Merge "Change aosp/1912773 moved a few functions to a new file and broke a support function for our test. Updating the test to point to the new location in code." into sc-dev am: 0d168698b3
Original change: https://googleplex-android-review.googlesource.com/c/platform/tools/test/connectivity/+/17192385
Change-Id: I8fa5eb44575be1b5684a0230fa6b2a3de9cb625b
diff --git a/acts/framework/acts/controllers/openwrt_ap.py b/acts/framework/acts/controllers/openwrt_ap.py
index ac0712d..8e8619a 100644
--- a/acts/framework/acts/controllers/openwrt_ap.py
+++ b/acts/framework/acts/controllers/openwrt_ap.py
@@ -29,7 +29,6 @@
WIFI_2G = "wifi2g"
WIFI_5G = "wifi5g"
WAIT_TIME = 20
-DEFAULT_RADIOS = ("radio0", "radio1")
def create(configs):
@@ -110,7 +109,7 @@
lambda msg: "[OpenWrtAP|%s] %s" % (self.ssh_settings.hostname, msg))
self.wireless_setting = None
self.network_setting = network_settings.NetworkSettings(
- self.ssh, self.ssh_settings, self.log)
+ self.ssh, config["ssh_config"]["host"], self.log)
def configure_ap(self, wifi_configs, channel_2g, channel_5g):
"""Configure AP with the required settings.
@@ -588,40 +587,6 @@
return wifi_network
return None
- def get_wifi_status(self, radios=DEFAULT_RADIOS):
- """Check if radios are up. Default are 2G and 5G bands.
-
- Args:
- radios: Wifi interfaces for check status.
- Returns:
- True if both radios are up. False if not.
- """
- status = True
- for radio in radios:
- str_output = self.ssh.run("wifi status %s" % radio).stdout
- wifi_status = yaml.load(str_output.replace("\t", "").replace("\n", ""),
- Loader=yaml.FullLoader)
- status = wifi_status[radio]["up"] and status
- return status
-
- def verify_wifi_status(self, radios=DEFAULT_RADIOS, timeout=20):
- """Ensure wifi interfaces are ready.
-
- Args:
- radios: Wifi interfaces for check status.
- timeout: An integer that is the number of times to try
- wait for interface ready.
- Returns:
- True if both radios are up. False if not.
- """
- start_time = time.time()
- end_time = start_time + timeout
- while time.time() < end_time:
- if self.get_wifi_status(radios):
- return True
- time.sleep(1)
- return False
-
def close(self):
"""Reset wireless and network settings to default and stop AP."""
if self.network_setting.config:
diff --git a/acts/framework/acts/controllers/openwrt_lib/network_const.py b/acts/framework/acts/controllers/openwrt_lib/network_const.py
index 0a3fb42..76dd34b 100644
--- a/acts/framework/acts/controllers/openwrt_lib/network_const.py
+++ b/acts/framework/acts/controllers/openwrt_lib/network_const.py
@@ -42,55 +42,9 @@
}
}
-IPSEC_HYBRID_RSA = {
- "conn HYBRID_RSA": {
- "keyexchange": "ikev1",
- "left": "192.168.1.1",
- "leftsubnet": "0.0.0.0/0",
- "leftauth": "pubkey",
- "leftcert": "serverCert.der",
- "leftsendcert": "always",
- "right": "%any",
- "rightsubnet": "0.0.0.0/0",
- "rightauth": "pubkey",
- "rightauth2": "xauth",
- "xauth": "server",
- "auto": "add",
- }
-}
-
-IPSEC_XAUTH_PSK = {
- "conn XAUTH_PSK": {
- "keyexchange": "ikev1",
- "left": "192.168.1.1",
- "leftsubnet": "0.0.0.0/0",
- "leftauth": "psk",
- "right": "%any",
- "rightsubnet": "0.0.0.0/0",
- "rightauth": "psk",
- "rightauth2": "xauth",
- "auto": "add",
- }
-}
-
-IPSEC_XAUTH_RSA = {
- "conn XAUTH_RSA": {
- "keyexchange": "ikev1",
- "left": "192.168.1.1",
- "leftsubnet": "0.0.0.0/0",
- "leftcert": "serverCert.der",
- "leftsendcert": "always",
- "right": "%any",
- "rightsubnet": "0.0.0.0/0",
- "rightauth": "xauth",
- "xauth": "server",
- "auto": "add",
- }
-}
-
# parmas for lx2tpd
-XL2TPD_CONF_GLOBAL = (
+XL2TPD_CONF_GLOBAL = [
"[global]",
"ipsec saref = no",
"debug tunnel = no",
@@ -100,9 +54,9 @@
"access control = no",
"rand source = dev",
"port = 1701",
-)
+]
-XL2TPD_CONF_INS = (
+XL2TPD_CONF_INS = [
"[lns default]",
"require authentication = yes",
"pass peer = yes",
@@ -110,9 +64,9 @@
"length bit = yes",
"refuse pap = yes",
"refuse chap = yes",
-)
+]
-XL2TPD_OPTION = (
+XL2TPD_OPTION = [
"require-mschap-v2",
"refuse-mschap",
"ms-dns 8.8.8.8",
@@ -133,17 +87,17 @@
"lcp-echo-interval 30",
"lcp-echo-failure 4",
"nomppe"
-)
+]
# iptable rules for vpn_pptp
-FIREWALL_RULES_FOR_PPTP = (
+FIREWALL_RULES_FOR_PPTP = [
"iptables -A input_rule -i ppp+ -j ACCEPT",
"iptables -A output_rule -o ppp+ -j ACCEPT",
"iptables -A forwarding_rule -i ppp+ -j ACCEPT"
-)
+]
# iptable rules for vpn_l2tp
-FIREWALL_RULES_FOR_L2TP = (
+FIREWALL_RULES_FOR_L2TP = [
"iptables -I INPUT -m policy --dir in --pol ipsec --proto esp -j ACCEPT",
"iptables -I FORWARD -m policy --dir in --pol ipsec --proto esp -j ACCEPT",
"iptables -I FORWARD -m policy --dir out --pol ipsec --proto esp -j ACCEPT",
@@ -157,14 +111,7 @@
"iptables -A INPUT -p udp --dport 500 -j ACCEPT",
"iptables -A INPUT -p udp --dport 4500 -j ACCEPT",
"iptables -A INPUT -p udp -m policy --dir in --pol ipsec -m udp --dport 1701 -j ACCEPT"
-)
-
-FIREWALL_RULES_DISABLE_DNS_RESPONSE = (
- "iptables -I OUTPUT -p udp --sport 53 -j DROP",
- "iptables -I OUTPUT -p tcp --sport 53 -j DROP",
- "ip6tables -I OUTPUT -p udp --sport 53 -j DROP",
- "ip6tables -I OUTPUT -p tcp --sport 53 -j DROP",
-)
+]
# Object for vpn profile
diff --git a/acts/framework/acts/controllers/openwrt_lib/network_settings.py b/acts/framework/acts/controllers/openwrt_lib/network_settings.py
index efbd590..91cdb03 100644
--- a/acts/framework/acts/controllers/openwrt_lib/network_settings.py
+++ b/acts/framework/acts/controllers/openwrt_lib/network_settings.py
@@ -13,13 +13,8 @@
# limitations under the License.
import re
-import time
-
-from acts import signals
-from acts import utils
from acts.controllers.openwrt_lib import network_const
-
SERVICE_DNSMASQ = "dnsmasq"
SERVICE_STUNNEL = "stunnel"
SERVICE_NETWORK = "network"
@@ -27,13 +22,8 @@
SERVICE_FIREWALL = "firewall"
SERVICE_IPSEC = "ipsec"
SERVICE_XL2TPD = "xl2tpd"
-SERVICE_ODHCPD = "odhcpd"
-SERVICE_NODOGSPLASH = "nodogsplash"
PPTP_PACKAGE = "pptpd kmod-nf-nathelper-extra"
L2TP_PACKAGE = "strongswan-full openssl-util xl2tpd"
-NAT6_PACKAGE = "ip6tables kmod-ipt-nat6"
-CAPTIVE_PORTAL_PACKAGE = "nodogsplash"
-MDNS_PACKAGE = "avahi-utils avahi-daemon-service-http avahi-daemon-service-ssh libavahi-client avahi-dbus-daemon"
STUNNEL_CONFIG_PATH = "/etc/stunnel/DoTServer.conf"
HISTORY_CONFIG_PATH = "/etc/dirty_configs"
PPTPD_OPTION_PATH = "/etc/ppp/options.pptpd"
@@ -41,7 +31,6 @@
XL2TPD_OPTION_CONFIG_PATH = "/etc/ppp/options.xl2tpd"
FIREWALL_CUSTOM_OPTION_PATH = "/etc/firewall.user"
PPP_CHAP_SECRET_PATH = "/etc/ppp/chap-secrets"
-TCPDUMP_DIR = "/tmp/tcpdump/"
LOCALHOST = "192.168.1.1"
DEFAULT_PACKAGE_INSTALL_TIMEOUT = 200
@@ -51,30 +40,26 @@
Attributes:
ssh: ssh connection object.
- ssh_settings: ssh settings for AccessPoint.
- service_manager: Object manage service configuration.
- user: username for ssh.
+ service_manager: Object manage service configuration
ip: ip address for AccessPoint.
log: Logging object for AccessPoint.
config: A list to store changes on network settings.
- firewall_rules_list: A list of firewall rule name list.
+ firewall_rules_list: A list of firewall rule name list
cleanup_map: A dict for compare oppo functions.
l2tp: profile for vpn l2tp server.
"""
- def __init__(self, ssh, ssh_settings, logger):
+ def __init__(self, ssh, ip, logger):
"""Initialize wireless settings.
Args:
ssh: ssh connection object.
- ssh_settings: ssh settings for AccessPoint.
+ ip: ip address for AccessPoint.
logger: Logging object for AccessPoint.
"""
self.ssh = ssh
self.service_manager = ServiceManager(ssh)
- self.ssh_settings = ssh_settings
- self.user = self.ssh_settings.username
- self.ip = self.ssh_settings.hostname
+ self.ip = ip
self.log = logger
self.config = set()
self.firewall_rules_list = []
@@ -82,14 +67,7 @@
"setup_dns_server": self.remove_dns_server,
"setup_vpn_pptp_server": self.remove_vpn_pptp_server,
"setup_vpn_l2tp_server": self.remove_vpn_l2tp_server,
- "disable_ipv6": self.enable_ipv6,
- "setup_ipv6_bridge": self.remove_ipv6_bridge,
- "default_dns": self.del_default_dns,
- "default_v6_dns": self.del_default_v6_dns,
- "ipv6_prefer_option": self.remove_ipv6_prefer_option,
- "block_dns_response": self.unblock_dns_response,
- "setup_mdns": self.remove_mdns,
- "setup_captive_portal": self.remove_cpative_portal
+ "disable_ipv6": self.enable_ipv6
}
# This map contains cleanup functions to restore the configuration to
# its default state. We write these keys to HISTORY_CONFIG_PATH prior to
@@ -97,7 +75,6 @@
# This makes it easier to recover after an aborted test.
self.update_firewall_rules_list()
self.cleanup_network_settings()
- self.clear_tcpdump()
def cleanup_network_settings(self):
"""Reset all changes on Access point."""
@@ -111,11 +88,7 @@
if self.config:
temp = self.config.copy()
for change in temp:
- change_list = change.split()
- if len(change_list) > 1:
- self.cleanup_map[change_list[0]](*change_list[1:])
- else:
- self.cleanup_map[change]()
+ self.cleanup_map[change]()
self.config = set()
if self.file_exists(HISTORY_CONFIG_PATH):
@@ -424,11 +397,11 @@
org: Organization name for generate cert keys.
"""
self.l2tp = network_const.VpnL2tp(vpn_server_hostname,
- vpn_server_address,
- vpn_username,
- vpn_password,
- psk_secret,
- server_name)
+ vpn_server_address,
+ vpn_username,
+ vpn_password,
+ psk_secret,
+ server_name)
self.package_install(L2TP_PACKAGE)
self.config.add("setup_vpn_l2tp_server")
@@ -443,8 +416,6 @@
self.setup_ppp_secret()
# /etc/config/firewall & /etc/firewall.user
self.setup_firewall_rules_for_l2tp()
- # setup vpn server local ip
- self.setup_vpn_local_ip()
# generate cert and key for rsa
self.generate_vpn_cert_keys(country, org)
# restart service
@@ -457,7 +428,6 @@
"""Remove l2tp vpn server on OpenWrt."""
self.config.discard("setup_vpn_l2tp_server")
self.restore_firewall_rules_for_l2tp()
- self.remove_vpn_local_ip()
self.service_manager.need_restart(SERVICE_IPSEC)
self.service_manager.need_restart(SERVICE_XL2TPD)
self.service_manager.need_restart(SERVICE_FIREWALL)
@@ -481,35 +451,28 @@
def setup_ipsec(self):
"""Setup ipsec config."""
- def load_ipsec_config(data, rightsourceip=False):
+ def load_config(data):
for i in data.keys():
config.append(i)
for j in data[i].keys():
config.append("\t %s=%s" % (j, data[i][j]))
- if rightsourceip:
- config.append("\t rightsourceip=%s.16/26" % self.l2tp.address.rsplit(".", 1)[0])
config.append("")
config = []
- load_ipsec_config(network_const.IPSEC_CONF)
- load_ipsec_config(network_const.IPSEC_L2TP_PSK)
- load_ipsec_config(network_const.IPSEC_L2TP_RSA)
- load_ipsec_config(network_const.IPSEC_HYBRID_RSA, True)
- load_ipsec_config(network_const.IPSEC_XAUTH_PSK, True)
- load_ipsec_config(network_const.IPSEC_XAUTH_RSA, True)
+ load_config(network_const.IPSEC_CONF)
+ load_config(network_const.IPSEC_L2TP_PSK)
+ load_config(network_const.IPSEC_L2TP_RSA)
self.create_config_file("\n".join(config), "/etc/ipsec.conf")
ipsec_secret = []
ipsec_secret.append(r": PSK \"%s\"" % self.l2tp.psk_secret)
ipsec_secret.append(r": RSA \"%s\"" % "serverKey.der")
- ipsec_secret.append(r"%s : XAUTH \"%s\"" % (self.l2tp.username,
- self.l2tp.password))
self.create_config_file("\n".join(ipsec_secret), "/etc/ipsec.secrets")
def setup_xl2tpd(self, ip_range=20):
"""Setup xl2tpd config."""
net_id, host_id = self.l2tp.address.rsplit(".", 1)
- xl2tpd_conf = list(network_const.XL2TPD_CONF_GLOBAL)
+ xl2tpd_conf = network_const.XL2TPD_CONF_GLOBAL
xl2tpd_conf.append("auth file = %s" % PPP_CHAP_SECRET_PATH)
xl2tpd_conf.extend(network_const.XL2TPD_CONF_INS)
xl2tpd_conf.append("ip range = %s.%s-%s.%s" %
@@ -520,7 +483,7 @@
xl2tpd_conf.append("pppoptfile = %s" % XL2TPD_OPTION_CONFIG_PATH)
self.create_config_file("\n".join(xl2tpd_conf), XL2TPD_CONFIG_PATH)
- xl2tpd_option = list(network_const.XL2TPD_OPTION)
+ xl2tpd_option = network_const.XL2TPD_OPTION
xl2tpd_option.append("name %s" % self.l2tp.name)
self.create_config_file("\n".join(xl2tpd_option),
XL2TPD_OPTION_CONFIG_PATH)
@@ -611,7 +574,7 @@
self.ssh.run("uci set firewall.@rule[-1].src='wan'")
self.ssh.run("uci set firewall.@rule[-1].proto='47'")
- iptable_rules = list(network_const.FIREWALL_RULES_FOR_PPTP)
+ iptable_rules = network_const.FIREWALL_RULES_FOR_PPTP
self.add_custom_firewall_rules(iptable_rules)
self.service_manager.need_restart(SERVICE_FIREWALL)
@@ -654,7 +617,7 @@
self.ssh.run("uci set firewall.@rule[-1].proto='ah'")
net_id = self.l2tp.address.rsplit(".", 1)[0]
- iptable_rules = list(network_const.FIREWALL_RULES_FOR_L2TP)
+ iptable_rules = network_const.FIREWALL_RULES_FOR_L2TP
iptable_rules.append("iptables -A FORWARD -s %s.0/24"
" -j ACCEPT" % net_id)
iptable_rules.append("iptables -t nat -A POSTROUTING"
@@ -707,24 +670,6 @@
"""Disable pptp service."""
self.package_remove(PPTP_PACKAGE)
- def setup_vpn_local_ip(self):
- """Setup VPN Server local ip on OpenWrt for client ping verify."""
- self.ssh.run("uci set network.lan2=interface")
- self.ssh.run("uci set network.lan2.type=bridge")
- self.ssh.run("uci set network.lan2.ifname=eth1.2")
- self.ssh.run("uci set network.lan2.proto=static")
- self.ssh.run("uci set network.lan2.ipaddr=\"%s\"" % self.l2tp.address)
- self.ssh.run("uci set network.lan2.netmask=255.255.255.0")
- self.ssh.run("uci set network.lan2=interface")
- self.service_manager.reload(SERVICE_NETWORK)
- self.commit_changes()
-
- def remove_vpn_local_ip(self):
- """Discard vpn local ip on OpenWrt."""
- self.ssh.run("uci delete network.lan2")
- self.service_manager.reload(SERVICE_NETWORK)
- self.commit_changes()
-
def enable_ipv6(self):
"""Enable ipv6 on OpenWrt."""
self.ssh.run("uci set network.lan.ipv6=1")
@@ -743,194 +688,6 @@
self.service_manager.reload(SERVICE_NETWORK)
self.commit_changes()
- def setup_ipv6_bridge(self):
- """Setup ipv6 bridge for client have ability to access network."""
- self.config.add("setup_ipv6_bridge")
-
- self.ssh.run("uci set dhcp.lan.dhcpv6=relay")
- self.ssh.run("uci set dhcp.lan.ra=relay")
- self.ssh.run("uci set dhcp.lan.ndp=relay")
-
- self.ssh.run("uci set dhcp.wan6=dhcp")
- self.ssh.run("uci set dhcp.wan6.dhcpv6=relay")
- self.ssh.run("uci set dhcp.wan6.ra=relay")
- self.ssh.run("uci set dhcp.wan6.ndp=relay")
- self.ssh.run("uci set dhcp.wan6.master=1")
- self.ssh.run("uci set dhcp.wan6.interface=wan6")
-
- # Enable service
- self.service_manager.need_restart(SERVICE_ODHCPD)
- self.commit_changes()
-
- def remove_ipv6_bridge(self):
- """Discard ipv6 bridge on OpenWrt."""
- if "setup_ipv6_bridge" in self.config:
- self.config.discard("setup_ipv6_bridge")
-
- self.ssh.run("uci set dhcp.lan.dhcpv6=server")
- self.ssh.run("uci set dhcp.lan.ra=server")
- self.ssh.run("uci delete dhcp.lan.ndp")
-
- self.ssh.run("uci delete dhcp.wan6")
-
- self.service_manager.need_restart(SERVICE_ODHCPD)
- self.commit_changes()
-
- def _add_dhcp_option(self, args):
- self.ssh.run("uci add_list dhcp.lan.dhcp_option=\"%s\"" % args)
-
- def _remove_dhcp_option(self, args):
- self.ssh.run("uci del_list dhcp.lan.dhcp_option=\"%s\"" % args)
-
- def add_default_dns(self, addr_list):
- """Add default dns server for client.
-
- Args:
- addr_list: dns ip address for Openwrt client.
- """
- self._add_dhcp_option("6,%s" % ",".join(addr_list))
- self.config.add("default_dns %s" % addr_list)
- self.service_manager.need_restart(SERVICE_DNSMASQ)
- self.commit_changes()
-
- def del_default_dns(self, addr_list):
- """Remove default dns server for client.
-
- Args:
- addr_list: list of dns ip address for Openwrt client.
- """
- self._remove_dhcp_option("6,%s" % addr_list)
- self.config.discard("default_dns %s" % addr_list)
- self.service_manager.need_restart(SERVICE_DNSMASQ)
- self.commit_changes()
-
- def add_default_v6_dns(self, addr_list):
- """Add default v6 dns server for client.
-
- Args:
- addr_list: dns ip address for Openwrt client.
- """
- self.ssh.run("uci add_list dhcp.lan.dns=\"%s\"" % addr_list)
- self.config.add("default_v6_dns %s" % addr_list)
- self.service_manager.need_restart(SERVICE_ODHCPD)
- self.commit_changes()
-
- def del_default_v6_dns(self, addr_list):
- """Del default v6 dns server for client.
-
- Args:
- addr_list: dns ip address for Openwrt client.
- """
- self.ssh.run("uci del_list dhcp.lan.dns=\"%s\"" % addr_list)
- self.config.add("default_v6_dns %s" % addr_list)
- self.service_manager.need_restart(SERVICE_ODHCPD)
- self.commit_changes()
-
- def add_ipv6_prefer_option(self):
- self._add_dhcp_option("108,1800i")
- self.config.add("ipv6_prefer_option")
- self.service_manager.need_restart(SERVICE_DNSMASQ)
- self.commit_changes()
-
- def remove_ipv6_prefer_option(self):
- self._remove_dhcp_option("108,1800i")
- self.config.discard("ipv6_prefer_option")
- self.service_manager.need_restart(SERVICE_DNSMASQ)
- self.commit_changes()
-
- def start_tcpdump(self, test_name, args="", interface="br-lan"):
- """"Start tcpdump on OpenWrt.
-
- Args:
- test_name: Test name for create tcpdump file name.
- args: Option args for tcpdump.
- interface: Interface to logging.
- Returns:
- tcpdump_file_name: tcpdump file name on OpenWrt.
- pid: tcpdump process id.
- """
- if not self.path_exists(TCPDUMP_DIR):
- self.ssh.run("mkdir %s" % TCPDUMP_DIR)
- tcpdump_file_name = "openwrt_%s_%s.pcap" % (test_name,
- time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time())))
- tcpdump_file_path = "".join([TCPDUMP_DIR, tcpdump_file_name])
- cmd = "tcpdump -i %s -s0 %s -w %s" % (interface, args, tcpdump_file_path)
- self.ssh.run_async(cmd)
- pid = self._get_tcpdump_pid(tcpdump_file_name)
- if not pid:
- raise signals.TestFailure("Fail to start tcpdump on OpenWrt.")
- # Set delay to prevent tcpdump fail to capture target packet.
- time.sleep(15)
- return tcpdump_file_name
-
- def stop_tcpdump(self, tcpdump_file_name, pull_dir=None):
- """Stop tcpdump on OpenWrt and pull the pcap file.
-
- Args:
- tcpdump_file_name: tcpdump file name on OpenWrt.
- pull_dir: Keep none if no need to pull.
- Returns:
- tcpdump abs_path on host.
- """
- # Set delay to prevent tcpdump fail to capture target packet.
- time.sleep(15)
- pid = self._get_tcpdump_pid(tcpdump_file_name)
- self.ssh.run("kill -9 %s" % pid, ignore_status=True)
- if self.path_exists(TCPDUMP_DIR) and pull_dir:
- tcpdump_path = "".join([TCPDUMP_DIR, tcpdump_file_name])
- tcpdump_remote_path = "/".join([pull_dir, tcpdump_file_name])
- tcpdump_local_path = "%s@%s:%s" % (self.user, self.ip, tcpdump_path)
- utils.exe_cmd("scp %s %s" % (tcpdump_local_path, tcpdump_remote_path))
-
- if self._get_tcpdump_pid(tcpdump_file_name):
- raise signals.TestFailure("Failed to stop tcpdump on OpenWrt.")
- if self.file_exists(tcpdump_path):
- self.ssh.run("rm -f %s" % tcpdump_path)
- return tcpdump_remote_path if pull_dir else None
-
- def clear_tcpdump(self):
- self.ssh.run("killall tpcdump", ignore_status=True)
- if self.ssh.run("pgrep tpcdump", ignore_status=True).stdout:
- raise signals.TestFailure("Failed to clean up tcpdump process.")
-
- def _get_tcpdump_pid(self, tcpdump_file_name):
- """Check tcpdump process on OpenWrt."""
- return self.ssh.run("pgrep -f %s" % (tcpdump_file_name), ignore_status=True).stdout
-
- def setup_mdns(self):
- self.config.add("setup_mdns")
- self.package_install(MDNS_PACKAGE)
- self.commit_changes()
-
- def remove_mdns(self):
- self.config.discard("setup_mdns")
- self.package_remove(MDNS_PACKAGE)
- self.commit_changes()
-
- def block_dns_response(self):
- self.config.add("block_dns_response")
- iptable_rules = list(network_const.FIREWALL_RULES_DISABLE_DNS_RESPONSE)
- self.add_custom_firewall_rules(iptable_rules)
- self.service_manager.need_restart(SERVICE_FIREWALL)
- self.commit_changes()
-
- def unblock_dns_response(self):
- self.config.discard("block_dns_response")
- self.remove_custom_firewall_rules()
- self.service_manager.need_restart(SERVICE_FIREWALL)
- self.commit_changes()
-
- def setup_captive_portal(self):
- self.package_install(CAPTIVE_PORTAL_PACKAGE)
- self.config.add("setup_captive_portal")
- self.service_manager.need_restart(SERVICE_NODOGSPLASH)
- self.commit_changes()
-
- def remove_cpative_portal(self):
- self.package_remove(CAPTIVE_PORTAL_PACKAGE)
- self.config.discard("setup_captive_portal")
- self.commit_changes()
-
class ServiceManager(object):
"""Class for service on OpenWrt.
@@ -963,8 +720,6 @@
def restart_services(self):
"""Restart all services need to restart."""
for service in self._need_restart:
- if service == SERVICE_NETWORK:
- self.reload(service)
self.restart(service)
self._need_restart = set()
diff --git a/acts_tests/acts_contrib/test_utils/net/connectivity_const.py b/acts_tests/acts_contrib/test_utils/net/connectivity_const.py
index 591c83f..49996c6 100644
--- a/acts_tests/acts_contrib/test_utils/net/connectivity_const.py
+++ b/acts_tests/acts_contrib/test_utils/net/connectivity_const.py
@@ -74,29 +74,13 @@
MULTIPATH_PREFERENCE_PERFORMANCE = 1 << 2
# Private DNS constants
-DNS_GOOGLE_HOSTNAME = "dns.google"
-DNS_QUAD9_HOSTNAME = "dns.quad9.net"
-DNS_CLOUDFLARE_HOSTNAME = "1dot1dot1dot1.cloudflare-dns.com"
-DOH_CLOUDFLARE_HOSTNAME = "cloudflare-dns.com"
+DNS_GOOGLE = "dns.google"
+DNS_QUAD9 = "dns.quad9.net"
+DNS_CLOUDFLARE = "1dot1dot1dot1.cloudflare-dns.com"
PRIVATE_DNS_MODE_OFF = "off"
PRIVATE_DNS_MODE_OPPORTUNISTIC = "opportunistic"
PRIVATE_DNS_MODE_STRICT = "hostname"
-DNS_SUPPORT_TYPE = {
- DNS_GOOGLE_HOSTNAME: ["Do53", "DoT", "DoH"],
- DNS_CLOUDFLARE_HOSTNAME: ["Do53","DoT"],
- DOH_CLOUDFLARE_HOSTNAME: ["DoH"]
-}
-
-DNS_GOOGLE_ADDR_V4 = ["8.8.4.4", "8.8.8.8"]
-DNS_GOOGLE_ADDR_V6 = ["2001:4860:4860::8888",
- "2001:4860:4860::8844"]
-DNS_CLOUDFLARE_ADDR_V4 = ["1.1.1.1", "1.0.0.1"]
-DOH_CLOUDFLARE_ADDR_V4 = ["104.16.248.249", "104.16.249.249"]
-DOH_CLOUDFLARE_ADDR_V6 = ["2606:4700::6810:f8f9",
- "2606:4700::6810:f9f9"]
-
-
# IpSec constants
SOCK_STREAM = 1
SOCK_DGRAM = 2
diff --git a/acts_tests/acts_contrib/test_utils/tel/loggers/protos/telephony_stress_metric.proto b/acts_tests/acts_contrib/test_utils/tel/loggers/protos/telephony_stress_metric.proto
deleted file mode 100644
index 0a0b377..0000000
--- a/acts_tests/acts_contrib/test_utils/tel/loggers/protos/telephony_stress_metric.proto
+++ /dev/null
@@ -1,18 +0,0 @@
-/* Note: If making any changes to this file be sure to generate a new
- compiled *_pb2.py file by running the following command from this
- directory:
- $ protoc -I=. --python_out=. telephony_stress_metric.proto
-
- Be sure that you are compiling with protoc 3.4.0
-
- More info can be found at:
- https://developers.google.com/protocol-buffers/docs/pythontutorial
-*/
-
-syntax = "proto2";
-
-package wireless.android.platform.testing.telephony.metrics;
-
-message TelephonyStressTestResult {
- map<string,int32> results_dict = 1;
-}
diff --git a/acts_tests/acts_contrib/test_utils/tel/loggers/protos/telephony_stress_metric_pb2.py b/acts_tests/acts_contrib/test_utils/tel/loggers/protos/telephony_stress_metric_pb2.py
deleted file mode 100644
index 6ccec95..0000000
--- a/acts_tests/acts_contrib/test_utils/tel/loggers/protos/telephony_stress_metric_pb2.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# -*- coding: utf-8 -*-
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# source: telephony_stress_metric.proto
-
-from google.protobuf import descriptor as _descriptor
-from google.protobuf import message as _message
-from google.protobuf import reflection as _reflection
-from google.protobuf import symbol_database as _symbol_database
-# @@protoc_insertion_point(imports)
-
-_sym_db = _symbol_database.Default()
-
-
-
-
-DESCRIPTOR = _descriptor.FileDescriptor(
- name='telephony_stress_metric.proto',
- package='wireless.android.platform.testing.telephony.metrics',
- syntax='proto2',
- serialized_options=None,
- create_key=_descriptor._internal_create_key,
- serialized_pb=b'\n\x1dtelephony_stress_metric.proto\x12\x33wireless.android.platform.testing.telephony.metrics\"\xc6\x01\n\x19TelephonyStressTestResult\x12u\n\x0cresults_dict\x18\x01 \x03(\x0b\x32_.wireless.android.platform.testing.telephony.metrics.TelephonyStressTestResult.ResultsDictEntry\x1a\x32\n\x10ResultsDictEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01'
-)
-
-
-
-
-_TELEPHONYSTRESSTESTRESULT_RESULTSDICTENTRY = _descriptor.Descriptor(
- name='ResultsDictEntry',
- full_name='wireless.android.platform.testing.telephony.metrics.TelephonyStressTestResult.ResultsDictEntry',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- create_key=_descriptor._internal_create_key,
- fields=[
- _descriptor.FieldDescriptor(
- name='key', full_name='wireless.android.platform.testing.telephony.metrics.TelephonyStressTestResult.ResultsDictEntry.key', index=0,
- number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=b"".decode('utf-8'),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
- _descriptor.FieldDescriptor(
- name='value', full_name='wireless.android.platform.testing.telephony.metrics.TelephonyStressTestResult.ResultsDictEntry.value', index=1,
- number=2, type=5, cpp_type=1, label=1,
- has_default_value=False, default_value=0,
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- serialized_options=b'8\001',
- is_extendable=False,
- syntax='proto2',
- extension_ranges=[],
- oneofs=[
- ],
- serialized_start=235,
- serialized_end=285,
-)
-
-_TELEPHONYSTRESSTESTRESULT = _descriptor.Descriptor(
- name='TelephonyStressTestResult',
- full_name='wireless.android.platform.testing.telephony.metrics.TelephonyStressTestResult',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- create_key=_descriptor._internal_create_key,
- fields=[
- _descriptor.FieldDescriptor(
- name='results_dict', full_name='wireless.android.platform.testing.telephony.metrics.TelephonyStressTestResult.results_dict', index=0,
- number=1, type=11, cpp_type=10, label=3,
- has_default_value=False, default_value=[],
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
- ],
- extensions=[
- ],
- nested_types=[_TELEPHONYSTRESSTESTRESULT_RESULTSDICTENTRY, ],
- enum_types=[
- ],
- serialized_options=None,
- is_extendable=False,
- syntax='proto2',
- extension_ranges=[],
- oneofs=[
- ],
- serialized_start=87,
- serialized_end=285,
-)
-
-_TELEPHONYSTRESSTESTRESULT_RESULTSDICTENTRY.containing_type = _TELEPHONYSTRESSTESTRESULT
-_TELEPHONYSTRESSTESTRESULT.fields_by_name['results_dict'].message_type = _TELEPHONYSTRESSTESTRESULT_RESULTSDICTENTRY
-DESCRIPTOR.message_types_by_name['TelephonyStressTestResult'] = _TELEPHONYSTRESSTESTRESULT
-_sym_db.RegisterFileDescriptor(DESCRIPTOR)
-
-TelephonyStressTestResult = _reflection.GeneratedProtocolMessageType('TelephonyStressTestResult', (_message.Message,), {
-
- 'ResultsDictEntry' : _reflection.GeneratedProtocolMessageType('ResultsDictEntry', (_message.Message,), {
- 'DESCRIPTOR' : _TELEPHONYSTRESSTESTRESULT_RESULTSDICTENTRY,
- '__module__' : 'telephony_stress_metric_pb2'
- # @@protoc_insertion_point(class_scope:wireless.android.platform.testing.telephony.metrics.TelephonyStressTestResult.ResultsDictEntry)
- })
- ,
- 'DESCRIPTOR' : _TELEPHONYSTRESSTESTRESULT,
- '__module__' : 'telephony_stress_metric_pb2'
- # @@protoc_insertion_point(class_scope:wireless.android.platform.testing.telephony.metrics.TelephonyStressTestResult)
- })
-_sym_db.RegisterMessage(TelephonyStressTestResult)
-_sym_db.RegisterMessage(TelephonyStressTestResult.ResultsDictEntry)
-
-
-_TELEPHONYSTRESSTESTRESULT_RESULTSDICTENTRY._options = None
-# @@protoc_insertion_point(module_scope)
diff --git a/acts_tests/acts_contrib/test_utils/tel/loggers/telephony_stress_metric_logger.py b/acts_tests/acts_contrib/test_utils/tel/loggers/telephony_stress_metric_logger.py
deleted file mode 100644
index 456e8b2..0000000
--- a/acts_tests/acts_contrib/test_utils/tel/loggers/telephony_stress_metric_logger.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# /usr/bin/env python3
-#
-# Copyright (C) 2021 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-
-import base64
-import os
-import time
-
-from acts.metrics.core import ProtoMetric
-from acts.metrics.logger import MetricLogger
-from acts_contrib.test_utils.tel.loggers.protos.telephony_stress_metric_pb2 import TelephonyStressTestResult
-
-# Initializes the path to the protobuf
-PROTO_PATH = os.path.join(os.path.dirname(__file__),
- 'protos',
- 'telephony_stress_metric.proto')
-
-
-class TelephonyStressMetricLogger(MetricLogger):
- """A logger for gathering Telephony Stress test metrics
-
- Attributes:
- proto: Module used to store Telephony metrics in a proto
- """
-
- def __init__(self, event):
- super().__init__(event=event)
- self.proto = TelephonyStressTestResult()
-
- def set_result(self, result_dict):
- self.proto.results_dict = result_dict
-
- def end(self, event):
- metric = ProtoMetric(name='telephony_stress_test_result',
- data=self.proto)
- return self.publisher.publish(metric)
-
diff --git a/acts_tests/acts_contrib/test_utils/wifi/wifi_test_utils.py b/acts_tests/acts_contrib/test_utils/wifi/wifi_test_utils.py
index 2e3f336..420afb8 100755
--- a/acts_tests/acts_contrib/test_utils/wifi/wifi_test_utils.py
+++ b/acts_tests/acts_contrib/test_utils/wifi/wifi_test_utils.py
@@ -909,18 +909,13 @@
True: if network_ssid is found in scan results.
False: if network_ssid is not found in scan results.
"""
- start_time = time.time()
for num_tries in range(max_tries):
if start_wifi_connection_scan_and_return_status(ad):
scan_results = ad.droid.wifiGetScanResults()
match_results = match_networks({WifiEnums.SSID_KEY: network_ssid},
scan_results)
if len(match_results) > 0:
- ad.log.debug("Found network in %s seconds." %
- (time.time() - start_time))
return True
- ad.log.debug("Did not find network in %s seconds." %
- (time.time() - start_time))
return False
@@ -1471,9 +1466,11 @@
ad.droid.wifiStopTrackingStateChange()
-def connect_to_wifi_network(ad, network, assert_on_fail=True,
- check_connectivity=True, hidden=False,
- num_of_scan_tries=3, num_of_connect_tries=3):
+def connect_to_wifi_network(ad,
+ network,
+ assert_on_fail=True,
+ check_connectivity=True,
+ hidden=False):
"""Connection logic for open and psk wifi networks.
Args:
@@ -1482,20 +1479,16 @@
assert_on_fail: If true, errors from wifi_connect will raise
test failure signals.
hidden: Is the Wifi network hidden.
- num_of_scan_tries: The number of times to try scan
- interface before declaring failure.
- num_of_connect_tries: The number of times to try
- connect wifi before declaring failure.
"""
if hidden:
start_wifi_connection_scan_and_ensure_network_not_found(
- ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
+ ad, network[WifiEnums.SSID_KEY])
else:
start_wifi_connection_scan_and_ensure_network_found(
- ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
+ ad, network[WifiEnums.SSID_KEY])
wifi_connect(ad,
network,
- num_of_tries=num_of_connect_tries,
+ num_of_tries=3,
assert_on_fail=assert_on_fail,
check_connectivity=check_connectivity)
diff --git a/acts_tests/tests/google/net/CaptivePortalTest.py b/acts_tests/tests/google/net/CaptivePortalTest.py
index 7542ee9..eaafa25 100644
--- a/acts_tests/tests/google/net/CaptivePortalTest.py
+++ b/acts_tests/tests/google/net/CaptivePortalTest.py
@@ -16,7 +16,6 @@
import time
from acts import asserts
-from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT
from acts import base_test
from acts.test_decorators import test_tracker_info
from acts_contrib.test_utils.net import connectivity_const as cconst
@@ -34,8 +33,8 @@
SIGN_IN_NOTIFICATION = "Sign in to network"
-class CaptivePortalTest(WifiBaseTest):
- """Check device can access the network after pass captive portal check."""
+class CaptivePortalTest(base_test.BaseTestClass):
+ """Tests for Captive portal."""
def setup_class(self):
"""Setup devices for tests and unpack params.
@@ -49,20 +48,10 @@
4. uic_zip: Zip file location of UICD application
"""
self.dut = self.android_devices[0]
- opt_params = ["rk_captive_portal", "gg_captive_portal",
- "configure_OpenWrt", "wifi_network"]
- self.unpack_userparams(opt_param_names=opt_params,)
+ req_params = ["rk_captive_portal", "gg_captive_portal"]
+ self.unpack_userparams(req_param_names=req_params,)
wutils.wifi_test_device_init(self.dut)
- if OPENWRT in self.user_params:
- self.openwrt = self.access_points[0]
- if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip":
- self.dut.log.info("Skip configure Wifi interface due to config setup.")
- else:
- self.configure_openwrt_ap_and_start(wpa_network=True)
- self.wifi_network = self.openwrt.get_wifi_network()
- self.openwrt.network_setting.setup_captive_portal()
-
def teardown_class(self):
"""Reset devices."""
cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_OPPORTUNISTIC)
@@ -87,7 +76,7 @@
uutils.has_element(self.dut, text="Network & internet"),
"Failed to find 'Network & internet' icon")
uutils.wait_and_click(self.dut, text="Network & internet")
- uutils.wait_and_click(self.dut, text="Internet")
+ uutils.wait_and_click(self.dut, text="Not connected")
def _verify_sign_in_notification(self):
"""Verify sign in notification shows for captive portal."""
@@ -236,44 +225,3 @@
# verify connection to captive portal network
self._verify_captive_portal(self.gg_captive_portal)
-
- @test_tracker_info(uuid="c25a1be7-f202-41c4-ac95-bed1720833ab")
- def test_openwrt_captive_portal_default(self):
- """Verify captive portal network.
-
- Steps:
- 1. Set default private dns mode
- 2. Connect to openwrt captive portal network
- 3. Verify connectivity
- """
- cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_OPPORTUNISTIC)
- self.openwrt.network_setting.service_manager.restart("nodogsplash")
- self._verify_captive_portal(self.wifi_network, click_accept="Continue")
-
- @test_tracker_info(uuid="1419e36d-0303-44ba-bc60-4d707b45ef48")
- def test_openwrt_captive_portal_private_dns_off(self):
- """Verify captive portal network.
-
- Steps:
- 1. Turn off private dns mode
- 2. Connect to openwrt captive portal network
- 3. Verify connectivity
- """
- cutils.set_private_dns(self.dut, cconst.PRIVATE_DNS_MODE_OFF)
- self.openwrt.network_setting.service_manager.restart("nodogsplash")
- self._verify_captive_portal(self.wifi_network, click_accept="Continue")
-
- @test_tracker_info(uuid="5aae44ee-fa62-47b9-9b3d-8121f9f92da1")
- def test_openwrt_captive_portal_private_dns_strict(self):
- """Verify captive portal network.
-
- Steps:
- 1. Set strict private dns mode
- 2. Connect to openwrt captive portal network
- 3. Verify connectivity
- """
- cutils.set_private_dns(self.dut,
- cconst.PRIVATE_DNS_MODE_STRICT,
- cconst.DNS_GOOGLE)
- self.openwrt.network_setting.service_manager.restart("nodogsplash")
- self._verify_captive_portal(self.wifi_network, click_accept="Continue")
diff --git a/acts_tests/tests/google/net/DNSTest.py b/acts_tests/tests/google/net/DNSTest.py
deleted file mode 100644
index d0f99d9..0000000
--- a/acts_tests/tests/google/net/DNSTest.py
+++ /dev/null
@@ -1,132 +0,0 @@
-#
-# Copyright 2021 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import random
-
-from acts import asserts
-from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT
-from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
-from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
-from scapy.all import rdpcap, DNSRR, DNSQR, IP, IPv6
-
-
-WLAN = "wlan0"
-PING_ADDR = "google.com"
-
-
-class DNSTest(WifiBaseTest):
- """DNS related test for Android."""
-
- def setup_class(self):
- self.dut = self.android_devices[0]
- wutils.wifi_test_device_init(self.dut)
-
- req_params = []
- opt_param = ["wifi_network", "configure_OpenWrt"]
- self.unpack_userparams(
- req_param_names=req_params, opt_param_names=opt_param)
-
- asserts.assert_true(OPENWRT in self.user_params,
- "OpenWrtAP is not in testbed.")
- self.openwrt = self.access_points[0]
- if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip":
- self.dut.log.info("Skip configure Wifi interface due to config setup.")
- else:
- self.configure_openwrt_ap_and_start(wpa_network=True)
- self.wifi_network = self.openwrt.get_wifi_network()
-
- asserts.assert_true(self.openwrt.verify_wifi_status(),
- "OpenWrt Wifi interface is not ready.")
-
- def teardown_class(self):
- """Reset wifi to make sure VPN tears down cleanly."""
- wutils.reset_wifi(self.dut)
-
- def teardown_test(self):
- """Reset wifi to make sure VPN tears down cleanly."""
- wutils.reset_wifi(self.dut)
-
- def ping(self, addr, ignore_status=True, timeout=60):
- """Start a ping from DUT and return ping result.
-
- Args:
- addr: Address to ping.
- ignore_status: ignore non zero return.
- timeout: cmd timeout.
- Returns:
- Boolean for ping result.
- """
- return "100%" not in self.dut.adb.shell("ping -c 1 %s" % addr,
- ignore_status=ignore_status,
- timeout=timeout)
-
- def generate_query_qname(self):
- """Return a random query name."""
- return "%s-ds.metric.gstatic.com" % random.randint(0, 99999999)
-
- def test_dns_query(self):
- # Setup environment
- wutils.connect_to_wifi_network(self.dut, self.wifi_network)
- # Start tcpdump on OpenWrt
- remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name)
- # Generate query name
- test_qname = self.generate_query_qname()
- self.dut.log.info("Test query name = %s" % test_qname)
- # Start send a query
- ping_result = self.ping(test_qname)
- local_pcap_path = self.openwrt.network_setting.stop_tcpdump(remote_pcap_path,
- self.dut.device_log_path)
- # Check DNSRR.rrname in tcpdump to verify DNS response
- packets = rdpcap(local_pcap_path)
- self.dut.log.info("pcap file path : %s" % local_pcap_path)
- pkt_count = 0
- for pkt in packets:
- if pkt.haslayer(DNSRR) and pkt[DNSRR].rrname.decode().strip(".") == test_qname:
- pkt_count = pkt_count + 1
- self.dut.log.info("DNS query response count : %s" % pkt_count)
- if not ping_result:
- asserts.assert_true(pkt_count > 0,
- "Did not find match standard query response in tcpdump.")
- asserts.assert_true(ping_result, "Device ping fail.")
-
- def test_dns_query_retransmit(self):
- # Setup environment
- wutils.connect_to_wifi_network(self.dut, self.wifi_network)
- self.openwrt.network_setting.block_dns_response()
- # Start tcpdump on OpenWrt
- remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name)
- # Generate query name
- test_qname = self.generate_query_qname()
- self.dut.log.info("Test query name = %s" % test_qname)
- # Start send a query
- self.ping(test_qname)
- local_pcap_path = self.openwrt.network_setting.stop_tcpdump(remote_pcap_path,
- self.dut.device_log_path)
- # Check DNSQR.qname in tcpdump to verify device retransmit the query
- packets = rdpcap(local_pcap_path)
- self.dut.log.info("pcap file path : %s" % local_pcap_path)
- pkt_count = 0
- pkt6_count = 0
- for pkt in packets:
- if pkt.haslayer(DNSQR) and pkt[DNSQR].qname.decode().strip(".") == test_qname:
- if pkt.haslayer(IP):
- pkt_count = pkt_count + 1
- if pkt.haslayer(IPv6):
- pkt6_count = pkt6_count + 1
- self.dut.log.info("IPv4 DNS query count : %s" % pkt_count)
- self.dut.log.info("IPv6 DNS query count : %s" % pkt6_count)
- self.openwrt.network_setting.unblock_dns_response()
- asserts.assert_true(pkt_count >= 2 or pkt6_count >= 2,
- "Did not find match standard query in tcpdump.")
diff --git a/acts_tests/tests/google/net/DhcpTest.py b/acts_tests/tests/google/net/DhcpTest.py
deleted file mode 100644
index 739f6ca..0000000
--- a/acts_tests/tests/google/net/DhcpTest.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#
-# Copyright 2021 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import time
-
-from acts import asserts
-from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT
-from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
-from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
-
-WLAN = "wlan0"
-PING_ADDR = "google.com"
-
-
-class DhcpTest(WifiBaseTest):
- """DHCP related test for Android."""
-
- def setup_class(self):
- self.dut = self.android_devices[0]
-
- wutils.wifi_test_device_init(self.dut)
- asserts.assert_true(OPENWRT in self.user_params,
- "OpenWrtAP is not in testbed.")
- self.openwrt = self.access_points[0]
- self.configure_openwrt_ap_and_start(wpa_network=True)
- self.wifi_network = self.openwrt.get_wifi_network()
- self.openwrt.network_setting.setup_ipv6_bridge()
- asserts.assert_true(self.openwrt.verify_wifi_status(),
- "OpenWrt Wifi interface is not ready.")
- def teardown_class(self):
- """Reset wifi to make sure VPN tears down cleanly."""
- wutils.reset_wifi(self.dut)
-
- def teardown_test(self):
- """Reset wifi to make sure VPN tears down cleanly."""
- wutils.reset_wifi(self.dut)
-
- def _verify_ping(self, option="", dest=PING_ADDR):
- try:
- out = self.dut.adb.shell("ping%s -c1 %s" % (option, dest))
- return "100%" not in out
- except Exception as e:
- self.dut.log.debug(e)
- return False
-
- def _verify_device_address(self, ipv4=True, ipv6=True, timeout=15):
- """Verify device get assign address on wireless interface."""
- current_time = time.time()
- while time.time() < current_time + timeout:
- try:
- if ipv4:
- ipv4_addr = self.dut.droid.connectivityGetIPv4Addresses(WLAN)[0]
- self.dut.log.info("ipv4_address is %s" % ipv4_addr)
- if ipv6:
- ipv6_addr = self.dut.droid.connectivityGetIPv6Addresses(WLAN)[0]
- self.dut.log.info("ipv6_address is %s" % ipv6_addr)
- return True
- except:
- time.sleep(1)
- return False
-
- def test_ipv4_ipv6_network(self):
- """Verify device can get both ipv4 ipv6 address."""
- wutils.connect_to_wifi_network(self.dut, self.wifi_network)
-
- asserts.assert_true(self._verify_device_address(),
- "Fail to get ipv4/ipv6 address.")
- asserts.assert_true(self._verify_ping(), "Fail to ping on ipv4.")
- asserts.assert_true(self._verify_ping("6"), "Fail to ping on ipv6.")
-
- def test_ipv6_only_prefer_option(self):
- """Verify DUT can only get ipv6 address and ping out."""
- self.openwrt.network_setting.add_ipv6_prefer_option()
- wutils.connect_to_wifi_network(self.dut, self.wifi_network)
-
- asserts.assert_true(self._verify_device_address(ipv4=False),
- "Fail to get ipv6 address.")
- asserts.assert_false(self._verify_ping(),
- "Should not ping on success on ipv4.")
- asserts.assert_true(self._verify_ping("6"),
- "Fail to ping on ipv6.")
diff --git a/acts_tests/tests/google/net/DnsOverHttpsTest.py b/acts_tests/tests/google/net/DnsOverHttpsTest.py
deleted file mode 100644
index 56f3792..0000000
--- a/acts_tests/tests/google/net/DnsOverHttpsTest.py
+++ /dev/null
@@ -1,295 +0,0 @@
-#
-# Copyright 2021 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-
-from acts import asserts
-from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT
-from acts.test_decorators import test_tracker_info
-from acts_contrib.test_utils.net import connectivity_const as cconst
-from acts_contrib.test_utils.net import connectivity_test_utils as cutils
-from acts_contrib.test_utils.net.net_test_utils import start_tcpdump
-from acts_contrib.test_utils.net.net_test_utils import stop_tcpdump
-from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
-from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
-from scapy.all import rdpcap
-from scapy.all import Scapy_Exception
-from scapy.all import TCP
-from scapy.all import UDP
-
-
-DEFAULT_DNS_TIMEOUT = 5
-
-
-class DnsOverHttpsTest(WifiBaseTest):
- """Tests for DnsoverHttps feature."""
-
- def setup_class(self):
- """Setup devices and OpenWrt for DnsoverHttps test and unpack params."""
-
- self.dut = self.android_devices[0]
- if len(self.android_devices) > 1:
- self.dut_b = self.android_devices[1]
- for ad in self.android_devices:
- wutils.reset_wifi(ad)
- ad.droid.setPrivateDnsMode(True)
- req_params = ("ping_hosts",)
- opt_params = ("wifi_network", "configure_OpenWrt",
- "ipv4_only_network", "ipv4_ipv6_network")
- self.unpack_userparams(req_param_names=req_params,
- opt_param_names=opt_params)
- if OPENWRT in self.user_params:
- self.openwrt = self.access_points[0]
- if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip":
- self.dut.log.info("Skip configure Wifi interface due to config setup.")
- else:
- self.configure_openwrt_ap_and_start(wpa_network=True)
- self.tcpdump_pid = None
- self.default_dns = None
- self.default_dns_v6 = None
- self._setup_doh(self.dut, self.get_wifi_network())
-
- def teardown_test(self):
- wutils.reset_wifi(self.dut)
- if OPENWRT in self.user_params:
- self.openwrt.network_setting.del_default_dns(self.default_dns)
- self.default_dns = None
- self.default_dns_v6 = None
-
- def teardown_class(self):
- for ad in self.android_devices:
- ad.droid.setPrivateDnsMode(True)
- self._setup_doh(ad, self.get_wifi_network(), enable=False)
-
- def on_fail(self, test_name, begin_time):
- self.dut.take_bug_report(test_name, begin_time)
-
- def get_wifi_network(self, ipv6_supported=False):
- """Return fit network for conditions.
-
- Args:
- ipv6_supported: Boolean for select network.
- Returns:
- A dict for network object for connect wifi.
- """
- if OPENWRT in self.user_params:
- if ipv6_supported:
- self.openwrt.network_setting.enable_ipv6()
- self.openwrt.network_setting.setup_ipv6_bridge()
- else:
- self.openwrt.network_setting.disable_ipv6()
- self.openwrt.network_setting.remove_ipv6_bridge()
- if self.default_dns:
- self.openwrt.network_setting.add_default_dns(self.default_dns)
- if self.default_dns_v6:
- for ipv6_dns in self.default_dns_v6:
- self.openwrt.network_setting.add_default_v6_dns(ipv6_dns)
- if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip":
- return self.wifi_network
- return self.openwrt.get_wifi_network()
- if ipv6_supported:
- return self.ipv4_ipv6_network
- return self.ipv4_only_network
-
- def _verify_doh_queries(self, pcap_file, over_https):
- """Verify if DNS queries were over https or not.
-
- Args:
- pcap_file: tcpdump file
- over_https: True if excepted all dns go through doh.
- """
- try:
- packets = rdpcap(pcap_file)
- except Scapy_Exception:
- asserts.fail("Not a valid pcap file")
-
- for pkt in packets:
- summary = "%s" % pkt.summary()
- for host in self.ping_hosts:
- host = host.split(".")[-2]
- if UDP in pkt and pkt[UDP].sport == 53 and host in summary:
- if over_https:
- asserts.fail("Found query to port 53: %s" % summary)
- else:
- self.dut.log.info("Found query to port 53: %s" % summary)
- if TCP in pkt and pkt[TCP].sport == 853:
- asserts.fail("Found query to port 853: %s" % summary)
-
- def _test_public_doh_mode(self, ad, net, dns_mode, hostname=None):
- """Test step for DoH.
-
- Args:
- ad: android device object.
- net: wifi network to connect to, LTE network if None.
- dns_mode: private DNS mode.
- hostname: private DNS hostname to set to.
- """
-
- # set private dns mode
- if dns_mode:
- cutils.set_private_dns(self.dut, dns_mode, hostname)
- # connect to wifi
- wutils.start_wifi_connection_scan_and_ensure_network_found(
- self.dut, net[wutils.WifiEnums.SSID_KEY])
- wutils.wifi_connect(self.dut, net)
- self._verify_tls_completed()
-
- # start tcpdump on the device
- self.tcpdump_pid = start_tcpdump(self.dut, self.test_name)
-
- # ping hosts should pass
- for host in self.ping_hosts:
- self.log.info("Pinging %s" % host)
- status = wutils.validate_connection(self.dut, host)
- asserts.assert_true(status, "Failed to ping host %s" % host)
- self.log.info("Ping successful")
-
- # stop tcpdump
- pcap_file = stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name)
-
- # verify DNS queries
- overhttps = dns_mode != cconst.PRIVATE_DNS_MODE_OFF
- self._verify_doh_queries(pcap_file, overhttps)
-
- # reset wifi
- wutils.reset_wifi(self.dut)
-
- def _verify_tls_completed(self, retry_count=5):
- """Verify tls finish verification process.
-
- Expect all private dns server status, should be
- "success", or "fail".
-
- Args:
- retry_count: int for retry times.
- Raises:
- TimeoutError: if TLS verification stuck in processing.
- """
- for attempt in range(retry_count):
- out = self.dut.adb.shell("dumpsys dnsresolver")
- if "status{in_process}" in out:
- if attempt + 1 < retry_count:
- self.dut.log.info("DoT still validating, retrying...")
- time.sleep(DEFAULT_DNS_TIMEOUT)
- else:
- return
- raise TimeoutError("Fail to completed TLS verification.")
-
- def _setup_doh(self, ad, net, enable=True):
- """Enable/Disable DoH option.
-
- Args:
- ad: android devies.
- net: network as wifi.
- enable: if True, sets the 'doh' experiment flag.
- """
- if enable:
- ad.adb.shell("setprop persist.device_config.netd_native.doh 1")
- else:
- ad.adb.shell("setprop persist.device_config.netd_native.doh 0")
- wutils.wifi_connect(ad, net)
- wutils.reset_wifi(ad)
- out = ad.adb.shell("dumpsys dnsresolver |grep doh")
- ad.log.debug(out)
-
- def test_mix_server_ipv4_only_wifi_network_with_dns_strict_mode(self):
- """Test doh flag with below situation.
-
- - Android device in strict mode
- - DNS server supporting both Dns, DoT and DoH protocols
- - IPv4-only network
- """
- self._test_public_doh_mode(self.dut, self.get_wifi_network(),
- cconst.PRIVATE_DNS_MODE_STRICT,
- hostname=cconst.DNS_GOOGLE_HOSTNAME)
-
- def test_mix_server_ipv4_ipv6_wifi_network_with_dns_strict_mode(self):
- """Test doh flag with below situation.
-
- - Android device in strict mode
- - DNS server supporting both Dns, DoT and DoH protocols
- - IPv4-IPv6 network
- """
- self._test_public_doh_mode(self.dut, self.get_wifi_network(),
- cconst.PRIVATE_DNS_MODE_STRICT,
- hostname=cconst.DNS_GOOGLE_HOSTNAME)
-
- def test_pure_server_ipv4_only_wifi_network_with_dns_strict_mode(self):
- """Test doh flag with below situation.
-
- - Android device in strict mode
- - DNS server only supporting DoH protocols
- - IPv4-only network
- """
- self._test_public_doh_mode(self.dut, self.get_wifi_network(),
- cconst.PRIVATE_DNS_MODE_STRICT,
- hostname=cconst.DOH_CLOUDFLARE_HOSTNAME)
-
- def test_pure_server_ipv4_ipv6_wifi_network_with_dns_strict_mode(self):
- """Test doh flag with below situation.
-
- - Android device in strict mode
- - DNS server only supporting DoH protocols
- - IPv4-IPv6 network
- """
- self._test_public_doh_mode(self.dut, self.get_wifi_network(),
- cconst.PRIVATE_DNS_MODE_STRICT,
- hostname=cconst.DOH_CLOUDFLARE_HOSTNAME)
-
- def test_mix_server_ipv4_only_wifi_network_with_dns_opportunistic_mode(self):
- """Test doh flag with below situation.
-
- - Android device in opportunistic mode
- - DNS server supporting both Dns, DoT and DoH protocols
- - IPv4-only network
- """
- self.default_dns = cconst.DNS_GOOGLE_ADDR_V4
- self._test_public_doh_mode(self.dut, self.get_wifi_network(),
- cconst.PRIVATE_DNS_MODE_OPPORTUNISTIC)
-
- def test_mix_server_ipv4_ipv6_wifi_network_with_dns_opportunistic_mode(self):
- """Test doh flag with below situation.
-
- - Android device in opportunistic mode
- - DNS server supporting both Dns, DoT and DoH protocols
- - IPv4-IPv6 network
- """
- self.default_dns = cconst.DNS_GOOGLE_ADDR_V4
- self.default_dns_v6 = cconst.DNS_GOOGLE_ADDR_V6
- self._test_public_doh_mode(self.dut, self.get_wifi_network(),
- cconst.PRIVATE_DNS_MODE_OPPORTUNISTIC)
-
- def test_mix_server_ipv4_only_wifi_network_with_dns_off_mode(self):
- """Test doh with below situation.
-
- - Android device in dns off mode
- - DNS server supporting both Dns, DoT and DoH protocols
- - IPv4-only network
- """
- self.default_dns = cconst.DNS_GOOGLE_ADDR_V4
- self._test_public_doh_mode(self.dut, self.get_wifi_network(),
- cconst.PRIVATE_DNS_MODE_OFF)
-
- def test_mix_server_ipv4_ipv6_wifi_network_with_dns_off_mode(self):
- """Test doh with below situation.
-
- - Android device in dns off mode
- - DNS server supporting both Dns, DoT and DoH protocols
- - IPv4-IPv6 network
- """
- self.default_dns = cconst.DNS_GOOGLE_ADDR_V4
- self.default_dns_v6 = cconst.DNS_GOOGLE_ADDR_V6
- self._test_public_doh_mode(self.dut, self.get_wifi_network(),
- cconst.PRIVATE_DNS_MODE_OFF)
diff --git a/acts_tests/tests/google/net/LegacyVpnTest.py b/acts_tests/tests/google/net/LegacyVpnTest.py
index 4003a2f..6e9710b 100644
--- a/acts_tests/tests/google/net/LegacyVpnTest.py
+++ b/acts_tests/tests/google/net/LegacyVpnTest.py
@@ -43,8 +43,7 @@
req_params = [
x for x in req_params if not x.startswith("__")
]
- opt_params = ["wifi_network", "vpn_cert_country",
- "vpn_cert_org", "configure_OpenWrt"]
+ opt_params = ["wifi_network", "vpn_cert_country", "vpn_cert_org"]
self.unpack_userparams(req_param_names=req_params,
opt_param_names=opt_params)
@@ -52,12 +51,8 @@
wutils.wifi_toggle_state(self.dut, True)
if OPENWRT in self.user_params:
self.openwrt = self.access_points[0]
- if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip":
- self.dut.log.info("Skip configure Wifi interface due to config setup.")
- else:
- self.configure_openwrt_ap_and_start(wpa_network=True)
- self.wifi_network = self.openwrt.get_wifi_network()
-
+ self.configure_openwrt_ap_and_start(wpa_network=True)
+ self.wifi_network = self.openwrt.get_wifi_network()
# Wait for OpenWrt statement update
time.sleep(10)
self.openwrt.network_setting.setup_vpn_pptp_server(
diff --git a/acts_tests/tests/google/net/MutlicastDNSTest.py b/acts_tests/tests/google/net/MutlicastDNSTest.py
deleted file mode 100644
index 475ad45..0000000
--- a/acts_tests/tests/google/net/MutlicastDNSTest.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# Copyright 2021 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from acts import asserts
-from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT
-from acts.test_decorators import test_tracker_info
-from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
-from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
-
-
-class MulticastDNSTest(WifiBaseTest):
- """Verify Multicast DNS can work on Android devices."""
-
- def setup_class(self):
- """Setup Openwrt and unpack params for mDNS test."""
- self.dut = self.android_devices[0]
- req_params = []
- opt_params = ["configure_OpenWrt", "wifi_network"]
- self.unpack_userparams(req_params, opt_params)
- if OPENWRT in self.user_params:
- self.openwrt = self.access_points[0]
- if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip":
- self.dut.log.info("Skip configure Wifi interface due to config setup.")
- else:
- self.configure_openwrt_ap_and_start(wpa_network=True)
- self.wifi_network = self.openwrt.get_wifi_network()
-
- def on_fail(self, test_name, begin_time):
- """Take bugreport if test failed."""
- self.dut.take_bug_report(test_name, begin_time)
-
- def teardown_test(self):
- """Reset wifi settings after test case."""
- wutils.reset_wifi(self.dut)
-
- def verify_ping(self, hostname, expect_ping_pass=True):
- """Verify if result of the ping as excepted.
-
- Args:
- hostname: ping address.
- expect_ping_pass: excepted ping result is True or False.
- Returns:
- Boolean if ping result work as expected.
- """
- out = self.dut.adb.shell("ping -c 1 %s" % hostname)
- result = ("100%" not in out) == expect_ping_pass
- if not result:
- self.dut.log.info(out)
- return result
-
- def test_mdns_query_ipv4_only(self):
- """Verify mdns query work in ipv4 only network."""
- self.openwrt.network_setting.disable_ipv6()
- self.openwrt.network_setting.setup_mdns()
- wutils.wifi_connect(self.dut, self.wifi_network)
- asserts.assert_true(self.verify_ping("openwrt.local"),
- "Fail to ping openwrt.local.")
-
- def test_mdns_query_ipv4_ipv6(self):
- """Verify mdns query work in ipv4 & ipv6 network."""
- self.openwrt.network_setting.enable_ipv6()
- self.openwrt.network_setting.setup_mdns()
- wutils.wifi_connect(self.dut, self.wifi_network)
- asserts.assert_true(self.verify_ping("openwrt.local"),
- "Fail to ping openwrt.local.")
-
diff --git a/acts_tests/tests/google/tel/live/TelLiveStressTest.py b/acts_tests/tests/google/tel/live/TelLiveStressTest.py
index 80d3e7e..9351210 100644
--- a/acts_tests/tests/google/tel/live/TelLiveStressTest.py
+++ b/acts_tests/tests/google/tel/live/TelLiveStressTest.py
@@ -28,7 +28,6 @@
from acts.libs.proc import job
from acts.test_decorators import test_tracker_info
from acts_contrib.test_utils.tel.loggers.telephony_metric_logger import TelephonyMetricLogger
-from acts_contrib.test_utils.tel.loggers.telephony_stress_metric_logger import TelephonyStressMetricLogger
from acts_contrib.test_utils.tel.TelephonyBaseTest import TelephonyBaseTest
from acts_contrib.test_utils.tel.tel_defines import CAPABILITY_VOLTE
from acts_contrib.test_utils.tel.tel_defines import CAPABILITY_WFC
@@ -110,19 +109,6 @@
EXCEPTION_TOLERANCE = 5
BINDER_LOGS = ["/sys/kernel/debug/binder"]
DEFAULT_FILE_DOWNLOADS = ["1MB", "5MB", "10MB", "20MB", "50MB"]
-RESULTS_LIST = {-2: "UNAVAILABLE_NETWORK_TYPE",
- -1: "CALL_SETUP_FAILURE",
- 0: "SUCCESS",
- 1: "INITIATE_FAILED",
- 2: "NO_RING_EVENT_OR_ANSWER_FAILED",
- 3: "NO_CALL_ID_FOUND",
- 4: "CALL_STATE_NOT_ACTIVE_DURING_ESTABLISHMENT",
- 5: "AUDIO_STATE_NOT_INCALL_DURING_ESTABLISHMENT",
- 6: "AUDIO_STATE_NOT_INCALL_AFTER_CONNECTED",
- 7: "CALL_DROP_OR_WRONG_STATE_DURING_ESTABLISHMENT",
- 8: "CALL_DROP_OR_WRONG_STATE_AFTER_CONNECTED",
- 9: "CALL_HANGUP_FAIL",
- 10: "CALL_ID_CLEANUP_FAIL"}
class TelLiveStressTest(TelephonyBaseTest):
@@ -805,7 +791,6 @@
self.log.error("Too many exception errors, quit test")
return False
self.log.info("%s", dict(self.result_info))
- self.tel_logger.set_result(self.result_collection)
if any([
self.result_info["Call Setup Failure"],
self.result_info["Call Maintenance Failure"],