Merge "acts: add test case for hotspot"
diff --git a/acts/framework/acts/base_test.py b/acts/framework/acts/base_test.py
index a0002ac..c76d98a 100755
--- a/acts/framework/acts/base_test.py
+++ b/acts/framework/acts/base_test.py
@@ -192,6 +192,7 @@
self.consecutive_failure_limit = self.user_params.get(
'consecutive_failure_limit', -1)
self.size_limit_reached = False
+ self.retryable_exceptions = signals.TestFailure
# Initialize a controller manager (Mobly)
self._controller_manager = controller_manager.ControllerManager(
@@ -514,6 +515,14 @@
begin_time: Logline format timestamp taken when the test started.
"""
+ def on_retry():
+ """Function to run before retrying a test through get_func_with_retry.
+
+ This function runs when a test is automatically retried. The function
+ can be used to modify internal test parameters, for example, to retry
+ a test with slightly different input variables.
+ """
+
def _exec_procedure_func(self, func, tr_record):
"""Executes a procedure function like on_pass, on_fail etc.
@@ -647,6 +656,7 @@
Returns: result of the test method
"""
+ exceptions = self.retryable_exceptions
def wrapper(*args, **kwargs):
error_msgs = []
extras = {}
@@ -656,8 +666,9 @@
if retry:
self.teardown_test()
self.setup_test()
+ self.on_retry()
return func(*args, **kwargs)
- except signals.TestFailure as e:
+ except exceptions as e:
retry = True
msg = 'Failure on attempt %d: %s' % (i+1, e.details)
self.log.warning(msg)
@@ -893,6 +904,7 @@
self._block_all_test_cases(tests)
setup_fail = True
except signals.TestAbortClass:
+ self.log.exception('Test class %s aborted' % self.TAG)
setup_fail = True
except Exception as e:
self.log.exception("Failed to setup %s.", self.TAG)
@@ -911,6 +923,7 @@
self.exec_one_testcase(test_name, test_func, self.cli_args)
return self.results
except signals.TestAbortClass:
+ self.log.exception('Test class %s aborted' % self.TAG)
return self.results
except signals.TestAbortAll as e:
# Piggy-back test results on this exception object so we don't lose
diff --git a/acts/framework/acts/controllers/access_point.py b/acts/framework/acts/controllers/access_point.py
index f57d8e8..4bdb924 100755
--- a/acts/framework/acts/controllers/access_point.py
+++ b/acts/framework/acts/controllers/access_point.py
@@ -16,9 +16,7 @@
import collections
import ipaddress
-import logging
import os
-import time
from acts import logger
from acts.controllers.ap_lib import ap_get_interface
@@ -26,7 +24,6 @@
from acts.controllers.ap_lib import dhcp_config
from acts.controllers.ap_lib import dhcp_server
from acts.controllers.ap_lib import hostapd
-from acts.controllers.ap_lib import hostapd_config
from acts.controllers.ap_lib import hostapd_constants
from acts.controllers.utils_lib.commands import ip
from acts.controllers.utils_lib.commands import route
@@ -108,15 +105,14 @@
ssh_settings: The ssh settings being used by the ssh connection.
dhcp_settings: The dhcp server settings being used.
"""
-
def __init__(self, configs):
"""
Args:
configs: configs for the access point from config file.
"""
self.ssh_settings = settings.from_config(configs['ssh_config'])
- self.log = logger.create_logger(lambda msg: '[Access Point|%s] %s' % (
- self.ssh_settings.hostname, msg))
+ self.log = logger.create_logger(lambda msg: '[Access Point|%s] %s' %
+ (self.ssh_settings.hostname, msg))
if 'ap_subnet' in configs:
self._AP_2G_SUBNET_STR = configs['ap_subnet']['2g']
@@ -139,6 +135,8 @@
# A map from network interface name to _ApInstance objects representing
# the hostapd instance running against the interface.
self._aps = dict()
+ self._dhcp = None
+ self._dhcp_bss = dict()
self.bridge = bridge_interface.BridgeInterface(self)
self.interfaces = ap_get_interface.ApInterfaces(self)
@@ -197,8 +195,8 @@
off parameters into the config.
Returns:
- An identifier for the ap being run. This identifier can be used
- later by this controller to control the ap.
+ An identifier for each ssid being started. These identifiers can be
+ used later by this controller to control the ap.
Raises:
Error: When the ap can't be brought up.
@@ -238,12 +236,12 @@
self._aps[interface] = new_instance
# Turn off the DHCP server, we're going to change its settings.
- self._dhcp.stop()
+ self.stop_dhcp()
# Clear all routes to prevent old routes from interfering.
self._route_cmd.clear_routes(net_interface=interface)
if hostapd_config.bss_lookup:
- # The dhcp_bss dictionary is created to hold the key/value
+ # The self._dhcp_bss dictionary is created to hold the key/value
# pair of the interface name and the ip scope that will be
# used for the particular interface. The a, b, c, d
# variables below are the octets for the ip address. The
@@ -251,20 +249,19 @@
# is requested. This part is designed to bring up the
# hostapd interfaces and not the DHCP servers for each
# interface.
- dhcp_bss = {}
+ self._dhcp_bss = dict()
counter = 1
for bss in hostapd_config.bss_lookup:
if interface_mac_orig:
- hostapd_config.bss_lookup[
- bss].bssid = (interface_mac_orig.stdout[:-1]
- + hex(last_octet)[-1:])
+ hostapd_config.bss_lookup[bss].bssid = (
+ interface_mac_orig.stdout[:-1] + hex(last_octet)[-1:])
self._route_cmd.clear_routes(net_interface=str(bss))
if interface is self.wlan_2g:
starting_ip_range = self._AP_2G_SUBNET_STR
else:
starting_ip_range = self._AP_5G_SUBNET_STR
a, b, c, d = starting_ip_range.split('.')
- dhcp_bss[bss] = dhcp_config.Subnet(
+ self._dhcp_bss[bss] = dhcp_config.Subnet(
ipaddress.ip_network('%s.%s.%s.%s' %
(a, b, str(int(c) + counter), d)))
counter = counter + 1
@@ -282,30 +279,112 @@
# hostapd and assigns the DHCP scopes that were defined but
# not used during the hostapd loop above. The k and v
# variables represent the interface name, k, and dhcp info, v.
- for k, v in dhcp_bss.items():
+ for k, v in self._dhcp_bss.items():
bss_interface_ip = ipaddress.ip_interface(
- '%s/%s' % (dhcp_bss[k].router,
- dhcp_bss[k].network.netmask))
+ '%s/%s' % (self._dhcp_bss[k].router,
+ self._dhcp_bss[k].network.netmask))
self._ip_cmd.set_ipv4_address(str(k), bss_interface_ip)
# Restart the DHCP server with our updated list of subnets.
configured_subnets = [x.subnet for x in self._aps.values()]
if hostapd_config.bss_lookup:
- for k, v in dhcp_bss.items():
+ for k, v in self._dhcp_bss.items():
configured_subnets.append(v)
- self._dhcp.start(config=dhcp_config.DhcpConfig(configured_subnets))
+ self.start_dhcp(subnets=configured_subnets)
+ self.start_nat()
- # The following three commands are needed to enable bridging between
+ bss_interfaces = [bss for bss in hostapd_config.bss_lookup]
+ bss_interfaces.append(interface)
+
+ return bss_interfaces
+
+ def start_dhcp(self, subnets):
+ """Start a DHCP server for the specified subnets.
+
+ This allows consumers of the access point objects to control DHCP.
+
+ Args:
+ subnets: A list of Subnets.
+ """
+ return self._dhcp.start(config=dhcp_config.DhcpConfig(subnets))
+
+ def stop_dhcp(self):
+ """Stop DHCP for this AP object.
+
+ This allows consumers of the access point objects to control DHCP.
+ """
+ return self._dhcp.stop()
+
+ def start_nat(self):
+ """Start NAT on the AP.
+
+ This allows consumers of the access point objects to enable NAT
+ on the AP.
+
+ Note that this is currently a global setting, since we don't
+ have per-interface masquerade rules.
+ """
+ # The following three commands are needed to enable NAT between
# the WAN and LAN/WLAN ports. This means anyone connecting to the
# WLAN/LAN ports will be able to access the internet if the WAN port
# is connected to the internet.
self.ssh.run('iptables -t nat -F')
- self.ssh.run(
- 'iptables -t nat -A POSTROUTING -o %s -j MASQUERADE' % self.wan)
+ self.ssh.run('iptables -t nat -A POSTROUTING -o %s -j MASQUERADE' %
+ self.wan)
self.ssh.run('echo 1 > /proc/sys/net/ipv4/ip_forward')
- return interface
+ def stop_nat(self):
+ """Stop NAT on the AP.
+
+ This allows consumers of the access point objects to disable NAT on the
+ AP.
+
+ Note that this is currently a global setting, since we don't have
+ per-interface masquerade rules.
+ """
+ self.ssh.run('iptables -t nat -F')
+ self.ssh.run('echo 0 > /proc/sys/net/ipv4/ip_forward')
+
+ def create_bridge(self, bridge_name, interfaces):
+ """Create the specified bridge and bridge the specified interfaces.
+
+ Args:
+ bridge_name: The name of the bridge to create.
+ interfaces: A list of interfaces to add to the bridge.
+ """
+
+ # Create the bridge interface
+ self.ssh.run(
+ 'brctl addbr {bridge_name}'.format(bridge_name=bridge_name))
+
+ for interface in interfaces:
+ self.ssh.run('brctl addif {bridge_name} {interface}'.format(
+ bridge_name=bridge_name, interface=interface))
+
+ def remove_bridge(self, bridge_name):
+ """Removes the specified bridge
+
+ Args:
+ bridge_name: The name of the bridge to remove.
+ """
+ # Check if the bridge exists.
+ #
+ # Cases where it may not are if we failed to initialize properly
+ #
+ # Or if we're doing 2.4Ghz and 5Ghz SSIDs and we've already torn
+ # down the bridge once, but we got called for each band.
+ result = self.ssh.run(
+ 'brctl show {bridge_name}'.format(bridge_name=bridge_name),
+ ignore_status=True)
+
+ # If the bridge exists, we'll get an exit_status of 0, indicating
+ # success, so we can continue and remove the bridge.
+ if result.exit_status == 0:
+ self.ssh.run('ip link set {bridge_name} down'.format(
+ bridge_name=bridge_name))
+ self.ssh.run(
+ 'brctl delbr {bridge_name}'.format(bridge_name=bridge_name))
def get_bssid_from_ssid(self, ssid, band):
"""Gets the BSSID from a provided SSID
@@ -350,7 +429,7 @@
instance = self._aps.get(identifier)
instance.hostapd.stop()
- self._dhcp.stop()
+ self.stop_dhcp()
self._ip_cmd.clear_ipv4_addresses(identifier)
# DHCP server needs to refresh in order to tear down the subnet no
@@ -360,7 +439,7 @@
configured_subnets = [x.subnet for x in self._aps.values()]
del self._aps[identifier]
if configured_subnets:
- self._dhcp.start(dhcp_config.DhcpConfig(configured_subnets))
+ self.start_dhcp(subnets=configured_subnets)
def stop_all_aps(self):
"""Stops all running aps on this device."""
@@ -368,7 +447,7 @@
for ap in list(self._aps.keys()):
try:
self.stop_ap(ap)
- except dhcp_server.NoInterfaceError as e:
+ except dhcp_server.NoInterfaceError:
pass
def close(self):
@@ -401,7 +480,7 @@
iface_lan = self.lan
- a, b, c, d = subnet_str.strip('/24').split('.')
+ a, b, c, _ = subnet_str.strip('/24').split('.')
bridge_ip = "%s.%s.%s.%s" % (a, b, c, BRIDGE_IP_LAST)
configs = (iface_wlan, iface_lan, bridge_ip)
@@ -420,10 +499,11 @@
self.ssh.send_file(scapy_path, self.scapy_install_path)
self.ssh.send_file(send_ra_path, self.scapy_install_path)
- scapy = os.path.join(self.scapy_install_path, scapy_path.split('/')[-1])
+ scapy = os.path.join(self.scapy_install_path,
+ scapy_path.split('/')[-1])
- untar_res = self.ssh.run(
- 'tar -xvf %s -C %s' % (scapy, self.scapy_install_path))
+ untar_res = self.ssh.run('tar -xvf %s -C %s' %
+ (scapy, self.scapy_install_path))
instl_res = self.ssh.run(
'cd %s; %s' % (self.scapy_install_path, SCAPY_INSTALL_COMMAND))
@@ -436,8 +516,13 @@
output = self.ssh.run(cmd)
self.scapy_install_path = None
- def send_ra(self, iface, mac=RA_MULTICAST_ADDR, interval=1, count=None,
- lifetime=LIFETIME, rtt=0):
+ def send_ra(self,
+ iface,
+ mac=RA_MULTICAST_ADDR,
+ interval=1,
+ count=None,
+ lifetime=LIFETIME,
+ rtt=0):
"""Invoke scapy and send RA to the device.
Args:
diff --git a/acts/framework/acts/controllers/ap_lib/hostapd_ap_preset.py b/acts/framework/acts/controllers/ap_lib/hostapd_ap_preset.py
index 9bb069a..930d525 100644
--- a/acts/framework/acts/controllers/ap_lib/hostapd_ap_preset.py
+++ b/acts/framework/acts/controllers/ap_lib/hostapd_ap_preset.py
@@ -43,7 +43,7 @@
dtim_period=None,
frag_threshold=None,
rts_threshold=None,
- force_wmm = None,
+ force_wmm=None,
beacon_interval=None,
short_preamble=None,
n_capabilities=None,
@@ -110,16 +110,14 @@
if frequency < 5000:
interface = iface_wlan_2g
mode = _get_or_default(mode, hostapd_constants.MODE_11N_MIXED)
- n_capabilities = _get_or_default(
- n_capabilities,
- [
- hostapd_constants.N_CAPABILITY_LDPC,
- hostapd_constants.N_CAPABILITY_SGI20,
- hostapd_constants.N_CAPABILITY_SGI40,
- hostapd_constants.N_CAPABILITY_TX_STBC,
- hostapd_constants.N_CAPABILITY_RX_STBC1,
- hostapd_constants.N_CAPABILITY_DSSS_CCK_40
- ])
+ n_capabilities = _get_or_default(n_capabilities, [
+ hostapd_constants.N_CAPABILITY_LDPC,
+ hostapd_constants.N_CAPABILITY_SGI20,
+ hostapd_constants.N_CAPABILITY_SGI40,
+ hostapd_constants.N_CAPABILITY_TX_STBC,
+ hostapd_constants.N_CAPABILITY_RX_STBC1,
+ hostapd_constants.N_CAPABILITY_DSSS_CCK_40
+ ])
config = hostapd_config.HostapdConfig(
ssid=ssid,
hidden=hidden,
@@ -151,39 +149,32 @@
if not vht_bandwidth:
pass
elif vht_bandwidth >= 40:
- n_capabilities = _get_or_default(
- n_capabilities,
- [
- hostapd_constants.N_CAPABILITY_LDPC,
- extended_channel,
- hostapd_constants.N_CAPABILITY_SGI20,
- hostapd_constants.N_CAPABILITY_SGI40,
- hostapd_constants.N_CAPABILITY_TX_STBC,
- hostapd_constants.N_CAPABILITY_RX_STBC1
- ])
- else:
- n_capabilities = _get_or_default(
- n_capabilities,
- [
- hostapd_constants.N_CAPABILITY_LDPC,
- hostapd_constants.N_CAPABILITY_SGI20,
- hostapd_constants.N_CAPABILITY_SGI40,
- hostapd_constants.N_CAPABILITY_TX_STBC,
- hostapd_constants.N_CAPABILITY_RX_STBC1,
- hostapd_constants.N_CAPABILITY_HT20
- ])
- ac_capabilities = _get_or_default(
- ac_capabilities,
- [
- hostapd_constants.AC_CAPABILITY_MAX_MPDU_11454,
- hostapd_constants.AC_CAPABILITY_RXLDPC,
- hostapd_constants.AC_CAPABILITY_SHORT_GI_80,
- hostapd_constants.AC_CAPABILITY_TX_STBC_2BY1,
- hostapd_constants.AC_CAPABILITY_RX_STBC_1,
- hostapd_constants.AC_CAPABILITY_MAX_A_MPDU_LEN_EXP7,
- hostapd_constants.AC_CAPABILITY_RX_ANTENNA_PATTERN,
- hostapd_constants.AC_CAPABILITY_TX_ANTENNA_PATTERN
+ n_capabilities = _get_or_default(n_capabilities, [
+ hostapd_constants.N_CAPABILITY_LDPC, extended_channel,
+ hostapd_constants.N_CAPABILITY_SGI20,
+ hostapd_constants.N_CAPABILITY_SGI40,
+ hostapd_constants.N_CAPABILITY_TX_STBC,
+ hostapd_constants.N_CAPABILITY_RX_STBC1
])
+ else:
+ n_capabilities = _get_or_default(n_capabilities, [
+ hostapd_constants.N_CAPABILITY_LDPC,
+ hostapd_constants.N_CAPABILITY_SGI20,
+ hostapd_constants.N_CAPABILITY_SGI40,
+ hostapd_constants.N_CAPABILITY_TX_STBC,
+ hostapd_constants.N_CAPABILITY_RX_STBC1,
+ hostapd_constants.N_CAPABILITY_HT20
+ ])
+ ac_capabilities = _get_or_default(ac_capabilities, [
+ hostapd_constants.AC_CAPABILITY_MAX_MPDU_11454,
+ hostapd_constants.AC_CAPABILITY_RXLDPC,
+ hostapd_constants.AC_CAPABILITY_SHORT_GI_80,
+ hostapd_constants.AC_CAPABILITY_TX_STBC_2BY1,
+ hostapd_constants.AC_CAPABILITY_RX_STBC_1,
+ hostapd_constants.AC_CAPABILITY_MAX_A_MPDU_LEN_EXP7,
+ hostapd_constants.AC_CAPABILITY_RX_ANTENNA_PATTERN,
+ hostapd_constants.AC_CAPABILITY_TX_ANTENNA_PATTERN
+ ])
config = hostapd_config.HostapdConfig(
ssid=ssid,
hidden=hidden,
@@ -250,8 +241,13 @@
channel=channel,
ssid=ssid,
security=security)
+ elif profile_name == 'actiontec_mi424wr':
+ config = actiontec.actiontec_mi424wr(iface_wlan_2g=iface_wlan_2g,
+ iface_wlan_5g=iface_wlan_5g,
+ channel=channel,
+ ssid=ssid,
+ security=security)
else:
raise ValueError('Invalid ap model specified (%s)' % profile_name)
return config
-
diff --git a/acts/framework/acts/controllers/ap_lib/third_party_ap_profiles/actiontec.py b/acts/framework/acts/controllers/ap_lib/third_party_ap_profiles/actiontec.py
index 598b2b5..e2452d6 100644
--- a/acts/framework/acts/controllers/ap_lib/third_party_ap_profiles/actiontec.py
+++ b/acts/framework/acts/controllers/ap_lib/third_party_ap_profiles/actiontec.py
@@ -16,6 +16,13 @@
from acts.controllers.ap_lib import hostapd_constants
+def _merge_dicts(*dict_args):
+ result = {}
+ for dictionary in dict_args:
+ result.update(dictionary)
+ return result
+
+
def actiontec_pk5000(iface_wlan_2g=None,
channel=None,
security=None,
@@ -49,10 +56,10 @@
beacon_interval = 100
dtim_period = 3
# Sets the basic rates and supported rates of the PK5000
- additional_params = {'basic_rates': '10 20 55 110',
- 'supported_rates': '10 20 55 110 60 90 120 180 '
- '240 360 480 540'
- }
+ additional_params = {
+ 'basic_rates': '10 20 55 110',
+ 'supported_rates': '10 20 55 110 60 90 120 180 240 360 480 540'
+ }
if security:
if security.security_mode is hostapd_constants.WPA2:
@@ -65,8 +72,9 @@
additional_params['vendor_elements'] = 'dd0e0050f204104a00011010' \
'44000102'
else:
- raise ValueError('The Actiontec PK5000 only supports WPA2. Invalid '
- 'security mode (%s)' % security.security_mode)
+ raise ValueError(
+ 'The Actiontec PK5000 only supports WPA2. Invalid security '
+ 'mode (%s)' % security.security_mode)
elif security is None:
pass
else:
@@ -88,3 +96,97 @@
return config
+
+def actiontec_mi424wr(iface_wlan_2g=None,
+ iface_wlan_5g=None,
+ channel=None,
+ security=None,
+ ssid=None):
+ # TODO(b/143104825): Permit RIFS once it is supported
+ """A simulated implementation of an Actiontec MI424WR AP.
+ Args:
+ iface_wlan_2g: The 2.4Ghz interface of the test AP.
+ iface_wlan_5g: The 5Ghz interface of the test AP.
+ channel: What channel to use (2.4Ghz or 5Ghz).
+ security: A security profile.
+ ssid: The network name.
+ Returns:
+ A hostapd config.
+
+ Differences from real MI424WR:
+ ERP Information:
+ MI424WR:
+ Use Protection: Set
+ Simulated:
+ Use Protection: Not Set
+ HT Capabilities:
+ MI424WR: [TX-STBC][DSSS_CCK-40][RX-STBC123]
+ Simulated: [TX-STBC][DSSS_CCK-40][RX-STBC1]
+ HT Information:
+ MI424WR:
+ RIFS: Premitted
+ Reserved (Subset 2): 0x1
+ Simulated:
+ RIFS: Prohibited
+ Reserved (Subset 2): 0x0
+ """
+ if not iface_wlan_2g or not iface_wlan_5g:
+ raise ValueError('WLAN interface for 2G and/or 5G is missing.')
+
+ if (iface_wlan_2g not in hostapd_constants.INTERFACE_2G_LIST
+ or iface_wlan_5g not in hostapd_constants.INTERFACE_5G_LIST):
+ raise ValueError('Invalid interface name was passed.')
+
+ rates = {'supported_rates': '10 20 55 110 60 90 120 180 240 360 480 540'}
+
+ if channel <= 11:
+ interface = iface_wlan_2g
+ rates['basic_rates'] = '10 20 55 110'
+ else:
+ interface = iface_wlan_5g
+ rates['basic_rates'] = '60 120 240'
+
+ n_capabilities = [
+ hostapd_constants.N_CAPABILITY_TX_STBC,
+ hostapd_constants.N_CAPABILITY_DSSS_CCK_40,
+ hostapd_constants.N_CAPABILITY_RX_STBC1
+ ]
+
+ # Proprietary Atheros Communication: Adv Capability IE
+ # Proprietary Atheros Communication: Unknown IE
+ # Country Info: US Only IE
+ vendor_elements = {
+ 'vendor_elements':
+ 'dd0900037f01010000ff7f'
+ 'dd0a00037f04010000000000'
+ '0706555320010b1b'
+ }
+ additional_params = _merge_dicts(rates, vendor_elements)
+
+ if security:
+ if security.security_mode is hostapd_constants.WPA2:
+ if not security.wpa2_cipher == 'CCMP':
+ raise ValueError('The mock Actiontec MI424WR only supports a '
+ 'WPA2 unicast and multicast cipher of CCMP.'
+ 'Invalid cipher mode (%s)' %
+ security.security.wpa2_cipher)
+ else:
+ raise ValueError('The mock Actiontec MI424WR only supports WPA2. '
+ 'Invalid security mode (%s)' %
+ security.security_mode)
+
+ config = hostapd_config.HostapdConfig(
+ ssid=ssid,
+ channel=channel,
+ hidden=False,
+ security=security,
+ interface=interface,
+ mode=hostapd_constants.MODE_11N_MIXED,
+ force_wmm=True,
+ beacon_interval=100,
+ dtim_period=1,
+ short_preamble=True,
+ n_capabilities=n_capabilities,
+ additional_parameters=additional_params)
+
+ return config
diff --git a/acts/framework/acts/controllers/iperf_client.py b/acts/framework/acts/controllers/iperf_client.py
index e2728ea..40c6993 100644
--- a/acts/framework/acts/controllers/iperf_client.py
+++ b/acts/framework/acts/controllers/iperf_client.py
@@ -22,6 +22,7 @@
from acts import context
from acts import utils
from acts.controllers.android_device import AndroidDevice
+from acts.controllers.iperf_server import _AndroidDeviceBridge
from acts.controllers.utils_lib.ssh import connection
from acts.controllers.utils_lib.ssh import settings
from acts.event import event_bus
@@ -172,32 +173,6 @@
return full_out_path
-# TODO(markdr): Remove this after automagic controller creation has been
-# removed.
-class _AndroidDeviceBridge(object):
- """A helper class that bridges the IPerfClientOverAdb to the AndroidDevices.
-
- Using this class, IPerfClientOverAdb can access the AndroidDevices on the
- test
- """
- android_devices = {}
-
- @staticmethod
- @subscribe_static(TestClassBeginEvent)
- def on_test_begin(event):
- for device in getattr(event.test_class, 'android_devices', []):
- _AndroidDeviceBridge.android_devices[device.serial] = device
-
- @staticmethod
- @subscribe_static(TestClassEndEvent)
- def on_test_end(_):
- _AndroidDeviceBridge.android_devices = {}
-
-
-event_bus.register_subscription(_AndroidDeviceBridge.on_test_begin.subscription)
-event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription)
-
-
class IPerfClientOverAdb(IPerfClientBase):
"""Class that handles iperf3 operations over ADB devices."""
@@ -217,7 +192,7 @@
if isinstance(self._android_device_or_serial, AndroidDevice):
return self._android_device_or_serial
else:
- return _AndroidDeviceBridge.android_devices[
+ return _AndroidDeviceBridge.android_devices()[
self._android_device_or_serial]
def start(self, ip, iperf_args, tag, timeout=3600):
diff --git a/acts/framework/acts/controllers/iperf_server.py b/acts/framework/acts/controllers/iperf_server.py
index d6022d6..bedc05a 100755
--- a/acts/framework/acts/controllers/iperf_server.py
+++ b/acts/framework/acts/controllers/iperf_server.py
@@ -439,20 +439,27 @@
class _AndroidDeviceBridge(object):
"""A helper class for connecting serial numbers to AndroidDevices."""
- # A dict of serial -> AndroidDevice, where AndroidDevice is a device found
- # in the current TestClass's controllers.
- android_devices = {}
+ _test_class = None
@staticmethod
@subscribe_static(TestClassBeginEvent)
def on_test_begin(event):
- for device in getattr(event.test_class, 'android_devices', []):
- _AndroidDeviceBridge.android_devices[device.serial] = device
+ _AndroidDeviceBridge._test_class = event.test_class
@staticmethod
@subscribe_static(TestClassEndEvent)
def on_test_end(_):
- _AndroidDeviceBridge.android_devices = {}
+ _AndroidDeviceBridge._test_class = None
+
+ @staticmethod
+ def android_devices():
+ """A dict of serial -> AndroidDevice, where AndroidDevice is a device
+ found in the current TestClass's controllers.
+ """
+ if not _AndroidDeviceBridge._test_class:
+ return {}
+ return {device.serial: device
+ for device in _AndroidDeviceBridge._test_class.android_devices}
event_bus.register_subscription(
@@ -493,7 +500,7 @@
if isinstance(self._android_device_or_serial, AndroidDevice):
return self._android_device_or_serial
else:
- return _AndroidDeviceBridge.android_devices[
+ return _AndroidDeviceBridge.android_devices()[
self._android_device_or_serial]
def _get_device_log_path(self):
diff --git a/acts/framework/acts/controllers/monsoon_lib/sampling/hvpm/packet.py b/acts/framework/acts/controllers/monsoon_lib/sampling/hvpm/packet.py
index f62975d..2233370 100644
--- a/acts/framework/acts/controllers/monsoon_lib/sampling/hvpm/packet.py
+++ b/acts/framework/acts/controllers/monsoon_lib/sampling/hvpm/packet.py
@@ -53,8 +53,8 @@
1 │ 2 │ uint16 │ Main │ Fine │ Calibration/Measurement value
2 │ 4 │ uint16 │ USB │ Coarse │ Calibration/Measurement value
3 │ 6 │ uint16 │ USB │ Fine │ Calibration/Measurement value
- 4 │ 8 │ int16 │ Aux │ Coarse │ Calibration/Measurement value
- 5 │ 10 │ int16 │ Aux │ Fine │ Calibration/Measurement value
+ 4 │ 8 │ uint16 │ Aux │ Coarse │ Calibration/Measurement value
+ 5 │ 10 │ uint16 │ Aux │ Fine │ Calibration/Measurement value
6 │ 12 │ uint16 │ Main │ Voltage │ Main V measurement, or Aux V
│ │ │ │ │ if setVoltageChannel == 1
7 │ 14 │ uint16 │ USB │ Voltage │ USB Voltage
@@ -76,7 +76,7 @@
SIZE = 18
def __init__(self, raw_data, sample_time):
- self.values = struct.unpack('>4H2h2H2B', raw_data)
+ self.values = struct.unpack('>8H2B', raw_data)
self._sample_time = sample_time
def __getitem__(self, channel_and_reading_granularity):
diff --git a/acts/framework/acts/controllers/monsoon_lib/sampling/hvpm/transformers.py b/acts/framework/acts/controllers/monsoon_lib/sampling/hvpm/transformers.py
index e91a89b..5ddc23c 100644
--- a/acts/framework/acts/controllers/monsoon_lib/sampling/hvpm/transformers.py
+++ b/acts/framework/acts/controllers/monsoon_lib/sampling/hvpm/transformers.py
@@ -438,10 +438,10 @@
zero_offset += cal_zero
if cal_ref - zero_offset != 0:
slope = scale / (cal_ref - zero_offset)
- if granularity == Granularity.FINE:
- slope /= 1000
else:
slope = 0
+ if granularity == Granularity.FINE:
+ slope /= 1000
index = HvpmMeasurement.get_index(channel, granularity)
calibrated_value[:, granularity] = slope * (
@@ -452,7 +452,7 @@
readings[:, channel] = np.where(
measurements[:, fine_data_position] < self.fine_threshold,
calibrated_value[:, Granularity.FINE],
- calibrated_value[:, Granularity.COARSE])
+ calibrated_value[:, Granularity.COARSE]) / 1000.0 # to mA
main_voltage_index = HvpmMeasurement.get_index(Channel.MAIN,
Reading.VOLTAGE)
diff --git a/acts/framework/acts/controllers/monsoon_lib/sampling/lvpm_stock/packet.py b/acts/framework/acts/controllers/monsoon_lib/sampling/lvpm_stock/packet.py
index 80c8274..b0f8839 100644
--- a/acts/framework/acts/controllers/monsoon_lib/sampling/lvpm_stock/packet.py
+++ b/acts/framework/acts/controllers/monsoon_lib/sampling/lvpm_stock/packet.py
@@ -54,9 +54,9 @@
Val │ Byte │ Type │ Monsoon │ Reading │
Pos │ Offset │ Format │ Channel │ Type │ Description
────┼────────┼────────┼─────────┼─────────┼──────────────────────────────
- 0 │ 0 │ uint16 │ Main │ Current │ Calibration value.
- 1 │ 2 │ uint16 │ USB │ Current │ Calibration value.
- 2 │ 4 │ uint16 │ Aux │ Current │ Calibration value.
+ 0 │ 0 │ int16 │ Main │ Current │ Calibration value.
+ 1 │ 2 │ int16 │ USB │ Current │ Calibration value.
+ 2 │ 4 │ int16 │ Aux │ Current │ Calibration value.
3 │ 6 │ uint16 │ Main │ Voltage │ Calibration value.
If the measurement is a power reading:
@@ -64,11 +64,11 @@
Val │ Byte │ Type │ Monsoon │ Reading │
Pos │ Offset │ Format │ Channel │ Type │ Description
────┼────────┼────────┼─────────┼─────────┼──────────────────────────────
- 0 │ 0 │ uint16 │ Main │ Current │ b0: if 1, Coarse, else Fine
+ 0 │ 0 │ int16 │ Main │ Current │ b0: if 1, Coarse, else Fine
│ │ │ │ │ b1-7: Measurement value.
- 1 │ 2 │ uint16 │ USB │ Current │ b0: if 1, Coarse, else Fine
+ 1 │ 2 │ int16 │ USB │ Current │ b0: if 1, Coarse, else Fine
│ │ │ │ │ b1-7: Measurement value.
- 2 │ 4 │ uint16 │ Aux │ Current │ b0: if 1, Coarse, else Fine
+ 2 │ 4 │ int16 │ Aux │ Current │ b0: if 1, Coarse, else Fine
│ │ │ │ │ b1-7: Measurement value.
3 │ 6 │ uint16 │ Main │ Voltage │ Measurement value.
@@ -86,7 +86,7 @@
sample_type: The type of sample that was recorded.
entry_index: The index of the measurement within the packet.
"""
- self.values = struct.unpack('>4H', raw_data)
+ self.values = struct.unpack('>3hH', raw_data)
self._sample_time = sample_time
self._sample_type = sample_type
diff --git a/acts/framework/acts/controllers/monsoon_lib/sampling/lvpm_stock/stock_transformers.py b/acts/framework/acts/controllers/monsoon_lib/sampling/lvpm_stock/stock_transformers.py
index eaf60b0..becc4ee 100644
--- a/acts/framework/acts/controllers/monsoon_lib/sampling/lvpm_stock/stock_transformers.py
+++ b/acts/framework/acts/controllers/monsoon_lib/sampling/lvpm_stock/stock_transformers.py
@@ -340,32 +340,6 @@
return False
return True
- @staticmethod
- def _get_currents(sample, calibration_data):
- """Returns the list of current values for each channel.
-
- Args:
- sample: The Sample object to determine the current values of.
- calibration_data: The CalibrationCollection used to calibrate the
- sample.
-
- Returns:
-
- """
- currents = [0] * 3
- for channel in Channel.values:
- current = sample[channel]
- granularity = Granularity.FINE
- if current & 1:
- current &= ~1
- granularity = Granularity.COARSE
-
- zero = calibration_data.get(channel, Origin.ZERO, granularity)
- scale = calibration_data.get(channel, Origin.SCALE, granularity)
- currents[channel] = (current - zero) * scale
-
- return currents
-
def _transform_buffer(self, buffer):
calibration_data = buffer.calibration_data
@@ -393,7 +367,7 @@
# Monsoon.py algorithm.
readings[:, channel] = np.where(
measurements[:, channel] & 1,
- (measurements[:, channel] - 1 - coarse_zero) * coarse_scale,
+ ((measurements[:, channel] & ~1) - coarse_zero) * coarse_scale,
(measurements[:, channel] - fine_zero) * fine_scale)
for i in range(len(buffer.samples)):
diff --git a/acts/framework/acts/libs/uicd/uicd_cli.py b/acts/framework/acts/libs/uicd/uicd_cli.py
index b542cf0..8388551 100644
--- a/acts/framework/acts/libs/uicd/uicd_cli.py
+++ b/acts/framework/acts/libs/uicd/uicd_cli.py
@@ -45,7 +45,7 @@
containing them.
log_path: Directory for storing logs generated by Uicd.
"""
- self._uicd_zip = uicd_zip
+ self._uicd_zip = uicd_zip[0] if isinstance(uicd_zip, list) else uicd_zip
self._uicd_path = tempfile.mkdtemp(prefix='uicd')
self._log_path = log_path
if self._log_path:
diff --git a/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py b/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py
index 669ab0c..bf4d549 100644
--- a/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py
+++ b/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py
@@ -100,7 +100,7 @@
class PixelBudsBluetoothHandsfreeAbstractDevice(
- BluetoothHandsfreeAbstractDevice):
+ BluetoothHandsfreeAbstractDevice):
CMD_EVENT = 'EvtHex'
@@ -250,6 +250,62 @@
return self.jaybird_controller.press_volume_up()
+class AndroidHeadsetBluetoothHandsfreeAbstractDevice(
+ BluetoothHandsfreeAbstractDevice):
+
+ def __init__(self, ad_controller):
+ self.ad_controller = ad_controller
+
+ @property
+ def mac_address(self):
+ return self.ad_controller.droid.bluetoothGetLocalAddress()
+
+ def accept_call(self):
+ return self.ad_controller.droid.telecomAcceptRingingCall(None)
+
+ def end_call(self):
+ return self.ad_controller.droid.telecomEndCall()
+
+ def enter_pairing_mode(self):
+ self.ad_controller.droid.bluetoothStartPairingHelper(True)
+ return self.ad_controller.droid.bluetoothMakeDiscoverable()
+
+ def next_track(self):
+ return (self.ad_controller.droid.
+ bluetoothMediaPassthrough("skipNext"))
+
+ def pause(self):
+ return self.ad_controller.droid.bluetoothMediaPassthrough("pause")
+
+ def play(self):
+ return self.ad_controller.droid.bluetoothMediaPassthrough("play")
+
+ def power_off(self):
+ return self.ad_controller.droid.bluetoothToggleState(False)
+
+ def power_on(self):
+ return self.ad_controller.droid.bluetoothToggleState(True)
+
+ def previous_track(self):
+ return (self.ad_controller.droid.
+ bluetoothMediaPassthrough("skipPrev"))
+
+ def reject_call(self):
+ return self.ad_controller.droid.telecomCallDisconnect(
+ self.ad_controller.droid.telecomCallGetCallIds()[0])
+
+ def volume_down(self):
+ target_step = self.ad_controller.droid.getMediaVolume() - 1
+ target_step = max(target_step, 0)
+ return self.ad_controller.droid.setMediaVolume(target_step)
+
+ def volume_up(self):
+ target_step = self.ad_controller.droid.getMediaVolume() + 1
+ max_step = self.ad_controller.droid.getMaxMediaVolume()
+ target_step = min(target_step, max_step)
+ return self.ad_controller.droid.setMediaVolume(target_step)
+
+
class BluetoothHandsfreeAbstractDeviceFactory:
"""Generates a BluetoothHandsfreeAbstractDevice for any device controller.
"""
@@ -257,7 +313,8 @@
_controller_abstract_devices = {
'EarstudioReceiver': EarstudioReceiverBluetoothHandsfreeAbstractDevice,
'JaybirdX3Earbuds': JaybirdX3EarbudsBluetoothHandsfreeAbstractDevice,
- 'ParentDevice': PixelBudsBluetoothHandsfreeAbstractDevice
+ 'ParentDevice': PixelBudsBluetoothHandsfreeAbstractDevice,
+ 'AndroidDevice': AndroidHeadsetBluetoothHandsfreeAbstractDevice
}
def generate(self, controller):
diff --git a/acts/framework/acts/test_utils/bt/bt_test_utils.py b/acts/framework/acts/test_utils/bt/bt_test_utils.py
index 0409d91..57992eb 100644
--- a/acts/framework/acts/test_utils/bt/bt_test_utils.py
+++ b/acts/framework/acts/test_utils/bt/bt_test_utils.py
@@ -1603,4 +1603,3 @@
log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg))
return False
return True
-
diff --git a/acts/framework/acts/test_utils/bt/pts/fuchsia_pts_ics_lib.py b/acts/framework/acts/test_utils/bt/pts/fuchsia_pts_ics_lib.py
index 5ea32c1..f2f9b2c 100644
--- a/acts/framework/acts/test_utils/bt/pts/fuchsia_pts_ics_lib.py
+++ b/acts/framework/acts/test_utils/bt/pts/fuchsia_pts_ics_lib.py
@@ -108,6 +108,56 @@
b'TSPC_A2DP_7a_3': b'FALSE',
b'TSPC_A2DP_7b_1': b'FALSE',
b'TSPC_A2DP_7b_2': b'FALSE',
+
+ # Not available in Launch Studio Yet
+ b'TSPC_A2DP_10_1': b'FALSE',
+ b'TSPC_A2DP_10_2': b'FALSE',
+ b'TSPC_A2DP_10_3': b'FALSE',
+ b'TSPC_A2DP_10_4': b'FALSE',
+ b'TSPC_A2DP_10_5': b'FALSE',
+ b'TSPC_A2DP_10_6': b'FALSE',
+ b'TSPC_A2DP_11_1': b'FALSE',
+ b'TSPC_A2DP_11_2': b'FALSE',
+ b'TSPC_A2DP_11_3': b'FALSE',
+ b'TSPC_A2DP_11_4': b'FALSE',
+ b'TSPC_A2DP_11_5': b'FALSE',
+ b'TSPC_A2DP_11_6': b'FALSE',
+ b'TSPC_A2DP_12_2': b'FALSE',
+ b'TSPC_A2DP_12_3': b'FALSE',
+ b'TSPC_A2DP_12_3': b'FALSE',
+ b'TSPC_A2DP_12_4': b'FALSE',
+ b'TSPC_A2DP_13_1': b'FALSE',
+ b'TSPC_A2DP_13_2': b'FALSE',
+ b'TSPC_A2DP_13_3': b'FALSE',
+ b'TSPC_A2DP_13_4': b'FALSE',
+ b'TSPC_A2DP_14_1': b'FALSE',
+ b'TSPC_A2DP_14_2': b'FALSE',
+ b'TSPC_A2DP_14_3': b'FALSE',
+ b'TSPC_A2DP_14_4': b'FALSE',
+ b'TSPC_A2DP_14_5': b'FALSE',
+ b'TSPC_A2DP_15_1': b'FALSE',
+ b'TSPC_A2DP_15_2': b'FALSE',
+ b'TSPC_A2DP_15_3': b'FALSE',
+ b'TSPC_A2DP_15_4': b'FALSE',
+ b'TSPC_A2DP_15_5': b'FALSE',
+ b'TSPC_A2DP_15_6': b'FALSE',
+ b'TSPC_A2DP_3_2a': b'FALSE',
+ b'TSPC_A2DP_3_2b': b'FALSE',
+ b'TSPC_A2DP_3_2c': b'FALSE',
+ b'TSPC_A2DP_3_2d': b'FALSE',
+ b'TSPC_A2DP_3_2e': b'FALSE',
+ b'TSPC_A2DP_3_2f': b'FALSE',
+ b'TSPC_A2DP_5_2a': b'FALSE',
+ b'TSPC_A2DP_5_2b': b'FALSE',
+ b'TSPC_A2DP_5_2c': b'FALSE',
+ b'TSPC_A2DP_8_2': b'FALSE',
+ b'TSPC_A2DP_8_3': b'FALSE',
+ b'TSPC_A2DP_8_4': b'FALSE',
+ b'TSPC_A2DP_9_1': b'FALSE',
+ b'TSPC_A2DP_9_2': b'FALSE',
+ b'TSPC_A2DP_9_3': b'FALSE',
+ b'TSPC_A2DP_9_4': b'FALSE',
+
}
diff --git a/acts/framework/acts/test_utils/bt/pts/pts_base_class.py b/acts/framework/acts/test_utils/bt/pts/pts_base_class.py
index 33fa81b..b2c48d6 100644
--- a/acts/framework/acts/test_utils/bt/pts/pts_base_class.py
+++ b/acts/framework/acts/test_utils/bt/pts/pts_base_class.py
@@ -121,8 +121,6 @@
# TODO: Implement MMIs as necessary
}
}
-
- self.pts.setup_pts()
self.pts.bind_to(self.process_next_action)
def teardown_class(self):
@@ -169,7 +167,7 @@
return _safe_wrap_test_case
- def process_next_action(self, profile, action):
+ def process_next_action(self, action):
func = self.pts_action_mapping.get(
self.pts.pts_profile_mmi_request).get(action, "Nothing")
if func is not 'Nothing':
diff --git a/acts/framework/acts/test_utils/bt/simulated_carkit_device.py b/acts/framework/acts/test_utils/bt/simulated_carkit_device.py
index 7279fe6..84fcc5e 100644
--- a/acts/framework/acts/test_utils/bt/simulated_carkit_device.py
+++ b/acts/framework/acts/test_utils/bt/simulated_carkit_device.py
@@ -19,6 +19,10 @@
from acts.controllers import android_device
from acts.test_utils.bt.bt_test_utils import bluetooth_enabled_check
+# TODO: This class to be deprecated for
+# ../acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py
+
+
class SimulatedCarkitDevice():
def __init__(self, serial):
self.ad = android_device.create(serial)[0]
diff --git a/acts/framework/acts/test_utils/gnss/gnss_test_utils.py b/acts/framework/acts/test_utils/gnss/gnss_test_utils.py
index 3b4ed38..dba308c 100644
--- a/acts/framework/acts/test_utils/gnss/gnss_test_utils.py
+++ b/acts/framework/acts/test_utils/gnss/gnss_test_utils.py
@@ -99,6 +99,7 @@
ad.adb.shell("echo DEBUG_LEVEL = 5 >> /vendor/etc/gps.conf")
ad.adb.shell("echo %r >> /data/local.prop" % LOCAL_PROP_FILE_CONTENTS)
ad.adb.shell("chmod 644 /data/local.prop")
+ ad.adb.shell("setprop persist.logd.logpersistd.size 20000")
ad.adb.shell("setprop persist.logd.size 16777216")
ad.adb.shell("setprop persist.vendor.radio.adb_log_on 1")
ad.adb.shell("setprop persist.logd.logpersistd logcatd")
@@ -289,7 +290,8 @@
shutil.make_archive(gnss_log_path, "zip", gnss_log_path)
shutil.rmtree(gnss_log_path)
output_path = os.path.join(DEFAULT_QXDM_LOG_PATH, "logs/.")
- file_count = ad.adb.shell("find %s -type f -iname *.qmdl | wc -l" % output_path)
+ file_count = ad.adb.shell(
+ "find %s -type f -iname *.qmdl | wc -l" % output_path)
if not int(file_count) == 0:
qxdm_log_name = "QXDM_%s_%s" % (ad.model, ad.serial)
qxdm_log_path = os.path.join(log_path, qxdm_log_name)
@@ -333,7 +335,8 @@
ad.log.error("Mobile data is at unknown state and set to %d" % out)
def gnss_trigger_modem_ssr(ad, dwelltime=60):
- """Trigger modem SSR crash and verify if modem crash and recover successfully.
+ """Trigger modem SSR crash and verify if modem crash and recover
+ successfully.
Args:
ad: An AndroidDevice object.
@@ -535,7 +538,8 @@
for i in range(retries):
begin_time = get_current_epoch_time()
clear_aiding_data_by_gtw_gpstool(ad)
- ad.log.info("Start %s on GTW_GPSTool - attempt %d" % (type.upper(), i+1))
+ ad.log.info("Start %s on GTW_GPSTool - attempt %d" % (type.upper(),
+ i+1))
start_gnss_by_gtw_gpstool(ad, True, type)
for _ in range(10 + criteria):
logcat_results = ad.search_logcat("First fixed", begin_time)
@@ -547,16 +551,16 @@
if (first_fixed/1000) <= criteria:
return True
start_gnss_by_gtw_gpstool(ad, False, type)
- raise signals.TestFailure("Fail to get %s location fixed within "
- "%d seconds criteria." %
- (type.upper(), criteria))
+ raise signals.TestFailure("Fail to get %s location fixed "
+ "within %d seconds criteria."
+ % (type.upper(), criteria))
time.sleep(1)
if not ad.is_adb_logcat_on:
ad.start_adb_logcat()
check_currrent_focus_app(ad)
start_gnss_by_gtw_gpstool(ad, False, type)
- raise signals.TestFailure("Fail to get %s location fixed within %d attempts."
- % (type.upper(), retries))
+ raise signals.TestFailure("Fail to get %s location fixed within %d "
+ "attempts." % (type.upper(), retries))
def start_ttff_by_gtw_gpstool(ad, ttff_mode, iteration):
"""Identify which TTFF mode for different test items.
@@ -581,9 +585,9 @@
begin_time):
ad.log.info("Send TTFF start_test_action successfully.")
break
- if i == 3:
- check_currrent_focus_app(ad)
- raise signals.TestFailure("Fail to send TTFF start_test_action.")
+ else:
+ check_currrent_focus_app(ad)
+ raise signals.TestFailure("Fail to send TTFF start_test_action.")
def gnss_tracking_via_gtw_gpstool(ad, criteria, type="gnss", testtime=60):
"""Start GNSS/FLP tracking tests for input testtime on GTW_GPSTool.
@@ -624,6 +628,7 @@
track_data = {}
history_top4_cn = 0
history_cn = 0
+ l5flag = "false"
file_count = int(ad.adb.shell("find %s -type f -iname *.txt | wc -l"
% GNSSSTATUS_LOG_PATH))
if file_count != 1:
@@ -780,7 +785,8 @@
elif any(float(ttff_data[key].ttff_sec) == 0.0 for key in ttff_data.keys()):
ad.log.error("One or more TTFF %s Timeout" % ttff_mode)
return False
- elif any(float(ttff_data[key].ttff_sec) >= criteria for key in ttff_data.keys()):
+ elif any(float(ttff_data[key].ttff_sec) >= criteria for key in
+ ttff_data.keys()):
ad.log.error("One or more TTFF %s are over test criteria %d seconds"
% (ttff_mode, criteria))
return False
@@ -802,7 +808,10 @@
pe_list = [float(ttff_data[key].ttff_pe) for key in ttff_data.keys()]
cn_list = [float(ttff_data[key].ttff_cn) for key in ttff_data.keys()]
timeoutcount = sec_list.count(0.0)
- avgttff = sum(sec_list)/(len(sec_list) - timeoutcount)
+ if len(sec_list) == timeoutcount:
+ avgttff = 9527
+ else:
+ avgttff = sum(sec_list)/(len(sec_list) - timeoutcount)
if timeoutcount != 0:
maxttff = 9527
else:
@@ -865,7 +874,8 @@
ad: An AndroidDevice object.
"""
time.sleep(1)
- current = ad.adb.shell("dumpsys window | grep -E 'mCurrentFocus|mFocusedApp'")
+ current = ad.adb.shell(
+ "dumpsys window | grep -E 'mCurrentFocus|mFocusedApp'")
ad.log.debug("\n"+current)
def check_location_api(ad, retries):
@@ -887,7 +897,8 @@
logcat_results = ad.search_logcat("REPORT_LOCATION", begin_time)
if logcat_results:
ad.log.info("%s" % logcat_results[-1]["log_message"])
- ad.log.info("GnssLocationProvider reports location successfully.")
+ ad.log.info("GnssLocationProvider reports location "
+ "successfully.")
return True
if not ad.is_adb_logcat_on:
ad.start_adb_logcat()
@@ -910,10 +921,12 @@
time.sleep(1)
begin_time = get_current_epoch_time()
ad.log.info("Try to get NLP status - attempt %d" % (i+1))
- ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool --es mode nlp")
+ ad.adb.shell(
+ "am start -S -n com.android.gpstool/.GPSTool --es mode nlp")
while get_current_epoch_time() - begin_time <= 30000:
- logcat_results = ad.search_logcat(
- "LocationManagerService: incoming location: Location", begin_time)
+ logcat_results = ad.search_logcat("LocationManagerService: "
+ "incoming location: Location",
+ begin_time)
if logcat_results:
for logcat_result in logcat_results:
if location_type in logcat_result["log_message"]:
@@ -935,7 +948,8 @@
atten_value: attenuation value
"""
try:
- ad.log.info("Set attenuation value to \"%d\" for GNSS signal." % atten_value)
+ ad.log.info(
+ "Set attenuation value to \"%d\" for GNSS signal." % atten_value)
attenuator[0].set_atten(atten_value)
except Exception as e:
ad.log.error(e)
@@ -990,7 +1004,8 @@
ad.log.info("Open an youtube video - attempt %d" % (i+1))
ad.adb.shell("am start -a android.intent.action.VIEW -d \"%s\"" % url)
time.sleep(2)
- out = ad.adb.shell("dumpsys activity | grep NewVersionAvailableActivity")
+ out = ad.adb.shell(
+ "dumpsys activity | grep NewVersionAvailableActivity")
if out:
ad.log.info("Skip Youtube New Version Update.")
ad.send_keycode("BACK")
@@ -1011,12 +1026,72 @@
"""
try:
baseband_version = ad.adb.getprop("gsm.version.baseband")
- gms_version = ad.adb.shell("dumpsys package com.google.android.gms | "
- "grep versionName").split("\n")[0].split("=")[1]
+ gms_version = ad.adb.shell(
+ "dumpsys package com.google.android.gms | grep versionName"
+ ).split("\n")[0].split("=")[1]
+ mpss_version = ad.adb.shell("cat /sys/devices/soc0/images | grep MPSS "
+ "| cut -d ':' -f 3")
if not extra_msg:
ad.log.info("TestResult Baseband_Version %s" % baseband_version)
- ad.log.info("TestResult GMS_Version %s" % gms_version.replace(" ", ""))
+ ad.log.info(
+ "TestResult GMS_Version %s" % gms_version.replace(" ", ""))
+ ad.log.info("TestResult MPSS_Version %s" % mpss_version)
else:
- ad.log.info("%s, Baseband_Version = %s" % (extra_msg, baseband_version))
+ ad.log.info(
+ "%s, Baseband_Version = %s" % (extra_msg, baseband_version))
except Exception as e:
- ad.log.error(e)
\ No newline at end of file
+ ad.log.error(e)
+
+def start_toggle_gnss_by_gtw_gpstool(ad, iteration):
+ """Send toggle gnss off/on start_test_action
+
+ Args:
+ ad: An AndroidDevice object.
+ iteration: Iteration of toggle gnss off/on cycles.
+ """
+ msg_list = []
+ begin_time = get_current_epoch_time()
+ try:
+ for i in range(1, 4):
+ ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool "
+ "--es mode toggle --es cycle %d" % iteration)
+ time.sleep(1)
+ if ad.search_logcat("cmp=com.android.gpstool/.ToggleGPS",
+ begin_time):
+ ad.log.info("Send ToggleGPS start_test_action successfully.")
+ break
+ else:
+ check_currrent_focus_app(ad)
+ raise signals.TestFailure("Fail to send ToggleGPS "
+ "start_test_action within 3 attempts.")
+ time.sleep(2)
+ test_start = ad.search_logcat("GPSTool_ToggleGPS: startService",
+ begin_time)
+ if test_start:
+ ad.log.info(test_start[-1]["log_message"].split(":")[-1].strip())
+ else:
+ raise signals.TestFailure("Fail to start toggle GPS off/on test.")
+ # Every iteration is expected to finish within 4 minutes.
+ while get_current_epoch_time() - begin_time <= iteration * 240000:
+ crash_end = ad.search_logcat("Force finishing activity "
+ "com.android.gpstool/.GPSTool",
+ begin_time)
+ if crash_end:
+ raise signals.TestFailure("GPSTool crashed. Abort test.")
+ toggle_results = ad.search_logcat("GPSTool : msg", begin_time)
+ if toggle_results:
+ for toggle_result in toggle_results:
+ msg = toggle_result["log_message"]
+ if not msg in msg_list:
+ ad.log.info(msg.split(":")[-1].strip())
+ msg_list.append(msg)
+ if "timeout" in msg:
+ raise signals.TestFailure("Fail to get location fixed "
+ "within 60 seconds.")
+ if "Test end" in msg:
+ raise signals.TestPass("Completed quick toggle GNSS "
+ "off/on test.")
+ raise signals.TestFailure("Fail to finish toggle GPS off/on test "
+ "within %d minutes" % (iteration * 4))
+ finally:
+ ad.send_keycode("HOME")
diff --git a/acts/framework/acts/test_utils/instrumentation/adb_commands/common.py b/acts/framework/acts/test_utils/instrumentation/adb_commands/common.py
index 2991600..ffd43dd 100644
--- a/acts/framework/acts/test_utils/instrumentation/adb_commands/common.py
+++ b/acts/framework/acts/test_utils/instrumentation/adb_commands/common.py
@@ -62,6 +62,11 @@
nfc = DeviceState('svc nfc', 'enable', 'disable')
+# Calling
+
+disable_dialing = DeviceSetprop('ro.telephony.disable-call', 'true', 'false')
+
+
# Screen
screen_adaptive_brightness = DeviceSetting(
@@ -115,3 +120,12 @@
disable_doze = 'dumpsys deviceidle disable'
+
+# Miscellaneous
+
+test_harness = DeviceBinaryCommandSeries(
+ [
+ DeviceSetprop('ro.monkey'),
+ DeviceSetprop('ro.test_harness')
+ ]
+)
diff --git a/acts/framework/acts/test_utils/net/NetstackBaseTest.py b/acts/framework/acts/test_utils/net/NetstackBaseTest.py
new file mode 100755
index 0000000..a59a2e0
--- /dev/null
+++ b/acts/framework/acts/test_utils/net/NetstackBaseTest.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2019 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+from acts.base_test import BaseTestClass
+from acts import asserts
+
+
+class NetstackBaseTest(BaseTestClass):
+ def __init__(self, controllers):
+ BaseTestClass.__init__(self, controllers)
diff --git a/acts/framework/acts/test_utils/net/connectivity_const.py b/acts/framework/acts/test_utils/net/connectivity_const.py
index f4865ab..946d2c6 100644
--- a/acts/framework/acts/test_utils/net/connectivity_const.py
+++ b/acts/framework/acts/test_utils/net/connectivity_const.py
@@ -64,6 +64,12 @@
MULTIPATH_PREFERENCE_RELIABILITY = 1 << 1
MULTIPATH_PREFERENCE_PERFORMANCE = 1 << 2
+# Private DNS constants
+DNS_GOOGLE = "dns.google"
+PRIVATE_DNS_MODE_OFF = "off"
+PRIVATE_DNS_MODE_OPPORTUNISTIC = "opportunistic"
+PRIVATE_DNS_MODE_STRICT = "hostname"
+
# IpSec constants
SOCK_STREAM = 1
SOCK_DGRAM = 2
diff --git a/acts/framework/acts/test_utils/net/connectivity_test_utils.py b/acts/framework/acts/test_utils/net/connectivity_test_utils.py
index 02167c9..4b7668c 100644
--- a/acts/framework/acts/test_utils/net/connectivity_test_utils.py
+++ b/acts/framework/acts/test_utils/net/connectivity_test_utils.py
@@ -14,6 +14,7 @@
# limitations under the License.
from acts import asserts
+from acts.test_utils.net import connectivity_const as cconst
def start_natt_keepalive(ad, src_ip, src_port, dst_ip, interval = 10):
""" Start NAT-T keep alive on dut """
@@ -61,3 +62,17 @@
ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key)
return status
+
+def set_private_dns(ad, dns_mode, hostname=None):
+ """ Set private DNS mode on dut """
+ if dns_mode == cconst.PRIVATE_DNS_MODE_OFF:
+ ad.droid.setPrivateDnsMode(False)
+ else:
+ ad.droid.setPrivateDnsMode(True, hostname)
+
+ mode = ad.droid.getPrivateDnsMode()
+ host = ad.droid.getPrivateDnsSpecifier()
+ ad.log.info("DNS mode is %s and DNS server is %s" % (mode, host))
+ asserts.assert_true(dns_mode == mode and host == hostname,
+ "Failed to set DNS mode to %s and DNS to %s" % \
+ (dns_mode, hostname))
diff --git a/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py b/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py
index 7af1e46..14c9597 100644
--- a/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py
+++ b/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py
@@ -15,6 +15,7 @@
# limitations under the License.
import logging
+import time
import acts.test_utils.power.PowerBaseTest as PBT
@@ -26,15 +27,16 @@
from bokeh.models.widgets import DataTable, TableColumn
from bokeh.plotting import figure, output_file, save
+LOGTIME_RETRY_COUNT = 3
+RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
+RECOVER_MONSOON_RETRY_COUNT = 3
+MONSOON_RETRY_INTERVAL = 300
class PowerGnssBaseTest(PBT.PowerBaseTest):
"""
Base Class for all GNSS Power related tests
"""
- def __init__(self, controllers):
- base_test.BaseTestClass.__init__(self, controllers)
-
def collect_power_data(self):
"""Measure power and plot.
@@ -119,3 +121,30 @@
# Layout the plot and the datatable bar
l = layout([[dt], [plot]])
save(l)
+
+ def disconnect_usb(self, ad, sleeptime):
+ """Disconnect usb while device is on sleep and
+ connect the usb again once the sleep time completes
+
+ sleeptime: sleep time where dut is disconnected from usb
+ """
+ self.dut.adb.shell(RESET_BATTERY_STATS)
+ time.sleep(1)
+ for _ in range(LOGTIME_RETRY_COUNT):
+ self.mon_info.dut.disconnect_dut()
+ if not ad.is_connected():
+ time.sleep(sleeptime)
+ self.mon_info.dut.reconnect_dut()
+ break
+ else:
+ self.log.error('Test failed after maximum retry')
+ for _ in range(RECOVER_MONSOON_RETRY_COUNT):
+ if self.monsoon_recover():
+ break
+ else:
+ self.log.warning(
+ 'Wait for {} second then try again'.format(
+ MONSOON_RETRY_INTERVAL))
+ time.sleep(MONSOON_RETRY_INTERVAL)
+ else:
+ self.log.error('Failed to recover monsoon')
diff --git a/acts/framework/acts/test_utils/tel/TelephonyBaseTest.py b/acts/framework/acts/test_utils/tel/TelephonyBaseTest.py
index b4b40ed..6a0895d 100644
--- a/acts/framework/acts/test_utils/tel/TelephonyBaseTest.py
+++ b/acts/framework/acts/test_utils/tel/TelephonyBaseTest.py
@@ -167,6 +167,7 @@
"enable_radio_log_on", False)
self.cbrs_esim = self.user_params.get("cbrs_esim", False)
self.account_util = self.user_params.get("account_util", None)
+ self.save_passing_logs = self.user_params.get("save_passing_logs", False)
if isinstance(self.account_util, list):
self.account_util = self.account_util[0]
self.fi_util = self.user_params.get("fi_util", None)
@@ -189,7 +190,8 @@
"number_of_sims":2
}
break
-
+ if "anritsu_md8475a_ip_address" in self.user_params:
+ return
qxdm_log_mask_cfg = self.user_params.get("qxdm_log_mask_cfg", None)
if isinstance(qxdm_log_mask_cfg, list):
qxdm_log_mask_cfg = qxdm_log_mask_cfg[0]
@@ -332,8 +334,7 @@
if getattr(ad, "telephony_test_setup", None):
return True
- if "enable_wifi_verbose_logging" in self.user_params:
- ad.droid.wifiEnableVerboseLogging(WIFI_VERBOSE_LOGGING_ENABLED)
+ ad.droid.wifiEnableVerboseLogging(WIFI_VERBOSE_LOGGING_ENABLED)
# Disable Emergency alerts
# Set chrome browser start with no-first-run verification and
@@ -411,9 +412,7 @@
force_connectivity_metrics_upload(ad)
time.sleep(30)
try:
- if "enable_wifi_verbose_logging" in self.user_params:
- ad.droid.wifiEnableVerboseLogging(
- WIFI_VERBOSE_LOGGING_DISABLED)
+ ad.droid.wifiEnableVerboseLogging(WIFI_VERBOSE_LOGGING_DISABLED)
except Exception as e:
self.log.error("Failure with %s", e)
try:
@@ -477,6 +476,10 @@
def on_fail(self, test_name, begin_time):
self._take_bug_report(test_name, begin_time)
+ def on_pass(self, test_name, begin_time):
+ if self.save_passing_logs:
+ self._take_bug_report(test_name, begin_time)
+
def _ad_take_extra_logs(self, ad, test_name, begin_time):
ad.adb.wait_for_device()
result = True
diff --git a/acts/framework/acts/test_utils/tel/tel_test_utils.py b/acts/framework/acts/test_utils/tel/tel_test_utils.py
index be9ef84..0607fc1 100644
--- a/acts/framework/acts/test_utils/tel/tel_test_utils.py
+++ b/acts/framework/acts/test_utils/tel/tel_test_utils.py
@@ -3219,7 +3219,7 @@
def phone_switch_to_msim_mode(ad, retries=3, timeout=60):
result = False
if not ad.is_apk_installed("com.google.mdstest"):
- raise signals.TestSkipClass("mdstest is not installed")
+ raise signals.TestAbortClass("mdstest is not installed")
mode = ad.droid.telephonyGetPhoneCount()
if mode == 2:
ad.log.info("Device already in MSIM mode")
@@ -3262,7 +3262,7 @@
def phone_switch_to_ssim_mode(ad, retries=3, timeout=30):
result = False
if not ad.is_apk_installed("com.google.mdstest"):
- raise signals.TestSkipClass("mdstest is not installed")
+ raise signals.TestAbortClass("mdstest is not installed")
mode = ad.droid.telephonyGetPhoneCount()
if mode == 1:
ad.log.info("Device already in SSIM mode")
@@ -7605,6 +7605,14 @@
return True
+def get_host_ip_address(ad):
+ cmd = "|".join(("ifconfig", "grep eno1 -A1", "grep inet", "awk '{$1=$1};1'", "cut -d ' ' -f 2"))
+ destination_ip = exe_cmd(cmd)
+ destination_ip = (destination_ip.decode("utf-8")).split("\n")[0]
+ ad.log.info("Host IP is %s", destination_ip)
+ return destination_ip
+
+
def toggle_connectivity_monitor_setting(ad, state=True):
monitor_setting = ad.adb.getprop("persist.radio.enable_tel_mon")
ad.log.info("radio.enable_tel_mon setting is %s", monitor_setting)
diff --git a/acts/framework/acts/test_utils/wifi/ota_sniffer.py b/acts/framework/acts/test_utils/wifi/ota_sniffer.py
new file mode 100644
index 0000000..264fd36
--- /dev/null
+++ b/acts/framework/acts/test_utils/wifi/ota_sniffer.py
@@ -0,0 +1,370 @@
+from acts import logger
+from acts.controllers.utils_lib import ssh
+import csv
+import io
+import os
+
+
+def create(configs, logging_dir):
+ """Factory method for sniffer
+ Args:
+ configs: list of dicts with sniffer settings, settings must contain the following :
+ ssh_settings (ssh credentials required to log into the sniffer)
+ """
+ objs = []
+ for config in configs:
+ try:
+ if config["type"] == "tshark":
+ objs.append(TsharkSniffer(config, logging_dir))
+ elif config["type"] == "mock":
+ objs.append(MockSniffer(config, logging_dir))
+ except KeyError:
+ raise KeyError("Invalid sniffer configurations")
+ return objs
+
+
+def destroy(objs):
+ return
+
+
+class OtaSnifferBase(object):
+ """Base class provides functions whose implementation in shared by all sniffers"""
+
+ _log_file_counter = 0
+
+ def start_capture(self):
+ """Starts the sniffer Capture"""
+ raise NotImplementedError
+
+ def stop_capture(self):
+ """Stops the sniffer Capture"""
+ raise NotImplementedError
+
+ def _get_remote_dump_path(self):
+ return "sniffer_dump.csv"
+
+ def _get_full_file_path(self, tag=None):
+ """Returns the full file path for the sniffer capture dump file.
+
+ Returns the full file path (on test machine) for the sniffer capture dump file
+
+ Args:
+ tag: The tag appended to the sniffer capture dump file .
+ """
+ out_dir = os.path.join(self.log_dir, "sniffer_files")
+ if not os.path.exists(out_dir):
+ os.mkdir(out_dir)
+ tags = [tag, "count", OtaSnifferBase._log_file_counter]
+ out_file_name = 'Sniffer_Capture_%s.csv' % ('_'.join(
+ [str(x) for x in tags if x != '' and x is not None]))
+ OtaSnifferBase._log_file_counter += 1
+
+ file_path = os.path.join(out_dir, out_file_name)
+ return file_path
+
+
+class MockSniffer(OtaSnifferBase):
+ """Class that implements mock sniffer for test development and debug"""
+
+ def __init__(self, config, logging_dir):
+ self.log = logger.create_tagged_trace_logger("Mock Sniffer")
+ self.log_dir = logging_dir
+
+ def _ssh_into_sniffer(self):
+ """logs into the sniffer machine"""
+ self.log.info("Logging into the sniffer machine")
+
+ def _connect_to_network(self):
+ """ Connects to a given network
+
+ Args:
+ network: dictionary of network credentials; SSID and password
+ """
+ self.log.info("Connecting to wireless network ")
+
+ def _run_sniffer(self):
+ """Starts the sniffer"""
+ self.log.info("Starting Sniffer")
+ self.log.info(
+ "Executing sniffer command on the sniffer machine")
+
+ def _stop_sniffer(self):
+ """ Stops the sniffer"""
+ self.log.info("Stopping Sniffer")
+
+ def start_capture(self):
+ """Starts sniffer capture on the specified machine"""
+
+ self._ssh_into_sniffer()
+ self._connect_to_network()
+ self._run_sniffer()
+
+ def stop_capture(self):
+ """Stops the sniffer
+
+ Returns:
+ The name of the processed sniffer dump from the terminated sniffer process
+ """
+
+ self._stop_sniffer()
+ log_file = self._get_full_file_path("Mock")
+ return log_file
+
+
+class TsharkSniffer(OtaSnifferBase):
+ """Class that implements Tshark based Sniffer """
+
+ def __init__(self, config, logging_dir):
+ self.sniffer_proc_pid = None
+ self.log = logger.create_tagged_trace_logger("Tshark Sniffer")
+ self.ssh_config = config["ssh_config"]
+ self.params = config["sniffer_params"]
+ self.log_dir = logging_dir
+ self.type_subtype_dict = {
+ "0": "Association Requests",
+ "1": "Association Responses",
+ "2": "Reassociation Requests",
+ "3": "Resssociation Responses",
+ "4": "Probe Requests",
+ "5": "Probe Responses",
+ "8": "Beacon",
+ "9": "ATIM",
+ "10": "Disassociations",
+ "11": "Authentications",
+ "12": "Deauthentications",
+ "13": "Actions",
+ "24": "Block ACK Requests",
+ "25": "Block ACKs",
+ "26": "PS-Polls",
+ "27": "RTS",
+ "28": "CTS",
+ "29": "ACK",
+ "30": "CF-Ends",
+ "31": "CF-Ends/CF-Acks",
+ "32": "Data",
+ "33": "Data+CF-Ack",
+ "34": "Data+CF-Poll",
+ "35": "Data+CF-Ack+CF-Poll",
+ "36": "Null",
+ "37": "CF-Ack",
+ "38": "CF-Poll",
+ "39": "CF-Ack+CF-Poll",
+ "40": "QoS Data",
+ "41": "QoS Data+CF-Ack",
+ "42": "QoS Data+CF-Poll",
+ "43": "QoS Data+CF-Ack+CF-Poll",
+ "44": "QoS Null",
+ "46": "QoS CF-Poll (Null)",
+ "47": "QoS CF-Ack+CF-Poll (Null)"
+ }
+
+ self.tshark_columns = [
+ "frame_number", "frame_time_relative", "mactime", "frame_len",
+ "rssi", "channel", "ta", "ra", "bssid", "type", "subtype",
+ "duration", "seq", "retry", "pwrmgmt", "moredata", "ds", "phy",
+ "radio_datarate", "vht_datarate", "radiotap_mcs_index", "vht_mcs", "wlan_data_rate",
+ "11n_mcs_index", "11ac_mcs", "11n_bw", "11ac_bw", "vht_nss", "mcs_gi",
+ "vht_gi", "vht_coding", "ba_bm", "fc_status",
+ "bf_report"
+ ]
+
+
+ self._tshark_output_columns = [
+ "frame_number",
+ "frame_time_relative",
+ "mactime",
+ "ta",
+ "ra",
+ "bssid",
+ "rssi",
+ "channel",
+ "frame_len",
+ "Info",
+ "radio_datarate",
+ "radiotap_mcs_index",
+ "pwrmgmt",
+ "phy",
+ "vht_nss",
+ "vht_mcs",
+ "vht_datarate",
+ "11ac_mcs",
+ "11ac_bw",
+ "vht_gi",
+ "vht_coding",
+ "wlan_data_rate",
+ "11n_mcs_index",
+ "11n_bw",
+ "mcs_gi",
+ "type",
+ "subtype",
+ "duration",
+ "seq",
+ "retry",
+ "moredata",
+ "ds",
+ "ba_bm",
+ "fc_status",
+ "bf_report"
+ ]
+
+
+ self.tshark_fields = '-T fields -e frame.number -e frame.time_relative -e radiotap.mactime -e frame.len '+\
+ '-e radiotap.dbm_antsignal -e wlan_radio.channel '+\
+ '-e wlan.ta -e wlan.ra -e wlan.bssid '+\
+ '-e wlan.fc.type -e wlan.fc.type_subtype -e wlan.duration -e wlan.seq -e wlan.fc.retry -e wlan.fc.pwrmgt -e wlan.fc.moredata -e wlan.fc.ds '+\
+ '-e wlan_radio.phy '+\
+ '-e radiotap.datarate -e radiotap.vht.datarate.0 '+\
+ '-e radiotap.mcs.index -e radiotap.vht.mcs.0 '+\
+ '-e wlan_radio.data_rate -e wlan_radio.11n.mcs_index -e wlan_radio.11ac.mcs '+\
+ '-e wlan_radio.11n.bandwidth -e wlan_radio.11ac.bandwidth '+\
+ '-e radiotap.vht.nss.0 -e radiotap.mcs.gi -e radiotap.vht.gi -e radiotap.vht.coding.0 '+\
+ '-e wlan.ba.bm -e wlan.fcs.status -e wlan.vht.compressed_beamforming_report.snr '+ \
+ '-y IEEE802_11_RADIO -E separator="^" '
+
+ @property
+ def _started(self):
+ return self.sniffer_proc_pid is not None
+
+ def _ssh_into_sniffer(self):
+ """logs into the sniffer machine"""
+ self.log.info("Logging into Sniffer")
+ self._sniffer_server = ssh.connection.SshConnection(
+ ssh.settings.from_config(self.ssh_config))
+
+ def _connect_to_network(self, network):
+ """ Connects to a given network
+ Connects to a wireless network using networksetup utility
+
+ Args:
+ network: dictionary of network credentials; SSID and password
+ """
+ self.log.info("Connecting to network {}".format(network["SSID"]))
+
+ #Scan to see if the requested SSID is available
+ scan_result = self._sniffer_server.run("/usr/local/bin/airport -s")
+
+ if network["SSID"] not in scan_result.stdout:
+ self.log.error("{} not found in scan".format(network["SSID"]))
+
+ if "password" not in network.keys():
+ network["password"] = ""
+
+ if set(["SSID", "password"]).issubset(network.keys()):
+ pass
+ else:
+ raise KeyError("Incorrect Network Config")
+
+ connect_command = "networksetup -setairportnetwork en0 {} {}".format(
+ network["SSID"], network["password"])
+ self._sniffer_server.run(connect_command)
+
+ def _run_tshark(self, sniffer_command):
+ """Starts the sniffer"""
+
+ self.log.info("Starting Sniffer")
+ sniffer_job = self._sniffer_server.run_async(sniffer_command)
+ self.sniffer_proc_pid = sniffer_job.stdout
+
+ def _stop_tshark(self):
+ """ Stops the sniffer"""
+
+ self.log.info("Stopping Sniffer")
+
+ # while loop to kill the sniffer process
+ while True:
+ try:
+ #Returns error if process was killed already
+ self._sniffer_server.run("kill -15 {}".format(
+ str(self.sniffer_proc_pid)))
+ except:
+ pass
+ try:
+ #Returns 1 if process was killed
+ self._sniffer_server.run(
+ "/bin/ps aux| grep {} | grep -v grep".format(
+ self.sniffer_proc_pid))
+ except:
+ break
+
+ def _process_tshark_dump(self, dump, sniffer_tag):
+ """ Process tshark dump for better readability
+ Processes tshark dump for better readability and saves it to a file.
+ Adds an info column at the end of each row.
+ Format of the info columns: subtype of the frame, sequence no and retry status
+
+ Args:
+ dump : string of sniffer capture output
+ sniffer_tag : tag to be appended to the dump file
+
+ Returns:
+ log_file : name of the file where the processed dump is stored
+ """
+ dump = io.StringIO(dump)
+ log_file = self._get_full_file_path(sniffer_tag)
+ with open(log_file, "w") as output_csv:
+ reader = csv.DictReader(
+ dump, fieldnames=self.tshark_columns, delimiter="^")
+ writer = csv.DictWriter(
+ output_csv, fieldnames=self._tshark_output_columns, delimiter="\t")
+ writer.writeheader()
+ for row in reader:
+ if row["subtype"] in self.type_subtype_dict.keys():
+ row["Info"] = "{sub} S={seq} retry={retry_status}".format(
+ sub=self.type_subtype_dict[row["subtype"]],
+ seq=row["seq"],
+ retry_status=row["retry"])
+ else:
+ row["Info"] = "{sub} S={seq} retry={retry_status}\n".format(
+ sub=row["subtype"],
+ seq=row["seq"],
+ retry_status=row["retry"])
+ writer.writerow(row)
+ return log_file
+
+ def start_capture(self, network, duration=30):
+ """Starts sniffer capture on the specified machine"""
+
+ # Checking for existing sniffer processes
+ if self._started:
+ self.log.info("Sniffer already running")
+ return
+
+ self.tshark_command = "/usr/local/bin/tshark -l -I -t u -a duration:{}".format(
+ duration)
+
+ # Logging into the sniffer
+ self._ssh_into_sniffer()
+
+ #Connecting to network
+ self._connect_to_network(network)
+
+ sniffer_command = "{tshark} {fields} > {log_file}".format(
+ tshark=self.tshark_command,
+ fields=self.tshark_fields,
+ log_file=self._get_remote_dump_path())
+
+ #Starting sniffer capture by executing tshark command
+ self._run_tshark(sniffer_command)
+
+ def stop_capture(self, sniffer_tag=""):
+ """Stops the sniffer
+
+ Returns:
+ The name of the processed sniffer dump from the terminated sniffer process
+ """
+ #Checking if there is an ongoing sniffer capture
+ if not self._started:
+ self.log.error("No sniffer process running")
+ return
+ # Killing sniffer process
+ self._stop_tshark()
+
+ sniffer_dump = self._sniffer_server.run('cat {}'.format(
+ self._get_remote_dump_path()))
+
+ #Processing writing capture output to file
+ log_file = self._process_tshark_dump(sniffer_dump.stdout, sniffer_tag)
+
+ self.sniffer_proc_pid = None
+
+ return log_file
diff --git a/acts/framework/acts/test_utils/wifi/wifi_performance_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_performance_test_utils.py
index 18c50a5..e1039a0 100644
--- a/acts/framework/acts/test_utils/wifi/wifi_performance_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_performance_test_utils.py
@@ -25,8 +25,6 @@
import time
from acts.controllers.android_device import AndroidDevice
from acts.controllers.utils_lib import ssh
-from acts.metrics.core import ProtoMetric
-from acts.metrics.logger import MetricLogger
from acts import utils
from acts.test_utils.wifi import wifi_test_utils as wutils
from concurrent.futures import ThreadPoolExecutor
@@ -48,7 +46,6 @@
# Threading decorator
def nonblocking(f):
"""Creates a decorator transforming function calls to non-blocking"""
-
def wrap(*args, **kwargs):
executor = ThreadPoolExecutor(max_workers=1)
thread_future = executor.submit(f, *args, **kwargs)
@@ -89,18 +86,16 @@
self.llstats_incremental = self._empty_llstats()
def _empty_llstats(self):
- return collections.OrderedDict(
- mcs_stats=collections.OrderedDict(),
- summary=collections.OrderedDict())
+ return collections.OrderedDict(mcs_stats=collections.OrderedDict(),
+ summary=collections.OrderedDict())
def _empty_mcs_stat(self):
- return collections.OrderedDict(
- txmpdu=0,
- rxmpdu=0,
- mpdu_lost=0,
- retries=0,
- retries_short=0,
- retries_long=0)
+ return collections.OrderedDict(txmpdu=0,
+ rxmpdu=0,
+ mpdu_lost=0,
+ retries=0,
+ retries_short=0,
+ retries_long=0)
def _mcs_id_to_string(self, mcs_id):
mcs_string = '{} {}MHz Nss{} MCS{} {}Mbps'.format(
@@ -140,13 +135,12 @@
return stats_diff
def _generate_stats_summary(self, llstats_dict):
- llstats_summary = collections.OrderedDict(
- common_tx_mcs=None,
- common_tx_mcs_count=0,
- common_tx_mcs_freq=0,
- common_rx_mcs=None,
- common_rx_mcs_count=0,
- common_rx_mcs_freq=0)
+ llstats_summary = collections.OrderedDict(common_tx_mcs=None,
+ common_tx_mcs_count=0,
+ common_tx_mcs_freq=0,
+ common_rx_mcs=None,
+ common_rx_mcs_count=0,
+ common_rx_mcs_freq=0)
txmpdu_count = 0
rxmpdu_count = 0
for mcs_id, mcs_stats in llstats_dict['mcs_stats'].items():
@@ -329,8 +323,8 @@
if y_axis not in ['default', 'secondary']:
raise ValueError('y_axis must be default or secondary')
if color == None:
- color = self.COLORS[self.fig_property['num_lines'] % len(
- self.COLORS)]
+ color = self.COLORS[self.fig_property['num_lines'] %
+ len(self.COLORS)]
if style == 'dashed':
style = [5, 5]
if not hover_text:
@@ -375,11 +369,11 @@
if y_axis not in ['default', 'secondary']:
raise ValueError('y_axis must be default or secondary')
if color == None:
- color = self.COLORS[self.fig_property['num_lines'] % len(
- self.COLORS)]
+ color = self.COLORS[self.fig_property['num_lines'] %
+ len(self.COLORS)]
if marker == None:
- marker = self.MARKERS[self.fig_property['num_lines'] % len(
- self.MARKERS)]
+ marker = self.MARKERS[self.fig_property['num_lines'] %
+ len(self.MARKERS)]
if not hover_text:
hover_text = ['y={}'.format(y) for y in y_data]
self.figure_data.append({
@@ -407,44 +401,40 @@
two_axes = False
for line in self.figure_data:
source = bokeh.models.ColumnDataSource(
- data=dict(
- x=line['x_data'],
- y=line['y_data'],
- hover_text=line['hover_text']))
+ data=dict(x=line['x_data'],
+ y=line['y_data'],
+ hover_text=line['hover_text']))
if line['width'] > 0:
- self.plot.line(
- x='x',
- y='y',
- legend=line['legend'],
- line_width=line['width'],
- color=line['color'],
- line_dash=line['style'],
- name=line['y_axis'],
- y_range_name=line['y_axis'],
- source=source)
+ self.plot.line(x='x',
+ y='y',
+ legend=line['legend'],
+ line_width=line['width'],
+ color=line['color'],
+ line_dash=line['style'],
+ name=line['y_axis'],
+ y_range_name=line['y_axis'],
+ source=source)
if line['shaded_region']:
band_x = line['shaded_region']['x_vector']
band_x.extend(line['shaded_region']['x_vector'][::-1])
band_y = line['shaded_region']['lower_limit']
band_y.extend(line['shaded_region']['upper_limit'][::-1])
- self.plot.patch(
- band_x,
- band_y,
- color='#7570B3',
- line_alpha=0.1,
- fill_alpha=0.1)
+ self.plot.patch(band_x,
+ band_y,
+ color='#7570B3',
+ line_alpha=0.1,
+ fill_alpha=0.1)
if line['marker'] in self.MARKERS:
marker_func = getattr(self.plot, line['marker'])
- marker_func(
- x='x',
- y='y',
- size=line['marker_size'],
- legend=line['legend'],
- line_color=line['color'],
- fill_color=line['color'],
- name=line['y_axis'],
- y_range_name=line['y_axis'],
- source=source)
+ marker_func(x='x',
+ y='y',
+ size=line['marker_size'],
+ legend=line['legend'],
+ line_color=line['color'],
+ fill_color=line['color'],
+ name=line['y_axis'],
+ y_range_name=line['y_axis'],
+ source=source)
if line['y_axis'] == 'secondary':
two_axes = True
@@ -479,11 +469,10 @@
def _save_figure_json(self, output_file):
"""Function to save a json format of a figure"""
- figure_dict = collections.OrderedDict(
- fig_property=self.fig_property,
- figure_data=self.figure_data,
- tools=self.TOOLS,
- tooltips=self.TOOLTIPS)
+ figure_dict = collections.OrderedDict(fig_property=self.fig_property,
+ figure_data=self.figure_data,
+ tools=self.TOOLS,
+ tooltips=self.TOOLTIPS)
output_file = output_file.replace('.html', '_plot_data.json')
with open(output_file, 'w') as outfile:
json.dump(figure_dict, outfile, indent=4)
@@ -533,14 +522,13 @@
ping_interarrivals: A list-like object enumerating the amount of time
between the beginning of each subsequent transmission.
"""
-
def __init__(self, ping_output):
self.packet_loss_percentage = 100
self.transmission_times = []
self.rtts = _ListWrap(self.transmission_times, lambda entry: entry.rtt)
- self.timestamps = _ListWrap(
- self.transmission_times, lambda entry: entry.timestamp)
+ self.timestamps = _ListWrap(self.transmission_times,
+ lambda entry: entry.timestamp)
self.ping_interarrivals = _PingInterarrivals(self.transmission_times)
self.start_time = 0
@@ -585,7 +573,6 @@
rtt: The round trip time for the packet sent.
timestamp: The timestamp the packet started its trip.
"""
-
def __init__(self, timestamp, rtt):
self.rtt = rtt
self.timestamp = timestamp
@@ -593,7 +580,6 @@
class _ListWrap(object):
"""A convenient helper class for treating list iterators as native lists."""
-
def __init__(self, wrapped_list, func):
self.__wrapped_list = wrapped_list
self.__func = func
@@ -611,7 +597,6 @@
class _PingInterarrivals(object):
"""A helper class for treating ping interarrivals as a native list."""
-
def __init__(self, ping_entries):
self.__ping_entries = ping_entries
@@ -653,16 +638,17 @@
)
if isinstance(src_device, AndroidDevice):
ping_cmd = '{} {}'.format(ping_cmd, dest_address)
- ping_output = src_device.adb.shell(
- ping_cmd, timeout=ping_deadline + SHORT_SLEEP, ignore_status=True)
+ ping_output = src_device.adb.shell(ping_cmd,
+ timeout=ping_deadline + SHORT_SLEEP,
+ ignore_status=True)
elif isinstance(src_device, ssh.connection.SshConnection):
ping_cmd = 'sudo {} {}'.format(ping_cmd, dest_address)
- ping_output = src_device.run(
- ping_cmd, timeout=ping_deadline + SHORT_SLEEP,
- ignore_status=True).stdout
+ ping_output = src_device.run(ping_cmd,
+ timeout=ping_deadline + SHORT_SLEEP,
+ ignore_status=True).stdout
else:
- raise TypeError(
- 'Unable to ping using src_device of type %s.' % type(src_device))
+ raise TypeError('Unable to ping using src_device of type %s.' %
+ type(src_device))
return PingResult(ping_output.splitlines())
@@ -688,13 +674,15 @@
def get_connected_rssi(dut,
num_measurements=1,
polling_frequency=SHORT_SLEEP,
- first_measurement_delay=0):
+ first_measurement_delay=0,
+ disconnect_warning=True):
"""Gets all RSSI values reported for the connected access point/BSSID.
Args:
dut: android device object from which to get RSSI
num_measurements: number of scans done, and RSSIs collected
polling_frequency: time to wait between RSSI measurements
+ disconnect_warning: boolean controlling disconnection logging messages
Returns:
connected_rssi: dict containing the measurements results for
all reported RSSI values (signal_poll, per chain, etc.) and their
@@ -709,6 +697,7 @@
('chain_0_rssi', empty_rssi_result()),
('chain_1_rssi', empty_rssi_result())])
# yapf: enable
+ previous_bssid = 'disconnected'
t0 = time.time()
time.sleep(first_measurement_delay)
for idx in range(num_measurements):
@@ -718,10 +707,14 @@
status_output = dut.adb.shell(WPA_CLI_STATUS)
match = re.search('bssid=.*', status_output)
if match:
- bssid = match.group(0).split('=')[1]
- connected_rssi['bssid'].append(bssid)
+ current_bssid = match.group(0).split('=')[1]
+ connected_rssi['bssid'].append(current_bssid)
else:
- connected_rssi['bssid'].append(RSSI_ERROR_VAL)
+ current_bssid = 'disconnected'
+ connected_rssi['bssid'].append(current_bssid)
+ if disconnect_warning and previous_bssid != 'disconnected':
+ logging.warning('WIFI DISCONNECT DETECTED!')
+ previous_bssid = current_bssid
signal_poll_output = dut.adb.shell(SIGNAL_POLL)
match = re.search('FREQUENCY=.*', signal_poll_output)
if match:
@@ -786,7 +779,8 @@
def get_connected_rssi_nb(dut,
num_measurements=1,
polling_frequency=SHORT_SLEEP,
- first_measurement_delay=0):
+ first_measurement_delay=0,
+ disconnect_warning=True):
return get_connected_rssi(dut, num_measurements, polling_frequency,
first_measurement_delay)
@@ -810,8 +804,9 @@
time.sleep(MED_SLEEP)
scan_output = dut.adb.shell(SCAN_RESULTS)
for bssid in tracked_bssids:
- bssid_result = re.search(
- bssid + '.*', scan_output, flags=re.IGNORECASE)
+ bssid_result = re.search(bssid + '.*',
+ scan_output,
+ flags=re.IGNORECASE)
if bssid_result:
bssid_result = bssid_result.group(0).split('\t')
scan_rssi[bssid]['data'].append(int(bssid_result[2]))
@@ -853,7 +848,6 @@
atten.set_atten(atten_level)
-# Miscellaneous Wifi Utilities
def get_current_atten_dut_chain_map(attenuators, dut, ping_server):
"""Function to detect mapping between attenuator ports and DUT chains.
@@ -932,12 +926,11 @@
rf_map_by_atten = [[] for atten in attenuators]
for net_id, net_config in networks.items():
wutils.reset_wifi(dut)
- wutils.wifi_connect(
- dut,
- net_config,
- num_of_tries=1,
- assert_on_fail=False,
- check_connectivity=False)
+ wutils.wifi_connect(dut,
+ net_config,
+ num_of_tries=1,
+ assert_on_fail=False,
+ check_connectivity=False)
rf_map_by_network[net_id] = get_current_atten_dut_chain_map(
attenuators, dut, ping_server)
for idx, chain in enumerate(rf_map_by_network[net_id]):
@@ -952,6 +945,25 @@
return rf_map_by_network, rf_map_by_atten
+# Miscellaneous Wifi Utilities
+def validate_network(dut, ssid):
+ """Check that DUT has a valid internet connection through expected SSID
+
+ Args:
+ dut: android device of interest
+ ssid: expected ssid
+ """
+ current_network = dut.droid.wifiGetConnectionInfo()
+ try:
+ connected = wutils.validate_connection(dut) is not None
+ except:
+ connected = False
+ if connected and current_network['SSID'] == ssid:
+ return True
+ else:
+ return False
+
+
def get_server_address(ssh_connection, dut_ip, subnet_mask):
"""Get server address on a specific subnet,
diff --git a/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py b/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py
index 19cfab0..64307e9 100644
--- a/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py
@@ -95,11 +95,22 @@
self.timeout = timeout
def __enter__(self):
+ """Entry context manager for BlockingBrowser.
+
+ The enter context manager for BlockingBrowser attempts to lock the
+ browser file. If successful, it launches and returns a chromedriver
+ session. If an exception occurs while starting the browser, the lock
+ file is released.
+ """
self.lock_file = open(self.lock_file_path, "r")
start_time = time.time()
while time.time() < start_time + self.timeout:
try:
fcntl.flock(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except BlockingIOError:
+ time.sleep(BROWSER_WAIT_SHORT)
+ continue
+ try:
self.driver = selenium.webdriver.Chrome(
options=self.chrome_options,
desired_capabilities=self.chrome_capabilities)
@@ -109,11 +120,19 @@
super(splinter.driver.webdriver.chrome.WebDriver,
self).__init__(2)
return super(BlockingBrowser, self).__enter__()
- except BlockingIOError:
- time.sleep(BROWSER_WAIT_SHORT)
+ except:
+ fcntl.flock(self.lock_file, fcntl.LOCK_UN)
+ self.lock_file.close()
+ raise RuntimeError("Error starting browser. "
+ "Releasing lock file.")
raise TimeoutError("Could not start chrome browser in time.")
def __exit__(self, exc_type, exc_value, traceback):
+ """Exit context manager for BlockingBrowser.
+
+ The exit context manager simply calls the parent class exit and
+ releases the lock file.
+ """
try:
super(BlockingBrowser, self).__exit__(exc_type, exc_value,
traceback)
diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
index a44acf0..37426c6 100755
--- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
@@ -44,7 +44,6 @@
SHORT_TIMEOUT = 30
ROAMING_TIMEOUT = 30
WIFI_CONNECTION_TIMEOUT_DEFAULT = 30
-WIFI_ABNORMAL_CONNECTION_TIME = 10
# Speed of light in m/s.
SPEED_OF_LIGHT = 299792458
@@ -1167,9 +1166,7 @@
if id is None and ssid is None:
for i in range(tries):
try:
- start = time.time()
conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, 30)
- _assert_connection_time(start)
break
except Empty:
pass
@@ -1177,25 +1174,16 @@
# If ssid or network id is specified, wait for specific connect event.
for i in range(tries):
try:
- start = time.time()
conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED, 30)
if id and conn_result['data'][WifiEnums.NETID_KEY] == id:
- _assert_connection_time(start)
break
elif ssid and conn_result['data'][WifiEnums.SSID_KEY] == ssid:
- _assert_connection_time(start)
break
except Empty:
pass
return conn_result
-def _assert_connection_time(start):
- duration = time.time() - start
- asserts.assert_true(
- duration < WIFI_ABNORMAL_CONNECTION_TIME,
- "Took " + str(duration) + "s to connect to network, " +
- " expected " + str(WIFI_ABNORMAL_CONNECTION_TIME))
def wait_for_disconnect(ad, timeout=10):
"""Wait for a disconnect event within the specified timeout.
@@ -1436,6 +1424,7 @@
finally:
ad.droid.wifiStopTrackingStateChange()
+
def wifi_connect_using_network_request(ad, network, network_specifier,
num_of_tries=3, assert_on_fail=True):
"""Connect an Android device to a wifi network using network request.
diff --git a/acts/tests/google/ble/bt5/AdvertisingSetTest.py b/acts/tests/google/ble/bt5/AdvertisingSetTest.py
index da54b1d..de4192f 100644
--- a/acts/tests/google/ble/bt5/AdvertisingSetTest.py
+++ b/acts/tests/google/ble/bt5/AdvertisingSetTest.py
@@ -67,7 +67,7 @@
self.adv_ad = self.android_devices[0]
if not self.adv_ad.droid.bluetoothIsLeExtendedAdvertisingSupported():
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Advertiser does not support LE Extended Advertising")
def teardown_test(self):
diff --git a/acts/tests/google/ble/bt5/Bt5ScanTest.py b/acts/tests/google/ble/bt5/Bt5ScanTest.py
index 4caa376..e2c9c83 100644
--- a/acts/tests/google/ble/bt5/Bt5ScanTest.py
+++ b/acts/tests/google/ble/bt5/Bt5ScanTest.py
@@ -66,11 +66,11 @@
self.adv_ad = self.android_devices[1]
if not self.scn_ad.droid.bluetoothIsLeExtendedAdvertisingSupported():
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Scanner does not support LE Extended Advertising")
if not self.adv_ad.droid.bluetoothIsLeExtendedAdvertisingSupported():
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Advertiser does not support LE Extended Advertising")
def teardown_test(self):
diff --git a/acts/tests/google/ble/bt5/PhyTest.py b/acts/tests/google/ble/bt5/PhyTest.py
index 9b95ead..0b1ecfa 100644
--- a/acts/tests/google/ble/bt5/PhyTest.py
+++ b/acts/tests/google/ble/bt5/PhyTest.py
@@ -42,11 +42,11 @@
def setup_class(self):
super(PhyTest, self).setup_class()
if not self.cen_ad.droid.bluetoothIsLe2MPhySupported():
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Central device does not support LE 2M PHY")
if not self.per_ad.droid.bluetoothIsLe2MPhySupported():
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Peripheral device does not support LE 2M PHY")
# Some controllers auto-update PHY to 2M, and both client and server
diff --git a/acts/tests/google/bt/ota/BtOtaTest.py b/acts/tests/google/bt/ota/BtOtaTest.py
index 91e51bb..86e3098 100644
--- a/acts/tests/google/bt/ota/BtOtaTest.py
+++ b/acts/tests/google/bt/ota/BtOtaTest.py
@@ -32,14 +32,14 @@
# Pairing devices
if not pair_pri_to_sec(self.dut, self.android_devices[1]):
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Failed to bond devices prior to update")
#Run OTA below, if ota fails then abort all tests
try:
ota_updater.update(self.dut)
except Exception as err:
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Failed up apply OTA update. Aborting tests")
@BluetoothBaseTest.bt_test_wrap
diff --git a/acts/tests/google/bt/pts/A2dpPtsTest.py b/acts/tests/google/bt/pts/A2dpPtsTest.py
index 25ed3ce..2c2ffee 100644
--- a/acts/tests/google/bt/pts/A2dpPtsTest.py
+++ b/acts/tests/google/bt/pts/A2dpPtsTest.py
@@ -29,11 +29,10 @@
pts_action_mapping = None
def setup_class(self):
- super(A2dpPtsTest, self).setup_class()
+ super().setup_class()
self.dut.initialize_bluetooth_controller()
# self.dut.set_bluetooth_local_name(self.dut_bluetooth_local_name)
local_dut_mac_address = self.dut.get_local_bluetooth_address()
- self.pts.set_profile_under_test("A2DP")
ics = None
ixit = None
@@ -56,7 +55,11 @@
ics = f_ics_lib.A2DP_ICS
ixit = fuchsia_ixit
+ ### PTS SETUP: Required after ICS, IXIT, and profile is setup ###
+ self.pts.set_profile_under_test("A2DP")
self.pts.set_ics_and_ixit(ics, ixit)
+ self.pts.setup_pts()
+ ### End PTS Setup ###
self.dut.unbond_all_known_devices()
self.dut.start_pairing_helper()
diff --git a/acts/tests/google/bt/pts/GattPtsTest.py b/acts/tests/google/bt/pts/GattPtsTest.py
index 7883f37..4166f8e 100644
--- a/acts/tests/google/bt/pts/GattPtsTest.py
+++ b/acts/tests/google/bt/pts/GattPtsTest.py
@@ -33,12 +33,11 @@
pts_action_mapping = None
def setup_class(self):
- super(GattPtsTest, self).setup_class()
+ super().setup_class()
self.dut_bluetooth_local_name = "fs_test"
self.dut.initialize_bluetooth_controller()
self.dut.set_bluetooth_local_name(self.dut_bluetooth_local_name)
local_dut_mac_address = self.dut.get_local_bluetooth_address()
- self.pts.set_profile_under_test("GATT")
ics = None
ixit = None
@@ -72,7 +71,11 @@
"Unable to run PTS tests on unsupported hardare {}.".format(
type(self.dut)))
+ ### PTS SETUP: Required after ICS, IXIT, and profile is setup ###
+ self.pts.set_profile_under_test("GATT")
self.pts.set_ics_and_ixit(ics, ixit)
+ self.pts.setup_pts()
+ ### End PTS Setup ###
self.dut.unbond_all_known_devices()
self.dut.start_pairing_helper()
diff --git a/acts/tests/google/bt/pts/SdpPtsTest.py b/acts/tests/google/bt/pts/SdpPtsTest.py
index 84c7ab7..1dd5d66 100644
--- a/acts/tests/google/bt/pts/SdpPtsTest.py
+++ b/acts/tests/google/bt/pts/SdpPtsTest.py
@@ -97,12 +97,12 @@
class SdpPtsTest(PtsBaseClass):
+
def setup_class(self):
- super(SdpPtsTest, self).setup_class()
+ super().setup_class()
self.dut.initialize_bluetooth_controller()
# self.dut.set_bluetooth_local_name(self.dut_bluetooth_local_name)
local_dut_mac_address = self.dut.get_local_bluetooth_address()
- self.pts.set_profile_under_test("SDP")
ics = None
ixit = None
@@ -125,7 +125,11 @@
ics = f_ics_lib.SDP_ICS
ixit = fuchsia_ixit
+ ### PTS SETUP: Required after ICS, IXIT, and profile is setup ###
+ self.pts.set_profile_under_test("SDP")
self.pts.set_ics_and_ixit(ics, ixit)
+ self.pts.setup_pts()
+ ### End PTS Setup ###
self.dut.unbond_all_known_devices()
self.dut.set_discoverable(True)
diff --git a/acts/tests/google/fuchsia/examples/Sl4fSanityTest.py b/acts/tests/google/fuchsia/examples/Sl4fSanityTest.py
index a66e1f2..d2116a9 100644
--- a/acts/tests/google/fuchsia/examples/Sl4fSanityTest.py
+++ b/acts/tests/google/fuchsia/examples/Sl4fSanityTest.py
@@ -39,7 +39,7 @@
if len(self.fuchsia_devices) > 0:
self.log.info(success_str)
else:
- raise signals.TestSkipClass("err_str")
+ raise signals.TestAbortClass("err_str")
def test_example(self):
self.log.info("Congratulations! You've run your first test.")
diff --git a/acts/tests/google/fuchsia/netstack/NetstackIxiaTest.py b/acts/tests/google/fuchsia/netstack/NetstackIxiaTest.py
new file mode 100644
index 0000000..81d69bf
--- /dev/null
+++ b/acts/tests/google/fuchsia/netstack/NetstackIxiaTest.py
@@ -0,0 +1,169 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2019 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+from acts import asserts
+from acts.controllers.ap_lib import hostapd_ap_preset
+from acts.controllers.ap_lib import hostapd_bss_settings
+from acts.controllers.ap_lib import hostapd_constants
+from acts.controllers.ap_lib import hostapd_security
+
+from acts.test_utils.net.NetstackBaseTest import NetstackBaseTest
+
+from acts.utils import rand_ascii_str
+
+
+class NetstackIxiaTest(NetstackBaseTest):
+ def __init__(self, controllers):
+ NetstackBaseTest.__init__(self, controllers)
+
+ def setup_class(self):
+ self.log.info('Setup {cls}'.format(cls=type(self)))
+
+ if not self.fuchsia_devices:
+ self.log.error(
+ "NetstackFuchsiaTest Init: Not enough fuchsia devices.")
+ self.log.info("Running testbed setup with one fuchsia devices")
+ self.fuchsia_dev = self.fuchsia_devices[0]
+
+ # We want to bring up several 2GHz and 5GHz BSSes.
+ wifi_bands = ['2g', '5g']
+
+ # Currently AP_DEFAULT_CHANNEL_2G is 6
+ # and AP_DEFAULT_CHANNEL_5G is 36.
+ wifi_channels = [
+ hostapd_constants.AP_DEFAULT_CHANNEL_2G,
+ hostapd_constants.AP_DEFAULT_CHANNEL_5G
+ ]
+
+ # Each band will start up an Open BSS (security_mode=None)
+ # and a WPA2 BSS (security_mode=hostapd_constants.WPA2_STRING)
+ security_modes = [None, hostapd_constants.WPA2_STRING]
+
+ # All secure BSSes will use the same password.
+ wifi_password = rand_ascii_str(10)
+ self.log.info('Wi-Fi password for this test: {wifi_password}'.format(
+ wifi_password=wifi_password))
+ hostapd_configs = []
+ wifi_interfaces = {}
+ bss_settings = {}
+
+ # Build a configuration for each sub-BSSID
+ for band_index, wifi_band in enumerate(wifi_bands):
+ ssid_name = 'Ixia_{wifi_band}_#{bss_number}_{security_mode}'
+ bss_settings[wifi_band] = []
+
+ # Prepare the extra SSIDs.
+ for mode_index, security_mode in enumerate(security_modes):
+
+ # Skip the first SSID because we configure that separately.
+ # due to the way the APIs work. This loop is only concerned
+ # with the sub-BSSIDs.
+ if mode_index == 0:
+ continue
+
+ bss_name = ssid_name.format(wifi_band=wifi_band,
+ security_mode=security_mode,
+ bss_number=mode_index + 1)
+
+ bss_setting = hostapd_bss_settings.BssSettings(
+ name=bss_name,
+ ssid=bss_name,
+ security=hostapd_security.Security(
+ security_mode=security_mode, password=wifi_password))
+ bss_settings[wifi_band].append(bss_setting)
+
+ # This is the configuration for the first SSID.
+ ssid_name = ssid_name.format(wifi_band=wifi_band,
+ security_mode=security_modes[0],
+ bss_number=1)
+
+ hostapd_configs.append(
+ hostapd_ap_preset.create_ap_preset(
+ profile_name='whirlwind',
+ iface_wlan_2g='wlan0',
+ iface_wlan_5g='wlan1',
+ ssid=ssid_name,
+ channel=wifi_channels[band_index],
+ security=hostapd_security.Security(
+ security_mode=security_modes[0],
+ password=wifi_password),
+ bss_settings=bss_settings[wifi_band]))
+
+ access_point = self.access_points[band_index]
+
+ # Now bring up the AP and track the interfaces we're using for
+ # each BSSID. All BSSIDs are now beaconing.
+ wifi_interfaces[wifi_band] = access_point.start_ap(
+ hostapd_configs[band_index])
+
+ # Disable DHCP on this Wi-Fi band.
+ # Note: This also disables DHCP on each sub-BSSID due to how
+ # the APIs are built.
+ #
+ # We need to do this in order to enable IxANVL testing across
+ # Wi-Fi, which needs to configure the IP addresses per-interface
+ # on the client device.
+ access_point.stop_dhcp()
+
+ # Disable NAT.
+ # NAT config in access_point.py is global at the moment, but
+ # calling it twice (once per band) won't hurt anything. This is
+ # easier than trying to conditionalize per band.
+ #
+ # Note that we could make this per-band, but it would require
+ # refactoring the access_point.py code that turns on NAT, however
+ # if that ever does happen then this code will work as expected
+ # without modification.
+ #
+ # This is also required for IxANVL testing. NAT would interfere
+ # with IxANVL because IxANVL needs to see the raw frames
+ # sourcing/sinking from/to the DUT for protocols such as ARP and
+ # DHCP, but it also needs the MAC/IP of the source and destination
+ # frames and packets to be from the DUT, so we want the AP to act
+ # like a bridge for these tests.
+ access_point.stop_nat()
+
+ # eth1 is the LAN port, which will always be a part of the bridge.
+ bridge_interfaces = ['eth1']
+
+ # This adds each bssid interface to the bridge.
+ for wifi_band in wifi_bands:
+ for wifi_interface in wifi_interfaces[wifi_band]:
+ bridge_interfaces.append(wifi_interface)
+
+ # Each interface can only be a member of 1 bridge, so we're going to use
+ # the last access_point object to set the bridge up for all interfaces.
+ access_point.create_bridge(bridge_name='ixia_bridge0',
+ interfaces=bridge_interfaces)
+
+ def setup_test(self):
+ pass
+
+ def teardown_test(self):
+ pass
+
+ def teardown_class(self):
+ self.log.info('Teardown {cls}'.format(cls=type(self)))
+
+ import pdb
+ pdb.set_trace()
+
+ for access_point in self.access_points:
+ access_point.remove_bridge(bridge_name='ixia_bridge0')
+
+ """Tests"""
+ def test_do_nothing(self):
+ return True
diff --git a/acts/tests/google/gnss/FlpTtffTest.py b/acts/tests/google/gnss/FlpTtffTest.py
index 77e8cf0..0a36923 100644
--- a/acts/tests/google/gnss/FlpTtffTest.py
+++ b/acts/tests/google/gnss/FlpTtffTest.py
@@ -56,7 +56,6 @@
for network in self.pixel_lab_network:
SSID = network['SSID']
self.ssid_map[SSID] = network
-
if int(self.ad.adb.shell("settings get global airplane_mode_on")) != 0:
self.ad.log.info("Force airplane mode off")
force_airplane_mode(self.ad, False)
diff --git a/acts/tests/google/gnss/GnssSanityTest.py b/acts/tests/google/gnss/GnssSanityTest.py
index 4d707d5..28c2ccf 100644
--- a/acts/tests/google/gnss/GnssSanityTest.py
+++ b/acts/tests/google/gnss/GnssSanityTest.py
@@ -69,6 +69,8 @@
from acts.test_utils.gnss.gnss_test_utils import check_xtra_download
from acts.test_utils.gnss.gnss_test_utils import gnss_tracking_via_gtw_gpstool
from acts.test_utils.gnss.gnss_test_utils import parse_gtw_gpstool_log
+from acts.test_utils.gnss.gnss_test_utils import enable_supl_mode
+from acts.test_utils.gnss.gnss_test_utils import start_toggle_gnss_by_gtw_gpstool
class GnssSanityTest(BaseTestClass):
@@ -77,8 +79,8 @@
super().setup_class()
self.ad = self.android_devices[0]
req_params = ["pixel_lab_network", "standalone_cs_criteria",
- "supl_cs_criteria", "xtra_ws_criteria", "xtra_cs_criteria",
- "weak_signal_supl_cs_criteria",
+ "supl_cs_criteria", "xtra_ws_criteria",
+ "xtra_cs_criteria", "weak_signal_supl_cs_criteria",
"weak_signal_xtra_ws_criteria",
"weak_signal_xtra_cs_criteria",
"default_gnss_signal_attenuation",
@@ -98,7 +100,6 @@
else:
self.wifi_xtra_cs_criteria = self.xtra_cs_criteria
self.flash_new_radio_or_mbn()
-
set_attenuator_gnss_signal(self.ad, self.attenuators,
self.default_gnss_signal_attenuation)
_init_device(self.ad)
@@ -259,7 +260,8 @@
self.ad.log.error("\n%s" % error)
else:
self.ad.log.info("NO \"%s\" initialization error found." % attr)
- asserts.assert_true(error_mismatch, "Error message found after GNSS init")
+ asserts.assert_true(error_mismatch, "Error message found after GNSS "
+ "init")
@test_tracker_info(uuid="ff318483-411c-411a-8b1a-422bd54f4a3f")
def test_supl_capabilities(self):
@@ -436,7 +438,8 @@
4. DUT hang up call.
Expected Results:
- All SUPL TTFF Cold Start results should be less than supl_cs_criteria.
+ All SUPL TTFF Cold Start results should be less than
+ supl_cs_criteria.
"""
begin_time = get_current_epoch_time()
start_qxdm_logger(self.ad, begin_time)
@@ -539,9 +542,9 @@
start_ttff_by_gtw_gpstool(self.ad, ttff_mode="cs", iteration=3)
ttff_data = process_ttff_by_gtw_gpstool(self.ad, begin_time,
self.pixel_lab_location)
- supl_ssr_test_result = check_ttff_data(self.ad, ttff_data,
- ttff_mode="Cold Start",
- criteria=self.supl_cs_criteria)
+ supl_ssr_test_result = check_ttff_data(
+ self.ad, ttff_data, ttff_mode="Cold Start",
+ criteria=self.supl_cs_criteria)
self.ad.log.info("SUPL after Modem SSR test %d times -> %s"
% (times, supl_ssr_test_result))
supl_ssr_test_result_all.append(supl_ssr_test_result)
@@ -713,7 +716,9 @@
start_ttff_by_gtw_gpstool(self.ad, ttff_mode="ws", iteration=10)
ws_ttff_data = process_ttff_by_gtw_gpstool(self.ad, begin_time,
self.pixel_lab_location)
- ws_result = check_ttff_data(self.ad, ws_ttff_data, ttff_mode="Warm Start",
+ ws_result = check_ttff_data(self.ad,
+ ws_ttff_data,
+ ttff_mode="Warm Start",
criteria=self.xtra_ws_criteria)
xtra_result.append(ws_result)
begin_time = get_current_epoch_time()
@@ -721,7 +726,9 @@
start_ttff_by_gtw_gpstool(self.ad, ttff_mode="cs", iteration=10)
cs_ttff_data = process_ttff_by_gtw_gpstool(self.ad, begin_time,
self.pixel_lab_location)
- cs_result = check_ttff_data(self.ad, cs_ttff_data, ttff_mode="Cold Start",
+ cs_result = check_ttff_data(self.ad,
+ cs_ttff_data,
+ ttff_mode="Cold Start",
criteria=self.xtra_cs_criteria)
xtra_result.append(cs_result)
asserts.assert_true(all(xtra_result),
@@ -752,7 +759,9 @@
start_ttff_by_gtw_gpstool(self.ad, ttff_mode="ws", iteration=10)
ws_ttff_data = process_ttff_by_gtw_gpstool(self.ad, begin_time,
self.pixel_lab_location)
- ws_result = check_ttff_data(self.ad, ws_ttff_data, ttff_mode="Warm Start",
+ ws_result = check_ttff_data(self.ad,
+ ws_ttff_data,
+ ttff_mode="Warm Start",
criteria=self.weak_signal_xtra_ws_criteria)
xtra_result.append(ws_result)
begin_time = get_current_epoch_time()
@@ -760,7 +769,9 @@
start_ttff_by_gtw_gpstool(self.ad, ttff_mode="cs", iteration=10)
cs_ttff_data = process_ttff_by_gtw_gpstool(self.ad, begin_time,
self.pixel_lab_location)
- cs_result = check_ttff_data(self.ad, cs_ttff_data, ttff_mode="Cold Start",
+ cs_result = check_ttff_data(self.ad,
+ cs_ttff_data,
+ ttff_mode="Cold Start",
criteria=self.weak_signal_xtra_cs_criteria)
xtra_result.append(cs_result)
asserts.assert_true(all(xtra_result),
@@ -787,13 +798,15 @@
self.ad.log.info("Turn airplane mode on")
force_airplane_mode(self.ad, True)
wifi_toggle_state(self.ad, True)
- connect_to_wifi_network(self.ad,
- self.ssid_map[self.pixel_lab_network[0]["SSID"]])
+ connect_to_wifi_network(
+ self.ad, self.ssid_map[self.pixel_lab_network[0]["SSID"]])
process_gnss_by_gtw_gpstool(self.ad, self.standalone_cs_criteria)
start_ttff_by_gtw_gpstool(self.ad, ttff_mode="ws", iteration=10)
ws_ttff_data = process_ttff_by_gtw_gpstool(self.ad, begin_time,
self.pixel_lab_location)
- ws_result = check_ttff_data(self.ad, ws_ttff_data, ttff_mode="Warm Start",
+ ws_result = check_ttff_data(self.ad,
+ ws_ttff_data,
+ ttff_mode="Warm Start",
criteria=self.xtra_ws_criteria)
xtra_result.append(ws_result)
begin_time = get_current_epoch_time()
@@ -801,7 +814,9 @@
start_ttff_by_gtw_gpstool(self.ad, ttff_mode="cs", iteration=10)
cs_ttff_data = process_ttff_by_gtw_gpstool(self.ad, begin_time,
self.pixel_lab_location)
- cs_result = check_ttff_data(self.ad, cs_ttff_data, ttff_mode="Cold Start",
+ cs_result = check_ttff_data(self.ad,
+ cs_ttff_data,
+ ttff_mode="Cold Start",
criteria=self.wifi_xtra_cs_criteria)
xtra_result.append(cs_result)
asserts.assert_true(all(xtra_result),
@@ -833,9 +848,9 @@
start_ttff_by_gtw_gpstool(self.ad, ttff_mode="cs", iteration=3)
ttff_data = process_ttff_by_gtw_gpstool(self.ad, begin_time,
self.pixel_lab_location)
- xtra_ssr_test_result = check_ttff_data(self.ad, ttff_data,
- ttff_mode="Cold Start",
- criteria=self.xtra_cs_criteria)
+ xtra_ssr_test_result = check_ttff_data(
+ self.ad, ttff_data, ttff_mode="Cold Start",
+ criteria=self.xtra_cs_criteria)
self.ad.log.info("XTRA after Modem SSR test %d times -> %s"
% (times, xtra_ssr_test_result))
xtra_ssr_test_result_all.append(xtra_ssr_test_result)
@@ -889,8 +904,8 @@
self.ad.log.info("Turn airplane mode on")
force_airplane_mode(self.ad, True)
wifi_toggle_state(self.ad, True)
- connect_to_wifi_network(self.ad,
- self.ssid_map[self.pixel_lab_network[0]["SSID"]])
+ connect_to_wifi_network(
+ self.ad, self.ssid_map[self.pixel_lab_network[0]["SSID"]])
for i in range(1, 6):
begin_time = get_current_epoch_time()
process_gnss_by_gtw_gpstool(self.ad, self.standalone_cs_criteria)
@@ -901,3 +916,23 @@
self.ad.log.info("Iteraion %d => %s" % (i, wifi_xtra_result))
asserts.assert_true(all(wifi_xtra_result_all),
"Fail to Download XTRA file")
+
+ @test_tracker_info(uuid="2a9f2890-3c0a-48b8-821d-bf97e36355e9")
+ def test_quick_toggle_gnss_state(self):
+ """Verify GNSS can still work properly after quick toggle GNSS off
+ to on.
+
+ Steps:
+ 1. Launch GTW_GPSTool.
+ 2. Go to "Advance setting"
+ 3. Set Cycle = 10 & Time-out = 60
+ 4. Go to "Toggle GPS" tab
+ 5. Execute "Start"
+
+ Expected Results:
+ No single Timeout is seen in 10 iterations.
+ """
+ enable_supl_mode(self.ad)
+ reboot(self.ad)
+ start_qxdm_logger(self.ad, get_current_epoch_time())
+ start_toggle_gnss_by_gtw_gpstool(self.ad, iteration=10)
diff --git a/acts/tests/google/net/CaptivePortalTest.py b/acts/tests/google/net/CaptivePortalTest.py
new file mode 100644
index 0000000..10240e4
--- /dev/null
+++ b/acts/tests/google/net/CaptivePortalTest.py
@@ -0,0 +1,209 @@
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+
+from acts import asserts
+from acts import base_test
+from acts import signals
+from acts.libs.uicd.uicd_cli import UicdCli
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.net import connectivity_const as cconst
+from acts.test_utils.net import connectivity_test_utils as cutils
+from acts.test_utils.wifi import wifi_test_utils as wutils
+
+WifiEnums = wutils.WifiEnums
+IFACE = "InterfaceName"
+TIME_OUT = 20
+WLAN = "wlan0"
+
+
+class CaptivePortalTest(base_test.BaseTestClass):
+ """ Tests for Captive portal """
+
+ def setup_class(self):
+ """Setup devices for tests and unpack params
+
+ Required params:
+ 1. rk_captive_portal: SSID of ruckus captive portal network in dict
+ 2. gg_captive_portal: SSID of guestgate network in dict
+ 3. uicd_workflows: uicd workflow that specify click actions to accept
+ a captive portal connection. Ex: Click on SignIn, Accept & Continue
+ //wireless/android/platform/testing/wifi/configs/uicd/
+ 4. uic_zip: Zip file location of UICD application
+ """
+ self.dut = self.android_devices[0]
+ wutils.wifi_test_device_init(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+ req_params = ["rk_captive_portal",
+ "gg_captive_portal",
+ "uicd_workflows",
+ "uicd_zip"]
+ self.unpack_userparams(req_param_names=req_params,)
+ self.ui = UicdCli(self.uicd_zip, self.uicd_workflows)
+ self.rk_workflow_config = "rk_captive_portal_%s" % self.dut.model
+ self.gg_workflow_config = "gg_captive_portal_%s" % self.dut.model
+
+ def teardown_class(self):
+ """ Reset devices """
+ cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_OPPORTUNISTIC)
+
+ def setup_test(self):
+ """ Setup device """
+ self.dut.unlock_screen()
+
+ def teardown_test(self):
+ """ Reset to default state after each test """
+ wutils.reset_wifi(self.dut)
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+
+ ### Helper methods ###
+
+ def _verify_captive_portal(self, network, uicd_workflow):
+ """Connect to captive portal network using uicd workflow
+
+ Steps:
+ 1. Connect to captive portal network
+ 2. Run uicd workflow to accept connection
+ 3. Verify internet connectivity
+
+ Args:
+ 1. network: captive portal network to connect to
+ 2. uicd_workflow: ui workflow to accept captive portal conn
+ """
+ # connect to captive portal wifi network
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ self.dut, network[WifiEnums.SSID_KEY])
+ wutils.wifi_connect(self.dut, network, check_connectivity=False)
+
+ # run uicd
+ self.ui.run(self.dut.serial, uicd_workflow)
+
+ # wait for sometime for captive portal connection to go through
+ curr_time = time.time()
+ while time.time() < curr_time + TIME_OUT:
+ link_prop = self.dut.droid.connectivityGetActiveLinkProperties()
+ self.log.debug("Link properties %s" % link_prop)
+ if link_prop and link_prop[IFACE] == WLAN:
+ break
+ time.sleep(2)
+
+ # verify connectivity
+ internet = wutils.validate_connection(self.dut,
+ wutils.DEFAULT_PING_ADDR)
+ if not internet:
+ raise signals.TestFailure("Failed to connect to internet on %s" %
+ network[WifiEnums.SSID_KEY])
+
+ ### Test Cases ###
+
+ @test_tracker_info(uuid="b035b4f9-40f7-42f6-9941-ec27afe15040")
+ def test_ruckus_captive_portal_default(self):
+ """Verify captive portal network
+
+ Steps:
+ 1. Set default private dns mode
+ 2. Connect to ruckus captive portal network
+ 3. Verify connectivity
+ """
+ # set private dns to opportunistic
+ cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_OPPORTUNISTIC)
+
+ # verify connection to captive portal network
+ self._verify_captive_portal(self.rk_captive_portal,
+ self.rk_workflow_config)
+
+ @test_tracker_info(uuid="8ea18d80-0170-41b1-8945-fe14bcd4feab")
+ def test_ruckus_captive_portal_private_dns_off(self):
+ """Verify captive portal network
+
+ Steps:
+ 1. Turn off private dns mode
+ 2. Connect to ruckus captive portal network
+ 3. Verify connectivity
+ """
+ # turn off private dns
+ cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_OFF)
+
+ # verify connection to captive portal network
+ self._verify_captive_portal(self.rk_captive_portal,
+ self.rk_workflow_config)
+
+ @test_tracker_info(uuid="e8e05907-55f7-40e5-850c-b3111ceb31a4")
+ def test_ruckus_captive_portal_private_dns_strict(self):
+ """Verify captive portal network
+
+ Steps:
+ 1. Set strict private dns mode
+ 2. Connect to ruckus captive portal network
+ 3. Verify connectivity
+ """
+ # set private dns to strict mode
+ cutils.set_private_dns(self.dut,
+ cconst.PRIVATE_DNS_MODE_STRICT,
+ cconst.DNS_GOOGLE)
+
+ # verify connection to captive portal network
+ self._verify_captive_portal(self.rk_captive_portal,
+ self.rk_workflow_config)
+
+ @test_tracker_info(uuid="76e49800-f141-4fd2-9969-562585eb1e7a")
+ def test_guestgate_captive_portal_default(self):
+ """Verify captive portal network
+
+ Steps:
+ 1. Set default private dns mode
+ 2. Connect to guestgate captive portal network
+ 3. Verify connectivity
+ """
+ # set private dns to opportunistic
+ cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_OPPORTUNISTIC)
+
+ # verify connection to captive portal network
+ self._verify_captive_portal(self.gg_captive_portal, "gg_captive_portal")
+
+ @test_tracker_info(uuid="0aea0cac-0f42-406b-84ba-62c1ef74adfc")
+ def test_guestgate_captive_portal_private_dns_off(self):
+ """Verify captive portal network
+
+ Steps:
+ 1. Turn off private dns mode
+ 2. Connect to guestgate captive portal network
+ 3. Verify connectivity
+ """
+ # turn off private dns
+ cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_OFF)
+
+ # verify connection to captive portal network
+ self._verify_captive_portal(self.gg_captive_portal, "gg_captive_portal")
+
+ @test_tracker_info(uuid="39124dcc-2fd3-4d33-b129-a1c8150b7f2a")
+ def test_guestgate_captive_portal_private_dns_strict(self):
+ """Verify captive portal network
+
+ Steps:
+ 1. Set strict private dns mode
+ 2. Connect to guestgate captive portal network
+ 3. Verify connectivity
+ """
+ # set private dns to strict mode
+ cutils.set_private_dns(self.dut,
+ cconst.PRIVATE_DNS_MODE_STRICT,
+ cconst.DNS_GOOGLE)
+
+ # verify connection to captive portal network
+ self._verify_captive_portal(self.gg_captive_portal, "gg_captive_portal")
diff --git a/acts/tests/google/net/CoreNetworkingOTATest.py b/acts/tests/google/net/CoreNetworkingOTATest.py
index 2444971..5b350f8 100755
--- a/acts/tests/google/net/CoreNetworkingOTATest.py
+++ b/acts/tests/google/net/CoreNetworkingOTATest.py
@@ -84,7 +84,7 @@
for ad in self.android_devices:
ota_updater.update(ad)
except Exception as err:
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Failed up apply OTA update. Aborting tests")
def on_fail(self, test_name, begin_time):
diff --git a/acts/tests/google/net/DnsOverTlsTest.py b/acts/tests/google/net/DnsOverTlsTest.py
index e8a883d..84646d7 100644
--- a/acts/tests/google/net/DnsOverTlsTest.py
+++ b/acts/tests/google/net/DnsOverTlsTest.py
@@ -95,7 +95,7 @@
"""
try:
packets = rdpcap(pcap_file)
- except Scapy_Excaption:
+ except Scapy_Exception:
asserts.fail("Not a valid pcap file")
for pkt in packets:
summary = "%s" % pkt.summary()
diff --git a/acts/tests/google/net/LegacyVpnTest.py b/acts/tests/google/net/LegacyVpnTest.py
index 1e037de..a4bfc52 100644
--- a/acts/tests/google/net/LegacyVpnTest.py
+++ b/acts/tests/google/net/LegacyVpnTest.py
@@ -48,17 +48,15 @@
"""
self.dut = self.android_devices[0]
required_params = dir(VPN_PARAMS)
- required_params = [x for x in required_params if not x.startswith('__')]
- optional_params = ["reference_networks", "wpa_networks",]
- self.unpack_userparams(req_param_names=required_params,
- opt_param_names=optional_params)
- if "AccessPoint" in self.user_params:
- self.legacy_configure_ap_and_start(wpa_network=True)
- asserts.assert_true(len(self.reference_networks) > 0,
- "Need at least one reference network with psk.")
- self.wifi_network = self.reference_networks[0]["2g"]
+ required_params = [
+ x for x in required_params if not x.startswith('__')
+ ] + ["wifi_network"]
+ self.unpack_userparams(req_param_names=required_params)
+
wutils.wifi_test_device_init(self.dut)
wutils.wifi_toggle_state(self.dut, True)
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ self.dut, self.wifi_network["SSID"])
wutils.wifi_connect(self.dut, self.wifi_network)
time.sleep(3)
diff --git a/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py b/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py
index 204beb0..09b47a6 100644
--- a/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py
+++ b/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py
@@ -19,7 +19,7 @@
import time
import os
from acts import utils
-MDLOG_RUNNING_TIME = 300
+MDLOG_RUNNING_TIME = 120
class PowerGnssDpoSimTest(GBT.PowerGnssBaseTest):
"""Power baseline tests for rockbottom state.
@@ -42,7 +42,7 @@
self.dut.send_keycode("SLEEP")
self.measure_gnsspower_test_func()
diaglog.start_diagmdlog_background(self.dut, maskfile=self.maskfile)
- time.sleep(MDLOG_RUNNING_TIME)
+ self.disconnect_usb(self.dut, MDLOG_RUNNING_TIME)
qxdm_log_path = os.path.join(self.log_path, 'QXDM')
diaglog.stop_background_diagmdlog(self.dut, qxdm_log_path)
@@ -51,6 +51,11 @@
self.dut.send_keycode("SLEEP")
self.measure_gnsspower_test_func()
diaglog.start_diagmdlog_background(self.dut, maskfile=self.maskfile)
- time.sleep(MDLOG_RUNNING_TIME)
+ self.disconnect_usb(self.dut, MDLOG_RUNNING_TIME)
qxdm_log_path = os.path.join(self.log_path, 'QXDM')
diaglog.stop_background_diagmdlog(self.dut, qxdm_log_path)
+
+ def test_gnss_rockbottom(self):
+ self.dut.send_keycode("SLEEP")
+ time.sleep(120)
+ self.measure_gnsspower_test_func()
diff --git a/acts/tests/google/tel/lab/TelLabDataTest.py b/acts/tests/google/tel/lab/TelLabDataTest.py
index 39f171a..c48446c 100644
--- a/acts/tests/google/tel/lab/TelLabDataTest.py
+++ b/acts/tests/google/tel/lab/TelLabDataTest.py
@@ -59,6 +59,7 @@
from acts.test_utils.tel.tel_test_utils import ensure_network_rat
from acts.test_utils.tel.tel_test_utils import ensure_phones_idle
from acts.test_utils.tel.tel_test_utils import ensure_network_generation
+from acts.test_utils.tel.tel_test_utils import get_host_ip_address
from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode
from acts.test_utils.tel.tel_test_utils import iperf_test_by_adb
from acts.test_utils.tel.tel_test_utils import start_qxdm_loggers
@@ -180,11 +181,7 @@
time.sleep(self.SETTLING_TIME)
# Fetch IP address of the host machine
- cmd = "|".join(("ifconfig", "grep eth0 -A1", "grep inet",
- "cut -d ':' -f2", "cut -d ' ' -f 1"))
- destination_ip = exe_cmd(cmd)
- destination_ip = (destination_ip.decode("utf-8")).split("\n")[0]
- self.log.info("Dest IP is %s", destination_ip)
+ destination_ip = get_host_ip_address(self)
if not adb_shell_ping(self.ad, DEFAULT_PING_DURATION,
destination_ip):
diff --git a/acts/tests/google/tel/lab/TelLabMobilityTest.py b/acts/tests/google/tel/lab/TelLabMobilityTest.py
index dd5c2e6..f620826 100644
--- a/acts/tests/google/tel/lab/TelLabMobilityTest.py
+++ b/acts/tests/google/tel/lab/TelLabMobilityTest.py
@@ -49,6 +49,7 @@
from acts.test_utils.tel.tel_defines import WAIT_TIME_IN_CALL_FOR_IMS
from acts.test_utils.tel.tel_test_utils import ensure_network_rat
from acts.test_utils.tel.tel_test_utils import ensure_phones_idle
+from acts.test_utils.tel.tel_test_utils import get_host_ip_address
from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb
from acts.test_utils.tel.tel_test_utils import toggle_volte
from acts.test_utils.tel.tel_test_utils import run_multithread_func
@@ -246,11 +247,7 @@
def iperf_setup(self):
# Fetch IP address of the host machine
- cmd = "|".join(("ifconfig", "grep eth0 -A1", "grep inet",
- "cut -d ':' -f2", "cut -d ' ' -f 1"))
- destination_ip = exe_cmd(cmd)
- destination_ip = (destination_ip.decode("utf-8")).split("\n")[0]
- self.log.info("Dest IP is %s", destination_ip)
+ destination_ip = get_host_ip_address(self)
if not adb_shell_ping(self.ad, DEFAULT_PING_DURATION, destination_ip):
self.log.error("Pings failed to Destination.")
diff --git a/acts/tests/google/tel/live/TelLiveConnectivityMonitorBaseTest.py b/acts/tests/google/tel/live/TelLiveConnectivityMonitorBaseTest.py
index c9378e4..135f583 100644
--- a/acts/tests/google/tel/live/TelLiveConnectivityMonitorBaseTest.py
+++ b/acts/tests/google/tel/live/TelLiveConnectivityMonitorBaseTest.py
@@ -37,6 +37,7 @@
from acts.test_utils.tel.tel_test_utils import get_device_epoch_time
from acts.test_utils.tel.tel_test_utils import get_model_name
from acts.test_utils.tel.tel_test_utils import get_operator_name
+from acts.test_utils.tel.tel_test_utils import get_outgoing_voice_sub_id
from acts.test_utils.tel.tel_test_utils import hangup_call
from acts.test_utils.tel.tel_test_utils import last_call_drop_reason
from acts.test_utils.tel.tel_test_utils import reboot_device
@@ -125,9 +126,11 @@
self.ad_reference = self.android_devices[1]
self.dut_model = get_model_name(self.dut)
self.dut_operator = get_operator_name(self.log, self.dut)
- self.dut_capabilities = self.dut.telephony.get("capabilities", [])
- self.dut_wfc_modes = self.dut.telephony.get("wfc_modes", [])
- self.reference_capabilities = self.ad_reference.telephony.get(
+ self.dut_subID = get_outgoing_voice_sub_id(self.dut)
+ self.dut_capabilities = self.dut.telephony["subscription"][self.dut_subID].get("capabilities", [])
+ self.dut_wfc_modes = self.dut.telephony["subscription"][self.dut_subID].get("wfc_modes", [])
+ self.ad_reference_subID = get_outgoing_voice_sub_id(self.ad_reference)
+ self.reference_capabilities = self.ad_reference.telephony["subscription"][self.ad_reference_subID].get(
"capabilities", [])
self.dut.log.info("DUT capabilities: %s", self.dut_capabilities)
self.skip_reset_between_cases = False
diff --git a/acts/tests/google/tel/live/TelLiveDataTest.py b/acts/tests/google/tel/live/TelLiveDataTest.py
index df59891..3d967cc 100644
--- a/acts/tests/google/tel/live/TelLiveDataTest.py
+++ b/acts/tests/google/tel/live/TelLiveDataTest.py
@@ -3224,6 +3224,52 @@
resume_internet_with_sl4a_port(dut, sl4a_port)
+ def _test_airplane_mode_stress(self):
+ ad = self.android_devices[0]
+ total_iteration = self.stress_test_number
+ fail_count = collections.defaultdict(int)
+ current_iteration = 1
+ for i in range(1, total_iteration + 1):
+ msg = "Airplane mode test Iteration: <%s> / <%s>" % (i, total_iteration)
+ self.log.info(msg)
+ if not toggle_airplane_mode(ad.log, ad, True):
+ ad.log.error("Toggle APM on failed")
+ fail_count["apm_on"] += 1
+ ad.log.error(">----Iteration : %d/%d failed.----<",
+ i, total_iteration)
+ if not toggle_airplane_mode(ad.log, ad, False):
+ ad.log.error("Toggle APM off failed")
+ fail_count["apm_off"] += 1
+ ad.log.error(">----Iteration : %d/%d failed.----<",
+ i, total_iteration)
+ ad.log.info(">----Iteration : %d/%d succeeded.----<",
+ i, total_iteration)
+ current_iteration += 1
+ test_result = True
+ for failure, count in fail_count.items():
+ if count:
+ ad.log.error("%s: %s %s failures in %s iterations",
+ self.test_name, count, failure,
+ total_iteration)
+ test_result = False
+ return test_result
+
+ @test_tracker_info(uuid="3a82728f-18b5-4a35-9eab-4e6cf55271d9")
+ @TelephonyBaseTest.tel_test_wrap
+ def test_apm_toggle_stress(self):
+ """ Test airplane mode toggle
+
+ 1. Start with airplane mode off
+ 2. Toggle airplane mode on
+ 3. Toggle airplane mode off
+ 4. Repeat above steps
+
+ Returns:
+ True if pass; False if fail.
+ """
+ return self._test_airplane_mode_stress()
+
+
@test_tracker_info(uuid="fda33416-698a-408f-8ddc-b5cde13b1f83")
@TelephonyBaseTest.tel_test_wrap
def test_data_stall_detection_cellular(self):
diff --git a/acts/tests/google/tel/live/TelLiveImsSettingsTest.py b/acts/tests/google/tel/live/TelLiveImsSettingsTest.py
index 64f81ca..7a6d384 100644
--- a/acts/tests/google/tel/live/TelLiveImsSettingsTest.py
+++ b/acts/tests/google/tel/live/TelLiveImsSettingsTest.py
@@ -86,9 +86,9 @@
subid].get("capabilities", [])
self.dut.log.info("DUT capabilities: %s", self.dut_capabilities)
if CAPABILITY_VOLTE not in self.dut_capabilities:
- raise signals.TestSkipClass("VoLTE is not supported")
+ raise signals.TestAbortClass("VoLTE is not supported")
if CAPABILITY_WFC not in self.dut_capabilities:
- raise signals.TestSkipClass("WFC is not supported")
+ raise signals.TestAbortClass("WFC is not supported")
self.default_volte = (CAPABILITY_VOLTE in self.dut_capabilities) and (
self.carrier_configs[CarrierConfigs.
diff --git a/acts/tests/google/tel/live/TelLiveRebootStressTest.py b/acts/tests/google/tel/live/TelLiveRebootStressTest.py
index 70677e1..3b6482a 100644
--- a/acts/tests/google/tel/live/TelLiveRebootStressTest.py
+++ b/acts/tests/google/tel/live/TelLiveRebootStressTest.py
@@ -95,12 +95,11 @@
self.user_params["check_crash"] = False
self.skip_reset_between_cases = False
- self.dut_capabilities = self.dut.telephony.get("capabilities", [])
- self.dut_wfc_modes = self.dut.telephony.get("wfc_modes", [])
+ self.dut_subID = get_outgoing_voice_sub_id(self.dut)
+ self.dut_capabilities = self.dut.telephony["subscription"][self.dut_subID].get("capabilities", [])
+ self.dut_wfc_modes = self.dut.telephony["subscription"][self.dut_subID].get("wfc_modes", [])
self.default_testing_func_names = []
- for method in ("_check_volte", "_check_vt", "_check_csfb",
- "_check_tethering", "_check_wfc_apm",
- "_check_wfc_nonapm", "_check_3g"):
+ for method in ("_check_volte", "_check_3g"):
func = getattr(self, method)
try:
check_result = func()
@@ -119,8 +118,7 @@
def feature_validator(self, *args):
failed_tests = []
for method in ("_check_subscription", "_check_data",
- "_check_sms_mt", "_check_call_setup_teardown",
- "_check_sms"):
+ "_check_call_setup_teardown", "_check_sms"):
func = getattr(self, method)
if not func():
self.log.error("%s failed", method)
diff --git a/acts/tests/google/tel/live/TelLiveSettingsTest.py b/acts/tests/google/tel/live/TelLiveSettingsTest.py
index 791e9e7..10b045f 100644
--- a/acts/tests/google/tel/live/TelLiveSettingsTest.py
+++ b/acts/tests/google/tel/live/TelLiveSettingsTest.py
@@ -51,7 +51,8 @@
self.number_of_devices = 1
self.stress_test_number = self.get_stress_test_number()
self.carrier_configs = dumpsys_carrier_config(self.dut)
- self.dut_capabilities = self.dut.telephony.get("capabilities", [])
+ self.dut_subID = get_outgoing_voice_sub_id(self.dut)
+ self.dut_capabilities = self.dut.telephony["subscription"][self.dut_subID].get("capabilities", [])
@test_tracker_info(uuid="c6149bd6-7080-453d-af37-1f9bd350a764")
@TelephonyBaseTest.tel_test_wrap
diff --git a/acts/tests/google/tel/live/TelLiveStressTest.py b/acts/tests/google/tel/live/TelLiveStressTest.py
index 28b03b0..ec364ce 100644
--- a/acts/tests/google/tel/live/TelLiveStressTest.py
+++ b/acts/tests/google/tel/live/TelLiveStressTest.py
@@ -251,6 +251,19 @@
ad.log.info("RAT 2G is enabled successfully.")
return True
+ def _get_network_rat(self, slot_id):
+ rat = self.dut.adb.getprop("gsm.network.type")
+ if "," in rat:
+ if self.dsds_esim:
+ rat = rat.split(',')[slot_id]
+ else:
+ (rat1, rat2) = rat.split(',')
+ if rat1 == "Unknown":
+ rat = rat2
+ else:
+ rat = rat1
+ return rat
+
def _send_message(self, max_wait_time=2 * MAX_WAIT_TIME_SMS_RECEIVE):
slot_id_rx = None
if self.single_phone_test:
@@ -276,12 +289,7 @@
0: sms_send_receive_verify,
1: mms_send_receive_verify
}
- rat = self.dut.adb.getprop("gsm.network.type")
- if "," in rat:
- if self.dsds_esim:
- rat = rat.split(',')[slot_id]
- else:
- rat = rat.split(',')[0]
+ rat = self._get_network_rat(slot_id)
self.dut.log.info("Network in RAT %s", rat)
if self.dut_incall and not is_rat_svd_capable(rat.upper()):
self.dut.log.info("In call data not supported, test SMS only")
@@ -336,12 +344,7 @@
self.log.error("%s fails", log_msg)
self.result_info["%s Failure" % message_type] += 1
else:
- rat = self.dut.adb.getprop("gsm.network.type")
- if "," in rat:
- if self.dsds_esim:
- rat = rat.split(',')[slot_id]
- else:
- rat = rat.split(',')[0]
+ rat = self._get_network_rat(slot_id)
self.dut.log.info("Network in RAT %s", rat)
if self.dut_incall and not is_rat_svd_capable(rat.upper()):
self.dut.log.info(
@@ -782,13 +785,8 @@
file_name = file_names[selection]
self.result_info["Internet Connection Check Total"] += 1
+ rat = self._get_network_rat(slot_id)
if not self.internet_connection_check_method(self.log, self.dut):
- rat = self.dut.adb.getprop("gsm.network.type")
- if "," in rat:
- if self.dsds_esim:
- rat = rat.split(',')[slot_id]
- else:
- rat = rat.split(',')[0]
self.dut.log.info("Network in RAT %s", rat)
if self.dut_incall and not is_rat_svd_capable(rat.upper()):
self.result_info[
@@ -1103,7 +1101,7 @@
def test_lte_volte_parallel_stress(self):
""" VoLTE on stress test"""
if CAPABILITY_VOLTE not in self.dut_capabilities:
- raise signals.TestSkipClass("VoLTE is not supported")
+ raise signals.TestAbortClass("VoLTE is not supported")
return self.parallel_tests(
setup_func=self._setup_lte_volte_enabled,
call_verification_func=is_phone_in_call_volte)
@@ -1121,7 +1119,7 @@
def test_wfc_parallel_stress(self):
""" Wifi calling APM mode off stress test"""
if CAPABILITY_WFC not in self.dut_capabilities:
- raise signals.TestSkipClass("WFC is not supported")
+ raise signals.TestAbortClass("WFC is not supported")
if WFC_MODE_WIFI_PREFERRED not in self.dut_wfc_modes:
raise signals.TestSkip("WFC_MODE_WIFI_PREFERRED is not supported")
return self.parallel_tests(
@@ -1133,7 +1131,7 @@
def test_wfc_apm_parallel_stress(self):
""" Wifi calling in APM mode on stress test"""
if CAPABILITY_WFC not in self.dut_capabilities:
- raise signals.TestSkipClass("WFC is not supported")
+ raise signals.TestAbortClass("WFC is not supported")
return self.parallel_tests(
setup_func=self._setup_wfc_apm,
call_verification_func=is_phone_in_call_iwlan)
@@ -1159,7 +1157,7 @@
def test_volte_modeprefchange_parallel_stress(self):
""" VoLTE Mode Pref call stress test"""
if CAPABILITY_VOLTE not in self.dut_capabilities:
- raise signals.TestSkipClass("VoLTE is not supported")
+ raise signals.TestAbortClass("VoLTE is not supported")
return self.parallel_with_network_change_tests(
setup_func=self._setup_lte_volte_enabled)
diff --git a/acts/tests/google/wifi/WifiAutoUpdateTest.py b/acts/tests/google/wifi/WifiAutoUpdateTest.py
index 04fb850..33369a2 100755
--- a/acts/tests/google/wifi/WifiAutoUpdateTest.py
+++ b/acts/tests/google/wifi/WifiAutoUpdateTest.py
@@ -93,7 +93,7 @@
try:
ota_updater.update(self.dut)
except Exception as err:
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Failed up apply OTA update. Aborting tests")
def setup_test(self):
diff --git a/acts/tests/google/wifi/WifiChaosTest.py b/acts/tests/google/wifi/WifiChaosTest.py
index aa36588..0fa77b3 100755
--- a/acts/tests/google/wifi/WifiChaosTest.py
+++ b/acts/tests/google/wifi/WifiChaosTest.py
@@ -213,15 +213,13 @@
Steps:
1. Send a few link probes.
- 2. Verify that at least one link probe succeeded.
- 3. Ensure that the device and AP did not crash (by checking that the
+ 2. Ensure that the device and AP did not crash (by checking that the
device remains connected to the expected network).
"""
results = wutils.send_link_probes(
self.dut, NUM_LINK_PROBES, PROBE_DELAY_SEC)
- asserts.assert_true(any(result.is_success for result in results),
- "Expect at least 1 probe success: " + str(results))
+ self.log.info("Link Probe results: %s" % (results,))
wifi_info = self.dut.droid.wifiGetConnectionInfo()
expected = network[WifiEnums.SSID_KEY]
@@ -269,12 +267,13 @@
self.log.info("Connecting to %s" % ssid)
self.scan_and_connect_by_id(network, net_id)
self.run_ping(10)
- self.send_link_probes(network)
+ # TODO(b/133369482): uncomment once bug is resolved
+ # self.send_link_probes(network)
wutils.wifi_forget_network(self.dut, ssid)
time.sleep(WAIT_BEFORE_CONNECTION)
- except:
+ except Exception as e:
self.log.error("Connection to %s network failed on the %d "
- "attempt." % (ssid, attempt))
+ "attempt with exception %s." % (ssid, attempt, e))
# TODO:(bmahadev) Uncomment after scan issue is fixed.
# self.dut.take_bug_report(ssid, begin_time)
# self.dut.cat_adb_log(ssid, begin_time)
diff --git a/acts/tests/google/wifi/WifiCrashStressTest.py b/acts/tests/google/wifi/WifiCrashStressTest.py
index bf17ada..837112a 100644
--- a/acts/tests/google/wifi/WifiCrashStressTest.py
+++ b/acts/tests/google/wifi/WifiCrashStressTest.py
@@ -41,7 +41,7 @@
wutils.wifi_test_device_init(self.dut)
wutils.wifi_test_device_init(self.dut_client)
if not self.dut.is_apk_installed("com.google.mdstest"):
- raise signals.TestSkipClass("mdstest is not installed")
+ raise signals.TestAbortClass("mdstest is not installed")
req_params = ["dbs_supported_models", "stress_count"]
opt_param = ["reference_networks"]
self.unpack_userparams(
diff --git a/acts/tests/google/wifi/WifiMacRandomizationTest.py b/acts/tests/google/wifi/WifiMacRandomizationTest.py
index 67fd99e..5ad2188 100644
--- a/acts/tests/google/wifi/WifiMacRandomizationTest.py
+++ b/acts/tests/google/wifi/WifiMacRandomizationTest.py
@@ -271,14 +271,15 @@
@test_tracker_info(uuid="2dd0a05e-a318-45a6-81cd-962e098fa242")
def test_set_mac_randomization_to_none(self):
self.pcap_procs = wutils.start_pcap(
- self.packet_capture, 'dual', self.log_path, self.test_name)
+ self.packet_capture, 'dual', self.test_name)
network = self.wpapsk_2g
# Set macRandomizationSetting to RANDOMIZATION_NONE.
network["macRand"] = RANDOMIZATION_NONE
self.connect_to_network_and_verify_mac_randomization(network,
status=RANDOMIZATION_NONE)
- pcap_fname = os.path.join(self.log_path, self.test_name,
- (self.test_name + '_2G.pcap'))
+ pcap_fname = '%s_%s.pcap' % \
+ (self.pcap_procs[hostapd_constants.BAND_2G][1],
+ hostapd_constants.BAND_2G.upper())
time.sleep(SHORT_TIMEOUT)
wutils.stop_pcap(self.packet_capture, self.pcap_procs, False)
packets = rdpcap(pcap_fname)
@@ -460,14 +461,15 @@
if not result:
raise ValueError("Failed to configure channel for 2G band")
self.pcap_procs = wutils.start_pcap(
- self.packet_capture, 'dual', self.log_path, self.test_name)
+ self.packet_capture, 'dual', self.test_name)
# re-connect to the softAp network after sniffer is started
wutils.connect_to_wifi_network(self.dut_client, self.wpapsk_2g)
wutils.connect_to_wifi_network(self.dut_client, softap)
time.sleep(SHORT_TIMEOUT)
wutils.stop_pcap(self.packet_capture, self.pcap_procs, False)
- pcap_fname = os.path.join(self.log_path, self.test_name,
- (self.test_name + '_2G.pcap'))
+ pcap_fname = '%s_%s.pcap' % \
+ (self.pcap_procs[hostapd_constants.BAND_2G][1],
+ hostapd_constants.BAND_2G.upper())
packets = rdpcap(pcap_fname)
self.verify_mac_not_found_in_pcap(self.soft_ap_factory_mac, packets)
self.verify_mac_not_found_in_pcap(self.sta_factory_mac, packets)
@@ -519,13 +521,14 @@
"""
self.pcap_procs = wutils.start_pcap(
- self.packet_capture, 'dual', self.log_path, self.test_name)
+ self.packet_capture, 'dual', self.test_name)
time.sleep(SHORT_TIMEOUT)
network = self.wpapsk_5g
rand_mac = self.connect_to_network_and_verify_mac_randomization(network)
wutils.send_link_probes(self.dut, 3, 3)
- pcap_fname = os.path.join(self.log_path, self.test_name,
- (self.test_name + '_5G.pcap'))
+ pcap_fname = '%s_%s.pcap' % \
+ (self.pcap_procs[hostapd_constants.BAND_5G][1],
+ hostapd_constants.BAND_5G.upper())
wutils.stop_pcap(self.packet_capture, self.pcap_procs, False)
time.sleep(SHORT_TIMEOUT)
packets = rdpcap(pcap_fname)
@@ -545,11 +548,12 @@
"""
self.pcap_procs = wutils.start_pcap(
- self.packet_capture, 'dual', self.log_path, self.test_name)
+ self.packet_capture, 'dual', self.test_name)
wutils.start_wifi_connection_scan(self.dut)
time.sleep(SHORT_TIMEOUT)
wutils.stop_pcap(self.packet_capture, self.pcap_procs, False)
- pcap_fname = os.path.join(self.log_path, self.test_name,
- (self.test_name + '_2G.pcap'))
+ pcap_fname = '%s_%s.pcap' % \
+ (self.pcap_procs[hostapd_constants.BAND_2G][1],
+ hostapd_constants.BAND_2G.upper())
packets = rdpcap(pcap_fname)
self.verify_mac_not_found_in_pcap(self.sta_factory_mac, packets)
diff --git a/acts/tests/google/wifi/WifiNetworkSuggestionTest.py b/acts/tests/google/wifi/WifiNetworkSuggestionTest.py
index 885a31d..19a3ae5 100644
--- a/acts/tests/google/wifi/WifiNetworkSuggestionTest.py
+++ b/acts/tests/google/wifi/WifiNetworkSuggestionTest.py
@@ -178,11 +178,15 @@
def remove_suggestions_disconnect_and_ensure_no_connection_back(self,
network_suggestions,
expected_ssid):
- # Remove suggestion trigger disconnect and wait for the disconnect.
self.dut.log.info("Removing network suggestions")
asserts.assert_true(
self.dut.droid.wifiRemoveNetworkSuggestions(network_suggestions),
"Failed to remove suggestions")
+ # Ensure we did not disconnect
+ wutils.ensure_no_disconnect(self.dut)
+
+ # Trigger a disconnect and wait for the disconnect.
+ self.dut.droid.wifiDisconnect()
wutils.wait_for_disconnect(self.dut)
self.dut.ed.clear_all_events()
diff --git a/acts/tests/google/wifi/WifiPasspointTest.py b/acts/tests/google/wifi/WifiPasspointTest.py
index 5ca9a49..b867faa 100755
--- a/acts/tests/google/wifi/WifiPasspointTest.py
+++ b/acts/tests/google/wifi/WifiPasspointTest.py
@@ -73,11 +73,13 @@
self.unknown_fqdn = UNKNOWN_FQDN
# Setup Uicd cli object for UI interation.
self.ui = UicdCli(self.uicd_zip[0], self.uicd_workflows)
+ self.passpoint_workflow = "passpoint-login_%s" % self.dut.model
def setup_test(self):
self.dut.droid.wakeLockAcquireBright()
self.dut.droid.wakeUpNow()
+ self.dut.unlock_screen()
def teardown_test(self):
@@ -180,7 +182,7 @@
"Passpoint Provisioning status %s" % dut_event['data'][
'status'])
if int(dut_event['data']['status']) == 7:
- self.ui.run(self.dut.serial, "passpoint-login")
+ self.ui.run(self.dut.serial, self.passpoint_workflow)
# Clear all previous events.
self.dut.ed.clear_all_events()
diff --git a/acts/tests/google/wifi/WifiPingTest.py b/acts/tests/google/wifi/WifiPingTest.py
index 8abe9eb..8e9bcee 100644
--- a/acts/tests/google/wifi/WifiPingTest.py
+++ b/acts/tests/google/wifi/WifiPingTest.py
@@ -110,9 +110,8 @@
# Turn WiFi ON
if self.testclass_params.get('airplane_mode', 1):
self.log.info('Turning on airplane mode.')
- asserts.assert_true(
- utils.force_airplane_mode(self.dut, True),
- "Can not turn on airplane mode.")
+ asserts.assert_true(utils.force_airplane_mode(self.dut, True),
+ "Can not turn on airplane mode.")
wutils.wifi_toggle_state(self.dut, True)
def teardown_class(self):
@@ -149,12 +148,10 @@
sorted(x['rtt'][round(ignored_fraction * len(x['rtt'])):])
for x in result['ping_results']
]
- try:
- mean_rtt = [statistics.mean(x) for x in sorted_rtt]
- std_rtt = [statistics.stdev(x) for x in sorted_rtt]
- except statistics.StatisticsError:
- self.log.debug("Ping Result: {}".format(result['ping_results']))
- self.log.debug("Sorted RTT: {}".format(sorted_rtt))
+ disconnected = any([len(x) == 0 for x in sorted_rtt])
+ if disconnected:
+ asserts.fail('Test failed. DUT disconnected at least once.')
+
rtt_at_test_percentile = [
x[int((1 - self.testclass_params['rtt_test_percentile'] / 100) *
len(x))] for x in sorted_rtt
@@ -164,15 +161,13 @@
self.testcase_metric_logger.add_metric('ping_rtt',
max(rtt_at_test_percentile))
# Evaluate test pass/fail
- test_failed = False
- for idx, rtt in enumerate(rtt_at_test_percentile):
- if rtt > self.testclass_params['rtt_threshold'] * 1000:
- test_failed = True
- self.log.info(
- 'RTT Failed. Test %ile RTT = {}ms. Mean = {}ms. Stdev = {}'
- .format(rtt, mean_rtt[idx], std_rtt[idx]))
- if test_failed:
- asserts.fail('RTT above threshold')
+ rtt_failed = any([
+ rtt > self.testclass_params['rtt_threshold'] * 1000
+ for rtt in rtt_at_test_percentile
+ ])
+ if rtt_failed:
+ asserts.fail('Test failed. RTTs at test percentile = {}'.format(
+ rtt_at_test_percentile))
else:
asserts.explicit_pass(
'Test Passed. RTTs at test percentile = {}'.format(
@@ -431,11 +426,10 @@
self.dut.droid.wifiSetCountryCode(
self.testclass_params['country_code'])
self.main_network[band]['channel'] = testcase_params['channel']
- wutils.wifi_connect(
- self.dut,
- self.main_network[band],
- num_of_tries=5,
- check_connectivity=False)
+ wutils.wifi_connect(self.dut,
+ self.main_network[band],
+ num_of_tries=5,
+ check_connectivity=False)
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
def setup_ping_test(self, testcase_params):
@@ -564,7 +558,6 @@
setting turntable orientation and other chamber parameters to study
performance in varying channel conditions
"""
-
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
self.testcase_metric_logger = (
@@ -614,11 +607,10 @@
primary_y_label='Range (dB)',
)
for channel, channel_data in range_vs_angle.items():
- figure.add_line(
- x_data=channel_data['position'],
- y_data=channel_data['range'],
- hover_text=channel_data['llstats_at_range'],
- legend='Channel {}'.format(channel))
+ figure.add_line(x_data=channel_data['position'],
+ y_data=channel_data['range'],
+ hover_text=channel_data['llstats_at_range'],
+ legend='Channel {}'.format(channel))
average_range = sum(channel_data['range']) / len(
channel_data['range'])
self.log.info('Average range for Channel {} is: {}dB'.format(
@@ -714,12 +706,12 @@
class WifiOtaPing_TenDegree_Test(WifiOtaPingTest):
def __init__(self, controllers):
WifiOtaPingTest.__init__(self, controllers)
- self.tests = self.generate_test_cases(
- ap_power='standard',
- channels=[6, 36, 149],
- modes=['VHT20'],
- chamber_mode='orientation',
- positions=list(range(0, 360, 10)))
+ self.tests = self.generate_test_cases(ap_power='standard',
+ channels=[6, 36, 149],
+ modes=['VHT20'],
+ chamber_mode='orientation',
+ positions=list(range(0, 360,
+ 10)))
class WifiOtaPing_45Degree_Test(WifiOtaPingTest):
@@ -736,23 +728,22 @@
class WifiOtaPing_SteppedStirrers_Test(WifiOtaPingTest):
def __init__(self, controllers):
WifiOtaPingTest.__init__(self, controllers)
- self.tests = self.generate_test_cases(
- ap_power='standard',
- channels=[6, 36, 149],
- modes=['VHT20'],
- chamber_mode='stepped stirrers',
- positions=list(range(100)))
+ self.tests = self.generate_test_cases(ap_power='standard',
+ channels=[6, 36, 149],
+ modes=['VHT20'],
+ chamber_mode='stepped stirrers',
+ positions=list(range(100)))
class WifiOtaPing_LowPowerAP_TenDegree_Test(WifiOtaPingTest):
def __init__(self, controllers):
WifiOtaPingTest.__init__(self, controllers)
- self.tests = self.generate_test_cases(
- ap_power='low_power',
- channels=[6, 36, 149],
- modes=['VHT20'],
- chamber_mode='orientation',
- positions=list(range(0, 360, 10)))
+ self.tests = self.generate_test_cases(ap_power='low_power',
+ channels=[6, 36, 149],
+ modes=['VHT20'],
+ chamber_mode='orientation',
+ positions=list(range(0, 360,
+ 10)))
class WifiOtaPing_LowPowerAP_45Degree_Test(WifiOtaPingTest):
@@ -769,9 +760,8 @@
class WifiOtaPing_LowPowerAP_SteppedStirrers_Test(WifiOtaPingTest):
def __init__(self, controllers):
WifiOtaPingTest.__init__(self, controllers)
- self.tests = self.generate_test_cases(
- ap_power='low_power',
- channels=[6, 36, 149],
- modes=['VHT20'],
- chamber_mode='stepped stirrers',
- positions=list(range(100)))
+ self.tests = self.generate_test_cases(ap_power='low_power',
+ channels=[6, 36, 149],
+ modes=['VHT20'],
+ chamber_mode='stepped stirrers',
+ positions=list(range(100)))
diff --git a/acts/tests/google/wifi/WifiRvrTest.py b/acts/tests/google/wifi/WifiRvrTest.py
index 43c9bbb..cd2ee76 100644
--- a/acts/tests/google/wifi/WifiRvrTest.py
+++ b/acts/tests/google/wifi/WifiRvrTest.py
@@ -43,9 +43,7 @@
example_connectivity_performance_ap_sta.json.
"""
- TEST_TIMEOUT = 5
- SHORT_SLEEP = 1
- RSSI_POLL_INTERVAL = 1
+ TEST_TIMEOUT = 6
MAX_CONSECUTIVE_ZEROS = 3
def __init__(self, controllers):
@@ -102,9 +100,8 @@
# Turn WiFi ON
if self.testclass_params.get('airplane_mode', 1):
self.log.info('Turning on airplane mode.')
- asserts.assert_true(
- utils.force_airplane_mode(self.dut, True),
- "Can not turn on airplane mode.")
+ asserts.assert_true(utils.force_airplane_mode(self.dut, True),
+ "Can not turn on airplane mode.")
wutils.wifi_toggle_state(self.dut, True)
def teardown_test(self):
@@ -131,11 +128,10 @@
result['testcase_params']['traffic_type']),
x_label='Attenuation (dB)',
primary_y_label='Throughput (Mbps)')
- plots[plot_id].add_line(
- result['total_attenuation'],
- result['throughput_receive'],
- result['test_name'],
- marker='circle')
+ plots[plot_id].add_line(result['total_attenuation'],
+ result['throughput_receive'],
+ result['test_name'],
+ marker='circle')
figure_list = []
for plot_id, plot in plots.items():
plot.generate_figure()
@@ -214,8 +210,8 @@
abs(current_att - golden_att)
for golden_att in golden_attenuation
]
- sorted_distances = sorted(
- enumerate(att_distances), key=lambda x: x[1])
+ sorted_distances = sorted(enumerate(att_distances),
+ key=lambda x: x[1])
closest_indeces = [dist[0] for dist in sorted_distances[0:3]]
closest_throughputs = [
golden_results['throughput_receive'][index]
@@ -254,10 +250,9 @@
with open(results_file_path, 'w') as results_file:
json.dump(rvr_result, results_file, indent=4)
# Plot and save
- figure = wputils.BokehFigure(
- title=test_name,
- x_label='Attenuation (dB)',
- primary_y_label='Throughput (Mbps)')
+ figure = wputils.BokehFigure(title=test_name,
+ x_label='Attenuation (dB)',
+ primary_y_label='Throughput (Mbps)')
try:
golden_path = next(file_name
for file_name in self.golden_files_list
@@ -274,13 +269,12 @@
'lower_limit': throughput_limits['lower_limit'],
'upper_limit': throughput_limits['upper_limit']
}
- figure.add_line(
- golden_attenuation,
- golden_results['throughput_receive'],
- 'Golden Results',
- color='green',
- marker='circle',
- shaded_region=shaded_region)
+ figure.add_line(golden_attenuation,
+ golden_results['throughput_receive'],
+ 'Golden Results',
+ color='green',
+ marker='circle',
+ shaded_region=shaded_region)
except:
self.log.warning('ValueError: Golden file not found')
@@ -293,13 +287,12 @@
curr_llstats['summary']['common_rx_mcs_freq'] * 100)
for curr_llstats in rvr_result['llstats']
]
- figure.add_line(
- rvr_result['total_attenuation'],
- rvr_result['throughput_receive'],
- 'Test Results',
- hover_text=hover_text,
- color='red',
- marker='circle')
+ figure.add_line(rvr_result['total_attenuation'],
+ rvr_result['throughput_receive'],
+ 'Test Results',
+ hover_text=hover_text,
+ color='red',
+ marker='circle')
output_file_path = os.path.join(self.log_path,
'{}.html'.format(test_name))
@@ -367,8 +360,8 @@
llstats = []
rssi = []
for atten in testcase_params['atten_range']:
- if not wputils.health_check(self.dut, 5):
- asserts.skip('Batter low or DUT overheating. Skipping test.')
+ if not wputils.health_check(self.dut, 5, 50):
+ asserts.skip('Battery low or DUT overheating. Skipping test.')
# Set Attenuation
for attenuator in self.attenuators:
attenuator.set_atten(atten, strict=False)
@@ -414,7 +407,7 @@
atten, curr_throughput, current_rssi['signal_poll_rssi'],
current_rssi['chain_0_rssi'],
current_rssi['chain_1_rssi']))
- if curr_throughput == 0:
+ if curr_throughput == 0 and current_rssi['signal_poll_rssi'] < -80:
zero_counter = zero_counter + 1
else:
zero_counter = 0
@@ -481,24 +474,17 @@
self.dut.go_to_sleep()
band = self.access_point.band_lookup_by_channel(
testcase_params['channel'])
- current_network = self.dut.droid.wifiGetConnectionInfo()
- try:
- connected = wutils.validate_connection(self.dut) is not None
- except:
- connected = False
- if connected and current_network['SSID'] == self.main_network[band][
- 'SSID']:
+ if wputils.validate_network(self.dut, self.main_network[band]['SSID']):
self.log.info('Already connected to desired network')
else:
wutils.reset_wifi(self.dut)
self.dut.droid.wifiSetCountryCode(
self.testclass_params['country_code'])
self.main_network[band]['channel'] = testcase_params['channel']
- wutils.wifi_connect(
- self.dut,
- self.main_network[band],
- num_of_tries=5,
- check_connectivity=False)
+ wutils.wifi_connect(self.dut,
+ self.main_network[band],
+ num_of_tries=5,
+ check_connectivity=True)
self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
def setup_rvr_test(self, testcase_params):
@@ -604,11 +590,10 @@
class WifiRvr_2GHz_Test(WifiRvrTest):
def __init__(self, controllers):
super().__init__(controllers)
- self.tests = self.generate_test_cases(
- channels=[1, 6, 11],
- modes=['VHT20'],
- traffic_types=['TCP'],
- traffic_directions=['DL', 'UL'])
+ self.tests = self.generate_test_cases(channels=[1, 6, 11],
+ modes=['VHT20'],
+ traffic_types=['TCP'],
+ traffic_directions=['DL', 'UL'])
class WifiRvr_UNII1_Test(WifiRvrTest):
@@ -689,7 +674,6 @@
setting turntable orientation and other chamber parameters to study
performance in varying channel conditions
"""
-
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
self.testcase_metric_logger = (
@@ -747,13 +731,12 @@
compiled_data[test_id]['metrics'][metric_key].append(
metric_value)
# Add test id to plots
- plots[test_id].add_line(
- result['total_attenuation'],
- result['throughput_receive'],
- result['test_name'],
- width=1,
- style='dashed',
- marker='circle')
+ plots[test_id].add_line(result['total_attenuation'],
+ result['throughput_receive'],
+ result['test_name'],
+ width=1,
+ style='dashed',
+ marker='circle')
# Compute average RvRs and compount metrics over orientations
for test_id, test_data in compiled_data.items():
@@ -773,16 +756,14 @@
metric_key, metric_value)
test_data['avg_rvr'] = numpy.mean(test_data['throughput'], 0)
test_data['median_rvr'] = numpy.median(test_data['throughput'], 0)
- plots[test_id].add_line(
- test_data['total_attenuation'],
- test_data['avg_rvr'],
- legend='Average Throughput',
- marker='circle')
- plots[test_id].add_line(
- test_data['total_attenuation'],
- test_data['median_rvr'],
- legend='Median Throughput',
- marker='square')
+ plots[test_id].add_line(test_data['total_attenuation'],
+ test_data['avg_rvr'],
+ legend='Average Throughput',
+ marker='circle')
+ plots[test_id].add_line(test_data['total_attenuation'],
+ test_data['median_rvr'],
+ legend='Median Throughput',
+ marker='square')
figure_list = []
for test_id, plot in plots.items():
@@ -814,12 +795,11 @@
continue
testcase_name = 'test_rvr_{}_{}_ch{}_{}_{}deg'.format(
traffic_type, direction, channel, mode, angle)
- test_params = collections.OrderedDict(
- channel=channel,
- mode=mode,
- traffic_type=traffic_type,
- traffic_direction=direction,
- orientation=angle)
+ test_params = collections.OrderedDict(channel=channel,
+ mode=mode,
+ traffic_type=traffic_type,
+ traffic_direction=direction,
+ orientation=angle)
setattr(self, testcase_name, partial(self._test_rvr, test_params))
test_cases.append(testcase_name)
return test_cases
diff --git a/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py
index a603e01..0716158 100755
--- a/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py
+++ b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py
@@ -52,7 +52,7 @@
try:
ota_updater.update(self.hotspot_device)
except Exception as err:
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Failed up apply OTA update. Aborting tests")
def on_fail(self, test_name, begin_time):
diff --git a/acts/tests/google/wifi/WifiTethering2GPskOTATest.py b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py
index e9fedcd..7399e32 100755
--- a/acts/tests/google/wifi/WifiTethering2GPskOTATest.py
+++ b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py
@@ -52,7 +52,7 @@
try:
ota_updater.update(self.hotspot_device)
except Exception as err:
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Failed up apply OTA update. Aborting tests")
def on_fail(self, test_name, begin_time):
diff --git a/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py
index 6648d0e..985e7a7 100755
--- a/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py
+++ b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py
@@ -52,7 +52,7 @@
try:
ota_updater.update(self.hotspot_device)
except Exception as err:
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Failed up apply OTA update. Aborting tests")
def on_fail(self, test_name, begin_time):
diff --git a/acts/tests/google/wifi/WifiTethering5GPskOTATest.py b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py
index 7450578..9e68f22 100755
--- a/acts/tests/google/wifi/WifiTethering5GPskOTATest.py
+++ b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py
@@ -52,7 +52,7 @@
try:
ota_updater.update(self.hotspot_device)
except Exception as err:
- raise signals.TestSkipClass(
+ raise signals.TestAbortClass(
"Failed up apply OTA update. Aborting tests")
def on_fail(self, test_name, begin_time):
diff --git a/acts/tests/google/wifi/rtt/functional/RangeAwareTest.py b/acts/tests/google/wifi/rtt/functional/RangeAwareTest.py
index 5962d24..2e53c6d 100644
--- a/acts/tests/google/wifi/rtt/functional/RangeAwareTest.py
+++ b/acts/tests/google/wifi/rtt/functional/RangeAwareTest.py
@@ -270,22 +270,22 @@
"Missing (timed-out) results",
extras=extras)
asserts.assert_false(
- stats['any_lci_mismatch'], "LCI mismatch", extras=extras)
+ stats_reverse_direction['any_lci_mismatch'], "LCI mismatch", extras=extras)
asserts.assert_false(
- stats['any_lcr_mismatch'], "LCR mismatch", extras=extras)
+ stats_reverse_direction['any_lcr_mismatch'], "LCR mismatch", extras=extras)
asserts.assert_equal(
- stats['num_invalid_rssi'], 0, "Invalid RSSI", extras=extras)
+ stats_reverse_direction['num_invalid_rssi'], 0, "Invalid RSSI", extras=extras)
asserts.assert_true(
stats_reverse_direction['num_failures'] <=
self.rtt_max_failure_rate_two_sided_rtt_percentage *
- stats['num_results'] / 100,
+ stats_reverse_direction['num_results'] / 100,
"Failure rate is too high",
extras=extras)
if accuracy_evaluation:
asserts.assert_true(
stats_reverse_direction['num_range_out_of_margin'] <=
self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage *
- stats['num_success_results'] / 100,
+ stats_reverse_direction['num_success_results'] / 100,
"Results exceeding error margin rate is too high",
extras=extras)