Merge "Bluetooth OTA test" into oc-dev
diff --git a/acts/framework/acts/controllers/access_point.py b/acts/framework/acts/controllers/access_point.py
index 81bde09..a8ea506 100755
--- a/acts/framework/acts/controllers/access_point.py
+++ b/acts/framework/acts/controllers/access_point.py
@@ -16,8 +16,8 @@
import collections
import ipaddress
-import logging
+from acts.controllers.ap_lib import bridge_interface
from acts.controllers.ap_lib import dhcp_config
from acts.controllers.ap_lib import dhcp_server
from acts.controllers.ap_lib import hostapd
@@ -90,6 +90,9 @@
_AP_5GHZ_SUBNET_STR = '192.168.9.0/24'
_AP_2GHZ_SUBNET = dhcp_config.Subnet(ipaddress.ip_network(_AP_2GHZ_SUBNET_STR))
_AP_5GHZ_SUBNET = dhcp_config.Subnet(ipaddress.ip_network(_AP_5GHZ_SUBNET_STR))
+LAN_INTERFACE = 'eth1'
+# The last digit of the ip for the bridge interface
+BRIDGE_IP_LAST = '100'
class AccessPoint(object):
@@ -116,6 +119,7 @@
# A map from network interface name to _ApInstance objects representing
# the hostapd instance running against the interface.
self._aps = dict()
+ self.bridge = bridge_interface.BridgeInterface(self.ssh)
def start_ap(self, hostapd_config, additional_parameters=None):
"""Starts as an ap using a set of configurations.
@@ -306,3 +310,29 @@
if self._aps:
self.stop_all_aps()
self.ssh.close()
+
+ def generate_bridge_configs(self, channel, iface_lan=LAN_INTERFACE):
+ """Generate a list of configs for a bridge between LAN and WLAN.
+
+ Args:
+ channel: the channel WLAN interface is brought up on
+ iface_lan: the LAN interface to bridge
+ Returns:
+ configs: tuple containing iface_wlan, iface_lan and bridge_ip
+ """
+
+ if channel < 15:
+ iface_wlan = _AP_2GHZ_INTERFACE
+ subnet_str = _AP_2GHZ_SUBNET_STR
+ else:
+ iface_wlan = _AP_5GHZ_INTERFACE
+ subnet_str = _AP_5GHZ_SUBNET_STR
+
+ iface_lan = iface_lan
+
+ a, b, c, d = subnet_str.strip('/24').split('.')
+ bridge_ip = "%s.%s.%s.%s" % (a, b, c, BRIDGE_IP_LAST)
+
+ configs = (iface_wlan, iface_lan, bridge_ip)
+
+ return configs
diff --git a/acts/framework/acts/controllers/ap_lib/bridge_interface.py b/acts/framework/acts/controllers/ap_lib/bridge_interface.py
new file mode 100644
index 0000000..af3d072
--- /dev/null
+++ b/acts/framework/acts/controllers/ap_lib/bridge_interface.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - Google, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+from acts.libs.proc import job
+
+# TODO(@qijiang): will change to brctl when it's built in image
+_BRCTL = '/home/root/bridge-utils/sbin/brctl'
+BRIDGE_NAME = 'br0'
+CREATE_BRIDGE = '%s addbr %s' % (_BRCTL, BRIDGE_NAME)
+DELETE_BRIDGE = '%s delbr %s' % (_BRCTL, BRIDGE_NAME)
+BRING_DOWN_BRIDGE = 'ifconfig %s down' % BRIDGE_NAME
+
+
+class BridgeInterfaceConfigs(object):
+ """Configs needed for creating bridge interface between LAN and WLAN.
+
+ """
+
+ def __init__(self, iface_wlan, iface_lan, bridge_ip):
+ """Set bridge interface configs based on the channel info.
+
+ Args:
+ iface_wlan: the wlan interface as part of the bridge
+ iface_lan: the ethernet LAN interface as part of the bridge
+ bridge_ip: the ip address assigned to the bridge interface
+ """
+ self.iface_wlan = iface_wlan
+ self.iface_lan = iface_lan
+ self.bridge_ip = bridge_ip
+
+
+class BridgeInterface(object):
+ """Class object for bridge interface betwen WLAN and LAN
+
+ """
+
+ def __init__(self, ssh_session):
+ """Initialize the BridgeInterface class.
+
+ Bridge interface will be added between ethernet LAN port and WLAN port.
+ Args:
+ ssh_session: ssh session to the AP
+ """
+ self.ssh = ssh_session
+ self.log = logging.getLogger()
+
+ def startup(self, brconfigs):
+ """Start up the bridge interface.
+
+ Args:
+ brconfigs: the bridge interface config, type BridgeInterfaceConfigs
+ """
+
+ self.log.info('Create bridge interface between LAN and WLAN')
+ # Create the bridge
+ try:
+ self.ssh.run(CREATE_BRIDGE)
+ except job.Error:
+ self.log.warning(
+ 'Bridge interface {} already exists, no action needed'.format(
+ BRIDGE_NAME))
+
+ # Enable 4addr mode on for the wlan interface
+ ENABLE_4ADDR = 'iw dev %s set 4addr on' % (brconfigs.iface_wlan)
+ try:
+ self.ssh.run(ENABLE_4ADDR)
+ except job.Error:
+ self.log.warning(
+ '4addr is already enabled on {}'.format(brconfigs.iface_wlan))
+
+ # Add both LAN and WLAN interfaces to the bridge interface
+ for interface in [brconfigs.iface_lan, brconfigs.iface_wlan]:
+ ADD_INTERFACE = '%s addif %s %s' % (_BRCTL, BRIDGE_NAME, interface)
+ try:
+ self.ssh.run(ADD_INTERFACE)
+ except job.Error:
+ self.log.warning('{} has alrady been added to {}'.format(
+ interface, BRIDGE_NAME))
+ time.sleep(5)
+
+ # Set IP address on the bridge interface to bring it up
+ SET_BRIDGE_IP = 'ifconfig %s %s' % (BRIDGE_NAME, brconfigs.bridge_ip)
+ self.ssh.run(SET_BRIDGE_IP)
+ time.sleep(2)
+
+ # Bridge interface is up
+ self.log.info('Bridge interface is up and running')
+
+ def teardown(self, brconfigs):
+ """Tear down the bridge interface.
+
+ Args:
+ brconfigs: the bridge interface config, type BridgeInterfaceConfigs
+ """
+ self.log.info('Bringing down the bridge interface')
+ # Delete the bridge interface
+ self.ssh.run(BRING_DOWN_BRIDGE)
+ time.sleep(1)
+ self.ssh.run(DELETE_BRIDGE)
+
+ # Bring down wlan interface and disable 4addr mode
+ BRING_DOWN_WLAN = 'ifconfig %s down' % brconfigs.iface_wlan
+ self.ssh.run(BRING_DOWN_WLAN)
+ time.sleep(2)
+ DISABLE_4ADDR = 'iw dev %s set 4addr off' % (brconfigs.iface_wlan)
+ self.ssh.run(DISABLE_4ADDR)
+ time.sleep(1)
+ self.log.info('Bridge interface is down')
diff --git a/acts/framework/acts/controllers/ap_lib/hostapd_security.py b/acts/framework/acts/controllers/ap_lib/hostapd_security.py
index 4e1ae3a..9733e99 100644
--- a/acts/framework/acts/controllers/ap_lib/hostapd_security.py
+++ b/acts/framework/acts/controllers/ap_lib/hostapd_security.py
@@ -64,14 +64,15 @@
else:
security_mode = None
self.security_mode = security_mode
- if len(password) < hostapd_constants.MIN_WPA_PSK_LENGTH or len(
- password) > hostapd_constants.MAX_WPA_PSK_LENGTH:
- raise ValueError(
- 'Password must be a minumum of %s characters and a maximum of %s'
- % (hostapd_constants.MIN_WPA_PSK_LENGTH,
- hostapd_constants.MAX_WPA_PSK_LENGTH))
- else:
- self.password = password
+ if password:
+ if len(password) < hostapd_constants.MIN_WPA_PSK_LENGTH or len(
+ password) > hostapd_constants.MAX_WPA_PSK_LENGTH:
+ raise ValueError(
+ 'Password must be a minumum of %s characters and a maximum of %s'
+ % (hostapd_constants.MIN_WPA_PSK_LENGTH,
+ hostapd_constants.MAX_WPA_PSK_LENGTH))
+ else:
+ self.password = password
def generate_dict(self):
"""Returns: an ordered dictionary of settings"""
diff --git a/acts/framework/acts/controllers/packet_sender.py b/acts/framework/acts/controllers/packet_sender.py
index bdeac41..6b3898a 100644
--- a/acts/framework/acts/controllers/packet_sender.py
+++ b/acts/framework/acts/controllers/packet_sender.py
@@ -42,6 +42,17 @@
DHCP_OFFER_SRC_PORT = 67
DHCP_OFFER_DST_PORT = 68
DHCP_TRANS_ID = 0x01020304
+DNS_LEN = 3
+PING6_DATA = 'BEST PING6 EVER'
+PING4_TYPE = 8
+MDNS_TTL = 255
+MDNS_QTYPE = 'PTR'
+MDNS_UDP_PORT = 5353
+MDNS_V4_IP_DST = '224.0.0.251'
+MDNS_V4_MAC_DST = '01:00:5E:00:00:FB'
+MDNS_RECURSIVE = 1
+MDNS_V6_IP_DST = 'FF02::FB'
+MDNS_V6_MAC_DST = '33:33:00:00:00:FB'
def create(configs):
@@ -479,15 +490,22 @@
else:
self.src_ipv6 = config_params['src_ipv6']
- def generate(self, lifetime, ip_dst=None, eth_dst=None):
+ def generate(self,
+ lifetime,
+ enableDNS=False,
+ dns_lifetime=0,
+ ip_dst=None,
+ eth_dst=None):
"""Generates a Router Advertisement (RA) packet (ICMP over IPv6).
Args:
lifetime: RA lifetime
- ip_dst: IPv6 destination address (layer 3)
+ enableDNS: Add RDNSS option to RA (Optional)
+ dns_lifetime: Set DNS server lifetime (Optional)
+ ip_dst: IPv6 destination address (Optional)
eth_dst: Ethernet (layer 2) destination address (Optional)
"""
- # Compute addresses
+ # Overwrite standard fields if desired
ip6_dst = (ip_dst if ip_dst is not None else RA_IP)
hw_dst = (eth_dst if eth_dst is not None else RA_MAC)
@@ -497,10 +515,266 @@
src_ll_addr = scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.src_mac)
prefix = scapy.ICMPv6NDOptPrefixInfo(
prefixlen=RA_PREFIX_LEN, prefix=RA_PREFIX)
- ip6 = base / router_solicitation / src_ll_addr / prefix
+ if enableDNS:
+ rndss = scapy.ICMPv6NDOptRDNSS(
+ lifetime=dns_lifetime, dns=[self.src_ipv6], len=DNS_LEN)
+ ip6 = base / router_solicitation / src_ll_addr / prefix / rndss
+ else:
+ ip6 = base / router_solicitation / src_ll_addr / prefix
# Create Ethernet layer
ethernet = scapy.Ether(src=self.src_mac, dst=hw_dst)
self.packet = ethernet / ip6
return self.packet
+
+
+class Ping6Generator(object):
+ """Creates a custom Ping v6 packet (i.e., ICMP over IPv6)
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ dst_mac: MAC address (Layer 2) of the destination node
+ src_ipv6_type: IPv6 source address type (e.g., Link Local, Global, etc)
+ src_ipv6: IPv6 address (Layer 3) of the source node
+ dst_ipv6: IPv6 address (Layer 3) of the destination node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ self.dst_mac = config_params['dst_mac']
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ self.dst_ipv6 = config_params['dst_ipv6']
+ self.src_ipv6_type = config_params['src_ipv6_type']
+ if config_params['src_ipv6'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv6 = wputils.get_if_addr6(interf, self.src_ipv6_type)
+ else:
+ self.src_ipv6 = config_params['src_ipv6']
+
+ def generate(self, ip_dst=None, eth_dst=None):
+ """Generates a Ping6 packet (i.e., Echo Request)
+
+ Args:
+ ip_dst: IPv6 destination address (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+ # Overwrite standard fields if desired
+ ip6_dst = (ip_dst if ip_dst is not None else self.dst_ipv6)
+ hw_dst = (eth_dst if eth_dst is not None else self.dst_mac)
+
+ # Create IPv6 layer
+ base = scapy.IPv6(dst=ip6_dst, src=self.src_ipv6)
+ echo_request = scapy.ICMPv6EchoRequest(data=PING6_DATA)
+
+ ip6 = base / echo_request
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(src=self.src_mac, dst=hw_dst)
+
+ self.packet = ethernet / ip6
+ return self.packet
+
+
+class Ping4Generator(object):
+ """Creates a custom Ping v4 packet (i.e., ICMP over IPv4)
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ dst_mac: MAC address (Layer 2) of the destination node
+ src_ipv4: IPv4 address (Layer 3) of the source node
+ dst_ipv4: IPv4 address (Layer 3) of the destination node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ self.dst_mac = config_params['dst_mac']
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ self.dst_ipv4 = config_params['dst_ipv4']
+ if config_params['src_ipv4'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv4 = scapy.get_if_addr(interf)
+ else:
+ self.src_ipv4 = config_params['src_ipv4']
+
+ def generate(self, ip_dst=None, eth_dst=None):
+ """Generates a Ping4 packet (i.e., Echo Request)
+
+ Args:
+ ip_dst: IP destination address (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+
+ # Overwrite standard fields if desired
+ sta_ip = (ip_dst if ip_dst is not None else self.dst_ipv4)
+ sta_hw = (eth_dst if eth_dst is not None else self.dst_mac)
+
+ # Create IPv6 layer
+ base = scapy.IP(src=self.src_ipv4, dst=sta_ip)
+ echo_request = scapy.ICMP(type=PING4_TYPE)
+
+ ip4 = base / echo_request
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(src=self.src_mac, dst=sta_hw)
+
+ self.packet = ethernet / ip4
+ return self.packet
+
+
+class Mdns6Generator(object):
+ """Creates a custom mDNS IPv6 packet
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ src_ipv6_type: IPv6 source address type (e.g., Link Local, Global, etc)
+ src_ipv6: IPv6 address (Layer 3) of the source node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ self.src_ipv6_type = config_params['src_ipv6_type']
+ if config_params['src_ipv6'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv6 = wputils.get_if_addr6(interf, self.src_ipv6_type)
+ else:
+ self.src_ipv6 = config_params['src_ipv6']
+
+ def generate(self, ip_dst=None, eth_dst=None):
+ """Generates a mDNS v6 packet for multicast DNS config
+
+ Args:
+ ip_dst: IPv6 destination address (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+
+ # Overwrite standard fields if desired
+ sta_ip = (ip_dst if ip_dst is not None else MDNS_V6_IP_DST)
+ sta_hw = (eth_dst if eth_dst is not None else MDNS_V6_MAC_DST)
+
+ # Create mDNS layer
+ qdServer = scapy.DNSQR(qname=self.src_ipv6, qtype=MDNS_QTYPE)
+ mDNS = scapy.DNS(rd=MDNS_RECURSIVE, qd=qdServer)
+
+ # Create UDP
+ udp = scapy.UDP(sport=MDNS_UDP_PORT, dport=MDNS_UDP_PORT)
+
+ # Create IP layer
+ ip6 = scapy.IPv6(src=self.src_ipv6, dst=sta_ip)
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(src=self.src_mac, dst=sta_hw)
+
+ self.packet = ethernet / ip6 / udp / mDNS
+ return self.packet
+
+
+class Mdns4Generator(object):
+ """Creates a custom mDNS v4 packet
+
+ Attributes:
+ packet: desired built custom packet
+ src_mac: MAC address (Layer 2) of the source node
+ src_ipv4: IPv4 address (Layer 3) of the source node
+ """
+
+ def __init__(self, **config_params):
+ """Initialize the class with the required network and packet params.
+
+ Args:
+ config_params: contains all the necessary packet parameters.
+ Some fields can be generated automatically. For example:
+ {'subnet_mask': '255.255.255.0',
+ 'dst_ipv4': '192.168.1.3',
+ 'src_ipv4: 'get_local', ...
+ The key can also be 'get_local' which means the code will read
+ and use the local interface parameters
+ """
+ interf = config_params['interf']
+ self.packet = None
+ if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_mac = scapy.get_if_hwaddr(interf)
+ else:
+ self.src_mac = config_params['src_mac']
+
+ if config_params['src_ipv4'] == GET_FROM_LOCAL_INTERFACE:
+ self.src_ipv4 = scapy.get_if_addr(interf)
+ else:
+ self.src_ipv4 = config_params['src_ipv4']
+
+ def generate(self, ip_dst=None, eth_dst=None):
+ """Generates a mDNS v4 packet for multicast DNS config
+
+ Args:
+ ip_dst: IP destination address (Optional)
+ eth_dst: Ethernet (layer 2) destination address (Optional)
+ """
+
+ # Overwrite standard fields if desired
+ sta_ip = (ip_dst if ip_dst is not None else MDNS_V4_IP_DST)
+ sta_hw = (eth_dst if eth_dst is not None else MDNS_V4_MAC_DST)
+
+ # Create mDNS layer
+ qdServer = scapy.DNSQR(qname=self.src_ipv4, qtype=MDNS_QTYPE)
+ mDNS = scapy.DNS(rd=MDNS_RECURSIVE, qd=qdServer)
+
+ # Create UDP
+ udp = scapy.UDP(sport=MDNS_UDP_PORT, dport=MDNS_UDP_PORT)
+
+ # Create IP layer
+ ip4 = scapy.IP(src=self.src_ipv4, dst=sta_ip, ttl=255)
+
+ # Create Ethernet layer
+ ethernet = scapy.Ether(src=self.src_mac, dst=sta_hw)
+
+ self.packet = ethernet / ip4 / udp / mDNS
+ return self.packet
diff --git a/acts/framework/acts/libs/ota/__init__.py b/acts/framework/acts/libs/ota/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/acts/framework/acts/libs/ota/__init__.py
diff --git a/acts/framework/acts/libs/ota/ota_runners/__init__.py b/acts/framework/acts/libs/ota/ota_runners/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_runners/__init__.py
diff --git a/acts/framework/acts/libs/ota/ota_runners/ota_runner.py b/acts/framework/acts/libs/ota/ota_runners/ota_runner.py
new file mode 100644
index 0000000..dd58943
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_runners/ota_runner.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+
+SL4A_SERVICE_SETUP_TIME = 5
+
+
+class OtaError(Exception):
+ """Raised when an error in the OTA Update process occurs."""
+
+
+class OtaRunner(object):
+ """The base class for all OTA Update Runners."""
+
+ def __init__(self, ota_tool, android_device):
+ self.ota_tool = ota_tool
+ self.android_device = android_device
+ self.serial = self.android_device.serial
+
+ def _update(self):
+ logging.info('Stopping services.')
+ self.android_device.stop_services()
+ logging.info('Beginning tool.')
+ self.ota_tool.update(self)
+ logging.info('Tool finished. Waiting for boot completion.')
+ self.android_device.wait_for_boot_completion()
+ logging.info('Boot completed. Rooting adb.')
+ self.android_device.root_adb()
+ logging.info('Root complete. Installing new SL4A.')
+ output = self.android_device.adb.install('-r %s' % self.get_sl4a_apk)
+ logging.info('SL4A install output: %s' % output)
+ time.sleep(SL4A_SERVICE_SETUP_TIME)
+ logging.info('Starting services.')
+ self.android_device.start_services()
+ logging.info('Services started. Running ota tool cleanup.')
+ self.ota_tool.cleanup(self)
+ logging.info('Cleanup complete.')
+
+ def can_update(self):
+ """Whether or not an update package is available for the device."""
+ return NotImplementedError()
+
+ def get_ota_package(self):
+ raise NotImplementedError()
+
+ def get_sl4a_apk(self):
+ raise NotImplementedError()
+
+
+class SingleUseOtaRunner(OtaRunner):
+ """A single use OtaRunner.
+
+ SingleUseOtaRunners can only be ran once. If a user attempts to run it more
+ than once, an error will be thrown. Users can avoid the error by checking
+ can_update() before calling update().
+ """
+
+ def __init__(self, ota_tool, android_device, ota_package, sl4a_apk):
+ super(SingleUseOtaRunner, self).__init__(ota_tool, android_device)
+ self._ota_package = ota_package
+ self._sl4a_apk = sl4a_apk
+ self._called = False
+
+ def can_update(self):
+ return not self._called
+
+ def update(self):
+ """Starts the update process."""
+ if not self.can_update():
+ raise OtaError('A SingleUseOtaTool instance cannot update a phone '
+ 'multiple times.')
+ self._called = True
+ self._update()
+
+ def get_ota_package(self):
+ return self._ota_package
+
+ def get_sl4a_apk(self):
+ return self._sl4a_apk
+
+
+class MultiUseOtaRunner(OtaRunner):
+ """A multiple use OtaRunner.
+
+ MultiUseOtaRunner can only be ran for as many times as there have been
+ packages provided to them. If a user attempts to run it more than the number
+ of provided packages, an error will be thrown. Users can avoid the error by
+ checking can_update() before calling update().
+ """
+
+ def __init__(self, ota_tool, android_device, ota_packages, sl4a_apks):
+ super(MultiUseOtaRunner, self).__init__(ota_tool, android_device)
+ self._ota_packages = ota_packages
+ self._sl4a_apks = sl4a_apks
+ self.current_update_number = 0
+
+ def can_update(self):
+ return not self.current_update_number == len(self._ota_packages)
+
+ def update(self):
+ """Starts the update process."""
+ if not self.can_update():
+ raise OtaError('This MultiUseOtaRunner has already updated all '
+ 'given packages onto the phone.')
+ self._update()
+ self.current_update_number += 1
+
+ def get_ota_package(self):
+ return self._ota_packages[self.current_update_number]
+
+ def get_sl4a_apk(self):
+ return self._sl4a_apks[self.current_update_number]
diff --git a/acts/framework/acts/libs/ota/ota_runners/ota_runner_factory.py b/acts/framework/acts/libs/ota/ota_runners/ota_runner_factory.py
new file mode 100644
index 0000000..fa6ab19
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_runners/ota_runner_factory.py
@@ -0,0 +1,204 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from acts.config_parser import ActsConfigError
+from acts.libs.ota.ota_runners import ota_runner
+from acts.libs.ota.ota_tools import ota_tool_factory
+from acts.libs.ota.ota_tools import adb_sideload_ota_tool
+
+_bound_devices = {}
+
+DEFAULT_OTA_TOOL = adb_sideload_ota_tool.AdbSideloadOtaTool.__name__
+DEFAULT_OTA_COMMAND = 'adb'
+
+
+def create_all_from_configs(config, android_devices):
+ """Creates a new OtaTool for each given AndroidDevice.
+
+ After an OtaTool is assigned to a device, another OtaTool cannot be created
+ for that device. This will prevent OTA Update tests that accidentally flash
+ the same build onto a device more than once.
+
+ Args:
+ config: the ACTS config user_params.
+ android_devices: The devices to run an OTA Update on.
+
+ Returns:
+ A list of OtaRunners responsible for updating the given devices. The
+ indexes match the indexes of the corresponding AndroidDevice in
+ android_devices.
+ """
+ return [create_from_configs(config, ad) for ad in android_devices]
+
+
+def create_from_configs(config, android_device):
+ """Creates a new OtaTool for the given AndroidDevice.
+
+ After an OtaTool is assigned to a device, another OtaTool cannot be created
+ for that device. This will prevent OTA Update tests that accidentally flash
+ the same build onto a device more than once.
+
+ Args:
+ config: the ACTS config user_params.
+ android_device: The device to run the OTA Update on.
+
+ Returns:
+ An OtaRunner responsible for updating the given device.
+ """
+ # Default to adb sideload
+ try:
+ ota_tool_class_name = get_ota_value_from_config(
+ config, 'ota_tool', android_device)
+ except ActsConfigError:
+ ota_tool_class_name = DEFAULT_OTA_TOOL
+
+ if ota_tool_class_name not in config:
+ if ota_tool_class_name is not DEFAULT_OTA_TOOL:
+ raise ActsConfigError(
+ 'If the ota_tool is overloaded, the path to the tool must be '
+ 'added to the ACTS config file under {"OtaToolName": '
+ '"path/to/tool"} (in this case, {"%s": "path/to/tool"}.' %
+ ota_tool_class_name)
+ else:
+ command = DEFAULT_OTA_COMMAND
+ else:
+ command = config[ota_tool_class_name]
+ if type(command) is list:
+ # If file came as a list in the config.
+ if len(command) == 1:
+ command = command[0]
+ else:
+ raise ActsConfigError(
+ 'Config value for "%s" must be either a string or a list '
+ 'of exactly one element' % ota_tool_class_name)
+
+ ota_package = get_ota_value_from_config(config, 'ota_package',
+ android_device)
+ ota_sl4a = get_ota_value_from_config(config, 'ota_sl4a', android_device)
+ if type(ota_sl4a) != type(ota_package):
+ raise ActsConfigError(
+ 'The ota_package and ota_sl4a must either both be strings, or '
+ 'both be lists. Device with serial "%s" has mismatched types.' %
+ android_device.serial)
+ return create(ota_package, ota_sl4a, android_device, ota_tool_class_name,
+ command)
+
+
+def create(ota_package,
+ ota_sl4a,
+ android_device,
+ ota_tool_class_name=DEFAULT_OTA_TOOL,
+ command=DEFAULT_OTA_COMMAND,
+ use_cached_runners=True):
+ """
+ Args:
+ ota_package: A string or list of strings corresponding to the
+ update.zip package location(s) for running an OTA update.
+ ota_sl4a: A string or list of strings corresponding to the
+ sl4a.apk package location(s) for running an OTA update.
+ ota_tool_class_name: The class name for the desired ota_tool
+ command: The command line tool name for the updater
+ android_device: The AndroidDevice to run the OTA Update on.
+ use_cached_runners: Whether or not to use runners cached by previous
+ create calls.
+
+ Returns:
+ An OtaRunner with the given properties from the arguments.
+ """
+ ota_tool = ota_tool_factory.create(ota_tool_class_name, command)
+ return create_from_package(ota_package, ota_sl4a, android_device, ota_tool,
+ use_cached_runners)
+
+
+def create_from_package(ota_package,
+ ota_sl4a,
+ android_device,
+ ota_tool,
+ use_cached_runners=True):
+ """
+ Args:
+ ota_package: A string or list of strings corresponding to the
+ update.zip package location(s) for running an OTA update.
+ ota_sl4a: A string or list of strings corresponding to the
+ sl4a.apk package location(s) for running an OTA update.
+ ota_tool: The OtaTool to be paired with the returned OtaRunner
+ android_device: The AndroidDevice to run the OTA Update on.
+ use_cached_runners: Whether or not to use runners cached by previous
+ create calls.
+
+ Returns:
+ An OtaRunner with the given properties from the arguments.
+ """
+ if android_device in _bound_devices and use_cached_runners:
+ logging.warning('Android device %s has already been assigned an '
+ 'OtaRunner. Returning previously created runner.')
+ return _bound_devices[android_device]
+
+ if type(ota_package) != type(ota_sl4a):
+ raise TypeError(
+ 'The ota_package and ota_sl4a must either both be strings, or '
+ 'both be lists. Device with serial "%s" has requested mismatched '
+ 'types.' % android_device.serial)
+
+ if type(ota_package) is str:
+ runner = ota_runner.SingleUseOtaRunner(ota_tool, android_device,
+ ota_package, ota_sl4a)
+ elif type(ota_package) is list:
+ runner = ota_runner.MultiUseOtaRunner(ota_tool, android_device,
+ ota_package, ota_sl4a)
+ else:
+ raise TypeError('The "ota_package" value in the acts config must be '
+ 'either a list or a string.')
+
+ _bound_devices[android_device] = runner
+ return runner
+
+
+def get_ota_value_from_config(config, key, android_device):
+ """Returns a key for the given AndroidDevice.
+
+ Args:
+ config: The ACTS config
+ key: The base key desired (ota_tool, ota_sl4a, or ota_package)
+ android_device: An AndroidDevice
+
+ Returns: The value at the specified key.
+ Throws: ActsConfigError if the value cannot be determined from the config.
+ """
+ suffix = ''
+ if 'ota_map' in config:
+ if android_device.serial in config['ota_map']:
+ suffix = '_%s' % config['ota_map'][android_device.serial]
+
+ ota_package_key = '%s%s' % (key, suffix)
+ if ota_package_key not in config:
+ if suffix is not '':
+ raise ActsConfigError(
+ 'Asked for an OTA Update without specifying a required value. '
+ '"ota_map" has entry {"%s": "%s"}, but there is no '
+ 'corresponding entry {"%s":"/path/to/file"} found within the '
+ 'ACTS config.' % (android_device.serial, suffix[1:],
+ ota_package_key))
+ else:
+ raise ActsConfigError(
+ 'Asked for an OTA Update without specifying a required value. '
+ '"ota_map" does not exist or have a key for serial "%s", and '
+ 'the default value entry "%s" cannot be found within the ACTS '
+ 'config.' % (android_device.serial, ota_package_key))
+
+ return config[ota_package_key]
diff --git a/acts/framework/acts/libs/ota/ota_tools/__init__.py b/acts/framework/acts/libs/ota/ota_tools/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_tools/__init__.py
diff --git a/acts/framework/acts/libs/ota/ota_tools/adb_sideload_ota_tool.py b/acts/framework/acts/libs/ota/ota_tools/adb_sideload_ota_tool.py
new file mode 100644
index 0000000..f94a762
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_tools/adb_sideload_ota_tool.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from acts.libs.ota.ota_tools.ota_tool import OtaTool
+
+# OTA Packages can be upwards of 1 GB. This may take some time to transfer over
+# USB 2.0.
+PUSH_TIMEOUT = 10 * 60
+
+
+class AdbSideloadOtaTool(OtaTool):
+ """Updates an AndroidDevice using adb sideload."""
+
+ def __init__(self, ignored_command):
+ # "command" is ignored. The ACTS adb version is used to prevent
+ # differing adb versions from constantly killing adbd.
+ super(AdbSideloadOtaTool, self).__init__(ignored_command)
+
+ def update(self, ota_runner):
+ logging.info('Rooting adb')
+ ota_runner.android_device.root_adb()
+ logging.info('Rebooting to sideload')
+ ota_runner.android_device.adb.reboot('sideload')
+ ota_runner.android_device.adb.wait_for_sideload()
+ logging.info('Sideloading ota package')
+ package_path = ota_runner.get_ota_package()
+ logging.info('Running adb sideload with package "%s"' % package_path)
+ sideload_result = ota_runner.android_device.adb.sideload(
+ package_path, timeout=PUSH_TIMEOUT)
+ logging.info('Sideload output: %s' % sideload_result)
+ logging.info('Sideload complete. Waiting for device to come back up.')
+ ota_runner.android_device.adb.wait_for_recovery()
+ ota_runner.android_device.adb.reboot()
+ logging.info('Device is up. Update complete.')
diff --git a/acts/framework/acts/libs/ota/ota_tools/ota_tool.py b/acts/framework/acts/libs/ota/ota_tools/ota_tool.py
new file mode 100644
index 0000000..e51fe6b
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_tools/ota_tool.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class OtaTool(object):
+ """A Wrapper for an OTA Update command or tool.
+
+ Each OtaTool acts as a facade to the underlying command or tool used to
+ update the device.
+ """
+
+ def __init__(self, command):
+ """Creates an OTA Update tool with the given properties.
+
+ Args:
+ command: A string that is used as the command line tool
+ """
+ self.command = command
+
+ def update(self, ota_runner):
+ """Begins the OTA Update. Returns after the update has installed.
+
+ Args:
+ ota_runner: The OTA Runner that handles the device information.
+ """
+ raise NotImplementedError()
+
+ def cleanup(self, ota_runner):
+ """A cleanup method for the OTA Tool to run after the update completes.
+
+ Args:
+ ota_runner: The OTA Runner that handles the device information.
+ """
+ pass
diff --git a/acts/framework/acts/libs/ota/ota_tools/ota_tool_factory.py b/acts/framework/acts/libs/ota/ota_tools/ota_tool_factory.py
new file mode 100644
index 0000000..ac81646
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_tools/ota_tool_factory.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts.libs.ota.ota_tools.adb_sideload_ota_tool import AdbSideloadOtaTool
+from acts.libs.ota.ota_tools.update_device_ota_tool import UpdateDeviceOtaTool
+
+_CONSTRUCTORS = {
+ AdbSideloadOtaTool.__name__: lambda command: AdbSideloadOtaTool(command),
+ UpdateDeviceOtaTool.__name__: lambda command: UpdateDeviceOtaTool(command),
+}
+_constructed_tools = {}
+
+
+def create(ota_tool_class, command):
+ """Returns an OtaTool with the given class name.
+
+ If the tool has already been created, the existing instance will be
+ returned.
+
+ Args:
+ ota_tool_class: the class/type of the tool you wish to use.
+ command: the command line tool being used.
+
+ Returns:
+ An OtaTool.
+ """
+ if ota_tool_class in _constructed_tools:
+ return _constructed_tools[ota_tool_class]
+
+ if ota_tool_class not in _CONSTRUCTORS:
+ raise KeyError('Given Ota Tool class name does not match a known '
+ 'name. Found "%s". Expected any of %s. If this tool '
+ 'does exist, add it to the _CONSTRUCTORS dict in this '
+ 'module.' % (ota_tool_class, _CONSTRUCTORS.keys()))
+
+ new_update_tool = _CONSTRUCTORS[ota_tool_class](command)
+ _constructed_tools[ota_tool_class] = new_update_tool
+
+ return new_update_tool
diff --git a/acts/framework/acts/libs/ota/ota_tools/update_device_ota_tool.py b/acts/framework/acts/libs/ota/ota_tools/update_device_ota_tool.py
new file mode 100644
index 0000000..0ab9091
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_tools/update_device_ota_tool.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import shutil
+import tempfile
+
+from acts.libs.ota.ota_tools import ota_tool
+from acts.libs.proc import job
+from acts import utils
+
+# OTA Packages can be upwards of 1 GB. This may take some time to transfer over
+# USB 2.0. A/B devices must also complete the update in the background.
+UPDATE_TIMEOUT = 20 * 60
+UPDATE_LOCATION = '/data/ota_package/update.zip'
+
+
+class UpdateDeviceOtaTool(ota_tool.OtaTool):
+ """Runs an OTA Update with system/update_engine/scripts/update_device.py."""
+
+ def __init__(self, command):
+ super(UpdateDeviceOtaTool, self).__init__(command)
+
+ self.unzip_path = tempfile.mkdtemp()
+ utils.unzip_maintain_permissions(self.command, self.unzip_path)
+
+ self.command = os.path.join(self.unzip_path, 'update_device.py')
+
+ def update(self, ota_runner):
+ logging.info('Forcing adb to be in root mode.')
+ ota_runner.android_device.root_adb()
+ update_command = '%s -s %s %s' % (self.command, ota_runner.serial,
+ ota_runner.get_ota_package())
+ logging.info('Running %s' % update_command)
+ result = job.run(update_command, timeout=UPDATE_TIMEOUT)
+ logging.info('Output: %s' % result.stdout)
+
+ logging.info('Rebooting device for update to go live.')
+ ota_runner.android_device.adb.reboot()
+ logging.info('Reboot sent.')
+
+ def __del__(self):
+ """Delete the unzipped update_device folder before ACTS exits."""
+ shutil.rmtree(self.unzip_path)
diff --git a/acts/framework/acts/libs/ota/ota_updater.py b/acts/framework/acts/libs/ota/ota_updater.py
new file mode 100644
index 0000000..ed300aa
--- /dev/null
+++ b/acts/framework/acts/libs/ota/ota_updater.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts.libs.ota.ota_runners import ota_runner_factory
+
+# Maps AndroidDevices to OtaRunners
+ota_runners = {}
+
+
+def initialize(user_params, android_devices):
+ """Initialize OtaRunners for each device.
+
+ Args:
+ user_params: The user_params from the ACTS config.
+ android_devices: The android_devices in the test.
+ """
+ for ad in android_devices:
+ ota_runners[ad] = ota_runner_factory.create_from_configs(
+ user_params, ad)
+
+
+def _check_initialization(android_device):
+ """Check if a given device was initialized."""
+ if android_device not in ota_runners:
+ raise KeyError('Android Device with serial "%s" has not been '
+ 'initialized for OTA Updates. Did you forget to call'
+ 'ota_updater.initialize()?' % android_device.serial)
+
+
+def update(android_device, ignore_update_errors=False):
+ """Update a given AndroidDevice.
+
+ Args:
+ android_device: The device to update
+ ignore_update_errors: Whether or not to ignore update errors such as
+ no more updates available for a given device. Default is false.
+ Throws:
+ OtaError if ignore_update_errors is false and the OtaRunner has run out
+ of packages to update the phone with.
+ """
+ _check_initialization(android_device)
+ try:
+ ota_runners[android_device].update()
+ except:
+ if ignore_update_errors:
+ return
+ raise
+
+
+def can_update(android_device):
+ """Whether or not a device can be updated."""
+ _check_initialization(android_device)
+ return ota_runners[android_device].can_update()
diff --git a/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py b/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py
index f22e0b8..85fdba3 100644
--- a/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py
+++ b/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py
@@ -42,6 +42,7 @@
def __init__(self, controllers):
BtMetricsBaseTest.__init__(self, controllers)
+ self.ad = self.android_devices[0]
def on_fail(self, test_name, begin_time):
self._collect_bluetooth_manager_dumpsys_logs(self.android_devices)
@@ -57,93 +58,8 @@
if not bypass_setup_wizard(ad):
self.log.debug(
"Failed to bypass setup wizard, continuing test.")
- if not "bt_config" in self.user_params.keys():
- self.log.error("Missing mandatory user config \"bt_config\"!")
- return False
- bt_config_map_file = self.user_params["bt_config"]
- return self._setup_bt_config(bt_config_map_file)
-
- def _setup_bt_config(self, bt_config_map_file):
- bt_config_map = {}
- if not os.path.isfile(bt_config_map_file):
- bt_config_map_file = os.path.join(
- self.user_params[Config.key_config_path], bt_config_map_file)
- if not os.path.isfile(bt_config_map_file):
- self.log.error("Unable to load bt_config file {}.".format(
- bt_config_map_file))
- return False
- try:
- f = open(bt_config_map_file, 'r')
- bt_config_map = json.load(f)
- f.close()
- except FileNotFoundError:
- self.log.error("File not found: {}.".format(bt_config_map_file))
- return False
- # Connected devices return all upper case mac addresses.
- # Make the peripheral_info address attribute upper case
- # in order to ensure the BT mac addresses match.
- for serial in bt_config_map.keys():
- mac_address = bt_config_map[serial]["peripheral_info"][
- "address"].upper()
- bt_config_map[serial]["peripheral_info"]["address"] = mac_address
- for ad in self.android_devices:
- serial = ad.serial
-
- # Verify serial number in bt_config_map
- self.log.info("Verify serial number of Android device in config.")
- if serial not in bt_config_map.keys():
- self.log.error(
- "Missing android device serial {} in bt_config.".format(
- serial))
- return False
-
- # Push the bt_config.conf file to Android device
- if (not self._push_config_to_android_device(ad, bt_config_map,
- serial)):
- return False
-
- # Add music to the Android device
- if not self._add_music_to_android_device(ad):
- return False
-
- # Verify Bluetooth is enabled
- self.log.info("Verifying Bluetooth is enabled on Android Device.")
- if not bluetooth_enabled_check(ad):
- self.log.error("Failed to toggle on Bluetooth on device {}".
- format(serial))
- return False
-
- # Verify Bluetooth device is connected
- self.log.info(
- "Waiting up to 10 seconds for device to reconnect...")
- if not self._verify_bluetooth_device_is_connected(
- ad, bt_config_map, serial):
- self.device_fails_to_connect_list.append(ad)
- if len(self.device_fails_to_connect_list) == len(self.android_devices):
- self.log.error("All devices failed to reconnect.")
- return False
- return True
-
- def _push_config_to_android_device(self, ad, bt_config_map, serial):
- """
- Push Bluetooth config file to android device so that it will have the
- paired link key to the remote device
- :param ad: Android device
- :param bt_config_map: Map to each device's config
- :param serial: Serial number of device
- :return: True on success, False on failure
- """
- self.log.info("Pushing bt_config.conf file to Android device.")
- config_path = bt_config_map[serial]["config_path"]
- if not os.path.isfile(config_path):
- config_path = os.path.join(
- self.user_params[Config.key_config_path], config_path)
- if not os.path.isfile(config_path):
- self.log.error("Unable to load bt_config file {}.".format(
- config_path))
- return False
- ad.adb.push("{} {}".format(config_path, BT_CONF_PATH))
- return True
+ # Add music to the Android device
+ return self._add_music_to_android_device(ad)
def _add_music_to_android_device(self, ad):
"""
@@ -181,45 +97,6 @@
ad.reboot()
return True
- def _verify_bluetooth_device_is_connected(self, ad, bt_config_map, serial):
- """
- Verify that remote Bluetooth device is connected
- :param ad: Android device
- :param bt_config_map: Config map
- :param serial: Serial number of Android device
- :return: True on success, False on failure
- """
- connected_devices = ad.droid.bluetoothGetConnectedDevices()
- start_time = time.time()
- wait_time = 10
- result = False
- while time.time() < start_time + wait_time:
- connected_devices = ad.droid.bluetoothGetConnectedDevices()
- if len(connected_devices) > 0:
- if bt_config_map[serial]["peripheral_info"]["address"] in {
- d['address']
- for d in connected_devices
- }:
- result = True
- break
- else:
- try:
- ad.droid.bluetoothConnectBonded(bt_config_map[serial][
- "peripheral_info"]["address"])
- except Exception as err:
- self.log.error("Failed to connect bonded. Err: {}".format(
- err))
- if not result:
- self.log.info("Connected Devices: {}".format(connected_devices))
- self.log.info("Bonded Devices: {}".format(
- ad.droid.bluetoothGetBondedDevices()))
- self.log.error(
- "Failed to connect to peripheral name: {}, address: {}".format(
- bt_config_map[serial]["peripheral_info"]["name"],
- bt_config_map[serial]["peripheral_info"]["address"]))
- self.device_fails_to_connect_list.append("{}:{}".format(
- serial, bt_config_map[serial]["peripheral_info"]["name"]))
-
def _collect_bluetooth_manager_dumpsys_logs(self, ads):
"""
Collect "adb shell dumpsys bluetooth_manager" logs
@@ -238,23 +115,13 @@
def start_playing_music_on_all_devices(self):
"""
- Start playing music all devices
+ Start playing music
:return: None
"""
- for ad in self.android_devices:
- ad.droid.mediaPlayOpen("file:///sdcard/Music/{}".format(
- self.music_file_to_play))
- ad.droid.mediaPlaySetLooping(True)
- self.log.info("Music is now playing on device {}".format(
- ad.serial))
-
- def stop_playing_music_on_all_devices(self):
- """
- Stop playing music on all devices
- :return: None
- """
- for ad in self.android_devices:
- ad.droid.mediaPlayStopAll()
+ self.ad.droid.mediaPlayOpen("file:///sdcard/Music/{}".format(
+ self.music_file_to_play))
+ self.ad.droid.mediaPlaySetLooping(True)
+ self.ad.log.info("Music is now playing.")
def monitor_music_play_util_deadline(self, end_time, sleep_interval=1):
"""
@@ -273,32 +140,17 @@
device_not_connected_list: List of ADs with no remote device
connected
"""
- bluetooth_off_list = []
device_not_connected_list = []
while time.time() < end_time:
- for ad in self.android_devices:
- serial = ad.serial
- if (not ad.droid.bluetoothCheckState() and
- serial not in bluetooth_off_list):
- self.log.error(
- "Device {}'s Bluetooth state is off.".format(serial))
- bluetooth_off_list.append(serial)
- if (ad.droid.bluetoothGetConnectedDevices() == 0 and
- serial not in device_not_connected_list):
- self.log.error(
- "Device {} not connected to any Bluetooth devices.".
- format(serial))
- device_not_connected_list.append(serial)
- if len(bluetooth_off_list) == len(self.android_devices):
- self.log.error(
- "Bluetooth off on all Android devices. Ending Test")
- return False, bluetooth_off_list, device_not_connected_list
- if len(device_not_connected_list) == len(self.android_devices):
- self.log.error(
- "Every Android device has no device connected.")
- return False, bluetooth_off_list, device_not_connected_list
+ if not self.ad.droid.bluetoothCheckState():
+ self.ad.log.error("Device {}'s Bluetooth state is off.".format(
+ serial))
+ return False
+ if self.ad.droid.bluetoothGetConnectedDevices() == 0:
+ self.ad.log.error(
+ "Bluetooth device not connected. Failing test.")
time.sleep(sleep_interval)
- return True, bluetooth_off_list, device_not_connected_list
+ return True
def play_music_for_duration(self, duration, sleep_interval=1):
"""
@@ -316,8 +168,7 @@
start_time = time.time()
end_time = start_time + duration
self.start_playing_music_on_all_devices()
- status, bluetooth_off_list, device_not_connected_list = \
- self.monitor_music_play_util_deadline(end_time, sleep_interval)
- if status:
- self.stop_playing_music_on_all_devices()
- return status, bluetooth_off_list, device_not_connected_list
+ status = self.monitor_music_play_util_deadline(end_time,
+ sleep_interval)
+ self.ad.droid.mediaPlayStopAll()
+ return status
diff --git a/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py b/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py
index 2d16f44..66acc84 100644
--- a/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py
+++ b/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py
@@ -27,6 +27,8 @@
def __init__(self, controllers):
BluetoothBaseTest.__init__(self, controllers)
self.bluetooth_proto_path = None
+ self.dongle = self.relay_devices[0]
+ self.ad = self.android_devices[0]
def setup_class(self):
"""
@@ -67,8 +69,40 @@
# Clear all metrics
for ad in self.android_devices:
get_bluetooth_metrics(ad, ad.bluetooth_proto_module)
+ self.dongle.setup()
+ tries = 5
+ # Since we are not concerned with pairing in this test, try 5 times.
+ while tries > 0:
+ if self._pair_devices():
+ return True
+ else:
+ tries -= 1
+ return False
+
+ def teardown_test(self):
+ super(BtMetricsBaseTest, self).teardown_test()
+ self.dongle.clean_up()
return True
+ def _pair_devices(self):
+ self.ad.droid.bluetoothStartPairingHelper(False)
+ self.dongle.enter_pairing_mode()
+
+ self.ad.droid.bluetoothBond(self.dongle.mac_address)
+
+ end_time = time.time() + 20
+ self.ad.log.info("Verifying devices are bonded")
+ while time.time() < end_time:
+ bonded_devices = self.ad.droid.bluetoothGetBondedDevices()
+
+ for d in bonded_devices:
+ if d['address'] == self.dongle.mac_address:
+ self.ad.log.info("Successfully bonded to device.")
+ self.log.info("Bonded devices:\n{}".format(bonded_devices))
+ return True
+ self.ad.log.info("Failed to bond devices.")
+ return False
+
def collect_bluetooth_manager_metrics_logs(self, ads):
"""
Collect Bluetooth metrics logs, save an ascii log to disk and return
diff --git a/acts/framework/acts/test_utils/tel/tel_test_utils.py b/acts/framework/acts/test_utils/tel/tel_test_utils.py
index 5732351..c778466 100644
--- a/acts/framework/acts/test_utils/tel/tel_test_utils.py
+++ b/acts/framework/acts/test_utils/tel/tel_test_utils.py
@@ -4575,7 +4575,10 @@
"""
ad.log.debug("Ensuring no tcpdump is running in background")
- ad.adb.shell("killall -9 tcpdump")
+ try:
+ ad.adb.shell("killall -9 tcpdump")
+ except AdbError:
+ self.log.warn("Killing existing tcpdump processes failed")
begin_time = epoch_to_log_line_timestamp(get_current_epoch_time())
begin_time = normalize_log_line_timestamp(begin_time)
file_name = "/sdcard/tcpdump{}{}{}.pcap".format(ad.serial, test_name,
diff --git a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
index abc2563..031ef08 100644
--- a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
@@ -20,6 +20,7 @@
from acts import asserts
from acts import utils
from acts.controllers import monsoon
+from acts.libs.proc import job
from acts.test_utils.wifi import wifi_test_utils as wutils
from bokeh.layouts import layout
from bokeh.models import CustomJS, ColumnDataSource
@@ -353,10 +354,13 @@
log = logging.getLogger()
bss_settings = []
ssid = network[wutils.WifiEnums.SSID_KEY]
- password = network["password"]
+ if "password" in network.keys():
+ password = network["password"]
+ security = hostapd_security.Security(
+ security_mode="wpa", password=password)
+ else:
+ security = hostapd_security.Security(security_mode=None, password=None)
channel = network["channel"]
- security = hostapd_security.Security(
- security_mode="wpa", password=password)
config = hostapd_ap_preset.create_ap_preset(
channel=channel,
ssid=ssid,
@@ -497,6 +501,48 @@
return None
+@utils.timeout(60)
+def wait_for_dhcp(intf):
+ """Wait the DHCP address assigned to desired interface.
+
+ Getting DHCP address takes time and the wait time isn't constant. Utilizing
+ utils.timeout to keep trying until success
+
+ Args:
+ intf: desired interface name
+ Returns:
+ ip: ip address of the desired interface name
+ Raise:
+ TimeoutError: After timeout, if no DHCP assigned, raise
+ """
+ log = logging.getLogger()
+ reset_host_interface(intf)
+ ip = '0.0.0.0'
+ while ip == '0.0.0.0':
+ ip = scapy.get_if_addr(intf)
+ log.info('DHCP address assigned to {}'.format(intf))
+ return ip
+
+
+def reset_host_interface(intf):
+ """Reset the host interface.
+
+ Args:
+ intf: the desired interface to reset
+ """
+ log = logging.getLogger()
+ intf_down_cmd = 'ifconfig %s down' % intf
+ intf_up_cmd = 'ifconfig %s up' % intf
+ try:
+ job.run(intf_down_cmd)
+ time.sleep(3)
+ job.run(intf_up_cmd)
+ time.sleep(3)
+ log.info('{} has been reset'.format(intf))
+ except job.Error:
+ raise Exception('No such interface')
+
+
def create_pkt_config(test_class):
"""Creates the config for generating multicast packets
diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
index 263fabe..0e0cbfc 100755
--- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
@@ -1268,7 +1268,7 @@
utils.exe_cmd(cmd)
-def validate_connection(ad, ping_addr):
+def validate_connection(ad, ping_addr=DEFAULT_PING_ADDR):
"""Validate internet connection by pinging the address provided.
Args:
diff --git a/acts/framework/acts/utils.py b/acts/framework/acts/utils.py
index c5d401b..5faabec 100755
--- a/acts/framework/acts/utils.py
+++ b/acts/framework/acts/utils.py
@@ -28,6 +28,7 @@
import subprocess
import time
import traceback
+import zipfile
# File name length is limited to 255 chars on some OS, so we need to make sure
# the file names we output fits within the limit.
@@ -807,3 +808,28 @@
return False
finally:
ad.adb.shell("rm /data/ping.txt", timeout=10, ignore_status=True)
+
+
+def unzip_maintain_permissions(zip_path, extract_location):
+ """Unzip a .zip file while maintaining permissions.
+
+ Args:
+ zip_path: The path to the zipped file.
+ extract_location: the directory to extract to.
+ """
+ with zipfile.ZipFile(zip_path, 'r') as zip_file:
+ for info in zip_file.infolist():
+ _extract_file(zip_file, info, extract_location)
+
+
+def _extract_file(zip_file, zip_info, extract_location):
+ """Extracts a single entry from a ZipFile while maintaining permissions.
+
+ Args:
+ zip_file: A zipfile.ZipFile.
+ zip_info: A ZipInfo object from zip_file.
+ extract_location: The directory to extract to.
+ """
+ out_path = zip_file.extract(zip_info.filename, path=extract_location)
+ perm = zip_info.external_attr >> 16
+ os.chmod(out_path, perm)
diff --git a/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py b/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py
index af1f1f8..2725b2c 100644
--- a/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py
+++ b/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py
@@ -58,16 +58,14 @@
"""
play_duration_seconds = 60
start_time = time.time()
- status, bluetooth_off_list, device_not_connected_list = \
- self.play_music_for_duration(play_duration_seconds)
- if not status:
- return status
- self.stop_playing_music_on_all_devices()
+ if not self.play_music_for_duration(play_duration_seconds):
+ return False
+ self.ad.droid.mediaPlayStopAll()
time.sleep(20)
bt_duration = time.time() - start_time
bluetooth_logs, bluetooth_logs_ascii = \
self.collect_bluetooth_manager_metrics_logs(
- [self.android_devices[0]])
+ [self.ad])
bluetooth_log = bluetooth_logs[0]
bluetooth_log_ascii = bluetooth_logs_ascii[0]
self.log.info(bluetooth_log_ascii)
@@ -113,10 +111,8 @@
a2dp_duration = 0
for i in range(num_play):
start_time = time.time()
- status, bluetooth_off_list, device_not_connected_list = \
- self.play_music_for_duration(play_duration_seconds)
- if not status:
- return status
+ if not self.play_music_for_duration(play_duration_seconds):
+ return False
a2dp_duration += (time.time() - start_time)
time.sleep(20)
bt_duration += (time.time() - start_time)
@@ -161,10 +157,8 @@
play_duration_seconds = 30
for i in range(num_play):
start_time = time.time()
- status, bluetooth_off_list, device_not_connected_list = \
- self.play_music_for_duration(play_duration_seconds)
- if not status:
- return status
+ if not self.play_music_for_duration(play_duration_seconds):
+ return False
time.sleep(20)
bt_duration = time.time() - start_time
bluetooth_logs, bluetooth_logs_ascii = \
diff --git a/acts/tests/google/bt/audio_lab/BtFunhausTest.py b/acts/tests/google/bt/audio_lab/BtFunhausTest.py
index 941d2b0..2e697af 100644
--- a/acts/tests/google/bt/audio_lab/BtFunhausTest.py
+++ b/acts/tests/google/bt/audio_lab/BtFunhausTest.py
@@ -59,46 +59,12 @@
sleep_interval = 120
#twelve_hours_in_seconds = 43200
- one_hour_in_seconds = 3600
- end_time = time.time() + one_hour_in_seconds
- status, bluetooth_off_list, device_not_connected_list = \
- self.monitor_music_play_util_deadline(end_time, sleep_interval)
- if not status:
- return status
+ #one_hour_in_seconds = 3600
+ one_min_in_sec = 60
+ end_time = time.time() + one_min_in_sec
+ if not self.monitor_music_play_util_deadline(end_time, sleep_interval):
+ return False
self._collect_bluetooth_manager_dumpsys_logs(self.android_devices)
- self.stop_playing_music_on_all_devices()
+ self.ad.droid.mediaPlayStopAll()
self.collect_bluetooth_manager_metrics_logs(self.android_devices)
- if len(device_not_connected_list) > 0 or len(bluetooth_off_list) > 0:
- self.log.info("Devices reported as not connected: {}".format(
- device_not_connected_list))
- self.log.info("Devices reported with Bluetooth state off: {}".
- format(bluetooth_off_list))
- return False
- return True
-
- @test_tracker_info(uuid='285be86d-f00f-4924-a206-e0a590b87b67')
- def test_setup_fail_if_devices_not_connected(self):
- """Test for devices connected or not during setup.
-
- This test is designed to fail if the number of devices having
- connection issues at time of setup is greater than 0. This lets
- the test runner know of the stability of the testbed.
-
- Steps:
- 1. Check lenght of self.device_fails_to_connect_list
-
- Expected Result:
- No device should be in a disconnected state.
-
- Returns:
- Pass if True
- Fail if False
-
- TAGS: None
- Priority: 1
- """
- if len(self.device_fails_to_connect_list) > 0:
- self.log.error("Devices failed to reconnect:\n{}".format(
- self.device_fails_to_connect_list))
- return False
return True
diff --git a/acts/tests/google/net/CoreNetworkingTest.py b/acts/tests/google/net/CoreNetworkingTest.py
index b56b3c5..f06be4a 100644
--- a/acts/tests/google/net/CoreNetworkingTest.py
+++ b/acts/tests/google/net/CoreNetworkingTest.py
@@ -58,12 +58,14 @@
self.dut.adb.shell("cmd netpolicy set restrict-background true")
# Launch app, check internet connectivity and close app
+ self.log.info("Launch app and test internet connectivity")
res = self.dut.droid.launchForResult(dum_class)
- self.log.info("Internet connectivity status after app launch: %s "
- % res['extras']['result'])
# Disable data saver mode
self.log.info("Disable data saver mode")
self.dut.adb.shell("cmd netpolicy set restrict-background false")
+ # Return test result
+ self.log.info("Internet connectivity status after app launch: %s "
+ % res['extras']['result'])
return res['extras']['result']
diff --git a/acts/tests/google/power/PowerbaselineTest.py b/acts/tests/google/power/PowerbaselineTest.py
index e13be25..8ff7437 100644
--- a/acts/tests/google/power/PowerbaselineTest.py
+++ b/acts/tests/google/power/PowerbaselineTest.py
@@ -2,14 +2,14 @@
#
# Copyright 2017 - The Android Open Source Project
#
-# Licensed under the Apache License, Version 2.0 (the "License");
+# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
+# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@@ -30,33 +30,27 @@
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
- self.tests = ("test_rockbottom_screenoff_wifidisabled",
- "test_rockbottom_screenoff_wifidisconnected",
- "test_rockbottom_screenon_wifidisabled",
- "test_rockbottom_screenon_wifidisconnected")
+ self.tests = ('test_rockbottom_screenoff_wifidisabled',
+ 'test_rockbottom_screenoff_wifidisconnected',
+ 'test_rockbottom_screenon_wifidisabled',
+ 'test_rockbottom_screenon_wifidisconnected')
def setup_class(self):
self.dut = self.android_devices[0]
- req_params = ["baselinetest_params"]
+ req_params = ['baselinetest_params']
self.unpack_userparams(req_params)
self.unpack_testparams(self.baselinetest_params)
- self.mon_data_path = os.path.join(self.log_path, "Monsoon")
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
self.mon = self.monsoons[0]
- self.mon.set_voltage(4.2)
self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
self.mon.attach_device(self.dut)
- self.mon_info = {
- "dut": self.mon,
- "freq": self.mon_freq,
- "duration": self.mon_duration,
- "offset": self.mon_offset,
- "data_path": self.mon_data_path
- }
+ self.mon_info = wputils.create_monsoon_info(self)
def teardown_class(self):
- self.mon.usb("on")
+ self.mon.usb('on')
def unpack_testparams(self, bulk_params):
"""Unpack all the test specific parameters.
@@ -76,11 +70,11 @@
"""
# Initialize the dut to rock-bottom state
wputils.dut_rockbottom(self.dut)
- if wifi_status == "ON":
+ if wifi_status == 'ON':
wutils.wifi_toggle_state(self.dut, True)
- if screen_status == "OFF":
+ if screen_status == 'OFF':
self.dut.droid.goToSleepNow()
- self.dut.log.info("Screen is OFF")
+ self.dut.log.info('Screen is OFF')
# Collecting current measurement data and plot
file_path, avg_current = wputils.monsoon_data_collect_save(
self.dut, self.mon_info, self.current_test_name, self.bug_report)
@@ -88,22 +82,22 @@
wputils.pass_fail_check(self, avg_current)
# Test cases
- @test_tracker_info(uuid="e7ab71f4-1e14-40d2-baec-cde19a3ac859")
+ @test_tracker_info(uuid='e7ab71f4-1e14-40d2-baec-cde19a3ac859')
def test_rockbottom_screenoff_wifidisabled(self):
- self.rockbottom_test_func("OFF", "OFF")
+ self.rockbottom_test_func('OFF', 'OFF')
- @test_tracker_info(uuid="167c847d-448f-4c7c-900f-82c552d7d9bb")
+ @test_tracker_info(uuid='167c847d-448f-4c7c-900f-82c552d7d9bb')
def test_rockbottom_screenoff_wifidisconnected(self):
- self.rockbottom_test_func("OFF", "ON")
+ self.rockbottom_test_func('OFF', 'ON')
- @test_tracker_info(uuid="2cd25820-8548-4e60-b0e3-63727b3c952c")
+ @test_tracker_info(uuid='2cd25820-8548-4e60-b0e3-63727b3c952c')
def test_rockbottom_screenon_wifidisabled(self):
- self.rockbottom_test_func("ON", "OFF")
+ self.rockbottom_test_func('ON', 'OFF')
- @test_tracker_info(uuid="d7d90a1b-231a-47c7-8181-23814c8ff9b6")
+ @test_tracker_info(uuid='d7d90a1b-231a-47c7-8181-23814c8ff9b6')
def test_rockbottom_screenon_wifidisconnected(self):
- self.rockbottom_test_func("ON", "ON")
+ self.rockbottom_test_func('ON', 'ON')
diff --git a/acts/tests/google/power/PowerdtimTest.py b/acts/tests/google/power/PowerdtimTest.py
index 18e3ce9..2403874 100644
--- a/acts/tests/google/power/PowerdtimTest.py
+++ b/acts/tests/google/power/PowerdtimTest.py
@@ -2,14 +2,14 @@
#
# Copyright 2017 - The Android Open Source Project
#
-# Licensed under the Apache License, Version 2.0 (the "License");
+# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
+# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@@ -28,40 +28,32 @@
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
- self.tests = ("test_2g_screenoff_dtimx1", "test_2g_screenoff_dtimx2",
- "test_2g_screenoff_dtimx4", "test_2g_screenoff_dtimx9",
- "test_2g_screenon_dtimx1", "test_2g_screenon_dtimx2",
- "test_2g_screenon_dtimx4", "test_2g_screenon_dtimx9",
- "test_5g_screenoff_dtimx1", "test_5g_screenoff_dtimx2",
- "test_5g_screenoff_dtimx4", "test_5g_screenoff_dtimx9",
- "test_5g_screenon_dtimx1", "test_5g_screenon_dtimx2",
- "test_5g_screenon_dtimx4", "test_5g_screenon_dtimx9")
+ self.tests = ('test_2g_screenoff_dtimx1', 'test_2g_screenoff_dtimx2',
+ 'test_2g_screenoff_dtimx4', 'test_2g_screenoff_dtimx9',
+ 'test_2g_screenon_dtimx1', 'test_2g_screenon_dtimx4',
+ 'test_5g_screenoff_dtimx1', 'test_5g_screenoff_dtimx2',
+ 'test_5g_screenoff_dtimx4', 'test_5g_screenoff_dtimx9',
+ 'test_5g_screenon_dtimx1', 'test_5g_screenon_dtimx4')
def setup_class(self):
self.log = logging.getLogger()
self.dut = self.android_devices[0]
self.access_point = self.access_points[0]
- req_params = ["main_network", "aux_network", "dtimtest_params"]
+ req_params = ['main_network', 'aux_network', 'dtimtest_params']
self.unpack_userparams(req_params)
self.unpack_testparams(self.dtimtest_params)
- self.mon_data_path = os.path.join(self.log_path, "Monsoon")
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
self.mon = self.monsoons[0]
- self.mon.set_voltage(4.2)
self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
self.mon.attach_device(self.dut)
- self.mon_info = {
- "dut": self.mon,
- "freq": self.mon_freq,
- "duration": self.mon_duration,
- "offset": self.mon_offset,
- "data_path": self.mon_data_path
- }
+ self.mon_info = wputils.create_monsoon_info(self)
self.num_atten = self.attenuators[0].instrument.num_atten
def teardown_class(self):
- self.mon.usb("on")
+ self.mon.usb('on')
def unpack_testparams(self, bulk_params):
"""Unpack all the test specific parameters.
@@ -88,15 +80,15 @@
wputils.dut_rockbottom(self.dut)
wutils.wifi_toggle_state(self.dut, True)
[
- self.attenuators[i].set_atten(self.atten_level['main_AP'][i])
+ self.attenuators[i].set_atten(self.atten_level['zero_atten'][i])
for i in range(self.num_atten)
]
- self.log.info("Set attenuation level to connect the main AP")
+ self.log.info('Set attenuation level to connect the main AP')
wputils.ap_setup(self.access_point, network)
wutils.wifi_connect(self.dut, network)
- if screen_status == "OFF":
+ if screen_status == 'OFF':
self.dut.droid.goToSleepNow()
- self.dut.log.info("Screen is OFF")
+ self.dut.log.info('Screen is OFF')
time.sleep(5)
# Collect power data and plot
file_path, avg_current = wputils.monsoon_data_collect_save(
@@ -108,82 +100,78 @@
wputils.pass_fail_check(self, avg_current)
# Test cases
- @test_tracker_info(uuid="2a70a78b-93a8-46a6-a829-e1624b8239d2")
+ @test_tracker_info(uuid='2a70a78b-93a8-46a6-a829-e1624b8239d2')
def test_2g_screenoff_dtimx1(self):
network = self.main_network[hc.BAND_2G]
- self.dtim_test_func(1, "OFF", network)
+ self.dtim_test_func(1, 'OFF', network)
- @test_tracker_info(uuid="b6c4114d-984a-4269-9e77-2bec0e4b6e6f")
+ @test_tracker_info(uuid='b6c4114d-984a-4269-9e77-2bec0e4b6e6f')
def test_2g_screenoff_dtimx2(self):
network = self.main_network[hc.BAND_2G]
- self.dtim_test_func(2, "OFF", network)
+ self.dtim_test_func(2, 'OFF', network)
- @test_tracker_info(uuid="2ae5bc29-3d5f-4fbb-9ff6-f5bd499a9d6e")
+ @test_tracker_info(uuid='2ae5bc29-3d5f-4fbb-9ff6-f5bd499a9d6e')
def test_2g_screenoff_dtimx4(self):
network = self.main_network[hc.BAND_2G]
- self.dtim_test_func(4, "OFF", network)
+ self.dtim_test_func(4, 'OFF', network)
- @test_tracker_info(uuid="b37fa75f-6166-4247-b15c-adcda8c7038e")
+ @test_tracker_info(uuid='b37fa75f-6166-4247-b15c-adcda8c7038e')
def test_2g_screenoff_dtimx9(self):
network = self.main_network[hc.BAND_2G]
- self.dtim_test_func(9, "OFF", network, dtim_max=10)
+ self.dtim_test_func(9, 'OFF', network, dtim_max=10)
- @test_tracker_info(uuid="384d3b0f-4335-4b00-8363-308ec27a150c")
+ @test_tracker_info(uuid='384d3b0f-4335-4b00-8363-308ec27a150c')
def test_2g_screenon_dtimx1(self):
- network = self.main_network[hc.BAND_2G]
- self.dtim_test_func(1, "ON", network)
+ """With screen on, modulated dtim isn't wokring, always DTIMx1.
+ So not running through all DTIM cases
- @test_tracker_info(uuid="dee62525-7c7a-4a3c-97c2-db6b272fb8b2")
- def test_2g_screenon_dtimx2(self):
+ """
network = self.main_network[hc.BAND_2G]
- self.dtim_test_func(2, "ON", network)
+ self.dtim_test_func(1, 'ON', network)
- @test_tracker_info(uuid="79d0f065-2c46-4400-b02c-5ad60e79afea")
+ @test_tracker_info(uuid='79d0f065-2c46-4400-b02c-5ad60e79afea')
def test_2g_screenon_dtimx4(self):
- network = self.main_network[hc.BAND_2G]
- self.dtim_test_func(4, "ON", network)
+ """Run only extra DTIMx4 for screen on to compare with DTIMx1.
+ They should be the same if everything is correct.
- @test_tracker_info(uuid="50bda9c9-b443-4f0e-b4a6-cdc4483084b7")
- def test_2g_screenon_dtimx9(self):
+ """
network = self.main_network[hc.BAND_2G]
- self.dtim_test_func(9, "ON", network, dtim_max=10)
+ self.dtim_test_func(4, 'ON', network)
- @test_tracker_info(uuid="5e2f73cb-7e4e-4a25-8fd5-c85adfdf466e")
+ @test_tracker_info(uuid='5e2f73cb-7e4e-4a25-8fd5-c85adfdf466e')
def test_5g_screenoff_dtimx1(self):
network = self.main_network[hc.BAND_5G]
- self.dtim_test_func(1, "OFF", network)
+ self.dtim_test_func(1, 'OFF', network)
- @test_tracker_info(uuid="017f57c3-e133-461d-80be-d025d1491d8a")
+ @test_tracker_info(uuid='017f57c3-e133-461d-80be-d025d1491d8a')
def test_5g_screenoff_dtimx2(self):
network = self.main_network[hc.BAND_5G]
- self.dtim_test_func(2, "OFF", network)
+ self.dtim_test_func(2, 'OFF', network)
- @test_tracker_info(uuid="b84a1cb3-9573-4bfd-9875-0f33cb171cc5")
+ @test_tracker_info(uuid='b84a1cb3-9573-4bfd-9875-0f33cb171cc5')
def test_5g_screenoff_dtimx4(self):
network = self.main_network[hc.BAND_5G]
- self.dtim_test_func(4, "OFF", network)
+ self.dtim_test_func(4, 'OFF', network)
- @test_tracker_info(uuid="75644df4-2cc8-4bbd-8985-0656a4f9d056")
+ @test_tracker_info(uuid='75644df4-2cc8-4bbd-8985-0656a4f9d056')
def test_5g_screenoff_dtimx9(self):
network = self.main_network[hc.BAND_5G]
- self.dtim_test_func(9, "OFF", network, dtim_max=10)
+ self.dtim_test_func(9, 'OFF', network, dtim_max=10)
- @test_tracker_info(uuid="327af44d-d9e7-49e0-9bda-accad6241dc7")
+ @test_tracker_info(uuid='327af44d-d9e7-49e0-9bda-accad6241dc7')
def test_5g_screenon_dtimx1(self):
- network = self.main_network[hc.BAND_5G]
- self.dtim_test_func(1, "ON", network)
+ """With screen on, modulated dtim isn't wokring, always DTIMx1.
+ So not running through all DTIM cases
- @test_tracker_info(uuid="96c7a28d-9d7d-404f-bd9f-3661b5a4b4c9")
- def test_5g_screenon_dtimx2(self):
+ """
network = self.main_network[hc.BAND_5G]
- self.dtim_test_func(2, "ON", network)
+ self.dtim_test_func(1, 'ON', network)
- @test_tracker_info(uuid="8b32585f-2517-426b-a2c9-8087093cf991")
+ @test_tracker_info(uuid='8b32585f-2517-426b-a2c9-8087093cf991')
def test_5g_screenon_dtimx4(self):
- network = self.main_network[hc.BAND_5G]
- self.dtim_test_func(4, "ON", network)
+ """Run only extra DTIMx4 for screen on to compare with DTIMx1.
+ They should be the same if everything is correct.
- @test_tracker_info(uuid="17a35bfe-f0a4-41cf-822a-f727a8b8090f")
- def test_5g_screenon_dtimx9(self):
+ """
network = self.main_network[hc.BAND_5G]
- self.dtim_test_func(9, "ON", network, dtim_max=10)
+ self.dtim_test_func(4, 'ON', network)
diff --git a/acts/tests/google/power/PowermulticastTest.py b/acts/tests/google/power/PowermulticastTest.py
index f9278d3..a2cfd09 100644
--- a/acts/tests/google/power/PowermulticastTest.py
+++ b/acts/tests/google/power/PowermulticastTest.py
@@ -19,6 +19,7 @@
import time
from acts import base_test
+from acts.controllers.ap_lib import bridge_interface as bi
from acts.controllers.ap_lib import hostapd_constants as hc
from acts.test_decorators import test_tracker_info
from acts.test_utils.wifi import wifi_test_utils as wutils
@@ -27,25 +28,34 @@
RA_SHORT_LIFETIME = 3
RA_LONG_LIFETIME = 1000
+DNS_LONG_LIFETIME = 300
+DNS_SHORT_LIFETIME = 3
class PowermulticastTest(base_test.BaseTestClass):
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
- self.tests = ('test_screenoff_direct_arp',
- 'test_screenoff_misdirect_arp',
- 'test_screenoff_direct_ns',
- 'test_screenoff_misdirect_ns', 'test_screenoff_ra_short',
- 'test_screenoff_ra_long',
- 'test_screenoff_directed_dhcp_offer'
- 'test_screenoff_misdirected_dhcp_offer'
- 'test_screenon_direct_arp',
- 'test_screenon_misdirect_arp', 'test_screenon_direct_ns',
- 'test_screenon_misdirect_ns', 'test_screenon_ra_short',
- 'test_screenon_ra_long',
- 'test_screenon_directed_dhcp_offer'
- 'test_screenon_misdirected_dhcp_offer')
+ self.tests = (
+ 'test_screenoff_directed_arp', 'test_screenoff_misdirected_arp',
+ 'test_screenoff_directed_ns', 'test_screenoff_misdirected_ns',
+ 'test_screenoff_ra_short', 'test_screenoff_ra_long',
+ 'test_screenoff_directed_dhcp_offer',
+ 'test_screenoff_misdirected_dhcp_offer',
+ 'test_screenoff_ra_rnds_short', 'test_screenoff_ra_rnds_long',
+ 'test_screenoff_directed_ping6',
+ 'test_screenoff_misdirected_ping6',
+ 'test_screenoff_directed_ping4',
+ 'test_screenoff_misdirected_ping4', 'test_screenoff_mdns6',
+ 'test_screenoff_mdns4', 'test_screenon_directed_arp',
+ 'test_screenon_misdirected_arp', 'test_screenon_directed_ns',
+ 'test_screenon_misdirected_ns', 'test_screenon_ra_short',
+ 'test_screenon_ra_long', 'test_screenon_directed_dhcp_offer',
+ 'test_screenon_misdirected_dhcp_offer',
+ 'test_screenon_ra_rnds_short', 'test_screenon_ra_rnds_long',
+ 'test_screenon_directed_ping6', 'test_screenon_misdirected_ping6',
+ 'test_screenon_directed_ping4', 'test_screenon_misdirected_ping4',
+ 'test_screenon_mdns6', 'test_screenon_mdns4')
def setup_class(self):
@@ -62,7 +72,6 @@
self.mon.set_voltage(4.2)
self.mon.attach_device(self.dut)
self.mon_info = wputils.create_monsoon_info(self)
- self.pkt_gen_config = wputils.create_pkt_config(self)
self.pkt_sender = self.packet_senders[0]
def unpack_testparams(self, bulk_params):
@@ -79,33 +88,56 @@
"""
self.mon.usb('on')
+ self.access_point.close()
- def sendPacketAndMeasure(self, screen_status, network, packet):
- """Packet injection template function
+ def set_connection(self, screen_status, network):
+ """Setup connection between AP and client.
+
+ Setup connection between AP and phone, change DTIMx1 and get information
+ such as IP addresses to prepare packet construction.
Args:
screen_status: screen on or off
- network: 2G or 5G
- pkt_gen: desired packet to transmit
+ network: network selection, 2g/5g
"""
+ # Change DTIMx1 on the phone to receive all Multicast packets
+ wputils.change_dtim(
+ self.dut, gEnableModulatedDTIM=1, gMaxLIModulatedDTIM=10)
+ self.dut.log.info('DTIM value of the phone is now DTIMx1')
+
# Initialize the dut to rock-bottom state
wputils.dut_rockbottom(self.dut)
wutils.wifi_toggle_state(self.dut, True)
# Set attenuation and connect to AP
for attn in range(self.num_atten):
- self.attenuators[attn].set_atten(self.atten_level['main_AP'][attn])
-
- self.log.info('Set attenuation level to connect the main AP')
+ self.attenuators[attn].set_atten(
+ self.atten_level['zero_atten'][attn])
+ self.log.info('Set attenuation level to all zero')
+ channel = network['channel']
+ iface_eth = self.pkt_sender.interface
+ brconfigs = self.access_point.generate_bridge_configs(channel)
+ self.brconfigs = bi.BridgeInterfaceConfigs(brconfigs[0], brconfigs[1],
+ brconfigs[2])
+ self.access_point.bridge.startup(self.brconfigs)
wputils.ap_setup(self.access_point, network)
wutils.wifi_connect(self.dut, network)
+ # Wait for DHCP with timeout of 60 seconds
+ wputils.wait_for_dhcp(iface_eth)
+
# Set the desired screen status
if screen_status == 'OFF':
self.dut.droid.goToSleepNow()
self.dut.log.info('Screen is OFF')
time.sleep(5)
+ def sendPacketAndMeasure(self, packet):
+ """Packet injection template function
+
+ Args:
+ packet: packet to be sent/inject
+ """
# Start sending packets
self.pkt_sender.start_sending(packet, self.interval)
@@ -114,6 +146,9 @@
self.dut, self.mon_info, self.current_test_name, self.bug_report)
wputils.monsoon_data_plot(self.mon_info, file_path)
+ # Bring down the bridge interface
+ self.access_point.bridge.teardown(self.brconfigs)
+
# Close AP
self.access_point.close()
@@ -121,115 +156,295 @@
wputils.pass_fail_check(self, avg_current)
# Test cases - screen OFF
- @test_tracker_info(uuid="b5378aaf-7949-48ac-95fb-ee94c85d49c3")
+ @test_tracker_info(uuid='b5378aaf-7949-48ac-95fb-ee94c85d49c3')
def test_screenoff_directed_arp(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate()
- self.sendPacketAndMeasure('OFF', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="3b5d348d-70bf-483d-8736-13da569473aa")
+ @test_tracker_info(uuid='3b5d348d-70bf-483d-8736-13da569473aa')
def test_screenoff_misdirected_arp(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate(self.ipv4_dst_fake)
- self.sendPacketAndMeasure('OFF', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="8e534d3b-5a25-429a-a1bb-8119d7d28b5a")
+ @test_tracker_info(uuid='8e534d3b-5a25-429a-a1bb-8119d7d28b5a')
def test_screenoff_directed_ns(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate()
- self.sendPacketAndMeasure('OFF', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="536d716d-f30b-4d20-9976-e2cbc36c3415")
+ @test_tracker_info(uuid='536d716d-f30b-4d20-9976-e2cbc36c3415')
def test_screenoff_misdirected_ns(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate(self.ipv6_dst_fake)
- self.sendPacketAndMeasure('OFF', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="5eed3174-8e94-428e-8527-19a9b5a90322")
+ @test_tracker_info(uuid='5eed3174-8e94-428e-8527-19a9b5a90322')
def test_screenoff_ra_short(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate(RA_SHORT_LIFETIME)
- self.sendPacketAndMeasure('OFF', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="67867bae-f1c5-44a4-9bd0-2b832ac8059c")
+ @test_tracker_info(uuid='67867bae-f1c5-44a4-9bd0-2b832ac8059c')
def test_screenoff_ra_long(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate(RA_LONG_LIFETIME)
- self.sendPacketAndMeasure('OFF', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="db19bc94-3513-45c4-b3a5-d6219649d0bb")
+ @test_tracker_info(uuid='db19bc94-3513-45c4-b3a5-d6219649d0bb')
def test_screenoff_directed_dhcp_offer(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate()
- self.sendPacketAndMeasure('OFF', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="a8059869-40ee-4cf3-a957-4b7aed03fcf9")
+ @test_tracker_info(uuid='a8059869-40ee-4cf3-a957-4b7aed03fcf9')
def test_screenoff_misdirected_dhcp_offer(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate(self.mac_dst_fake, self.ipv4_dst_fake)
- self.sendPacketAndMeasure('OFF', network, packet)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='6e663f0a-3eb5-46f6-a79e-311baebd5d2a')
+ def test_screenoff_ra_rnds_short(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(
+ RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_SHORT_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='84d2f1ff-bd4f-46c6-9b06-826d9b14909c')
+ def test_screenoff_ra_rnds_long(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(
+ RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_LONG_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='4a17e74f-3e7f-4e90-ac9e-884a7c13cede')
+ def test_screenoff_directed_ping6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='ab249e0d-58ba-4b55-8a81-e1e4fb04780a')
+ def test_screenoff_misdirected_ping6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv6_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='e37112e6-5c35-4c89-8d15-f5a44e69be0b')
+ def test_screenoff_directed_ping4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='afd4a011-63a9-46c3-8a75-13f515ba8475')
+ def test_screenoff_misdirected_ping4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv4_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='03f0e845-fd66-4120-a79d-5eb64d49b6cd')
+ def test_screenoff_mdns6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Mdns6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='dcbb0aec-512d-48bd-b743-024697ce511b')
+ def test_screenoff_mdns4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('OFF', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Mdns4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
# Test cases: screen ON
- @test_tracker_info(uuid="b9550149-bf36-4f86-9b4b-6e900756a90e")
+ @test_tracker_info(uuid='b9550149-bf36-4f86-9b4b-6e900756a90e')
def test_screenon_directed_arp(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate()
- self.sendPacketAndMeasure('ON', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="406dffae-104e-46cb-9ec2-910aac7aca39")
- def test_screenon_misdirected_arp(self):
+ @test_tracker_info(uuid='406dffae-104e-46cb-9ec2-910aac7aca39')
+ def test_screenon_misdirecteded_arp(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate(self.ipv4_dst_fake)
- self.sendPacketAndMeasure('ON', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="be4cb543-c710-4041-a770-819e82a6d164")
+ @test_tracker_info(uuid='be4cb543-c710-4041-a770-819e82a6d164')
def test_screenon_directed_ns(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate()
- self.sendPacketAndMeasure('ON', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="de21d24f-e03e-47a1-8bbb-11953200e870")
- def test_screenon_misdirected_ns(self):
+ @test_tracker_info(uuid='de21d24f-e03e-47a1-8bbb-11953200e870')
+ def test_screenon_misdirecteded_ns(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate(self.ipv6_dst_fake)
- self.sendPacketAndMeasure('ON', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="b424a170-5095-4b47-82eb-50f7b7fdf35d")
+ @test_tracker_info(uuid='b424a170-5095-4b47-82eb-50f7b7fdf35d')
def test_screenon_ra_short(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate(RA_SHORT_LIFETIME)
- self.sendPacketAndMeasure('ON', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="ab627e59-2ee8-4c0d-970b-eeb1d1cecdc1")
+ @test_tracker_info(uuid='ab627e59-2ee8-4c0d-970b-eeb1d1cecdc1')
def test_screenon_ra_long(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate(RA_LONG_LIFETIME)
- self.sendPacketAndMeasure('ON', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="ee6514ab-1814-44b9-ba01-63f77ba77c34")
+ @test_tracker_info(uuid='ee6514ab-1814-44b9-ba01-63f77ba77c34')
def test_screenon_directed_dhcp_offer(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate()
- self.sendPacketAndMeasure('ON', network, packet)
+ self.sendPacketAndMeasure(packet)
- @test_tracker_info(uuid="eaebfe98-32da-4ebc-bca7-3b7026d99a4f")
- def test_screenon_misdirected_dhcp_offer(self):
+ @test_tracker_info(uuid='eaebfe98-32da-4ebc-bca7-3b7026d99a4f')
+ def test_screenon_misdirecteded_dhcp_offer(self):
network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config)
packet = pkt_gen.generate(self.mac_dst_fake, self.ipv4_dst_fake)
- self.sendPacketAndMeasure('ON', network, packet)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='f0e2193f-bf6a-441b-b9c1-bb7b65787cd5')
+ def test_screenon_ra_rnds_short(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(
+ RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_SHORT_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='62b99cd7-75bf-45be-b93f-bb037a13b3e2')
+ def test_screenon_ra_rnds_long(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(
+ RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_LONG_LIFETIME)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='4088af4c-a64b-4fc1-848c-688936cc6c12')
+ def test_screenon_directed_ping6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='3179e327-e6ac-4dae-bb8a-f3940f21094d')
+ def test_screenon_misdirected_ping6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv6_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='90c70e8a-74fd-4878-89c6-5e15c3ede318')
+ def test_screenon_directed_ping4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='dcfabbc7-a7e1-4a92-a38d-8ebe7aa2e063')
+ def test_screenon_misdirected_ping4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate(self.ipv4_dst_fake)
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='117814db-f94d-4239-a7ab-033482b1da52')
+ def test_screenon_mdns6(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Mdns6Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
+
+ @test_tracker_info(uuid='ce6ad7e2-21f3-4e68-9c0d-d0e14e0a7c53')
+ def test_screenon_mdns4(self):
+ network = self.main_network[hc.BAND_5G]
+ self.set_connection('ON', network)
+ self.pkt_gen_config = wputils.create_pkt_config(self)
+ pkt_gen = pkt_utils.Mdns4Generator(**self.pkt_gen_config)
+ packet = pkt_gen.generate()
+ self.sendPacketAndMeasure(packet)
diff --git a/acts/tests/google/power/PowerroamingTest.py b/acts/tests/google/power/PowerroamingTest.py
index ca7eb7e..8bab26c 100644
--- a/acts/tests/google/power/PowerroamingTest.py
+++ b/acts/tests/google/power/PowerroamingTest.py
@@ -2,14 +2,14 @@
#
# Copyright 2017 - The Android Open Source Project
#
-# Licensed under the Apache License, Version 2.0 (the "License");
+# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
+# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@@ -29,10 +29,10 @@
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
- self.tests = ("test_screenoff_roaming", "test_screenoff_fastroaming",
- "test_screenon_toggle_between_AP",
- "test_screenoff_toggle_between_AP",
- "test_screenoff_wifi_wedge")
+ self.tests = ('test_screenoff_roaming', 'test_screenoff_fastroaming',
+ 'test_screenon_toggle_between_AP',
+ 'test_screenoff_toggle_between_AP',
+ 'test_screenoff_wifi_wedge')
def setup_class(self):
@@ -40,27 +40,21 @@
self.dut = self.android_devices[0]
self.access_point_main = self.access_points[0]
self.access_point_aux = self.access_points[1]
- req_params = ("main_network", "aux_network", "roamingtest_params")
+ req_params = ('main_network', 'aux_network', 'roamingtest_params')
self.unpack_userparams(req_params)
self.unpack_testparams(self.roamingtest_params)
- self.mon_data_path = os.path.join(self.log_path, "Monsoon")
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
self.mon = self.monsoons[0]
- self.mon.set_voltage(4.2)
self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
self.mon_duration_all = self.mon_duration
self.mon.attach_device(self.dut)
- self.mon_info = {
- "dut": self.mon,
- "freq": self.mon_freq,
- "duration": self.mon_duration,
- "offset": self.mon_offset,
- "data_path": self.mon_data_path
- }
+ self.mon_info = wputils.create_monsoon_info(self)
self.num_atten = self.attenuators[0].instrument.num_atten
def teardown_class(self):
- self.mon.usb("on")
+ self.mon.usb('on')
def unpack_testparams(self, bulk_params):
"""Unpack all the test specific parameters.
@@ -79,7 +73,7 @@
ap.close()
# Test cases
- @test_tracker_info(uuid="392622d3-0c5c-4767-afa2-abfb2058b0b8")
+ @test_tracker_info(uuid='392622d3-0c5c-4767-afa2-abfb2058b0b8')
def test_screenoff_roaming(self):
"""Test roaming power consumption with screen off.
Change the attenuation level to trigger roaming between two APs
@@ -94,9 +88,9 @@
wputils.dut_rockbottom(self.dut)
wutils.wifi_toggle_state(self.dut, True)
# Set attenuator and add two networks to the phone
- self.log.info("Set attenuation to connect device to both APs")
+ self.log.info('Set attenuation to connect device to both APs')
[
- self.attenuators[i].set_atten(self.atten_level["initial_state"][i])
+ self.attenuators[i].set_atten(self.atten_level['zero_atten'][i])
for i in range(self.num_atten)
]
wutils.wifi_connect(self.dut, network_aux)
@@ -105,7 +99,7 @@
self.dut.droid.goToSleepNow()
time.sleep(5)
# Set attenuator to trigger roaming
- self.dut.log.info("Trigger roaming now")
+ self.dut.log.info('Trigger roaming now')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
@@ -119,7 +113,7 @@
# Path fail check
wputils.pass_fail_check(self, avg_current)
- @test_tracker_info(uuid="2fec5208-043a-410a-8fd2-6784d70a3587")
+ @test_tracker_info(uuid='2fec5208-043a-410a-8fd2-6784d70a3587')
def test_screenoff_fastroaming(self):
# Initialize the dut to rock-bottom state
@@ -132,7 +126,7 @@
network_aux[wc.SSID] = network_main[wc.SSID]
wputils.ap_setup(self.access_point_aux, network_aux)
# Set attenuator and add two networks to the phone
- self.log.info("Set attenuation to connect device to the aux AP")
+ self.log.info('Set attenuation to connect device to the aux AP')
[
self.attenuators[i].set_atten(self.atten_level[wc.AP_MAIN][i])
for i in range(self.num_atten)
@@ -142,7 +136,7 @@
# Setup the main AP
wputils.ap_setup(self.access_point_main, network_main)
# Set attenuator to connect the phone to main AP
- self.log.info("Set attenuation to connect device to the main AP")
+ self.log.info('Set attenuation to connect device to the main AP')
[
self.attenuators[i].set_atten(self.atten_level[wc.AP_MAIN][i])
for i in range(self.num_atten)
@@ -151,7 +145,7 @@
time.sleep(5)
self.dut.droid.goToSleepNow()
# Trigger fastroaming
- self.dut.log.info("Trigger fastroaming now")
+ self.dut.log.info('Trigger fastroaming now')
[
self.attenuators[i].set_atten(self.atten_level[wc.AP_MAIN][i])
for i in range(self.num_atten)
@@ -164,7 +158,7 @@
# Path fail check
wputils.pass_fail_check(self, avg_current)
- @test_tracker_info(uuid="a0459b7c-74ce-4adb-8e55-c5365bc625eb")
+ @test_tracker_info(uuid='a0459b7c-74ce-4adb-8e55-c5365bc625eb')
def test_screenoff_toggle_between_AP(self):
# Setup both APs
@@ -175,10 +169,10 @@
# Initialize the dut to rock-bottom state
wputils.dut_rockbottom(self.dut)
wutils.wifi_toggle_state(self.dut, True)
- self.mon_info["duration"] = self.toggle_interval
+ self.mon_info['duration'] = self.toggle_interval
self.dut.droid.goToSleepNow()
time.sleep(5)
- self.log.info("Set attenuation to connect device to both APs")
+ self.log.info('Set attenuation to connect device to both APs')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
@@ -200,7 +194,7 @@
# Path fail check
wputils.pass_fail_check(self, avg_current)
- @test_tracker_info(uuid="e5ff95c0-b17e-425c-a903-821ba555a9b9")
+ @test_tracker_info(uuid='e5ff95c0-b17e-425c-a903-821ba555a9b9')
def test_screenon_toggle_between_AP(self):
# Setup both APs
@@ -211,8 +205,8 @@
# Initialize the dut to rock-bottom state
wputils.dut_rockbottom(self.dut)
wutils.wifi_toggle_state(self.dut, True)
- self.mon_info["duration"] = self.toggle_interval
- self.log.info("Set attenuation to connect device to both APs")
+ self.mon_info['duration'] = self.toggle_interval
+ self.log.info('Set attenuation to connect device to both APs')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
@@ -234,7 +228,7 @@
# Path fail check
wputils.pass_fail_check(self, avg_current)
- @test_tracker_info(uuid="a16ae337-326f-4d09-990f-42232c3c0dc4")
+ @test_tracker_info(uuid='a16ae337-326f-4d09-990f-42232c3c0dc4')
def test_screenoff_wifi_wedge(self):
# Setup both APs
@@ -246,16 +240,16 @@
wputils.dut_rockbottom(self.dut)
wutils.wifi_toggle_state(self.dut, True)
# Set attenuator to connect phone to both networks
- self.log.info("Set attenuation to connect device to both APs")
+ self.log.info('Set attenuation to connect device to both APs')
[
- self.attenuators[i].set_atten(self.atten_level["initial_state"][i])
+ self.attenuators[i].set_atten(self.atten_level['zero_atten'][i])
for i in range(self.num_atten)
]
wutils.wifi_connect(self.dut, network_main)
wutils.wifi_connect(self.dut, network_aux)
- self.log.info("Forget network {}".format(network_aux[wc.SSID]))
+ self.log.info('Forget network {}'.format(network_aux[wc.SSID]))
wutils.wifi_forget_network(self.dut, network_aux[wc.SSID])
- self.log.info("Set attenuation to trigger wedge condition")
+ self.log.info('Set attenuation to trigger wedge condition')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
diff --git a/acts/tests/google/power/PowerscanTest.py b/acts/tests/google/power/PowerscanTest.py
index 51d403c..4afb322 100644
--- a/acts/tests/google/power/PowerscanTest.py
+++ b/acts/tests/google/power/PowerscanTest.py
@@ -2,14 +2,14 @@
#
# Copyright 2017 - The Android Open Source Project
#
-# Licensed under the Apache License, Version 2.0 (the "License");
+# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
+# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@@ -23,44 +23,38 @@
from acts.test_utils.wifi import wifi_test_utils as wutils
from acts.test_utils.wifi import wifi_power_test_utils as wputils
-UNLOCK_SCREEN = "input keyevent 82"
+UNLOCK_SCREEN = 'input keyevent 82'
class PowerscanTest(base_test.BaseTestClass):
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
- self.tests = ("test_single_shot_scan_2g_highRSSI",
- "test_single_shot_scan_2g_lowRSSI",
- "test_single_shot_scan_5g_highRSSI",
- "test_single_shot_scan_5g_lowRSSI",
- "test_background_scan"
- "test_wifi_scan_2g", "test_wifi_scan_5g",
- "test_scan_wifidisconnected_turnonscreen",
- "test_scan_wificonnected_turnonscreen",
- "test_scan_screenoff_below_rssi_threshold",
- "test_scan_screenoff_lost_wificonnection")
+ self.tests = ('test_single_shot_scan_2g_highRSSI',
+ 'test_single_shot_scan_2g_lowRSSI',
+ 'test_single_shot_scan_5g_highRSSI',
+ 'test_single_shot_scan_5g_lowRSSI',
+ 'test_background_scan'
+ 'test_wifi_scan_2g', 'test_wifi_scan_5g',
+ 'test_scan_wifidisconnected_turnonscreen',
+ 'test_scan_wificonnected_turnonscreen',
+ 'test_scan_screenoff_below_rssi_threshold',
+ 'test_scan_screenoff_lost_wificonnection')
def setup_class(self):
self.log = logging.getLogger()
self.dut = self.android_devices[0]
self.access_point = self.access_points[0]
- req_params = ("main_network", "scantest_params")
+ req_params = ('main_network', 'scantest_params')
self.unpack_userparams(req_params)
self.unpack_testparams(self.scantest_params)
- self.mon_data_path = os.path.join(self.log_path, "Monsoon")
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
self.mon = self.monsoons[0]
- self.mon.set_voltage(4.2)
self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
self.mon.attach_device(self.dut)
- self.mon_info = {
- "dut": self.mon,
- "freq": self.mon_freq,
- "duration": self.mon_duration,
- "offset": self.mon_offset,
- "data_path": self.mon_data_path
- }
+ self.mon_info = wputils.create_monsoon_info(self)
self.num_atten = self.attenuators[0].instrument.num_atten
def unpack_testparams(self, bulk_params):
@@ -75,32 +69,32 @@
def setup_test(self):
self.SINGLE_SHOT_SCAN = (
- "am instrument -w -r -e min_scan_count \"700\""
- " -e WifiScanTest-testWifiSingleShotScan %d"
- " -e class com.google.android.platform.powertests."
- "WifiScanTest#testWifiSingleShotScan"
- " com.google.android.platform.powertests/"
- "android.test.InstrumentationTestRunner > /dev/null &" %
+ 'am instrument -w -r -e min_scan_count \"700\"'
+ ' -e WifiScanTest-testWifiSingleShotScan %d'
+ ' -e class com.google.android.platform.powertests.'
+ 'WifiScanTest#testWifiSingleShotScan'
+ ' com.google.android.platform.powertests/'
+ 'android.test.InstrumentationTestRunner > /dev/null &' %
(self.mon_duration + self.mon_offset + 10))
self.BACKGROUND_SCAN = (
- "am instrument -w -r -e min_scan_count \"1\" -e "
- "WifiScanTest-testWifiBackgroundScan %d -e class "
- "com.google.android.platform.powertests.WifiScan"
- "Test#testWifiBackgroundScan com.google.android."
- "platform.powertests/android.test.Instrumentation"
- "TestRunner > /dev/null &" %
+ 'am instrument -w -r -e min_scan_count \"1\" -e '
+ 'WifiScanTest-testWifiBackgroundScan %d -e class '
+ 'com.google.android.platform.powertests.WifiScan'
+ 'Test#testWifiBackgroundScan com.google.android.'
+ 'platform.powertests/android.test.Instrumentation'
+ 'TestRunner > /dev/null &' %
(self.mon_duration + self.mon_offset + 10))
self.WIFI_SCAN = (
- "am instrument -w -r -e min_scan_count \"1\" -e "
- "WifiScanTest-testWifiScan %d -e class "
- "com.google.android.platform.powertests.WifiScanTest#"
- "testWifiScan com.google.android.platform.powertests/"
- "android.test.InstrumentationTestRunner > /dev/null &" %
+ 'am instrument -w -r -e min_scan_count \"1\" -e '
+ 'WifiScanTest-testWifiScan %d -e class '
+ 'com.google.android.platform.powertests.WifiScanTest#'
+ 'testWifiScan com.google.android.platform.powertests/'
+ 'android.test.InstrumentationTestRunner > /dev/null &' %
(self.mon_duration + self.mon_offset + 10))
def teardown_class(self):
- self.mon.usb("on")
+ self.mon.usb('on')
def powrapk_scan_test_func(self, scan_command):
"""Test function for power.apk triggered scans.
@@ -108,13 +102,13 @@
scan_command: the adb shell command to trigger scans
"""
- self.mon_info["offset"] == 0
+ self.mon_info['offset'] == 0
# Initialize the dut to rock-bottom state
wputils.dut_rockbottom(self.dut)
wutils.wifi_toggle_state(self.dut, True)
- self.log.info("Wait for {} seconds".format(self.settle_wait_time))
+ self.log.info('Wait for {} seconds'.format(self.settle_wait_time))
time.sleep(self.settle_wait_time)
- self.log.info("Running power apk command to trigger scans")
+ self.log.info('Running power apk command to trigger scans')
self.dut.adb.shell_nb(scan_command)
self.dut.droid.goToSleepNow()
# Collect power data and plot
@@ -127,12 +121,12 @@
wputils.pass_fail_check(self, avg_current)
# Test cases
- @test_tracker_info(uuid="e5539b01-e208-43c6-bebf-6f1e73d8d8cb")
+ @test_tracker_info(uuid='e5539b01-e208-43c6-bebf-6f1e73d8d8cb')
def test_single_shot_scan_2g_highRSSI(self):
network = self.main_network[hc.BAND_2G]
wputils.ap_setup(self.access_point, network)
- self.log.info("Set attenuation to get high RSSI at 2g")
+ self.log.info('Set attenuation to get high RSSI at 2g')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
@@ -140,12 +134,12 @@
]
self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
- @test_tracker_info(uuid="14c5a762-95bc-40ea-9fd4-27126df7d86c")
+ @test_tracker_info(uuid='14c5a762-95bc-40ea-9fd4-27126df7d86c')
def test_single_shot_scan_2g_lowRSSI(self):
network = self.main_network[hc.BAND_2G]
wputils.ap_setup(self.access_point, network)
- self.log.info("Set attenuation to get low RSSI at 2g")
+ self.log.info('Set attenuation to get low RSSI at 2g')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
@@ -153,12 +147,12 @@
]
self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
- @test_tracker_info(uuid="a6506600-c567-43b5-9c25-86b505099b97")
+ @test_tracker_info(uuid='a6506600-c567-43b5-9c25-86b505099b97')
def test_single_shot_scan_2g_noAP(self):
network = self.main_network[hc.BAND_2G]
wputils.ap_setup(self.access_point, network)
- self.log.info("Set attenuation so all AP is out of reach ")
+ self.log.info('Set attenuation so all AP is out of reach ')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
@@ -166,12 +160,12 @@
]
self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
- @test_tracker_info(uuid="1a458248-1159-4c8e-a39f-92fc9e69c4dd")
+ @test_tracker_info(uuid='1a458248-1159-4c8e-a39f-92fc9e69c4dd')
def test_single_shot_scan_5g_highRSSI(self):
network = self.main_network[hc.BAND_5G]
wputils.ap_setup(self.access_point, network)
- self.log.info("Set attenuation to get high RSSI at 5g")
+ self.log.info('Set attenuation to get high RSSI at 5g')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
@@ -179,12 +173,12 @@
]
self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
- @test_tracker_info(uuid="bd4da426-a621-4131-9f89-6e5a77f321d2")
+ @test_tracker_info(uuid='bd4da426-a621-4131-9f89-6e5a77f321d2')
def test_single_shot_scan_5g_lowRSSI(self):
network = self.main_network[hc.BAND_5G]
wputils.ap_setup(self.access_point, network)
- self.log.info("Set attenuation to get low RSSI at 5g")
+ self.log.info('Set attenuation to get low RSSI at 5g')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
@@ -192,12 +186,12 @@
]
self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
- @test_tracker_info(uuid="288b3add-8925-4803-81c0-53debf157ffc")
+ @test_tracker_info(uuid='288b3add-8925-4803-81c0-53debf157ffc')
def test_single_shot_scan_5g_noAP(self):
network = self.main_network[hc.BAND_5G]
wputils.ap_setup(self.access_point, network)
- self.log.info("Set attenuation so all AP is out of reach ")
+ self.log.info('Set attenuation so all AP is out of reach ')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
@@ -205,14 +199,14 @@
]
self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN)
- @test_tracker_info(uuid="f401c66c-e515-4f51-8ef2-2a03470d8ff2")
+ @test_tracker_info(uuid='f401c66c-e515-4f51-8ef2-2a03470d8ff2')
def test_background_scan(self):
network = self.main_network[hc.BAND_2G]
wputils.ap_setup(self.access_point, network)
self.powrapk_scan_test_func(self.BACKGROUND_SCAN)
- @test_tracker_info(uuid="fe38c1c7-937c-42c0-9381-98356639df8f")
+ @test_tracker_info(uuid='fe38c1c7-937c-42c0-9381-98356639df8f')
def test_wifi_scan_2g(self):
network = self.main_network[hc.BAND_2G]
@@ -224,7 +218,7 @@
]
self.powrapk_scan_test_func(self.WIFI_SCAN)
- @test_tracker_info(uuid="8eedefd1-3a08-4ac2-ba55-5eb438def3d4")
+ @test_tracker_info(uuid='8eedefd1-3a08-4ac2-ba55-5eb438def3d4')
def test_wifi_scan_5g(self):
network = self.main_network[hc.BAND_2G]
@@ -236,24 +230,24 @@
]
self.powrapk_scan_test_func(self.WIFI_SCAN)
- @test_tracker_info(uuid="ff5ea952-ee31-4968-a190-82935ce7a8cb")
+ @test_tracker_info(uuid='ff5ea952-ee31-4968-a190-82935ce7a8cb')
def test_scan_wifidisconnected_turnonscreen(self):
# Initialize the dut to rock-bottom state
wputils.dut_rockbottom(self.dut)
wutils.wifi_toggle_state(self.dut, True)
self.dut.droid.goToSleepNow()
- self.log.info("Screen is OFF")
+ self.log.info('Screen is OFF')
time.sleep(5)
self.dut.droid.wakeUpNow()
- self.log.info("Now turn on screen to trigger scans")
+ self.log.info('Now turn on screen to trigger scans')
self.dut.adb.shell(UNLOCK_SCREEN)
file_path, avg_current = wputils.monsoon_data_collect_save(
self.dut, self.mon_info, self.current_test_name, self.bug_report)
wputils.monsoon_data_plot(self.mon_info, file_path)
wputils.pass_fail_check(self, avg_current)
- @test_tracker_info(uuid="9a836e5b-8128-4dd2-8e96-e79177810bdd")
+ @test_tracker_info(uuid='9a836e5b-8128-4dd2-8e96-e79177810bdd')
def test_scan_wificonnected_turnonscreen(self):
network = self.main_network[hc.BAND_2G]
@@ -270,10 +264,10 @@
wutils.wifi_connect(self.dut, network)
time.sleep(10)
self.dut.droid.goToSleepNow()
- self.log.info("Screen is OFF")
+ self.log.info('Screen is OFF')
time.sleep(5)
self.dut.droid.wakeUpNow()
- self.log.info("Now turn on screen to trigger scans")
+ self.log.info('Now turn on screen to trigger scans')
self.dut.adb.shell(UNLOCK_SCREEN)
file_path, avg_current = wputils.monsoon_data_collect_save(
self.dut, self.mon_info, self.current_test_name, self.bug_report)
@@ -283,7 +277,7 @@
# Path fail check
wputils.pass_fail_check(self, avg_current)
- @test_tracker_info(uuid="51e3c4f1-742b-45af-afd5-ae3552a03272")
+ @test_tracker_info(uuid='51e3c4f1-742b-45af-afd5-ae3552a03272')
def test_scan_screenoff_below_rssi_threshold(self):
network = self.main_network[hc.BAND_2G]
@@ -292,16 +286,16 @@
wputils.dut_rockbottom(self.dut)
wutils.wifi_toggle_state(self.dut, True)
# Set attenuator and add main network to the phone
- self.log.info("Set attenuation so device connection has medium RSSI")
+ self.log.info('Set attenuation so device connection has medium RSSI')
[
- self.attenuators[i].set_atten(self.atten_level["initial_state"][i])
+ self.attenuators[i].set_atten(self.atten_level['zero_atten'][i])
for i in range(self.num_atten)
]
wutils.wifi_connect(self.dut, network)
self.dut.droid.goToSleepNow()
time.sleep(20)
# Set attenuator to make RSSI below threshold
- self.log.info("Set attenuation to drop RSSI below threhold")
+ self.log.info('Set attenuation to drop RSSI below threhold')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
@@ -315,7 +309,7 @@
# Path fail check
wputils.pass_fail_check(self, avg_current)
- @test_tracker_info(uuid="a16ae337-326f-4d09-990f-42232c3c0dc4")
+ @test_tracker_info(uuid='a16ae337-326f-4d09-990f-42232c3c0dc4')
def test_scan_screenoff_lost_wificonnection(self):
network = self.main_network[hc.BAND_5G]
@@ -324,16 +318,16 @@
wputils.dut_rockbottom(self.dut)
wutils.wifi_toggle_state(self.dut, True)
# Set attenuator and add main network to the phone
- self.log.info("Set attenuation so device connection has medium RSSI")
+ self.log.info('Set attenuation so device connection has medium RSSI')
[
- self.attenuators[i].set_atten(self.atten_level["initial_state"][i])
+ self.attenuators[i].set_atten(self.atten_level['zero_atten'][i])
for i in range(self.num_atten)
]
wutils.wifi_connect(self.dut, network)
self.dut.droid.goToSleepNow()
time.sleep(5)
# Set attenuator to make RSSI below threshold
- self.log.info("Set attenuation so device loses connection")
+ self.log.info('Set attenuation so device loses connection')
[
self.attenuators[i].set_atten(
self.atten_level[self.current_test_name][i])
diff --git a/acts/tests/google/power/PowertrafficTest.py b/acts/tests/google/power/PowertrafficTest.py
new file mode 100644
index 0000000..0feb720
--- /dev/null
+++ b/acts/tests/google/power/PowertrafficTest.py
@@ -0,0 +1,164 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import time
+from acts import base_test
+from acts.controllers.ap_lib import bridge_interface as bi
+from acts.controllers.ap_lib import hostapd_constants as hc
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi import wifi_power_test_utils as wputils
+from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
+import acts.controllers.iperf_server as ipf
+
+
+class PowertrafficTest(base_test.BaseTestClass):
+ def __init__(self, controllers):
+
+ WifiBaseTest.__init__(self, controllers)
+ self.tests = ('test_screenoff_iperf_2g_highrssi',
+ 'test_screenoff_iperf_2g_mediumrssi',
+ 'test_screenoff_iperf_2g_lowrssi',
+ 'test_screenoff_iperf_5g_highrssi',
+ 'test_screenoff_iperf_5g_mediumrssi',
+ 'test_screenoff_iperf_5g_lowrssi')
+
+ def setup_class(self):
+
+ self.log = logging.getLogger()
+ self.dut = self.android_devices[0]
+ req_params = ['main_network', 'traffictest_params']
+ self.unpack_userparams(req_params)
+ self.unpack_testparams(self.traffictest_params)
+ self.num_atten = self.attenuators[0].instrument.num_atten
+ self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
+ self.mon_duration = self.iperf_duration - 10
+ self.mon = self.monsoons[0]
+ self.mon.set_max_current(8.0)
+ self.mon.set_voltage(4.2)
+ self.mon.attach_device(self.dut)
+ self.mon_info = wputils.create_monsoon_info(self)
+ self.iperf_server = self.iperf_servers[0]
+ self.access_point = self.access_points[0]
+ self.pkt_sender = self.packet_senders[0]
+
+ def teardown_test(self):
+ self.iperf_server.stop()
+ self.access_point.close()
+
+ def unpack_testparams(self, bulk_params):
+ """Unpack all the test specific parameters.
+
+ Args:
+ bulk_params: dict with all test specific params in the config file
+ """
+ for key in bulk_params.keys():
+ setattr(self, key, bulk_params[key])
+
+ def iperf_power_test_func(self, screen_status, band):
+ """Test function for iperf power measurement at different RSSI level.
+
+ Args:
+ screen_status: screen ON or OFF
+ band: desired band for AP to operate on
+ """
+ wputils.dut_rockbottom(self.dut)
+ wutils.wifi_toggle_state(self.dut, True)
+
+ # Set up the AP
+ network = self.main_network[band]
+ channel = network['channel']
+ configs = self.access_point.generate_bridge_configs(channel)
+ brconfigs = bi.BridgeInterfaceConfigs(configs[0], configs[1],
+ configs[2])
+ self.access_point.bridge.startup(brconfigs)
+ wputils.ap_setup(self.access_point, network)
+
+ # Wait for DHCP on the ethernet port and get IP as Iperf server address
+ # Time out in 60 seconds if not getting DHCP address
+ iface_eth = self.pkt_sender.interface
+ self.iperf_server_address = wputils.wait_for_dhcp(iface_eth)
+
+ # Set attenuator to desired level
+ self.log.info('Set attenuation to desired RSSI level')
+ for i in range(self.num_atten):
+ attenuation = self.atten_level[self.current_test_name][i]
+ self.attenuators[i].set_atten(attenuation)
+
+ # Connect the phone to the AP
+ wutils.wifi_connect(self.dut, network)
+ time.sleep(5)
+ if screen_status == 'OFF':
+ self.dut.droid.goToSleepNow()
+ RSSI = wputils.get_wifi_rssi(self.dut)
+
+ # Run IPERF session
+ iperf_args = '-i 1 -t %d > /dev/null' % self.iperf_duration
+ self.iperf_server.start()
+ wputils.run_iperf_client_nonblocking(
+ self.dut, self.iperf_server_address, iperf_args)
+
+ # Collect power data and plot
+ file_path, avg_current = wputils.monsoon_data_collect_save(
+ self.dut, self.mon_info, self.current_test_name, self.bug_report)
+ iperf_result = ipf.IPerfResult(self.iperf_server.log_files[-1])
+
+ # Monsoon Power data plot with IPerf throughput information
+ tag = '_RSSI_{0:d}dBm_Throughput_{1:.2f}Mbps'.format(
+ RSSI, (iperf_result.avg_receive_rate * 8))
+ wputils.monsoon_data_plot(self.mon_info, file_path, tag)
+
+ # Bring down bridge interface
+ self.access_point.bridge.teardown(brconfigs)
+
+ # Bring down the AP object
+ self.access_point.close()
+
+ # Pass and fail check
+ wputils.pass_fail_check(self, avg_current)
+
+ # Test cases
+ @test_tracker_info(uuid='43d9b146-3547-4a27-9d79-c9341c32ccda')
+ def test_screenoff_iperf_2g_highrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_2G)
+
+ @test_tracker_info(uuid='f00a868b-c8b1-4b36-8136-b39b5c2396a7')
+ def test_screenoff_iperf_2g_mediumrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_2G)
+
+ @test_tracker_info(uuid='cd0c37ac-23fe-4dd1-9130-ccb2dfa71020')
+ def test_screenoff_iperf_2g_lowrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_2G)
+
+ @test_tracker_info(uuid='f9173d39-b46d-4d80-a5a5-7966f5eed9de')
+ def test_screenoff_iperf_5g_highrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_5G)
+
+ @test_tracker_info(uuid='cf77e1dc-30bc-4df9-88be-408f1fddc24f')
+ def test_screenoff_iperf_5g_mediumrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_5G)
+
+ @test_tracker_info(uuid='48f91745-22dc-47c9-ace6-c2719df651d6')
+ def test_screenoff_iperf_5g_lowrssi(self):
+
+ self.iperf_power_test_func('OFF', hc.BAND_5G)
diff --git a/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py b/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py
index 6f5cdb9..abdf2d4 100644
--- a/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py
+++ b/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py
@@ -79,7 +79,6 @@
Ent.EAP: int(EAP.SIM),
WifiEnums.SSID_KEY: self.ent_roaming_ssid,
}
- self.attenuators = wutils.group_attenuators(self.attenuators)
self.attn_a = self.attenuators[0]
self.attn_b = self.attenuators[1]
# Set screen lock password so ConfigStore is unlocked.
diff --git a/acts/tests/google/wifi/WifiEnterpriseTest.py b/acts/tests/google/wifi/WifiEnterpriseTest.py
index b1a5391..785d91f 100755
--- a/acts/tests/google/wifi/WifiEnterpriseTest.py
+++ b/acts/tests/google/wifi/WifiEnterpriseTest.py
@@ -22,6 +22,8 @@
from acts import base_test
from acts import signals
from acts.test_decorators import test_tracker_info
+from acts.test_utils.tel.tel_test_utils import start_adb_tcpdump
+from acts.test_utils.tel.tel_test_utils import stop_adb_tcpdump
from acts.test_utils.wifi import wifi_test_utils as wutils
WifiEnums = wutils.WifiEnums
@@ -127,6 +129,8 @@
del self.config_passpoint_ttls[WifiEnums.SSID_KEY]
# Set screen lock password so ConfigStore is unlocked.
self.dut.droid.setDevicePassword(self.device_password)
+ self.tcpdump_pid = None
+ self.tcpdump_file = None
def teardown_class(self):
wutils.reset_wifi(self.dut)
@@ -139,8 +143,16 @@
self.dut.droid.wakeUpNow()
wutils.reset_wifi(self.dut)
self.dut.ed.clear_all_events()
+ (self.tcpdump_pid, self.tcpdump_file) = start_adb_tcpdump(
+ self.dut, self.test_name, mask='all')
def teardown_test(self):
+ if self.tcpdump_pid:
+ stop_adb_tcpdump(self.dut,
+ self.tcpdump_pid,
+ self.tcpdump_file,
+ pull_tcpdump=True)
+ self.tcpdump_pid = None
self.dut.droid.wakeLockRelease()
self.dut.droid.goToSleepNow()
self.dut.droid.wifiStopTrackingStateChange()
diff --git a/acts/tests/google/wifi/WifiPnoTest.py b/acts/tests/google/wifi/WifiPnoTest.py
index c0588a8..b8f85c0 100644
--- a/acts/tests/google/wifi/WifiPnoTest.py
+++ b/acts/tests/google/wifi/WifiPnoTest.py
@@ -193,6 +193,9 @@
2. Run the simple pno test.
"""
self.add_and_enable_dummy_networks(16)
- self.test_simple_pno_connection_2g_to_5g()
+ self.add_network_and_enable(self.pno_network_a)
+ self.add_network_and_enable(self.pno_network_b)
+ self.trigger_pno_and_assert_connect("a_on_b_off", self.pno_network_a)
+ self.trigger_pno_and_assert_connect("b_on_a_off", self.pno_network_b)
""" Tests End """
diff --git a/acts/tests/google/wifi/WifiTetheringTest.py b/acts/tests/google/wifi/WifiTetheringTest.py
index c33c964..ef79f3d 100644
--- a/acts/tests/google/wifi/WifiTetheringTest.py
+++ b/acts/tests/google/wifi/WifiTetheringTest.py
@@ -34,6 +34,7 @@
from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G
from acts.test_utils.wifi import wifi_test_utils as wutils
+WAIT_TIME = 2
class WifiTetheringTest(base_test.BaseTestClass):
""" Tests for Wifi Tethering """
@@ -43,7 +44,7 @@
self.convert_byte_to_mb = 1024.0 * 1024.0
self.new_ssid = "wifi_tethering_test2"
- self.data_usage_error = 0.3
+ self.data_usage_error = 1
self.hotspot_device = self.android_devices[0]
self.tethered_devices = self.android_devices[1:]
@@ -63,6 +64,24 @@
for ad in self.tethered_devices:
wutils.wifi_test_device_init(ad)
+ # Set chrome browser start with no-first-run verification
+ # Give permission to read from and write to storage
+ commands = ["pm grant com.android.chrome "
+ "android.permission.READ_EXTERNAL_STORAGE",
+ "pm grant com.android.chrome "
+ "android.permission.WRITE_EXTERNAL_STORAGE",
+ "rm /data/local/chrome-command-line",
+ "am set-debug-app --persistent com.android.chrome",
+ 'echo "chrome --no-default-browser-check --no-first-run '
+ '--disable-fre" > /data/local/tmp/chrome-command-line']
+ for cmd in commands:
+ for dut in self.tethered_devices:
+ try:
+ dut.adb.shell(cmd)
+ except adb.AdbError:
+ self.log.warn("adb command %s failed on %s"
+ % (cmd, dut.serial))
+
def teardown_class(self):
""" Reset devices """
wutils.wifi_toggle_state(self.hotspot_device, True)
@@ -70,6 +89,8 @@
def on_fail(self, test_name, begin_time):
""" Collect bug report on failure """
self.hotspot_device.take_bug_report(test_name, begin_time)
+ for ad in self.tethered_devices:
+ ad.take_bug_report(test_name, begin_time)
""" Helper functions """
@@ -112,6 +133,7 @@
"""
carrier_supports_ipv6 = ["vzw", "tmo"]
operator = get_operator_name(self.log, dut)
+ self.log.info("Carrier is %s" % operator)
return operator in carrier_supports_ipv6
def _find_ipv6_default_route(self, dut):
@@ -123,6 +145,7 @@
"""
default_route_substr = "::/0 -> "
link_properties = dut.droid.connectivityGetActiveLinkProperties()
+ self.log.info("LINK PROPERTIES:\n%s\n" % link_properties)
return link_properties and default_route_substr in link_properties
def _verify_ipv6_tethering(self, dut):
@@ -162,13 +185,15 @@
for _ in range(50):
dut_id = random.randint(0, len(self.tethered_devices)-1)
dut = self.tethered_devices[dut_id]
+ # wait for 1 sec between connect & disconnect stress test
+ time.sleep(1)
if device_connected[dut_id]:
wutils.wifi_forget_network(dut, self.network["SSID"])
else:
wutils.wifi_connect(dut, self.network)
device_connected[dut_id] = not device_connected[dut_id]
- def _verify_ping(self, dut, ip):
+ def _verify_ping(self, dut, ip, isIPv6=False):
""" Verify ping works from the dut to IP/hostname
Args:
@@ -180,7 +205,7 @@
False - if not
"""
self.log.info("Pinging %s from dut %s" % (ip, dut.serial))
- if self._is_ipaddress_ipv6(ip):
+ if isIPv6 or self._is_ipaddress_ipv6(ip):
return dut.droid.pingHost(ip, 5, "ping6")
return dut.droid.pingHost(ip)
@@ -251,19 +276,20 @@
Steps:
1. Start wifi tethering on hotspot device
- 2. Verify IPv6 address on hotspot device
+ 2. Verify IPv6 address on hotspot device (VZW & TMO only)
3. Connect tethered device to hotspot device
- 4. Verify IPv6 address on the client's link properties
- 5. Verify ping on client using ping6 which should pass
+ 4. Verify IPv6 address on the client's link properties (VZW only)
+ 5. Verify ping on client using ping6 which should pass (VZW only)
6. Disable mobile data on provider and verify that link properties
- does not have IPv6 address and default route
+ does not have IPv6 address and default route (VZW only)
"""
# Start wifi tethering on the hotspot device
wutils.toggle_wifi_off_and_on(self.hotspot_device)
self._start_wifi_tethering()
# Verify link properties on hotspot device
- self.log.info("Check IPv6 properties on the hotspot device")
+ self.log.info("Check IPv6 properties on the hotspot device. "
+ "Verizon & T-mobile should have IPv6 in link properties")
self._verify_ipv6_tethering(self.hotspot_device)
# Connect the client to the SSID
@@ -271,15 +297,16 @@
# Need to wait atleast 2 seconds for IPv6 address to
# show up in the link properties
- time.sleep(2)
+ time.sleep(WAIT_TIME)
# Verify link properties on tethered device
- self.log.info("Check IPv6 properties on the tethered device")
+ self.log.info("Check IPv6 properties on the tethered device. "
+ "Device should have IPv6 if carrier is Verizon")
self._verify_ipv6_tethering(self.tethered_devices[0])
# Verify ping6 on tethered device
ping_result = self._verify_ping(self.tethered_devices[0],
- "www.google.com")
+ wutils.DEFAULT_PING_ADDR, True)
if self._supports_ipv6_tethering(self.hotspot_device):
asserts.assert_true(ping_result, "Ping6 failed on the client")
else:
@@ -294,18 +321,19 @@
tel_defines.DATA_STATE_CONNECTED,
"Could not disable cell data")
- time.sleep(2) # wait until the IPv6 is removed from link properties
+ time.sleep(WAIT_TIME) # wait until the IPv6 is removed from link properties
result = self._find_ipv6_default_route(self.tethered_devices[0])
self.hotspot_device.droid.telephonyToggleDataConnection(True)
- if not result:
+ if result:
asserts.fail("Found IPv6 default route in link properties:Data off")
+ self.log.info("Did not find IPv6 address in link properties")
# Disable wifi tethering
wutils.stop_wifi_tethering(self.hotspot_device)
@test_tracker_info(uuid="110b61d1-8af2-4589-8413-11beac7a3025")
- def test_wifi_tethering_2ghz_traffic_between_2tethered_devices(self):
+ def wifi_tethering_2ghz_traffic_between_2tethered_devices(self):
""" Steps:
1. Start wifi hotspot with 2G band
@@ -319,7 +347,7 @@
wutils.stop_wifi_tethering(self.hotspot_device)
@test_tracker_info(uuid="953f6e2e-27bd-4b73-85a6-d2eaa4e755d5")
- def test_wifi_tethering_5ghz_traffic_between_2tethered_devices(self):
+ def wifi_tethering_5ghz_traffic_between_2tethered_devices(self):
""" Steps:
1. Start wifi hotspot with 5ghz band
@@ -413,9 +441,11 @@
end_time = int(time.time() * 1000)
bytes_before_download = dut.droid.connectivityGetRxBytesForDevice(
subscriber_id, 0, end_time)
- self.log.info("Bytes before download %s" % bytes_before_download)
+ self.log.info("Data usage before download: %s MB" %
+ (bytes_before_download/self.convert_byte_to_mb))
# download file
+ self.log.info("Download file of size %sMB" % self.file_size)
http_file_download_by_chrome(self.tethered_devices[0],
self.download_file)
@@ -423,13 +453,15 @@
end_time = int(time.time() * 1000)
bytes_after_download = dut.droid.connectivityGetRxBytesForDevice(
subscriber_id, 0, end_time)
- self.log.info("Bytes after download %s" % bytes_after_download)
+ self.log.info("Data usage after download: %s MB" %
+ (bytes_after_download/self.convert_byte_to_mb))
bytes_diff = bytes_after_download - bytes_before_download
wutils.stop_wifi_tethering(self.hotspot_device)
# verify data usage update is correct
bytes_used = bytes_diff/self.convert_byte_to_mb
+ self.log.info("Data usage on the device increased by %s" % bytes_used)
return bytes_used > self.file_size \
and bytes_used < self.file_size + self.data_usage_error
@@ -437,9 +469,9 @@
def test_wifi_tethering_data_usage_limit(self):
""" Steps:
- 1. Set the data usage limit to current data usage + 2MB
+ 1. Set the data usage limit to current data usage + 10MB
2. Start wifi tethering and connect a dut to the SSID
- 3. Download 5MB data on tethered device
+ 3. Download 20MB data on tethered device
a. file download should stop
b. tethered device will lose internet connectivity
c. data usage limit reached message should be displayed
@@ -448,7 +480,7 @@
"""
wutils.toggle_wifi_off_and_on(self.hotspot_device)
dut = self.hotspot_device
- data_usage_2mb = 2 * self.convert_byte_to_mb
+ data_usage_inc = 10 * self.convert_byte_to_mb
subscriber_id = dut.droid.telephonyGetSubscriberId()
self._start_wifi_tethering()
@@ -459,11 +491,11 @@
old_data_usage = dut.droid.connectivityQuerySummaryForDevice(
subscriber_id, 0, end_time)
- # set data usage limit to current usage limit + 2MB
+ # set data usage limit to current usage limit + 10MB
dut.droid.connectivitySetDataUsageLimit(
- subscriber_id, str(int(old_data_usage + data_usage_2mb)))
+ subscriber_id, str(int(old_data_usage + data_usage_inc)))
- # download file - size 5MB
+ # download file - size 20MB
http_file_download_by_chrome(self.tethered_devices[0],
self.download_file,
timeout=120)
@@ -479,8 +511,10 @@
dut.droid.connectivityFactoryResetNetworkPolicies(subscriber_id)
wutils.stop_wifi_tethering(self.hotspot_device)
- old_data_usage = (old_data_usage+data_usage_2mb)/self.convert_byte_to_mb
+ old_data_usage = (old_data_usage+data_usage_inc)/self.convert_byte_to_mb
new_data_usage = new_data_usage/self.convert_byte_to_mb
+ self.log.info("Expected data usage: %s MB" % old_data_usage)
+ self.log.info("Actual data usage: %s MB" % new_data_usage)
return (new_data_usage-old_data_usage) < self.data_usage_error
diff --git a/acts/tests/sample/OtaSampleTest.py b/acts/tests/sample/OtaSampleTest.py
new file mode 100644
index 0000000..aeb735e
--- /dev/null
+++ b/acts/tests/sample/OtaSampleTest.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2017 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import base_test
+from acts.libs.ota import ota_updater
+
+
+class OtaSampleTest(base_test.BaseTestClass):
+ """Demonstrates an example OTA Update test."""
+
+ def setup_class(self):
+ ota_updater.initialize(self.user_params, self.android_devices)
+ self.dut = self.android_devices[0]
+
+ def test_my_test(self):
+ self.pre_ota()
+ ota_updater.update(self.dut)
+ self.post_ota()
+
+ def pre_ota(self):
+ pass
+
+ def post_ota(self):
+ pass