#!/usr/bin/env python3
#
#   Copyright 2016 Google, Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import logging
import os
import re
import shutil
import time

from retry import retry

from collections import namedtuple
from enum import IntEnum
from queue import Empty

from acts import asserts
from acts import context
from acts import signals
from acts import utils
from acts.controllers import attenuator
from acts.controllers.ap_lib import hostapd_security
from acts.controllers.ap_lib import hostapd_ap_preset
from acts.controllers.ap_lib.hostapd_constants import BAND_2G
from acts.controllers.ap_lib.hostapd_constants import BAND_5G
from acts_contrib.test_utils.net import connectivity_const as cconsts
from acts_contrib.test_utils.tel import tel_defines
from acts_contrib.test_utils.wifi import wifi_constants
from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils

# Default timeout used for reboot, toggle WiFi and Airplane mode,
# for the system to settle down after the operation.
DEFAULT_TIMEOUT = 10
# Number of seconds to wait for events that are supposed to happen quickly.
# Like onSuccess for start background scan and confirmation on wifi state
# change.
SHORT_TIMEOUT = 30
ROAMING_TIMEOUT = 30
WIFI_CONNECTION_TIMEOUT_DEFAULT = 30
# Speed of light in m/s.
SPEED_OF_LIGHT = 299792458

DEFAULT_PING_ADDR = "https://www.google.com/robots.txt"

CNSS_DIAG_CONFIG_PATH = "/data/vendor/wifi/cnss_diag/"
CNSS_DIAG_CONFIG_FILE = "cnss_diag.conf"

ROAMING_ATTN = {
    "AP1_on_AP2_off": [0, 0, 95, 95],
    "AP1_off_AP2_on": [95, 95, 0, 0],
    "default": [0, 0, 0, 0]
}


class WifiEnums():

    SSID_KEY = "SSID"  # Used for Wifi & SoftAp
    SSID_PATTERN_KEY = "ssidPattern"
    NETID_KEY = "network_id"
    BSSID_KEY = "BSSID"  # Used for Wifi & SoftAp
    BSSID_PATTERN_KEY = "bssidPattern"
    PWD_KEY = "password"  # Used for Wifi & SoftAp
    frequency_key = "frequency"
    HIDDEN_KEY = "hiddenSSID"  # Used for Wifi & SoftAp
    IS_APP_INTERACTION_REQUIRED = "isAppInteractionRequired"
    IS_USER_INTERACTION_REQUIRED = "isUserInteractionRequired"
    IS_SUGGESTION_METERED = "isMetered"
    PRIORITY = "priority"
    SECURITY = "security"  # Used for Wifi & SoftAp

    # Used for SoftAp
    AP_BAND_KEY = "apBand"
    AP_CHANNEL_KEY = "apChannel"
    AP_BANDS_KEY = "apBands"
    AP_CHANNEL_FREQUENCYS_KEY = "apChannelFrequencies"
    AP_MAC_RANDOMIZATION_SETTING_KEY = "MacRandomizationSetting"
    AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY = "BridgedModeOpportunisticShutdownEnabled"
    AP_IEEE80211AX_ENABLED_KEY = "Ieee80211axEnabled"
    AP_MAXCLIENTS_KEY = "MaxNumberOfClients"
    AP_SHUTDOWNTIMEOUT_KEY = "ShutdownTimeoutMillis"
    AP_SHUTDOWNTIMEOUTENABLE_KEY = "AutoShutdownEnabled"
    AP_CLIENTCONTROL_KEY = "ClientControlByUserEnabled"
    AP_ALLOWEDLIST_KEY = "AllowedClientList"
    AP_BLOCKEDLIST_KEY = "BlockedClientList"

    WIFI_CONFIG_SOFTAP_BAND_2G = 1
    WIFI_CONFIG_SOFTAP_BAND_5G = 2
    WIFI_CONFIG_SOFTAP_BAND_2G_5G = 3
    WIFI_CONFIG_SOFTAP_BAND_6G = 4
    WIFI_CONFIG_SOFTAP_BAND_2G_6G = 5
    WIFI_CONFIG_SOFTAP_BAND_5G_6G = 6
    WIFI_CONFIG_SOFTAP_BAND_ANY = 7

    # DO NOT USE IT for new test case! Replaced by WIFI_CONFIG_SOFTAP_BAND_
    WIFI_CONFIG_APBAND_2G = WIFI_CONFIG_SOFTAP_BAND_2G
    WIFI_CONFIG_APBAND_5G = WIFI_CONFIG_SOFTAP_BAND_5G
    WIFI_CONFIG_APBAND_AUTO = WIFI_CONFIG_SOFTAP_BAND_2G_5G

    WIFI_CONFIG_APBAND_2G_OLD = 0
    WIFI_CONFIG_APBAND_5G_OLD = 1
    WIFI_CONFIG_APBAND_AUTO_OLD = -1

    WIFI_WPS_INFO_PBC = 0
    WIFI_WPS_INFO_DISPLAY = 1
    WIFI_WPS_INFO_KEYPAD = 2
    WIFI_WPS_INFO_LABEL = 3
    WIFI_WPS_INFO_INVALID = 4

    class SoftApSecurityType():
        OPEN = "NONE"
        WPA2 = "WPA2_PSK"
        WPA3_SAE_TRANSITION = "WPA3_SAE_TRANSITION"
        WPA3_SAE = "WPA3_SAE"

    class CountryCode():
        AUSTRALIA = "AU"
        CHINA = "CN"
        GERMANY = "DE"
        JAPAN = "JP"
        UK = "GB"
        US = "US"
        UNKNOWN = "UNKNOWN"

    # Start of Macros for EAP
    # EAP types
    class Eap(IntEnum):
        NONE = -1
        PEAP = 0
        TLS = 1
        TTLS = 2
        PWD = 3
        SIM = 4
        AKA = 5
        AKA_PRIME = 6
        UNAUTH_TLS = 7

    # EAP Phase2 types
    class EapPhase2(IntEnum):
        NONE = 0
        PAP = 1
        MSCHAP = 2
        MSCHAPV2 = 3
        GTC = 4

    class Enterprise:
        # Enterprise Config Macros
        EMPTY_VALUE = "NULL"
        EAP = "eap"
        PHASE2 = "phase2"
        IDENTITY = "identity"
        ANON_IDENTITY = "anonymous_identity"
        PASSWORD = "password"
        SUBJECT_MATCH = "subject_match"
        ALTSUBJECT_MATCH = "altsubject_match"
        DOM_SUFFIX_MATCH = "domain_suffix_match"
        CLIENT_CERT = "client_cert"
        CA_CERT = "ca_cert"
        ENGINE = "engine"
        ENGINE_ID = "engine_id"
        PRIVATE_KEY_ID = "key_id"
        REALM = "realm"
        PLMN = "plmn"
        FQDN = "FQDN"
        FRIENDLY_NAME = "providerFriendlyName"
        ROAMING_IDS = "roamingConsortiumIds"
        OCSP = "ocsp"

    # End of Macros for EAP

    # Macros for wifi p2p.
    WIFI_P2P_SERVICE_TYPE_ALL = 0
    WIFI_P2P_SERVICE_TYPE_BONJOUR = 1
    WIFI_P2P_SERVICE_TYPE_UPNP = 2
    WIFI_P2P_SERVICE_TYPE_VENDOR_SPECIFIC = 255

    class ScanResult:
        CHANNEL_WIDTH_20MHZ = 0
        CHANNEL_WIDTH_40MHZ = 1
        CHANNEL_WIDTH_80MHZ = 2
        CHANNEL_WIDTH_160MHZ = 3
        CHANNEL_WIDTH_80MHZ_PLUS_MHZ = 4

    # Macros for wifi rtt.
    class RttType(IntEnum):
        TYPE_ONE_SIDED = 1
        TYPE_TWO_SIDED = 2

    class RttPeerType(IntEnum):
        PEER_TYPE_AP = 1
        PEER_TYPE_STA = 2  # Requires NAN.
        PEER_P2P_GO = 3
        PEER_P2P_CLIENT = 4
        PEER_NAN = 5

    class RttPreamble(IntEnum):
        PREAMBLE_LEGACY = 0x01
        PREAMBLE_HT = 0x02
        PREAMBLE_VHT = 0x04

    class RttBW(IntEnum):
        BW_5_SUPPORT = 0x01
        BW_10_SUPPORT = 0x02
        BW_20_SUPPORT = 0x04
        BW_40_SUPPORT = 0x08
        BW_80_SUPPORT = 0x10
        BW_160_SUPPORT = 0x20

    class Rtt(IntEnum):
        STATUS_SUCCESS = 0
        STATUS_FAILURE = 1
        STATUS_FAIL_NO_RSP = 2
        STATUS_FAIL_REJECTED = 3
        STATUS_FAIL_NOT_SCHEDULED_YET = 4
        STATUS_FAIL_TM_TIMEOUT = 5
        STATUS_FAIL_AP_ON_DIFF_CHANNEL = 6
        STATUS_FAIL_NO_CAPABILITY = 7
        STATUS_ABORTED = 8
        STATUS_FAIL_INVALID_TS = 9
        STATUS_FAIL_PROTOCOL = 10
        STATUS_FAIL_SCHEDULE = 11
        STATUS_FAIL_BUSY_TRY_LATER = 12
        STATUS_INVALID_REQ = 13
        STATUS_NO_WIFI = 14
        STATUS_FAIL_FTM_PARAM_OVERRIDE = 15

        REASON_UNSPECIFIED = -1
        REASON_NOT_AVAILABLE = -2
        REASON_INVALID_LISTENER = -3
        REASON_INVALID_REQUEST = -4

    class RttParam:
        device_type = "deviceType"
        request_type = "requestType"
        BSSID = "bssid"
        channel_width = "channelWidth"
        frequency = "frequency"
        center_freq0 = "centerFreq0"
        center_freq1 = "centerFreq1"
        number_burst = "numberBurst"
        interval = "interval"
        num_samples_per_burst = "numSamplesPerBurst"
        num_retries_per_measurement_frame = "numRetriesPerMeasurementFrame"
        num_retries_per_FTMR = "numRetriesPerFTMR"
        lci_request = "LCIRequest"
        lcr_request = "LCRRequest"
        burst_timeout = "burstTimeout"
        preamble = "preamble"
        bandwidth = "bandwidth"
        margin = "margin"

    RTT_MARGIN_OF_ERROR = {
        RttBW.BW_80_SUPPORT: 2,
        RttBW.BW_40_SUPPORT: 5,
        RttBW.BW_20_SUPPORT: 5
    }

    # Macros as specified in the WifiScanner code.
    WIFI_BAND_UNSPECIFIED = 0  # not specified
    WIFI_BAND_24_GHZ = 1  # 2.4 GHz band
    WIFI_BAND_5_GHZ = 2  # 5 GHz band without DFS channels
    WIFI_BAND_5_GHZ_DFS_ONLY = 4  # 5 GHz band with DFS channels
    WIFI_BAND_5_GHZ_WITH_DFS = 6  # 5 GHz band with DFS channels
    WIFI_BAND_BOTH = 3  # both bands without DFS channels
    WIFI_BAND_BOTH_WITH_DFS = 7  # both bands with DFS channels

    REPORT_EVENT_AFTER_BUFFER_FULL = 0
    REPORT_EVENT_AFTER_EACH_SCAN = 1
    REPORT_EVENT_FULL_SCAN_RESULT = 2

    SCAN_TYPE_LOW_LATENCY = 0
    SCAN_TYPE_LOW_POWER = 1
    SCAN_TYPE_HIGH_ACCURACY = 2

    # US Wifi frequencies
    ALL_2G_FREQUENCIES = [
        2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462
    ]
    DFS_5G_FREQUENCIES = [
        5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620, 5640,
        5660, 5680, 5700, 5720
    ]
    NONE_DFS_5G_FREQUENCIES = [
        5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825
    ]
    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES

    band_to_frequencies = {
        WIFI_BAND_24_GHZ: ALL_2G_FREQUENCIES,
        WIFI_BAND_5_GHZ: NONE_DFS_5G_FREQUENCIES,
        WIFI_BAND_5_GHZ_DFS_ONLY: DFS_5G_FREQUENCIES,
        WIFI_BAND_5_GHZ_WITH_DFS: ALL_5G_FREQUENCIES,
        WIFI_BAND_BOTH: ALL_2G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES,
        WIFI_BAND_BOTH_WITH_DFS: ALL_5G_FREQUENCIES + ALL_2G_FREQUENCIES
    }

    # TODO: add all of the band mapping.
    softap_band_frequencies = {
        WIFI_CONFIG_SOFTAP_BAND_2G: ALL_2G_FREQUENCIES,
        WIFI_CONFIG_SOFTAP_BAND_5G: ALL_5G_FREQUENCIES
    }

    # All Wifi frequencies to channels lookup.
    freq_to_channel = {
        2412: 1,
        2417: 2,
        2422: 3,
        2427: 4,
        2432: 5,
        2437: 6,
        2442: 7,
        2447: 8,
        2452: 9,
        2457: 10,
        2462: 11,
        2467: 12,
        2472: 13,
        2484: 14,
        4915: 183,
        4920: 184,
        4925: 185,
        4935: 187,
        4940: 188,
        4945: 189,
        4960: 192,
        4980: 196,
        5035: 7,
        5040: 8,
        5045: 9,
        5055: 11,
        5060: 12,
        5080: 16,
        5170: 34,
        5180: 36,
        5190: 38,
        5200: 40,
        5210: 42,
        5220: 44,
        5230: 46,
        5240: 48,
        5260: 52,
        5280: 56,
        5300: 60,
        5320: 64,
        5500: 100,
        5520: 104,
        5540: 108,
        5560: 112,
        5580: 116,
        5600: 120,
        5620: 124,
        5640: 128,
        5660: 132,
        5680: 136,
        5700: 140,
        5745: 149,
        5765: 153,
        5785: 157,
        5795: 159,
        5805: 161,
        5825: 165,
    }

    # All Wifi channels to frequencies lookup.
    channel_2G_to_freq = {
        1: 2412,
        2: 2417,
        3: 2422,
        4: 2427,
        5: 2432,
        6: 2437,
        7: 2442,
        8: 2447,
        9: 2452,
        10: 2457,
        11: 2462,
        12: 2467,
        13: 2472,
        14: 2484
    }

    channel_5G_to_freq = {
        183: 4915,
        184: 4920,
        185: 4925,
        187: 4935,
        188: 4940,
        189: 4945,
        192: 4960,
        196: 4980,
        7: 5035,
        8: 5040,
        9: 5045,
        11: 5055,
        12: 5060,
        16: 5080,
        34: 5170,
        36: 5180,
        38: 5190,
        40: 5200,
        42: 5210,
        44: 5220,
        46: 5230,
        48: 5240,
        50: 5250,
        52: 5260,
        56: 5280,
        60: 5300,
        64: 5320,
        100: 5500,
        104: 5520,
        108: 5540,
        112: 5560,
        116: 5580,
        120: 5600,
        124: 5620,
        128: 5640,
        132: 5660,
        136: 5680,
        140: 5700,
        149: 5745,
        151: 5755,
        153: 5765,
        155: 5775,
        157: 5785,
        159: 5795,
        161: 5805,
        165: 5825
    }


class WifiChannelBase:
    ALL_2G_FREQUENCIES = []
    DFS_5G_FREQUENCIES = []
    NONE_DFS_5G_FREQUENCIES = []
    ALL_5G_FREQUENCIES = DFS_5G_FREQUENCIES + NONE_DFS_5G_FREQUENCIES
    MIX_CHANNEL_SCAN = []

    def band_to_freq(self, band):
        _band_to_frequencies = {
            WifiEnums.WIFI_BAND_24_GHZ:
            self.ALL_2G_FREQUENCIES,
            WifiEnums.WIFI_BAND_5_GHZ:
            self.NONE_DFS_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_5_GHZ_DFS_ONLY:
            self.DFS_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_5_GHZ_WITH_DFS:
            self.ALL_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_BOTH:
            self.ALL_2G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES,
            WifiEnums.WIFI_BAND_BOTH_WITH_DFS:
            self.ALL_5G_FREQUENCIES + self.ALL_2G_FREQUENCIES
        }
        return _band_to_frequencies[band]


class WifiChannelUS(WifiChannelBase):
    # US Wifi frequencies
    ALL_2G_FREQUENCIES = [
        2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457, 2462
    ]
    NONE_DFS_5G_FREQUENCIES = [
        5180, 5200, 5220, 5240, 5745, 5765, 5785, 5805, 5825
    ]
    MIX_CHANNEL_SCAN = [
        2412, 2437, 2462, 5180, 5200, 5280, 5260, 5300, 5500, 5320, 5520, 5560,
        5700, 5745, 5805
    ]

    def __init__(self, model=None, support_addition_channel=[]):
        if model in support_addition_channel:
            self.ALL_2G_FREQUENCIES = [
                2412, 2417, 2422, 2427, 2432, 2437, 2442, 2447, 2452, 2457,
                2462, 2467, 2472
                ]
        self.DFS_5G_FREQUENCIES = [
            5260, 5280, 5300, 5320, 5500, 5520, 5540, 5560, 5580, 5600, 5620,
            5640, 5660, 5680, 5700, 5720
            ]
        self.ALL_5G_FREQUENCIES = self.DFS_5G_FREQUENCIES + self.NONE_DFS_5G_FREQUENCIES


class WifiReferenceNetworks:
    """ Class to parse and return networks of different band and
        auth type from reference_networks
    """
    def __init__(self, obj):
        self.reference_networks = obj
        self.WIFI_2G = "2g"
        self.WIFI_5G = "5g"

        self.secure_networks_2g = []
        self.secure_networks_5g = []
        self.open_networks_2g = []
        self.open_networks_5g = []
        self._parse_networks()

    def _parse_networks(self):
        for network in self.reference_networks:
            for key in network:
                if key == self.WIFI_2G:
                    if "password" in network[key]:
                        self.secure_networks_2g.append(network[key])
                    else:
                        self.open_networks_2g.append(network[key])
                else:
                    if "password" in network[key]:
                        self.secure_networks_5g.append(network[key])
                    else:
                        self.open_networks_5g.append(network[key])

    def return_2g_secure_networks(self):
        return self.secure_networks_2g

    def return_5g_secure_networks(self):
        return self.secure_networks_5g

    def return_2g_open_networks(self):
        return self.open_networks_2g

    def return_5g_open_networks(self):
        return self.open_networks_5g

    def return_secure_networks(self):
        return self.secure_networks_2g + self.secure_networks_5g

    def return_open_networks(self):
        return self.open_networks_2g + self.open_networks_5g


def _assert_on_fail_handler(func, assert_on_fail, *args, **kwargs):
    """Wrapper function that handles the bahevior of assert_on_fail.

    When assert_on_fail is True, let all test signals through, which can
    terminate test cases directly. When assert_on_fail is False, the wrapper
    raises no test signals and reports operation status by returning True or
    False.

    Args:
        func: The function to wrap. This function reports operation status by
              raising test signals.
        assert_on_fail: A boolean that specifies if the output of the wrapper
                        is test signal based or return value based.
        args: Positional args for func.
        kwargs: Name args for func.

    Returns:
        If assert_on_fail is True, returns True/False to signal operation
        status, otherwise return nothing.
    """
    try:
        func(*args, **kwargs)
        if not assert_on_fail:
            return True
    except signals.TestSignal:
        if assert_on_fail:
            raise
        return False


def assert_network_in_list(target, network_list):
    """Makes sure a specified target Wi-Fi network exists in a list of Wi-Fi
    networks.

    Args:
        target: A dict representing a Wi-Fi network.
                E.g. {WifiEnums.SSID_KEY: "SomeNetwork"}
        network_list: A list of dicts, each representing a Wi-Fi network.
    """
    match_results = match_networks(target, network_list)
    asserts.assert_true(
        match_results, "Target network %s, does not exist in network list %s" %
        (target, network_list))


def match_networks(target_params, networks):
    """Finds the WiFi networks that match a given set of parameters in a list
    of WiFi networks.

    To be considered a match, the network should contain every key-value pair
    of target_params

    Args:
        target_params: A dict with 1 or more key-value pairs representing a Wi-Fi network.
                       E.g { 'SSID': 'wh_ap1_5g', 'BSSID': '30:b5:c2:33:e4:47' }
        networks: A list of dict objects representing WiFi networks.

    Returns:
        The networks that match the target parameters.
    """
    results = []
    asserts.assert_true(target_params,
                        "Expected networks object 'target_params' is empty")
    for n in networks:
        add_network = 1
        for k, v in target_params.items():
            if k not in n:
                add_network = 0
                break
            if n[k] != v:
                add_network = 0
                break
        if add_network:
            results.append(n)
    return results


def wait_for_wifi_state(ad, state, assert_on_fail=True):
    """Waits for the device to transition to the specified wifi state

    Args:
        ad: An AndroidDevice object.
        state: Wifi state to wait for.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns True if the device transitions
        to the specified state, False otherwise. If assert_on_fail is True, no return value.
    """
    return _assert_on_fail_handler(_wait_for_wifi_state,
                                   assert_on_fail,
                                   ad,
                                   state=state)


def _wait_for_wifi_state(ad, state):
    """Toggles the state of wifi.

    TestFailure signals are raised when something goes wrong.

    Args:
        ad: An AndroidDevice object.
        state: Wifi state to wait for.
    """
    if state == ad.droid.wifiCheckState():
        # Check if the state is already achieved, so we don't wait for the
        # state change event by mistake.
        return
    ad.droid.wifiStartTrackingStateChange()
    fail_msg = "Device did not transition to Wi-Fi state to %s on %s." % (
        state, ad.serial)
    try:
        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
                             lambda x: x["data"]["enabled"] == state,
                             SHORT_TIMEOUT)
    except Empty:
        asserts.assert_equal(state, ad.droid.wifiCheckState(), fail_msg)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_toggle_state(ad, new_state=None, assert_on_fail=True):
    """Toggles the state of wifi.

    Args:
        ad: An AndroidDevice object.
        new_state: Wifi state to set to. If None, opposite of the current state.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns True if the toggle was
        successful, False otherwise. If assert_on_fail is True, no return value.
    """
    return _assert_on_fail_handler(_wifi_toggle_state,
                                   assert_on_fail,
                                   ad,
                                   new_state=new_state)


def _wifi_toggle_state(ad, new_state=None):
    """Toggles the state of wifi.

    TestFailure signals are raised when something goes wrong.

    Args:
        ad: An AndroidDevice object.
        new_state: The state to set Wi-Fi to. If None, opposite of the current
                   state will be set.
    """
    if new_state is None:
        new_state = not ad.droid.wifiCheckState()
    elif new_state == ad.droid.wifiCheckState():
        # Check if the new_state is already achieved, so we don't wait for the
        # state change event by mistake.
        return
    ad.droid.wifiStartTrackingStateChange()
    ad.log.info("Setting Wi-Fi state to %s.", new_state)
    ad.ed.clear_all_events()
    # Setting wifi state.
    ad.droid.wifiToggleState(new_state)
    time.sleep(2)
    fail_msg = "Failed to set Wi-Fi state to %s on %s." % (new_state,
                                                           ad.serial)
    try:
        ad.ed.wait_for_event(wifi_constants.WIFI_STATE_CHANGED,
                             lambda x: x["data"]["enabled"] == new_state,
                             SHORT_TIMEOUT)
    except Empty:
        asserts.assert_equal(new_state, ad.droid.wifiCheckState(), fail_msg)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def reset_wifi(ad):
    """Clears all saved Wi-Fi networks on a device.

    This will turn Wi-Fi on.

    Args:
        ad: An AndroidDevice object.

    """
    networks = ad.droid.wifiGetConfiguredNetworks()
    if not networks:
        return
    removed = []
    for n in networks:
        if n['networkId'] not in removed:
            ad.droid.wifiForgetNetwork(n['networkId'])
            removed.append(n['networkId'])
        else:
            continue
        try:
            event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
                                    SHORT_TIMEOUT)
        except Empty:
            logging.warning("Could not confirm the removal of network %s.", n)
    # Check again to see if there's any network left.
    asserts.assert_true(
        not ad.droid.wifiGetConfiguredNetworks(),
        "Failed to remove these configured Wi-Fi networks: %s" % networks)



def toggle_airplane_mode_on_and_off(ad):
    """Turn ON and OFF Airplane mode.

    ad: An AndroidDevice object.
    Returns: Assert if turning on/off Airplane mode fails.

    """
    ad.log.debug("Toggling Airplane mode ON.")
    asserts.assert_true(utils.force_airplane_mode(ad, True),
                        "Can not turn on airplane mode on: %s" % ad.serial)
    time.sleep(DEFAULT_TIMEOUT)
    ad.log.debug("Toggling Airplane mode OFF.")
    asserts.assert_true(utils.force_airplane_mode(ad, False),
                        "Can not turn on airplane mode on: %s" % ad.serial)
    time.sleep(DEFAULT_TIMEOUT)


def toggle_wifi_off_and_on(ad):
    """Turn OFF and ON WiFi.

    ad: An AndroidDevice object.
    Returns: Assert if turning off/on WiFi fails.

    """
    ad.log.debug("Toggling wifi OFF.")
    wifi_toggle_state(ad, False)
    time.sleep(DEFAULT_TIMEOUT)
    ad.log.debug("Toggling wifi ON.")
    wifi_toggle_state(ad, True)
    time.sleep(DEFAULT_TIMEOUT)


def wifi_forget_network(ad, net_ssid):
    """Remove configured Wifi network on an android device.

    Args:
        ad: android_device object for forget network.
        net_ssid: ssid of network to be forget

    """
    networks = ad.droid.wifiGetConfiguredNetworks()
    if not networks:
        return
    removed = []
    for n in networks:
        if net_ssid in n[WifiEnums.SSID_KEY] and n['networkId'] not in removed:
            ad.droid.wifiForgetNetwork(n['networkId'])
            removed.append(n['networkId'])
            try:
                event = ad.ed.pop_event(wifi_constants.WIFI_FORGET_NW_SUCCESS,
                                        SHORT_TIMEOUT)
            except Empty:
                asserts.fail("Failed to remove network %s." % n)
            break


def wifi_test_device_init(ad):
    """Initializes an android device for wifi testing.

    0. Make sure SL4A connection is established on the android device.
    1. Disable location service's WiFi scan.
    2. Turn WiFi on.
    3. Clear all saved networks.
    4. Set country code to US.
    5. Enable WiFi verbose logging.
    6. Sync device time with computer time.
    7. Turn off cellular data.
    8. Turn off ambient display.
    """
    utils.require_sl4a((ad, ))
    ad.droid.wifiScannerToggleAlwaysAvailable(False)
    msg = "Failed to turn off location service's scan."
    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)
    wifi_toggle_state(ad, True)
    reset_wifi(ad)
    ad.droid.wifiEnableVerboseLogging(1)
    msg = "Failed to enable WiFi verbose logging."
    asserts.assert_equal(ad.droid.wifiGetVerboseLoggingLevel(), 1, msg)
    # We don't verify the following settings since they are not critical.
    # Set wpa_supplicant log level to EXCESSIVE.
    output = ad.adb.shell(
        "wpa_cli -i wlan0 -p -g@android:wpa_wlan0 IFNAME="
        "wlan0 log_level EXCESSIVE",
        ignore_status=True)
    ad.log.info("wpa_supplicant log change status: %s", output)
    utils.sync_device_time(ad)
    ad.droid.telephonyToggleDataConnection(False)
    set_wifi_country_code(ad, WifiEnums.CountryCode.US)
    utils.set_ambient_display(ad, False)


def set_wifi_country_code(ad, country_code):
    """Sets the wifi country code on the device.

    Args:
        ad: An AndroidDevice object.
        country_code: 2 letter ISO country code

    Raises:
        An RpcException if unable to set the country code.
    """
    try:
        ad.adb.shell("cmd wifi force-country-code enabled %s" % country_code)
    except Exception as e:
        ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US)


def start_wifi_connection_scan(ad):
    """Starts a wifi connection scan and wait for results to become available.

    Args:
        ad: An AndroidDevice object.
    """
    ad.ed.clear_all_events()
    ad.droid.wifiStartScan()
    try:
        ad.ed.pop_event("WifiManagerScanResultsAvailable", 60)
    except Empty:
        asserts.fail("Wi-Fi results did not become available within 60s.")


def start_wifi_connection_scan_and_return_status(ad):
    """
    Starts a wifi connection scan and wait for results to become available
    or a scan failure to be reported.

    Args:
        ad: An AndroidDevice object.
    Returns:
        True: if scan succeeded & results are available
        False: if scan failed
    """
    ad.ed.clear_all_events()
    ad.droid.wifiStartScan()
    try:
        events = ad.ed.pop_events("WifiManagerScan(ResultsAvailable|Failure)",
                                  60)
    except Empty:
        asserts.fail(
            "Wi-Fi scan results/failure did not become available within 60s.")
    # If there are multiple matches, we check for atleast one success.
    for event in events:
        if event["name"] == "WifiManagerScanResultsAvailable":
            return True
        elif event["name"] == "WifiManagerScanFailure":
            ad.log.debug("Scan failure received")
    return False


def start_wifi_connection_scan_and_check_for_network(ad,
                                                     network_ssid,
                                                     max_tries=3):
    """
    Start connectivity scans & checks if the |network_ssid| is seen in
    scan results. The method performs a max of |max_tries| connectivity scans
    to find the network.

    Args:
        ad: An AndroidDevice object.
        network_ssid: SSID of the network we are looking for.
        max_tries: Number of scans to try.
    Returns:
        True: if network_ssid is found in scan results.
        False: if network_ssid is not found in scan results.
    """
    start_time = time.time()
    for num_tries in range(max_tries):
        if start_wifi_connection_scan_and_return_status(ad):
            scan_results = ad.droid.wifiGetScanResults()
            match_results = match_networks({WifiEnums.SSID_KEY: network_ssid},
                                           scan_results)
            if len(match_results) > 0:
                ad.log.debug("Found network in %s seconds." %
                             (time.time() - start_time))
                return True
    ad.log.debug("Did not find network in %s seconds." %
                 (time.time() - start_time))
    return False


def start_wifi_connection_scan_and_ensure_network_found(
        ad, network_ssid, max_tries=3):
    """
    Start connectivity scans & ensure the |network_ssid| is seen in
    scan results. The method performs a max of |max_tries| connectivity scans
    to find the network.
    This method asserts on failure!

    Args:
        ad: An AndroidDevice object.
        network_ssid: SSID of the network we are looking for.
        max_tries: Number of scans to try.
    """
    ad.log.info("Starting scans to ensure %s is present", network_ssid)
    assert_msg = "Failed to find " + network_ssid + " in scan results" \
        " after " + str(max_tries) + " tries"
    asserts.assert_true(
        start_wifi_connection_scan_and_check_for_network(
            ad, network_ssid, max_tries), assert_msg)


def start_wifi_connection_scan_and_ensure_network_not_found(
        ad, network_ssid, max_tries=3):
    """
    Start connectivity scans & ensure the |network_ssid| is not seen in
    scan results. The method performs a max of |max_tries| connectivity scans
    to find the network.
    This method asserts on failure!

    Args:
        ad: An AndroidDevice object.
        network_ssid: SSID of the network we are looking for.
        max_tries: Number of scans to try.
    """
    ad.log.info("Starting scans to ensure %s is not present", network_ssid)
    assert_msg = "Found " + network_ssid + " in scan results" \
        " after " + str(max_tries) + " tries"
    asserts.assert_false(
        start_wifi_connection_scan_and_check_for_network(
            ad, network_ssid, max_tries), assert_msg)


def start_wifi_background_scan(ad, scan_setting):
    """Starts wifi background scan.

    Args:
        ad: android_device object to initiate connection on.
        scan_setting: A dict representing the settings of the scan.

    Returns:
        If scan was started successfully, event data of success event is returned.
    """
    idx = ad.droid.wifiScannerStartBackgroundScan(scan_setting)
    event = ad.ed.pop_event("WifiScannerScan{}onSuccess".format(idx),
                            SHORT_TIMEOUT)
    return event['data']


def start_wifi_tethering(ad, ssid, password, band=None, hidden=None,
                         security=None):
    """Starts wifi tethering on an android_device.

    Args:
        ad: android_device to start wifi tethering on.
        ssid: The SSID the soft AP should broadcast.
        password: The password the soft AP should use.
        band: The band the soft AP should be set on. It should be either
            WifiEnums.WIFI_CONFIG_APBAND_2G or WifiEnums.WIFI_CONFIG_APBAND_5G.
        hidden: boolean to indicate if the AP needs to be hidden or not.
        security: security type of softap.

    Returns:
        No return value. Error checks in this function will raise test failure signals
    """
    config = {WifiEnums.SSID_KEY: ssid}
    if password:
        config[WifiEnums.PWD_KEY] = password
    if band:
        config[WifiEnums.AP_BAND_KEY] = band
    if hidden:
        config[WifiEnums.HIDDEN_KEY] = hidden
    if security:
        config[WifiEnums.SECURITY] = security
    asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(config),
                        "Failed to update WifiAp Configuration")
    ad.droid.wifiStartTrackingTetherStateChange()
    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
    try:
        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
        ad.ed.wait_for_event("TetherStateChanged",
                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
        ad.log.debug("Tethering started successfully.")
    except Empty:
        msg = "Failed to receive confirmation of wifi tethering starting"
        asserts.fail(msg)
    finally:
        ad.droid.wifiStopTrackingTetherStateChange()


def save_wifi_soft_ap_config(ad,
                             wifi_config,
                             band=None,
                             hidden=None,
                             security=None,
                             password=None,
                             channel=None,
                             max_clients=None,
                             shutdown_timeout_enable=None,
                             shutdown_timeout_millis=None,
                             client_control_enable=None,
                             allowedList=None,
                             blockedList=None,
                             bands=None,
                             channel_frequencys=None,
                             mac_randomization_setting=None,
                             bridged_opportunistic_shutdown_enabled=None,
                             ieee80211ax_enabled=None):
    """ Save a soft ap configuration and verified
    Args:
        ad: android_device to set soft ap configuration.
        wifi_config: a soft ap configuration object, at least include SSID.
        band: specifies the band for the soft ap.
        hidden: specifies the soft ap need to broadcast its SSID or not.
        security: specifies the security type for the soft ap.
        password: specifies the password for the soft ap.
        channel: specifies the channel for the soft ap.
        max_clients: specifies the maximum connected client number.
        shutdown_timeout_enable: specifies the auto shut down enable or not.
        shutdown_timeout_millis: specifies the shut down timeout value.
        client_control_enable: specifies the client control enable or not.
        allowedList: specifies allowed clients list.
        blockedList: specifies blocked clients list.
        bands: specifies the band list for the soft ap.
        channel_frequencys: specifies the channel frequency list for soft ap.
        mac_randomization_setting: specifies the mac randomization setting.
        bridged_opportunistic_shutdown_enabled: specifies the opportunistic
                shutdown enable or not.
        ieee80211ax_enabled: specifies the ieee80211ax enable or not.
    """
    if security and password:
        wifi_config[WifiEnums.SECURITY] = security
        wifi_config[WifiEnums.PWD_KEY] = password
    if hidden is not None:
        wifi_config[WifiEnums.HIDDEN_KEY] = hidden
    if max_clients is not None:
        wifi_config[WifiEnums.AP_MAXCLIENTS_KEY] = max_clients
    if shutdown_timeout_enable is not None:
        wifi_config[
            WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] = shutdown_timeout_enable
    if shutdown_timeout_millis is not None:
        wifi_config[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] = shutdown_timeout_millis
    if client_control_enable is not None:
        wifi_config[WifiEnums.AP_CLIENTCONTROL_KEY] = client_control_enable
    if allowedList is not None:
        wifi_config[WifiEnums.AP_ALLOWEDLIST_KEY] = allowedList
    if blockedList is not None:
        wifi_config[WifiEnums.AP_BLOCKEDLIST_KEY] = blockedList
    if mac_randomization_setting is not None:
        wifi_config[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY
                ] = mac_randomization_setting
    if bridged_opportunistic_shutdown_enabled is not None:
        wifi_config[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY
                ] = bridged_opportunistic_shutdown_enabled
    if ieee80211ax_enabled is not None:
       wifi_config[WifiEnums.AP_IEEE80211AX_ENABLED_KEY]= ieee80211ax_enabled
    if channel_frequencys is not None:
        wifi_config[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] = channel_frequencys
    elif bands is not None:
        wifi_config[WifiEnums.AP_BANDS_KEY] = bands
    elif band is not None:
        if channel is not None:
            wifi_config[WifiEnums.AP_BAND_KEY] = band
            wifi_config[WifiEnums.AP_CHANNEL_KEY] = channel
        else:
             wifi_config[WifiEnums.AP_BAND_KEY] = band

    if WifiEnums.AP_CHANNEL_KEY in wifi_config and wifi_config[
            WifiEnums.AP_CHANNEL_KEY] == 0:
        del wifi_config[WifiEnums.AP_CHANNEL_KEY]

    if WifiEnums.SECURITY in wifi_config and wifi_config[
            WifiEnums.SECURITY] == WifiEnums.SoftApSecurityType.OPEN:
        del wifi_config[WifiEnums.SECURITY]
        del wifi_config[WifiEnums.PWD_KEY]

    asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config),
                        "Failed to set WifiAp Configuration")

    wifi_ap = ad.droid.wifiGetApConfiguration()
    asserts.assert_true(
        wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY],
        "Hotspot SSID doesn't match")
    if WifiEnums.SECURITY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.SECURITY] == wifi_config[WifiEnums.SECURITY],
            "Hotspot Security doesn't match")
    if WifiEnums.PWD_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.PWD_KEY] == wifi_config[WifiEnums.PWD_KEY],
            "Hotspot Password doesn't match")

    if WifiEnums.HIDDEN_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.HIDDEN_KEY] == wifi_config[WifiEnums.HIDDEN_KEY],
            "Hotspot hidden setting doesn't match")

    if WifiEnums.AP_CHANNEL_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_CHANNEL_KEY] == wifi_config[
                WifiEnums.AP_CHANNEL_KEY], "Hotspot Channel doesn't match")
    if WifiEnums.AP_MAXCLIENTS_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_MAXCLIENTS_KEY] == wifi_config[
                WifiEnums.AP_MAXCLIENTS_KEY],
            "Hotspot Max Clients doesn't match")
    if WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY] == wifi_config[
                WifiEnums.AP_SHUTDOWNTIMEOUTENABLE_KEY],
            "Hotspot ShutDown feature flag doesn't match")
    if WifiEnums.AP_SHUTDOWNTIMEOUT_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_SHUTDOWNTIMEOUT_KEY] == wifi_config[
                WifiEnums.AP_SHUTDOWNTIMEOUT_KEY],
            "Hotspot ShutDown timeout setting doesn't match")
    if WifiEnums.AP_CLIENTCONTROL_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_CLIENTCONTROL_KEY] == wifi_config[
                WifiEnums.AP_CLIENTCONTROL_KEY],
            "Hotspot Client control flag doesn't match")
    if WifiEnums.AP_ALLOWEDLIST_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_ALLOWEDLIST_KEY] == wifi_config[
                WifiEnums.AP_ALLOWEDLIST_KEY],
            "Hotspot Allowed List doesn't match")
    if WifiEnums.AP_BLOCKEDLIST_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_BLOCKEDLIST_KEY] == wifi_config[
                WifiEnums.AP_BLOCKEDLIST_KEY],
            "Hotspot Blocked List doesn't match")

    if WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY] == wifi_config[
                  WifiEnums.AP_MAC_RANDOMIZATION_SETTING_KEY],
            "Hotspot Mac randomization setting doesn't match")

    if WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY] == wifi_config[
                  WifiEnums.AP_BRIDGED_OPPORTUNISTIC_SHUTDOWN_ENABLE_KEY],
            "Hotspot bridged shutdown enable setting doesn't match")

    if WifiEnums.AP_IEEE80211AX_ENABLED_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_IEEE80211AX_ENABLED_KEY] == wifi_config[
                  WifiEnums.AP_IEEE80211AX_ENABLED_KEY],
            "Hotspot 80211 AX enable setting doesn't match")

    if WifiEnums.AP_CHANNEL_FREQUENCYS_KEY in wifi_config:
        asserts.assert_true(
            wifi_ap[WifiEnums.AP_CHANNEL_FREQUENCYS_KEY] == wifi_config[
                  WifiEnums.AP_CHANNEL_FREQUENCYS_KEY],
            "Hotspot channels setting doesn't match")

def start_wifi_tethering_saved_config(ad):
    """ Turn on wifi hotspot with a config that is already saved """
    ad.droid.wifiStartTrackingTetherStateChange()
    ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
    try:
        ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
        ad.ed.wait_for_event("TetherStateChanged",
                             lambda x: x["data"]["ACTIVE_TETHER"], 30)
    except:
        asserts.fail("Didn't receive wifi tethering starting confirmation")
    finally:
        ad.droid.wifiStopTrackingTetherStateChange()


def stop_wifi_tethering(ad):
    """Stops wifi tethering on an android_device.
    Args:
        ad: android_device to stop wifi tethering on.
    """
    ad.droid.wifiStartTrackingTetherStateChange()
    ad.droid.connectivityStopTethering(tel_defines.TETHERING_WIFI)
    try:
        ad.ed.pop_event("WifiManagerApDisabled", 30)
        ad.ed.wait_for_event("TetherStateChanged",
                             lambda x: not x["data"]["ACTIVE_TETHER"], 30)
    except Empty:
        msg = "Failed to receive confirmation of wifi tethering stopping"
        asserts.fail(msg)
    finally:
        ad.droid.wifiStopTrackingTetherStateChange()


def toggle_wifi_and_wait_for_reconnection(ad,
                                          network,
                                          num_of_tries=1,
                                          assert_on_fail=True):
    """Toggle wifi state and then wait for Android device to reconnect to
    the provided wifi network.

    This expects the device to be already connected to the provided network.

    Logic steps are
     1. Ensure that we're connected to the network.
     2. Turn wifi off.
     3. Wait for 10 seconds.
     4. Turn wifi on.
     5. Wait for the "connected" event, then confirm the connected ssid is the
        one requested.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to await connection. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns True if the toggle was
        successful, False otherwise. If assert_on_fail is True, no return value.
    """
    return _assert_on_fail_handler(_toggle_wifi_and_wait_for_reconnection,
                                   assert_on_fail,
                                   ad,
                                   network,
                                   num_of_tries=num_of_tries)


def _toggle_wifi_and_wait_for_reconnection(ad, network, num_of_tries=3):
    """Toggle wifi state and then wait for Android device to reconnect to
    the provided wifi network.

    This expects the device to be already connected to the provided network.

    Logic steps are
     1. Ensure that we're connected to the network.
     2. Turn wifi off.
     3. Wait for 10 seconds.
     4. Turn wifi on.
     5. Wait for the "connected" event, then confirm the connected ssid is the
        one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to await connection. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    expected_ssid = network[WifiEnums.SSID_KEY]
    # First ensure that we're already connected to the provided network.
    verify_con = {WifiEnums.SSID_KEY: expected_ssid}
    verify_wifi_connection_info(ad, verify_con)
    # Now toggle wifi state and wait for the connection event.
    wifi_toggle_state(ad, False)
    time.sleep(10)
    wifi_toggle_state(ad, True)
    ad.droid.wifiStartTrackingStateChange()
    try:
        connect_result = None
        for i in range(num_of_tries):
            try:
                connect_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
                                                 30)
                break
            except Empty:
                pass
        asserts.assert_true(
            connect_result, "Failed to connect to Wi-Fi network %s on %s" %
            (network, ad.serial))
        logging.debug("Connection result on %s: %s.", ad.serial,
                      connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        asserts.assert_equal(
            actual_ssid, expected_ssid, "Connected to the wrong network on %s."
            "Expected %s, but got %s." %
            (ad.serial, expected_ssid, actual_ssid))
        logging.info("Connected to Wi-Fi network %s on %s", actual_ssid,
                     ad.serial)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wait_for_connect(ad,
                     expected_ssid=None,
                     expected_id=None,
                     tries=2,
                     assert_on_fail=True):
    """Wait for a connect event.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: An Android device object.
        expected_ssid: SSID of the network to connect to.
        expected_id: Network Id of the network to connect to.
        tries: An integer that is the number of times to try before failing.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    return _assert_on_fail_handler(_wait_for_connect, assert_on_fail, ad,
                                   expected_ssid, expected_id, tries)


def _wait_for_connect(ad, expected_ssid=None, expected_id=None, tries=2):
    """Wait for a connect event.

    Args:
        ad: An Android device object.
        expected_ssid: SSID of the network to connect to.
        expected_id: Network Id of the network to connect to.
        tries: An integer that is the number of times to try before failing.
    """
    ad.droid.wifiStartTrackingStateChange()
    try:
        connect_result = _wait_for_connect_event(ad,
                                                 ssid=expected_ssid,
                                                 id=expected_id,
                                                 tries=tries)
        asserts.assert_true(
            connect_result,
            "Failed to connect to Wi-Fi network %s" % expected_ssid)
        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        if expected_ssid:
            asserts.assert_equal(actual_ssid, expected_ssid,
                                 "Connected to the wrong network")
        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
        if expected_id:
            asserts.assert_equal(actual_id, expected_id,
                                 "Connected to the wrong network")
        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)
    except Empty:
        asserts.fail("Failed to start connection process to %s" %
                     expected_ssid)
    except Exception as error:
        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
                     error)
        raise signals.TestFailure("Failed to connect to %s network" %
                                  expected_ssid)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def _wait_for_connect_event(ad, ssid=None, id=None, tries=1):
    """Wait for a connect event on queue and pop when available.

    Args:
        ad: An Android device object.
        ssid: SSID of the network to connect to.
        id: Network Id of the network to connect to.
        tries: An integer that is the number of times to try before failing.

    Returns:
        A dict with details of the connection data, which looks like this:
        {
         'time': 1485460337798,
         'name': 'WifiNetworkConnected',
         'data': {
                  'rssi': -27,
                  'is_24ghz': True,
                  'mac_address': '02:00:00:00:00:00',
                  'network_id': 1,
                  'BSSID': '30:b5:c2:33:d3:fc',
                  'ip_address': 117483712,
                  'link_speed': 54,
                  'supplicant_state': 'completed',
                  'hidden_ssid': False,
                  'SSID': 'wh_ap1_2g',
                  'is_5ghz': False}
        }

    """
    conn_result = None

    # If ssid and network id is None, just wait for any connect event.
    if id is None and ssid is None:
        for i in range(tries):
            try:
                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
                                              30)
                break
            except Empty:
                pass
    else:
        # If ssid or network id is specified, wait for specific connect event.
        for i in range(tries):
            try:
                conn_result = ad.ed.pop_event(wifi_constants.WIFI_CONNECTED,
                                              30)
                if id and conn_result['data'][WifiEnums.NETID_KEY] == id:
                    break
                elif ssid and conn_result['data'][WifiEnums.SSID_KEY] == ssid:
                    break
            except Empty:
                pass

    return conn_result


def wait_for_disconnect(ad, timeout=10):
    """Wait for a disconnect event within the specified timeout.

    Args:
        ad: Android device object.
        timeout: Timeout in seconds.

    """
    try:
        ad.droid.wifiStartTrackingStateChange()
        event = ad.ed.pop_event("WifiNetworkDisconnected", timeout)
    except Empty:
        raise signals.TestFailure("Device did not disconnect from the network")
    finally:
        ad.droid.wifiStopTrackingStateChange()


def ensure_no_disconnect(ad, duration=10):
    """Ensure that there is no disconnect for the specified duration.

    Args:
        ad: Android device object.
        duration: Duration in seconds.

    """
    try:
        ad.droid.wifiStartTrackingStateChange()
        event = ad.ed.pop_event("WifiNetworkDisconnected", duration)
        raise signals.TestFailure("Device disconnected from the network")
    except Empty:
        pass
    finally:
        ad.droid.wifiStopTrackingStateChange()


def connect_to_wifi_network(ad, network, assert_on_fail=True,
                            check_connectivity=True, hidden=False,
                            num_of_scan_tries=3, num_of_connect_tries=3):
    """Connection logic for open and psk wifi networks.

    Args:
        ad: AndroidDevice to use for connection
        network: network info of the network to connect to
        assert_on_fail: If true, errors from wifi_connect will raise
                        test failure signals.
        hidden: Is the Wifi network hidden.
        num_of_scan_tries: The number of times to try scan
                           interface before declaring failure.
        num_of_connect_tries: The number of times to try
                              connect wifi before declaring failure.
    """
    if hidden:
        start_wifi_connection_scan_and_ensure_network_not_found(
            ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
    else:
        start_wifi_connection_scan_and_ensure_network_found(
            ad, network[WifiEnums.SSID_KEY], max_tries=num_of_scan_tries)
    wifi_connect(ad,
                 network,
                 num_of_tries=num_of_connect_tries,
                 assert_on_fail=assert_on_fail,
                 check_connectivity=check_connectivity)


def connect_to_wifi_network_with_id(ad, network_id, network_ssid):
    """Connect to the given network using network id and verify SSID.

    Args:
        network_id: int Network Id of the network.
        network_ssid: string SSID of the network.

    Returns: True if connect using network id was successful;
             False otherwise.

    """
    start_wifi_connection_scan_and_ensure_network_found(ad, network_ssid)
    wifi_connect_by_id(ad, network_id)
    connect_data = ad.droid.wifiGetConnectionInfo()
    connect_ssid = connect_data[WifiEnums.SSID_KEY]
    ad.log.debug("Expected SSID = %s Connected SSID = %s" %
                 (network_ssid, connect_ssid))
    if connect_ssid != network_ssid:
        return False
    return True


def wifi_connect(ad,
                 network,
                 num_of_tries=1,
                 assert_on_fail=True,
                 check_connectivity=True):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    return _assert_on_fail_handler(_wifi_connect,
                                   assert_on_fail,
                                   ad,
                                   network,
                                   num_of_tries=num_of_tries,
                                   check_connectivity=check_connectivity)


def _wifi_connect(ad, network, num_of_tries=1, check_connectivity=True):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    asserts.assert_true(
        WifiEnums.SSID_KEY in network,
        "Key '%s' must be present in network definition." % WifiEnums.SSID_KEY)
    ad.droid.wifiStartTrackingStateChange()
    expected_ssid = network[WifiEnums.SSID_KEY]
    ad.droid.wifiConnectByConfig(network)
    ad.log.info("Starting connection process to %s", expected_ssid)
    try:
        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 30)
        connect_result = _wait_for_connect_event(ad,
                                                 ssid=expected_ssid,
                                                 tries=num_of_tries)
        asserts.assert_true(
            connect_result, "Failed to connect to Wi-Fi network %s on %s" %
            (network, ad.serial))
        ad.log.debug("Wi-Fi connection result: %s.", connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        asserts.assert_equal(
            actual_ssid, expected_ssid,
            "Connected to the wrong network on %s." % ad.serial)
        ad.log.info("Connected to Wi-Fi network %s.", actual_ssid)

        if check_connectivity:
            internet = validate_connection(ad, DEFAULT_PING_ADDR)
            if not internet:
                raise signals.TestFailure(
                    "Failed to connect to internet on %s" % expected_ssid)
    except Empty:
        asserts.fail("Failed to start connection process to %s on %s" %
                     (network, ad.serial))
    except Exception as error:
        ad.log.error("Failed to connect to %s with error %s", expected_ssid,
                     error)
        raise signals.TestFailure("Failed to connect to %s network" % network)

    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_connect_by_id(ad, network_id, num_of_tries=3, assert_on_fail=True):
    """Connect an Android device to a wifi network using network Id.

    Start connection to the wifi network, with the given network Id, wait for
    the "connected" event, then verify the connected network is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        network_id: Integer specifying the network id of the network.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    _assert_on_fail_handler(_wifi_connect_by_id, assert_on_fail, ad,
                            network_id, num_of_tries)


def _wifi_connect_by_id(ad, network_id, num_of_tries=1):
    """Connect an Android device to a wifi network using it's network id.

    Start connection to the wifi network, with the given network id, wait for
    the "connected" event, then verify the connected network is the one requested.

    Args:
        ad: android_device object to initiate connection on.
        network_id: Integer specifying the network id of the network.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    ad.droid.wifiStartTrackingStateChange()
    # Clear all previous events.
    ad.ed.clear_all_events()
    ad.droid.wifiConnectByNetworkId(network_id)
    ad.log.info("Starting connection to network with id %d", network_id)
    try:
        event = ad.ed.pop_event(wifi_constants.CONNECT_BY_NETID_SUCCESS, 60)
        connect_result = _wait_for_connect_event(ad,
                                                 id=network_id,
                                                 tries=num_of_tries)
        asserts.assert_true(
            connect_result,
            "Failed to connect to Wi-Fi network using network id")
        ad.log.debug("Wi-Fi connection result: %s", connect_result)
        actual_id = connect_result['data'][WifiEnums.NETID_KEY]
        asserts.assert_equal(
            actual_id, network_id, "Connected to the wrong network on %s."
            "Expected network id = %d, but got %d." %
            (ad.serial, network_id, actual_id))
        expected_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        ad.log.info("Connected to Wi-Fi network %s with %d network id.",
                    expected_ssid, network_id)

        internet = validate_connection(ad, DEFAULT_PING_ADDR)
        if not internet:
            raise signals.TestFailure("Failed to connect to internet on %s" %
                                      expected_ssid)
    except Empty:
        asserts.fail("Failed to connect to network with id %d on %s" %
                     (network_id, ad.serial))
    except Exception as error:
        ad.log.error("Failed to connect to network with id %d with error %s",
                     network_id, error)
        raise signals.TestFailure("Failed to connect to network with network"
                                  " id %d" % network_id)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_connect_using_network_request(ad,
                                       network,
                                       network_specifier,
                                       num_of_tries=3):
    """Connect an Android device to a wifi network using network request.

    Trigger a network request with the provided network specifier,
    wait for the "onMatch" event, ensure that the scan results in "onMatch"
    event contain the specified network, then simulate the user granting the
    request with the specified network selected. Then wait for the "onAvailable"
    network callback indicating successful connection to network.

    Args:
        ad: android_device object to initiate connection on.
        network_specifier: A dictionary representing the network specifier to
                           use.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        num_of_tries: An integer that is the number of times to try before
                      delaring failure.
    Returns:
        key: Key corresponding to network request.
    """
    key = ad.droid.connectivityRequestWifiNetwork(network_specifier, 0)
    ad.log.info("Sent network request %s with %s " % (key, network_specifier))
    # Need a delay here because UI interaction should only start once wifi
    # starts processing the request.
    time.sleep(wifi_constants.NETWORK_REQUEST_CB_REGISTER_DELAY_SEC)
    _wait_for_wifi_connect_after_network_request(ad, network, key, num_of_tries)
    return key


def wait_for_wifi_connect_after_network_request(ad,
                                                network,
                                                key,
                                                num_of_tries=3,
                                                assert_on_fail=True):
    """
    Simulate and verify the connection flow after initiating the network
    request.

    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
    event contain the specified network, then simulate the user granting the
    request with the specified network selected. Then wait for the "onAvailable"
    network callback indicating successful connection to network.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        key: Key corresponding to network request.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        Returns a value only if assert_on_fail is false.
        Returns True if the connection was successful, False otherwise.
    """
    _assert_on_fail_handler(_wait_for_wifi_connect_after_network_request,
                            assert_on_fail, ad, network, key, num_of_tries)


def _wait_for_wifi_connect_after_network_request(ad, network, key, num_of_tries=3):
    """
    Simulate and verify the connection flow after initiating the network
    request.

    Wait for the "onMatch" event, ensure that the scan results in "onMatch"
    event contain the specified network, then simulate the user granting the
    request with the specified network selected. Then wait for the "onAvailable"
    network callback indicating successful connection to network.

    Args:
        ad: android_device object to initiate connection on.
        network: A dictionary representing the network to connect to. The
                 dictionary must have the key "SSID".
        key: Key corresponding to network request.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure.
    """
    asserts.assert_true(
        WifiEnums.SSID_KEY in network,
        "Key '%s' must be present in network definition." % WifiEnums.SSID_KEY)
    ad.droid.wifiStartTrackingStateChange()
    expected_ssid = network[WifiEnums.SSID_KEY]
    ad.droid.wifiRegisterNetworkRequestMatchCallback()
    # Wait for the platform to scan and return a list of networks
    # matching the request
    try:
        matched_network = None
        for _ in [0, num_of_tries]:
            on_match_event = ad.ed.pop_event(
                wifi_constants.WIFI_NETWORK_REQUEST_MATCH_CB_ON_MATCH, 60)
            asserts.assert_true(on_match_event,
                                "Network request on match not received.")
            matched_scan_results = on_match_event["data"]
            ad.log.debug("Network request on match results %s",
                         matched_scan_results)
            matched_network = match_networks(
                {WifiEnums.SSID_KEY: network[WifiEnums.SSID_KEY]},
                matched_scan_results)
            ad.log.debug("Network request on match %s", matched_network)
            if matched_network:
                break

        asserts.assert_true(matched_network,
                            "Target network %s not found" % network)

        ad.droid.wifiSendUserSelectionForNetworkRequestMatch(network)
        ad.log.info("Sent user selection for network request %s",
                    expected_ssid)

        # Wait for the platform to connect to the network.
        autils.wait_for_event_with_keys(
            ad, cconsts.EVENT_NETWORK_CALLBACK,
            60,
            (cconsts.NETWORK_CB_KEY_ID, key),
            (cconsts.NETWORK_CB_KEY_EVENT, cconsts.NETWORK_CB_AVAILABLE))
        on_capabilities_changed = autils.wait_for_event_with_keys(
            ad, cconsts.EVENT_NETWORK_CALLBACK,
            10,
            (cconsts.NETWORK_CB_KEY_ID, key),
            (cconsts.NETWORK_CB_KEY_EVENT,
             cconsts.NETWORK_CB_CAPABILITIES_CHANGED))
        connected_network = None
        # WifiInfo is attached to TransportInfo only in S.
        if ad.droid.isSdkAtLeastS():
            connected_network = (
                on_capabilities_changed["data"][
                    cconsts.NETWORK_CB_KEY_TRANSPORT_INFO]
            )
        else:
            connected_network = ad.droid.wifiGetConnectionInfo()
        ad.log.info("Connected to network %s", connected_network)
        asserts.assert_equal(
            connected_network[WifiEnums.SSID_KEY], expected_ssid,
            "Connected to the wrong network."
            "Expected %s, but got %s."
            % (network, connected_network))
    except Empty:
        asserts.fail("Failed to connect to %s" % expected_ssid)
    except Exception as error:
        ad.log.error("Failed to connect to %s with error %s" %
                     (expected_ssid, error))
        raise signals.TestFailure("Failed to connect to %s network" % network)
    finally:
        ad.droid.wifiStopTrackingStateChange()


def wifi_passpoint_connect(ad,
                           passpoint_network,
                           num_of_tries=1,
                           assert_on_fail=True):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        passpoint_network: SSID of the Passpoint network to connect to.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
        assert_on_fail: If True, error checks in this function will raise test
                        failure signals.

    Returns:
        If assert_on_fail is False, function returns network id, if the connect was
        successful, False otherwise. If assert_on_fail is True, no return value.
    """
    _assert_on_fail_handler(_wifi_passpoint_connect,
                            assert_on_fail,
                            ad,
                            passpoint_network,
                            num_of_tries=num_of_tries)


def _wifi_passpoint_connect(ad, passpoint_network, num_of_tries=1):
    """Connect an Android device to a wifi network.

    Initiate connection to a wifi network, wait for the "connected" event, then
    confirm the connected ssid is the one requested.

    This will directly fail a test if anything goes wrong.

    Args:
        ad: android_device object to initiate connection on.
        passpoint_network: SSID of the Passpoint network to connect to.
        num_of_tries: An integer that is the number of times to try before
                      delaring failure. Default is 1.
    """
    ad.droid.wifiStartTrackingStateChange()
    expected_ssid = passpoint_network
    ad.log.info("Starting connection process to passpoint %s", expected_ssid)

    try:
        connect_result = _wait_for_connect_event(ad, expected_ssid,
                                                 num_of_tries)
        asserts.assert_true(
            connect_result, "Failed to connect to WiFi passpoint network %s on"
            " %s" % (expected_ssid, ad.serial))
        ad.log.info("Wi-Fi connection result: %s.", connect_result)
        actual_ssid = connect_result['data'][WifiEnums.SSID_KEY]
        asserts.assert_equal(
            actual_ssid, expected_ssid,
            "Connected to the wrong network on %s." % ad.serial)
        ad.log.info("Connected to Wi-Fi passpoint network %s.", actual_ssid)

        internet = validate_connection(ad, DEFAULT_PING_ADDR)
        if not internet:
            raise signals.TestFailure("Failed to connect to internet on %s" %
                                      expected_ssid)
    except Exception as error:
        ad.log.error("Failed to connect to passpoint network %s with error %s",
                     expected_ssid, error)
        raise signals.TestFailure("Failed to connect to %s passpoint network" %
                                  expected_ssid)

    finally:
        ad.droid.wifiStopTrackingStateChange()


def delete_passpoint(ad, fqdn):
    """Delete a required Passpoint configuration."""
    try:
        ad.droid.removePasspointConfig(fqdn)
        return True
    except Exception as error:
        ad.log.error(
            "Failed to remove passpoint configuration with FQDN=%s "
            "and error=%s", fqdn, error)
        return False


def start_wifi_single_scan(ad, scan_setting):
    """Starts wifi single shot scan.

    Args:
        ad: android_device object to initiate connection on.
        scan_setting: A dict representing the settings of the scan.

    Returns:
        If scan was started successfully, event data of success event is returned.
    """
    idx = ad.droid.wifiScannerStartScan(scan_setting)
    event = ad.ed.pop_event("WifiScannerScan%sonSuccess" % idx, SHORT_TIMEOUT)
    ad.log.debug("Got event %s", event)
    return event['data']


def track_connection(ad, network_ssid, check_connection_count):
    """Track wifi connection to network changes for given number of counts

    Args:
        ad: android_device object for forget network.
        network_ssid: network ssid to which connection would be tracked
        check_connection_count: Integer for maximum number network connection
                                check.
    Returns:
        True if connection to given network happen, else return False.
    """
    ad.droid.wifiStartTrackingStateChange()
    while check_connection_count > 0:
        connect_network = ad.ed.pop_event("WifiNetworkConnected", 120)
        ad.log.info("Connected to network %s", connect_network)
        if (WifiEnums.SSID_KEY in connect_network['data'] and
                connect_network['data'][WifiEnums.SSID_KEY] == network_ssid):
            return True
        check_connection_count -= 1
    ad.droid.wifiStopTrackingStateChange()
    return False


def get_scan_time_and_channels(wifi_chs, scan_setting, stime_channel):
    """Calculate the scan time required based on the band or channels in scan
    setting

    Args:
        wifi_chs: Object of channels supported
        scan_setting: scan setting used for start scan
        stime_channel: scan time per channel

    Returns:
        scan_time: time required for completing a scan
        scan_channels: channel used for scanning
    """
    scan_time = 0
    scan_channels = []
    if "band" in scan_setting and "channels" not in scan_setting:
        scan_channels = wifi_chs.band_to_freq(scan_setting["band"])
    elif "channels" in scan_setting and "band" not in scan_setting:
        scan_channels = scan_setting["channels"]
    scan_time = len(scan_channels) * stime_channel
    for channel in scan_channels:
        if channel in WifiEnums.DFS_5G_FREQUENCIES:
            scan_time += 132  #passive scan time on DFS
    return scan_time, scan_channels


def start_wifi_track_bssid(ad, track_setting):
    """Start tracking Bssid for the given settings.

    Args:
      ad: android_device object.
      track_setting: Setting for which the bssid tracking should be started

    Returns:
      If tracking started successfully, event data of success event is returned.
    """
    idx = ad.droid.wifiScannerStartTrackingBssids(
        track_setting["bssidInfos"], track_setting["apLostThreshold"])
    event = ad.ed.pop_event("WifiScannerBssid{}onSuccess".format(idx),
                            SHORT_TIMEOUT)
    return event['data']


def convert_pem_key_to_pkcs8(in_file, out_file):
    """Converts the key file generated by us to the format required by
    Android using openssl.

    The input file must have the extension "pem". The output file must
    have the extension "der".

    Args:
        in_file: The original key file.
        out_file: The full path to the converted key file, including
        filename.
    """
    asserts.assert_true(in_file.endswith(".pem"), "Input file has to be .pem.")
    asserts.assert_true(out_file.endswith(".der"),
                        "Output file has to be .der.")
    cmd = ("openssl pkcs8 -inform PEM -in {} -outform DER -out {} -nocrypt"
           " -topk8").format(in_file, out_file)
    utils.exe_cmd(cmd)


def validate_connection(ad,
                        ping_addr=DEFAULT_PING_ADDR,
                        wait_time=15,
                        ping_gateway=True):
    """Validate internet connection by pinging the address provided.

    Args:
        ad: android_device object.
        ping_addr: address on internet for pinging.
        wait_time: wait for some time before validating connection

    Returns:
        ping output if successful, NULL otherwise.
    """
    # wait_time to allow for DHCP to complete.
    for i in range(wait_time):
        if ad.droid.connectivityNetworkIsConnected(
        ) and ad.droid.connectivityGetIPv4DefaultGateway():
            break
        time.sleep(1)
    ping = False
    try:
        ping = ad.droid.httpPing(ping_addr)
        ad.log.info("Http ping result: %s.", ping)
    except:
        pass
    if not ping and ping_gateway:
        ad.log.info("Http ping failed. Pinging default gateway")
        gw = ad.droid.connectivityGetIPv4DefaultGateway()
        result = ad.adb.shell("ping -c 6 {}".format(gw))
        ad.log.info("Default gateway ping result: %s" % result)
        ping = False if "100% packet loss" in result else True
    return ping


#TODO(angli): This can only verify if an actual value is exactly the same.
# Would be nice to be able to verify an actual value is one of serveral.
def verify_wifi_connection_info(ad, expected_con):
    """Verifies that the information of the currently connected wifi network is
    as expected.

    Args:
        expected_con: A dict representing expected key-value pairs for wifi
            connection. e.g. {"SSID": "test_wifi"}
    """
    current_con = ad.droid.wifiGetConnectionInfo()
    case_insensitive = ["BSSID", "supplicant_state"]
    ad.log.debug("Current connection: %s", current_con)
    for k, expected_v in expected_con.items():
        # Do not verify authentication related fields.
        if k == "password":
            continue
        msg = "Field %s does not exist in wifi connection info %s." % (
            k, current_con)
        if k not in current_con:
            raise signals.TestFailure(msg)
        actual_v = current_con[k]
        if k in case_insensitive:
            actual_v = actual_v.lower()
            expected_v = expected_v.lower()
        msg = "Expected %s to be %s, actual %s is %s." % (k, expected_v, k,
                                                          actual_v)
        if actual_v != expected_v:
            raise signals.TestFailure(msg)


def check_autoconnect_to_open_network(
        ad, conn_timeout=WIFI_CONNECTION_TIMEOUT_DEFAULT):
    """Connects to any open WiFI AP
     Args:
         timeout value in sec to wait for UE to connect to a WiFi AP
     Returns:
         True if UE connects to WiFi AP (supplicant_state = completed)
         False if UE fails to complete connection within WIFI_CONNECTION_TIMEOUT time.
    """
    if ad.droid.wifiCheckState():
        return True
    ad.droid.wifiToggleState()
    wifi_connection_state = None
    timeout = time.time() + conn_timeout
    while wifi_connection_state != "completed":
        wifi_connection_state = ad.droid.wifiGetConnectionInfo(
        )['supplicant_state']
        if time.time() > timeout:
            ad.log.warning("Failed to connect to WiFi AP")
            return False
    return True


def expand_enterprise_config_by_phase2(config):
    """Take an enterprise config and generate a list of configs, each with
    a different phase2 auth type.

    Args:
        config: A dict representing enterprise config.

    Returns
        A list of enterprise configs.
    """
    results = []
    phase2_types = WifiEnums.EapPhase2
    if config[WifiEnums.Enterprise.EAP] == WifiEnums.Eap.PEAP:
        # Skip unsupported phase2 types for PEAP.
        phase2_types = [WifiEnums.EapPhase2.GTC, WifiEnums.EapPhase2.MSCHAPV2]
    for phase2_type in phase2_types:
        # Skip a special case for passpoint TTLS.
        if (WifiEnums.Enterprise.FQDN in config
                and phase2_type == WifiEnums.EapPhase2.GTC):
            continue
        c = dict(config)
        c[WifiEnums.Enterprise.PHASE2] = phase2_type.value
        results.append(c)
    return results


def generate_eap_test_name(config, ad=None):
    """ Generates a test case name based on an EAP configuration.

    Args:
        config: A dict representing an EAP credential.
        ad object: Redundant but required as the same param is passed
                   to test_func in run_generated_tests

    Returns:
        A string representing the name of a generated EAP test case.
    """
    eap = WifiEnums.Eap
    eap_phase2 = WifiEnums.EapPhase2
    Ent = WifiEnums.Enterprise
    name = "test_connect-"
    eap_name = ""
    for e in eap:
        if e.value == config[Ent.EAP]:
            eap_name = e.name
            break
    if "peap0" in config[WifiEnums.SSID_KEY].lower():
        eap_name = "PEAP0"
    if "peap1" in config[WifiEnums.SSID_KEY].lower():
        eap_name = "PEAP1"
    name += eap_name
    if Ent.PHASE2 in config:
        for e in eap_phase2:
            if e.value == config[Ent.PHASE2]:
                name += "-{}".format(e.name)
                break
    return name


def group_attenuators(attenuators):
    """Groups a list of attenuators into attenuator groups for backward
    compatibility reasons.

    Most legacy Wi-Fi setups have two attenuators each connected to a separate
    AP. The new Wi-Fi setup has four attenuators, each connected to one channel
    on an AP, so two of them are connected to one AP.

    To make the existing scripts work in the new setup, when the script needs
    to attenuate one AP, it needs to set attenuation on both attenuators
    connected to the same AP.

    This function groups attenuators properly so the scripts work in both
    legacy and new Wi-Fi setups.

    Args:
        attenuators: A list of attenuator objects, either two or four in length.

    Raises:
        signals.TestFailure is raised if the attenuator list does not have two
        or four objects.
    """
    attn0 = attenuator.AttenuatorGroup("AP0")
    attn1 = attenuator.AttenuatorGroup("AP1")
    # Legacy testbed setup has two attenuation channels.
    num_of_attns = len(attenuators)
    if num_of_attns == 2:
        attn0.add(attenuators[0])
        attn1.add(attenuators[1])
    elif num_of_attns == 4:
        attn0.add(attenuators[0])
        attn0.add(attenuators[1])
        attn1.add(attenuators[2])
        attn1.add(attenuators[3])
    else:
        asserts.fail(("Either two or four attenuators are required for this "
                      "test, but found %s") % num_of_attns)
    return [attn0, attn1]


def set_attns(attenuator, attn_val_name, roaming_attn=ROAMING_ATTN):
    """Sets attenuation values on attenuators used in this test.

    Args:
        attenuator: The attenuator object.
        attn_val_name: Name of the attenuation value pair to use.
        roaming_attn: Dictionary specifying the attenuation params.
    """
    logging.info("Set attenuation values to %s", roaming_attn[attn_val_name])
    try:
        attenuator[0].set_atten(roaming_attn[attn_val_name][0])
        attenuator[1].set_atten(roaming_attn[attn_val_name][1])
        attenuator[2].set_atten(roaming_attn[attn_val_name][2])
        attenuator[3].set_atten(roaming_attn[attn_val_name][3])
    except:
        logging.exception("Failed to set attenuation values %s.",
                          attn_val_name)
        raise


def set_attns_steps(attenuators,
                    atten_val_name,
                    roaming_attn=ROAMING_ATTN,
                    steps=10,
                    wait_time=12):
    """Set attenuation values on attenuators used in this test. It will change
    the attenuation values linearly from current value to target value step by
    step.

    Args:
        attenuators: The list of attenuator objects that you want to change
                     their attenuation value.
        atten_val_name: Name of the attenuation value pair to use.
        roaming_attn: Dictionary specifying the attenuation params.
        steps: Number of attenuator changes to reach the target value.
        wait_time: Sleep time for each change of attenuator.
    """
    logging.info("Set attenuation values to %s in %d step(s)",
                 roaming_attn[atten_val_name], steps)
    start_atten = [attenuator.get_atten() for attenuator in attenuators]
    target_atten = roaming_attn[atten_val_name]
    for current_step in range(steps):
        progress = (current_step + 1) / steps
        for i, attenuator in enumerate(attenuators):
            amount_since_start = (target_atten[i] - start_atten[i]) * progress
            attenuator.set_atten(round(start_atten[i] + amount_since_start))
        time.sleep(wait_time)


def trigger_roaming_and_validate(dut,
                                 attenuator,
                                 attn_val_name,
                                 expected_con,
                                 roaming_attn=ROAMING_ATTN):
    """Sets attenuators to trigger roaming and validate the DUT connected
    to the BSSID expected.

    Args:
        attenuator: The attenuator object.
        attn_val_name: Name of the attenuation value pair to use.
        expected_con: The network information of the expected network.
        roaming_attn: Dictionary specifying the attenaution params.
    """
    expected_con = {
        WifiEnums.SSID_KEY: expected_con[WifiEnums.SSID_KEY],
        WifiEnums.BSSID_KEY: expected_con["bssid"],
    }
    set_attns_steps(attenuator, attn_val_name, roaming_attn)

    verify_wifi_connection_info(dut, expected_con)
    expected_bssid = expected_con[WifiEnums.BSSID_KEY]
    logging.info("Roamed to %s successfully", expected_bssid)
    if not validate_connection(dut):
        raise signals.TestFailure("Fail to connect to internet on %s" %
                                  expected_bssid)


def create_softap_config():
    """Create a softap config with random ssid and password."""
    ap_ssid = "softap_" + utils.rand_ascii_str(8)
    ap_password = utils.rand_ascii_str(8)
    logging.info("softap setup: %s %s", ap_ssid, ap_password)
    config = {
        WifiEnums.SSID_KEY: ap_ssid,
        WifiEnums.PWD_KEY: ap_password,
    }
    return config


def start_softap_and_verify(ad, band):
    """Bring-up softap and verify AP mode and in scan results.

    Args:
        band: The band to use for softAP.

    Returns: dict, the softAP config.

    """
    # Register before start the test.
    callbackId = ad.dut.droid.registerSoftApCallback()
    # Check softap info value is default
    frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True)
    asserts.assert_true(frequency == 0, "Softap frequency is not reset")
    asserts.assert_true(bandwdith == 0, "Softap bandwdith is not reset")

    config = create_softap_config()
    start_wifi_tethering(ad.dut,
                         config[WifiEnums.SSID_KEY],
                         config[WifiEnums.PWD_KEY],
                         band=band)
    asserts.assert_true(ad.dut.droid.wifiIsApEnabled(),
                        "SoftAp is not reported as running")
    start_wifi_connection_scan_and_ensure_network_found(
        ad.dut_client, config[WifiEnums.SSID_KEY])

    # Check softap info can get from callback succeed and assert value should be
    # valid.
    frequency, bandwdith = get_current_softap_info(ad.dut, callbackId, True)
    asserts.assert_true(frequency > 0, "Softap frequency is not valid")
    asserts.assert_true(bandwdith > 0, "Softap bandwdith is not valid")
    # Unregister callback
    ad.dut.droid.unregisterSoftApCallback(callbackId)

    return config


def wait_for_expected_number_of_softap_clients(ad, callbackId,
                                               expected_num_of_softap_clients):
    """Wait for the number of softap clients to be updated as expected.
    Args:
        callbackId: Id of the callback associated with registering.
        expected_num_of_softap_clients: expected number of softap clients.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
        callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
    clientData = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data']
    clientCount = clientData[wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
    clientMacAddresses = clientData[
        wifi_constants.SOFTAP_CLIENTS_MACS_CALLBACK_KEY]
    asserts.assert_equal(
        clientCount, expected_num_of_softap_clients,
        "The number of softap clients doesn't match the expected number")
    asserts.assert_equal(
        len(clientMacAddresses), expected_num_of_softap_clients,
        "The number of mac addresses doesn't match the expected number")
    for macAddress in clientMacAddresses:
        asserts.assert_true(checkMacAddress(macAddress),
                            "An invalid mac address was returned")


def checkMacAddress(input):
    """Validate whether a string is a valid mac address or not.

    Args:
        input: The string to validate.

    Returns: True/False, returns true for a valid mac address and false otherwise.
    """
    macValidationRegex = "[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$"
    if re.match(macValidationRegex, input.lower()):
        return True
    return False


def wait_for_expected_softap_state(ad, callbackId, expected_softap_state):
    """Wait for the expected softap state change.
    Args:
        callbackId: Id of the callback associated with registering.
        expected_softap_state: The expected softap state.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
        callbackId) + wifi_constants.SOFTAP_STATE_CHANGED
    asserts.assert_equal(
        ad.ed.pop_event(eventStr, SHORT_TIMEOUT)['data'][
            wifi_constants.SOFTAP_STATE_CHANGE_CALLBACK_KEY],
        expected_softap_state,
        "Softap state doesn't match with expected state")


def get_current_number_of_softap_clients(ad, callbackId):
    """pop up all of softap client updated event from queue.
    Args:
        callbackId: Id of the callback associated with registering.

    Returns:
        If exist aleast callback, returns last updated number_of_softap_clients.
        Returns None when no any match callback event in queue.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
        callbackId) + wifi_constants.SOFTAP_NUMBER_CLIENTS_CHANGED
    events = ad.ed.pop_all(eventStr)
    for event in events:
        num_of_clients = event['data'][
            wifi_constants.SOFTAP_NUMBER_CLIENTS_CALLBACK_KEY]
    if len(events) == 0:
        return None
    return num_of_clients


def get_current_softap_info(ad, callbackId, need_to_wait):
    """pop up all of softap info changed event from queue.
    Args:
        callbackId: Id of the callback associated with registering.
        need_to_wait: Wait for the info callback event before pop all.
    Returns:
        Returns last updated information of softap.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
        callbackId) + wifi_constants.SOFTAP_INFO_CHANGED
    ad.log.debug("softap info dump from eventStr %s", eventStr)
    frequency = 0
    bandwidth = 0
    if (need_to_wait):
        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
        frequency = event['data'][
            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
        bandwidth = event['data'][
            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
        ad.log.info("softap info updated, frequency is %s, bandwidth is %s",
                    frequency, bandwidth)

    events = ad.ed.pop_all(eventStr)
    for event in events:
        frequency = event['data'][
            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
        bandwidth = event['data'][
            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
    ad.log.info("softap info, frequency is %s, bandwidth is %s", frequency,
                bandwidth)
    return frequency, bandwidth

def get_current_softap_infos(ad, callbackId, need_to_wait):
    """pop up all of softap info list changed event from queue.
    Args:
        callbackId: Id of the callback associated with registering.
        need_to_wait: Wait for the info callback event before pop all.
    Returns:
        Returns last updated informations of softap.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
        callbackId) + wifi_constants.SOFTAP_INFOLIST_CHANGED
    ad.log.debug("softap info dump from eventStr %s", eventStr)

    if (need_to_wait):
        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
        infos = event['data']

    events = ad.ed.pop_all(eventStr)
    for event in events:
        infos = event['data']

    for info in infos:
        frequency = info[
            wifi_constants.SOFTAP_INFO_FREQUENCY_CALLBACK_KEY]
        bandwidth = info[
            wifi_constants.SOFTAP_INFO_BANDWIDTH_CALLBACK_KEY]
        wifistandard = info[
            wifi_constants.SOFTAP_INFO_WIFISTANDARD_CALLBACK_KEY]
        bssid = info[
            wifi_constants.SOFTAP_INFO_BSSID_CALLBACK_KEY]
        ad.log.info(
                "softap info, freq:%s, bw:%s, wifistandard:%s, bssid:%s",
                frequency, bandwidth, wifistandard, bssid)

    return infos

def get_current_softap_capability(ad, callbackId, need_to_wait):
    """pop up all of softap info list changed event from queue.
    Args:
        callbackId: Id of the callback associated with registering.
        need_to_wait: Wait for the info callback event before pop all.
    Returns:
        Returns last updated capability of softap.
    """
    eventStr = wifi_constants.SOFTAP_CALLBACK_EVENT + str(
            callbackId) + wifi_constants.SOFTAP_CAPABILITY_CHANGED
    ad.log.debug("softap capability dump from eventStr %s", eventStr)
    if (need_to_wait):
        event = ad.ed.pop_event(eventStr, SHORT_TIMEOUT)
        capability = event['data']

    events = ad.ed.pop_all(eventStr)
    for event in events:
        capability = event['data']

    return capability

def get_ssrdumps(ad):
    """Pulls dumps in the ssrdump dir
    Args:
        ad: android device object.
    """
    logs = ad.get_file_names("/data/vendor/ssrdump/")
    if logs:
        ad.log.info("Pulling ssrdumps %s", logs)
        log_path = os.path.join(ad.device_log_path, "SSRDUMPS_%s" % ad.serial)
        os.makedirs(log_path, exist_ok=True)
        ad.pull_files(logs, log_path)
    ad.adb.shell("find /data/vendor/ssrdump/ -type f -delete",
                 ignore_status=True)


def start_pcap(pcap, wifi_band, test_name):
    """Start packet capture in monitor mode.

    Args:
        pcap: packet capture object
        wifi_band: '2g' or '5g' or 'dual'
        test_name: test name to be used for pcap file name

    Returns:
        Dictionary with wifi band as key and the tuple
        (pcap Process object, log directory) as the value
    """
    log_dir = os.path.join(
        context.get_current_context().get_full_output_path(), 'PacketCapture')
    os.makedirs(log_dir, exist_ok=True)
    if wifi_band == 'dual':
        bands = [BAND_2G, BAND_5G]
    else:
        bands = [wifi_band]
    procs = {}
    for band in bands:
        proc = pcap.start_packet_capture(band, log_dir, test_name)
        procs[band] = (proc, os.path.join(log_dir, test_name))
    return procs


def stop_pcap(pcap, procs, test_status=None):
    """Stop packet capture in monitor mode.

    Since, the pcap logs in monitor mode can be very large, we will
    delete them if they are not required. 'test_status' if True, will delete
    the pcap files. If False, we will keep them.

    Args:
        pcap: packet capture object
        procs: dictionary returned by start_pcap
        test_status: status of the test case
    """
    for proc, fname in procs.values():
        pcap.stop_packet_capture(proc)

    if test_status:
        shutil.rmtree(os.path.dirname(fname))


def verify_mac_not_found_in_pcap(ad, mac, packets):
    """Verify that a mac address is not found in the captured packets.

    Args:
        ad: android device object
        mac: string representation of the mac address
        packets: packets obtained by rdpcap(pcap_fname)
    """
    for pkt in packets:
        logging.debug("Packet Summary = %s", pkt.summary())
        if mac in pkt.summary():
            asserts.fail("Device %s caught Factory MAC: %s in packet sniffer."
                         "Packet = %s" % (ad.serial, mac, pkt.show()))


def verify_mac_is_found_in_pcap(ad, mac, packets):
    """Verify that a mac address is found in the captured packets.

    Args:
        ad: android device object
        mac: string representation of the mac address
        packets: packets obtained by rdpcap(pcap_fname)
    """
    for pkt in packets:
        if mac in pkt.summary():
            return
    asserts.fail("Did not find MAC = %s in packet sniffer."
                 "for device %s" % (mac, ad.serial))


def start_cnss_diags(ads, cnss_diag_file, pixel_models):
    for ad in ads:
        start_cnss_diag(ad, cnss_diag_file, pixel_models)


def start_cnss_diag(ad, cnss_diag_file, pixel_models):
    """Start cnss_diag to record extra wifi logs

    Args:
        ad: android device object.
        cnss_diag_file: cnss diag config file to push to device.
        pixel_models: pixel devices.
    """
    if ad.model not in pixel_models:
        ad.log.info("Device not supported to collect pixel logger")
        return
    if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP:
        prop = wifi_constants.LEGACY_CNSS_DIAG_PROP
    else:
        prop = wifi_constants.CNSS_DIAG_PROP
    if ad.adb.getprop(prop) != 'true':
        if not int(
                ad.adb.shell("ls -l %s%s | wc -l" %
                             (CNSS_DIAG_CONFIG_PATH, CNSS_DIAG_CONFIG_FILE))):
            ad.adb.push("%s %s" % (cnss_diag_file, CNSS_DIAG_CONFIG_PATH))
        ad.adb.shell(
            "find /data/vendor/wifi/cnss_diag/wlan_logs/ -type f -delete",
            ignore_status=True)
        ad.adb.shell("setprop %s true" % prop, ignore_status=True)


def stop_cnss_diags(ads, pixel_models):
    for ad in ads:
        stop_cnss_diag(ad, pixel_models)


def stop_cnss_diag(ad, pixel_models):
    """Stops cnss_diag

    Args:
        ad: android device object.
        pixel_models: pixel devices.
    """
    if ad.model not in pixel_models:
        ad.log.info("Device not supported to collect pixel logger")
        return
    if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP:
        prop = wifi_constants.LEGACY_CNSS_DIAG_PROP
    else:
        prop = wifi_constants.CNSS_DIAG_PROP
    ad.adb.shell("setprop %s false" % prop, ignore_status=True)


def get_cnss_diag_log(ad):
    """Pulls the cnss_diag logs in the wlan_logs dir
    Args:
        ad: android device object.
    """
    logs = ad.get_file_names("/data/vendor/wifi/cnss_diag/wlan_logs/")
    if logs:
        ad.log.info("Pulling cnss_diag logs %s", logs)
        log_path = os.path.join(ad.device_log_path, "CNSS_DIAG_%s" % ad.serial)
        os.makedirs(log_path, exist_ok=True)
        ad.pull_files(logs, log_path)


LinkProbeResult = namedtuple(
    'LinkProbeResult',
    ('is_success', 'stdout', 'elapsed_time', 'failure_reason'))


def send_link_probe(ad):
    """Sends a link probe to the currently connected AP, and returns whether the
    probe succeeded or not.

    Args:
         ad: android device object
    Returns:
        LinkProbeResult namedtuple
    """
    stdout = ad.adb.shell('cmd wifi send-link-probe')
    asserts.assert_false('Error' in stdout or 'Exception' in stdout,
                         'Exception while sending link probe: ' + stdout)

    is_success = False
    elapsed_time = None
    failure_reason = None
    if 'succeeded' in stdout:
        is_success = True
        elapsed_time = next(
            (int(token) for token in stdout.split() if token.isdigit()), None)
    elif 'failed with reason' in stdout:
        failure_reason = next(
            (int(token) for token in stdout.split() if token.isdigit()), None)
    else:
        asserts.fail('Unexpected link probe result: ' + stdout)

    return LinkProbeResult(is_success=is_success,
                           stdout=stdout,
                           elapsed_time=elapsed_time,
                           failure_reason=failure_reason)


def send_link_probes(ad, num_probes, delay_sec):
    """Sends a sequence of link probes to the currently connected AP, and
    returns whether the probes succeeded or not.

    Args:
         ad: android device object
         num_probes: number of probes to perform
         delay_sec: delay time between probes, in seconds
    Returns:
        List[LinkProbeResult] one LinkProbeResults for each probe
    """
    logging.info('Sending link probes')
    results = []
    for _ in range(num_probes):
        # send_link_probe() will also fail the test if it sees an exception
        # in the stdout of the adb shell command
        result = send_link_probe(ad)
        logging.info('link probe results: ' + str(result))
        results.append(result)
        time.sleep(delay_sec)

    return results


def ap_setup(test, index, ap, network, bandwidth=80, channel=6):
    """Set up the AP with provided network info.

        Args:
            test: the calling test class object.
            index: int, index of the AP.
            ap: access_point object of the AP.
            network: dict with information of the network, including ssid,
                     password and bssid.
            bandwidth: the operation bandwidth for the AP, default 80MHz.
            channel: the channel number for the AP.
        Returns:
            brconfigs: the bridge interface configs
        """
    bss_settings = []
    ssid = network[WifiEnums.SSID_KEY]
    test.access_points[index].close()
    time.sleep(5)

    # Configure AP as required.
    if "password" in network.keys():
        password = network["password"]
        security = hostapd_security.Security(security_mode="wpa",
                                             password=password)
    else:
        security = hostapd_security.Security(security_mode=None, password=None)
    config = hostapd_ap_preset.create_ap_preset(channel=channel,
                                                ssid=ssid,
                                                security=security,
                                                bss_settings=bss_settings,
                                                vht_bandwidth=bandwidth,
                                                profile_name='whirlwind',
                                                iface_wlan_2g=ap.wlan_2g,
                                                iface_wlan_5g=ap.wlan_5g)
    ap.start_ap(config)
    logging.info("AP started on channel {} with SSID {}".format(channel, ssid))


def turn_ap_off(test, AP):
    """Bring down hostapd on the Access Point.
    Args:
        test: The test class object.
        AP: int, indicating which AP to turn OFF.
    """
    hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd
    if hostapd_2g.is_alive():
        hostapd_2g.stop()
        logging.debug('Turned WLAN0 AP%d off' % AP)
    hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd
    if hostapd_5g.is_alive():
        hostapd_5g.stop()
        logging.debug('Turned WLAN1 AP%d off' % AP)


def turn_ap_on(test, AP):
    """Bring up hostapd on the Access Point.
    Args:
        test: The test class object.
        AP: int, indicating which AP to turn ON.
    """
    hostapd_2g = test.access_points[AP - 1]._aps['wlan0'].hostapd
    if not hostapd_2g.is_alive():
        hostapd_2g.start(hostapd_2g.config)
        logging.debug('Turned WLAN0 AP%d on' % AP)
    hostapd_5g = test.access_points[AP - 1]._aps['wlan1'].hostapd
    if not hostapd_5g.is_alive():
        hostapd_5g.start(hostapd_5g.config)
        logging.debug('Turned WLAN1 AP%d on' % AP)


def turn_location_off_and_scan_toggle_off(ad):
    """Turns off wifi location scans."""
    utils.set_location_service(ad, False)
    ad.droid.wifiScannerToggleAlwaysAvailable(False)
    msg = "Failed to turn off location service's scan."
    asserts.assert_true(not ad.droid.wifiScannerIsAlwaysAvailable(), msg)


def set_softap_channel(dut, ap_iface='wlan1', cs_count=10, channel=2462):
    """ Set SoftAP mode channel

    Args:
        dut: android device object
        ap_iface: interface of SoftAP mode.
        cs_count: how many beacon frames before switch channel, default = 10
        channel: a wifi channel.
    """
    chan_switch_cmd = 'hostapd_cli -i {} chan_switch {} {}'
    chan_switch_cmd_show = chan_switch_cmd.format(ap_iface, cs_count, channel)
    dut.log.info('adb shell {}'.format(chan_switch_cmd_show))
    chan_switch_result = dut.adb.shell(
        chan_switch_cmd.format(ap_iface, cs_count, channel))
    if chan_switch_result == 'OK':
        dut.log.info('switch hotspot channel to {}'.format(channel))
        return chan_switch_result

    asserts.fail("Failed to switch hotspot channel")

def get_wlan0_link(dut):
    """ get wlan0 interface status"""
    get_wlan0 = 'wpa_cli -iwlan0 -g@android:wpa_wlan0 IFNAME=wlan0 status'
    out = dut.adb.shell(get_wlan0)
    out = dict(re.findall(r'(\S+)=(".*?"|\S+)', out))
    asserts.assert_true("ssid" in out,
                        "Client doesn't connect to any network")
    return out

def verify_11ax_wifi_connection(ad, wifi6_supported_models, wifi6_ap):
    """Verify 11ax for wifi connection.

    Args:
      ad: adndroid device object
      wifi6_supported_models: device supporting 11ax.
      wifi6_ap: if the AP supports 11ax.
    """
    if wifi6_ap and ad.model in wifi6_supported_models:
        logging.info("Verifying 11ax. Model: %s" % ad.model)
        asserts.assert_true(
            ad.droid.wifiGetConnectionStandard() ==
            wifi_constants.WIFI_STANDARD_11AX, "DUT did not connect to 11ax.")

def verify_11ax_softap(dut, dut_client, wifi6_supported_models):
    """Verify 11ax SoftAp if devices support it.

    Check if both DUT and DUT client supports 11ax, then SoftAp turns on
    with 11ax mode and DUT client can connect to it.

    Args:
      dut: Softap device.
      dut_client: Client connecting to softap.
      wifi6_supported_models: List of device models supporting 11ax.
    """
    if dut.model in wifi6_supported_models and dut_client.model in wifi6_supported_models:
        logging.info(
            "Verifying 11ax softap. DUT model: %s, DUT Client model: %s",
            dut.model, dut_client.model)
        asserts.assert_true(
            dut_client.droid.wifiGetConnectionStandard() ==
            wifi_constants.WIFI_STANDARD_11AX,
            "DUT failed to start SoftAp in 11ax.")

def check_available_channels_in_bands_2_5(dut, country_code):
    """Check if DUT is capable of enable BridgedAp.
    #TODO: Find a way to make this function flexible by taking an argument.

    Args:
        country_code: country code, e.g., 'US', 'JP'.
    Returns:
        True: If DUT is capable of enable BridgedAp.
        False: If DUT is not capable of enable BridgedAp.
    """
    set_wifi_country_code(dut, country_code)
    country = dut.droid.wifiGetCountryCode()
    dut.log.info("DUT current country code : {}".format(country))
    # Wi-Fi ON and OFF to make sure country code take effet.
    wifi_toggle_state(dut, True)
    wifi_toggle_state(dut, False)

    # Register SoftAp Callback and get SoftAp capability.
    callbackId = dut.droid.registerSoftApCallback()
    capability = get_current_softap_capability(dut, callbackId, True)
    dut.droid.unregisterSoftApCallback(callbackId)

    if capability[wifi_constants.
                  SOFTAP_CAPABILITY_24GHZ_SUPPORTED_CHANNEL_LIST] and \
        capability[wifi_constants.
                   SOFTAP_CAPABILITY_5GHZ_SUPPORTED_CHANNEL_LIST]:
        return True
    return False


@retry(tries=5, delay=2)
def validate_ping_between_two_clients(dut1, dut2):
    """Make 2 DUT ping each other.

    Args:
        dut1: An AndroidDevice object.
        dut2: An AndroidDevice object.
    """
    # Get DUTs' IPv4 addresses.
    dut1_ip = ""
    dut2_ip = ""
    try:
        dut1_ip = dut1.droid.connectivityGetIPv4Addresses('wlan0')[0]
    except IndexError as e:
        dut1.log.info(
            "{} has no Wi-Fi connection, cannot get IPv4 address."
            .format(dut1.serial))
    try:
        dut2_ip = dut2.droid.connectivityGetIPv4Addresses('wlan0')[0]
    except IndexError as e:
        dut2.log.info(
            "{} has no Wi-Fi connection, cannot get IPv4 address."
            .format(dut2.serial))
    # Test fail if not able to obtain two DUT's IPv4 addresses.
    asserts.assert_true(dut1_ip and dut2_ip,
                        "Ping failed because no DUT's IPv4 address")

    dut1.log.info("{} IPv4 addresses : {}".format(dut1.serial, dut1_ip))
    dut2.log.info("{} IPv4 addresses : {}".format(dut2.serial, dut2_ip))

    # Two clients ping each other
    dut1.log.info("{} ping {}".format(dut1_ip, dut2_ip))
    asserts.assert_true(
        utils.adb_shell_ping(dut1, count=10, dest_ip=dut2_ip,
                             timeout=20),
        "%s ping %s failed" % (dut1.serial, dut2_ip))

    dut2.log.info("{} ping {}".format(dut2_ip, dut1_ip))
    asserts.assert_true(
        utils.adb_shell_ping(dut2, count=10, dest_ip=dut1_ip,
                             timeout=20),
        "%s ping %s failed" % (dut2.serial, dut1_ip))

