Integrate Google WiFi APs into retail AP class enabling unified API.
This CL adds support for Google WiFi APs into the retail AP class
allowing WiFi tests to a unified AP for all APs currently being used in
regression testing.
Test: Done
Bug: None
Change-Id: I14f566fd2560b2675fcb2a1107b544f032f68ce1
Signed-off-by: Omar El Ayach <oelayach@google.com>
diff --git a/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py b/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py
index 6a2dc03..82b8886 100644
--- a/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py
@@ -13,11 +13,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import fcntl
-import logging
import selenium
import splinter
import time
+from acts import logger
+from acts.controllers import access_point
+from acts.controllers.ap_lib import bridge_interface
+from acts.controllers.ap_lib import hostapd_security
+from acts.controllers.ap_lib import hostapd_ap_preset
BROWSER_WAIT_SHORT = 1
BROWSER_WAIT_MED = 3
@@ -36,7 +41,8 @@
("Netgear", "R7000"): "NetgearR7000AP",
("Netgear", "R7500"): "NetgearR7500AP",
("Netgear", "R7800"): "NetgearR7800AP",
- ("Netgear", "R8000"): "NetgearR8000AP"
+ ("Netgear", "R8000"): "NetgearR8000AP",
+ ("Google", "Wifi"): "GoogleWifiAP"
}
objs = []
for config in configs:
@@ -70,6 +76,7 @@
headless: boolean to control visible/headless browser operation
timeout: maximum time allowed to launch browser
"""
+ self.log = logger.create_tagged_trace_logger("ChromeDriver")
self.chrome_options = splinter.driver.webdriver.chrome.Options()
self.chrome_options.add_argument("--no-proxy-server")
self.chrome_options.add_argument("--no-sandbox")
@@ -142,7 +149,7 @@
if self.url.split("/")[-1] == url.split("/")[-1]:
break
if idx == num_tries - 1:
- logging.error("URL unreachable. Current URL: {}".format(
+ self.log.error("URL unreachable. Current URL: {}".format(
self.url))
raise RuntimeError("URL unreachable.")
@@ -181,7 +188,7 @@
assumed_ap_settings = self.ap_settings.copy()
actual_ap_settings = self.read_ap_settings()
if assumed_ap_settings != actual_ap_settings:
- logging.warning(
+ self.log.warning(
"Discrepancy in AP settings. Some settings may have been overwritten."
)
@@ -203,7 +210,7 @@
Args:
region: string indicating AP region
"""
- logging.warning("Updating region may overwrite wireless settings.")
+ self.log.warning("Updating region may overwrite wireless settings.")
setting_to_update = {"region": region}
self.update_ap_settings(setting_to_update)
@@ -276,7 +283,15 @@
}
self.update_ap_settings(setting_to_update)
- def update_ap_settings(self, *dict_settings, **named_settings):
+ def set_rate(self):
+ """Function that configures rate used by AP.
+
+ Function implementation is not supported by most APs and thus base
+ class raises exception if function not implemented in child class.
+ """
+ raise NotImplementedError
+
+ def update_ap_settings(self, dict_settings={}, **named_settings):
"""Function to update settings of existing AP.
Function copies arguments into ap_settings and calls configure_retail_ap
@@ -287,28 +302,23 @@
**named_settings accepts named settings to update
Note: dict and named_settings cannot contain the same settings.
"""
- settings_to_update = {}
- if (len(dict_settings) == 1) and (type(dict_settings[0]) == dict):
- for key, value in dict_settings[0].items():
- if key in named_settings:
- raise KeyError("{} was passed twice.".format(key))
- else:
- settings_to_update[key] = value
- elif len(dict_settings) > 1:
- raise TypeError("Wrong number of positional arguments given")
- return
-
- for key, value in named_settings.items():
- settings_to_update[key] = value
+ settings_to_update = dict(dict_settings, **named_settings)
+ if len(settings_to_update) != len(dict_settings) + len(named_settings):
+ raise KeyError("The following keys were passed twice: {}".format(
+ (set(dict_settings.keys()).intersection(
+ set(named_settings.keys())))))
+ if not set(settings_to_update.keys()).issubset(
+ set(self.ap_settings.keys())):
+ raise KeyError(
+ "The following settings are invalid for this AP: {}".format(
+ set(settings_to_update.keys()).difference(
+ set(self.ap_settings.keys()))))
updates_requested = False
- for key, value in settings_to_update.items():
- if (key in self.ap_settings):
- if self.ap_settings[key] != value:
- self.ap_settings[key] = value
- updates_requested = True
- else:
- raise KeyError("Invalid setting passed to AP configuration.")
+ for setting, value in settings_to_update.items():
+ if self.ap_settings[setting] != value:
+ self.ap_settings[setting] = value
+ updates_requested = True
if updates_requested:
self.configure_ap()
@@ -332,6 +342,8 @@
def __init__(self, ap_settings):
self.ap_settings = ap_settings.copy()
+ self.log = logger.create_tagged_trace_logger("AccessPoint|{}".format(
+ self.ap_settings["ip_address"]))
self.init_gui_data()
# Read and update AP settings
self.read_ap_settings()
@@ -489,7 +501,7 @@
self.bw_mode_text[self.ap_settings["{}_{}".format(
key[1], key[0])]])
except AttributeError:
- logging.warning(
+ self.log.warning(
"Cannot select bandwidth. Keeping AP default.")
# Update security settings (passwords updated only if applicable)
@@ -522,7 +534,7 @@
key[1], key[0])])
time.sleep(BROWSER_WAIT_SHORT)
except AttributeError:
- logging.warning(
+ self.log.warning(
"Cannot select channel. Keeping AP default.")
try:
alert = browser.get_alert()
@@ -578,6 +590,8 @@
def __init__(self, ap_settings):
self.ap_settings = ap_settings.copy()
+ self.log = logger.create_tagged_trace_logger("AccessPoint|{}".format(
+ self.ap_settings["ip_address"]))
self.init_gui_data()
# Read and update AP settings
self.read_ap_settings()
@@ -747,7 +761,7 @@
try:
config_item.select_by_text(channel_string)
except AttributeError:
- logging.warning(
+ self.log.warning(
"Cannot select channel. Keeping AP default.")
elif key == ("2G", "bandwidth"):
config_item = iframe.find_by_name(value).first
@@ -756,7 +770,7 @@
str(self.bw_mode_text_2g[self.ap_settings[
"{}_{}".format(key[1], key[0])]]))
except AttributeError:
- logging.warning(
+ self.log.warning(
"Cannot select bandwidth. Keeping AP default.")
elif key == ("5G_1", "bandwidth"):
config_item = iframe.find_by_name(value).first
@@ -765,7 +779,7 @@
str(self.bw_mode_text_5g[self.ap_settings[
"{}_{}".format(key[1], key[0])]]))
except AttributeError:
- logging.warning(
+ self.log.warning(
"Cannot select bandwidth. Keeping AP default.")
# Update passwords for WPA2-PSK protected networks
# (Must be done after security type is selected)
@@ -870,6 +884,8 @@
def __init__(self, ap_settings):
self.ap_settings = ap_settings.copy()
+ self.log = logger.create_tagged_trace_logger("AccessPoint|{}".format(
+ self.ap_settings["ip_address"]))
self.init_gui_data()
# Overwrite minor differences from R7500 AP
self.bw_mode_text_2g["VHT20"] = "Up to 347 Mbps"
@@ -890,6 +906,8 @@
def __init__(self, ap_settings):
self.ap_settings = ap_settings.copy()
+ self.log = logger.create_tagged_trace_logger("AccessPoint|{}".format(
+ self.ap_settings["ip_address"]))
self.init_gui_data()
# Overwrite minor differences from R7000 AP
self.config_page = "{}://{}:{}@{}:{}/WLG_wireless_dual_band_r8000.htm".format(
@@ -935,3 +953,216 @@
return
else:
self.update_ap_settings(ap_settings)
+
+
+class GoogleWifiAP(WifiRetailAP):
+ """ Class that implements Google Wifi AP.
+
+ This class is a work in progress
+ """
+
+ def __init__(self, ap_settings):
+ self.ap_settings = ap_settings.copy()
+ self.log = logger.create_tagged_trace_logger("AccessPoint|{}".format(
+ self.ap_settings["ssh_config"]["host"]))
+ if self.ap_settings["status_2G"] and self.ap_settings["status_5G_1"]:
+ raise ValueError("Error initializing Google Wifi AP. "
+ "Only one interface can be enabled at a time.")
+ self.channel_band_map = {
+ "2G": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
+ "5G_1": [36, 40, 44, 48, 149, 153, 157, 161, 165]
+ }
+ self.BW_MODE_MAP = {"VHT20": 20, "VHT40": 40, "VHT80": 80}
+ self.default_settings = {
+ "region": "United States",
+ "brand": "Google",
+ "model": "Wifi",
+ "status_2G": 0,
+ "status_5G_1": 0,
+ "ssid_2G": "GoogleWifi_2G",
+ "ssid_5G_1": "GoogleWifi_5G",
+ "channel_2G": 11,
+ "channel_5G_1": 149,
+ "bandwidth_2G": "VHT20",
+ "bandwidth_5G_1": "VHT20",
+ "power_2G": "auto",
+ "power_5G_1": "auto",
+ "security_type_2G": "Open",
+ "security_type_5G_1": "Open",
+ "subnet_2G": "192.168.1.0/24",
+ "subnet_5G_1": "192.168.9.0/24",
+ "password_2G": "password",
+ "password_5G_1": "password"
+ }
+
+ for setting in self.default_settings.keys():
+ if setting not in self.ap_settings:
+ self.log.warning(
+ "{0} not found during init. Setting {0} = {1}".format(
+ setting, self.default_settings[setting]))
+ self.ap_settings[setting] = self.default_settings[setting]
+ init_settings = self.ap_settings.copy()
+ init_settings["ap_subnet"] = {
+ "2g": self.ap_settings["subnet_2G"],
+ "5g": self.ap_settings["subnet_5G_1"]
+ }
+ self.access_point = access_point.AccessPoint(init_settings)
+ self.configure_ap()
+
+ def read_ap_settings(self):
+ """Function that reads current ap settings."""
+ return self.ap_settings.copy()
+
+ def update_ap_settings(self, dict_settings={}, **named_settings):
+ """Function to update settings of existing AP.
+
+ Function copies arguments into ap_settings and calls configure_ap
+ to apply them.
+
+ Args:
+ dict_settings: single dictionary of settings to update
+ **named_settings: named settings to update
+ Note: dict and named_settings cannot contain the same settings.
+ """
+ settings_to_update = dict(dict_settings, **named_settings)
+ if len(settings_to_update) != len(dict_settings) + len(named_settings):
+ raise KeyError("The following keys were passed twice: {}".format(
+ (set(dict_settings.keys()).intersection(
+ set(named_settings.keys())))))
+ if not set(settings_to_update.keys()).issubset(
+ set(self.ap_settings.keys())):
+ raise KeyError(
+ "The following settings are invalid for this AP: {}".format(
+ set(settings_to_update.keys()).difference(
+ set(self.ap_settings.keys()))))
+
+ updating_2G = any(["2G" in x for x in settings_to_update.keys()])
+ updating_5G_1 = any(["5G_1" in x for x in settings_to_update.keys()])
+ if updating_2G and updating_5G_1:
+ raise ValueError(
+ "Error updating Google WiFi AP. "
+ "One interface can be activated and updated at a time")
+ elif updating_2G:
+ # If updating an interface and not explicitly setting its status,
+ # it is assumed that the interface is to be ENABLED and updated
+ if "status_2G" not in settings_to_update:
+ settings_to_update["status_2G"] = 1
+ settings_to_update["status_5G_1"] = 0
+ elif updating_5G_1:
+ if "status_5G_1" not in settings_to_update:
+ settings_to_update["status_2G"] = 0
+ settings_to_update["status_5G_1"] = 1
+
+ updates_requested = False
+ for setting, value in settings_to_update.items():
+ if self.ap_settings[setting] != value:
+ self.ap_settings[setting] = value
+ updates_requested = True
+
+ if updates_requested:
+ self.configure_ap()
+
+ def configure_ap(self):
+ """Function to configure Google Wifi."""
+ self.log.info("Stopping Google Wifi interfaces.")
+ self.access_point.stop_all_aps()
+
+ if self.ap_settings["status_2G"] == 1:
+ network = "2G"
+ self.log.info("Bringing up 2.4 GHz network.")
+ elif self.ap_settings["status_5G_1"] == 1:
+ network = "5G_1"
+ self.log.info("Bringing up 5 GHz network.")
+ else:
+ return
+
+ bss_settings = []
+ ssid = self.ap_settings["ssid_{}".format(network)]
+ if "WPA" in self.ap_settings["security_type_{}".format(network)]:
+ password = self.ap_settings["password_{}".format(network)]
+ security = hostapd_security.Security(
+ security_mode="wpa", password=password)
+ else:
+ security = hostapd_security.Security(
+ security_mode=None, password=None)
+ channel = int(self.ap_settings["channel_{}".format(network)])
+ bandwidth = self.BW_MODE_MAP[self.ap_settings["bandwidth_{}".format(
+ network)]]
+ config = hostapd_ap_preset.create_ap_preset(
+ channel=channel,
+ ssid=ssid,
+ security=security,
+ bss_settings=bss_settings,
+ vht_bandwidth=bandwidth,
+ profile_name='whirlwind',
+ iface_wlan_2g=self.access_point.wlan_2g,
+ iface_wlan_5g=self.access_point.wlan_5g)
+ config_bridge = self.access_point.generate_bridge_configs(channel)
+ brconfigs = bridge_interface.BridgeInterfaceConfigs(
+ config_bridge[0], config_bridge[1], config_bridge[2])
+ self.access_point.bridge.startup(brconfigs)
+ self.access_point.start_ap(config)
+ self.set_power(network, self.ap_settings["power_{}".format(network)])
+ self.log.info("AP started on channel {} with SSID {}".format(
+ channel, ssid))
+
+ def set_power(self, network, power):
+ """Function that sets network transmit power.
+
+ Args:
+ network: string containing network identifier (2G, 5G_1, 5G_2)
+ power: power level in dBm
+ """
+ if power == "auto":
+ power_string = "auto"
+ else:
+ if not float(power).is_integer():
+ self.log.info(
+ "Power in dBm must be an integer. Setting to {}".format(
+ int(power)))
+ power = int(power)
+ power_string = "fixed {}".format(int(power) * 100)
+
+ if "2G" in network:
+ interface = self.access_point.wlan_2g
+ self.ap_settings["power_2G"] = power
+ elif "5G_1" in network:
+ interface = self.access_point.wlan_5g
+ self.ap_settings["power_5G_1"] = power
+ self.access_point.ssh.run("iw dev {} set txpower {}".format(
+ interface, power_string))
+
+ def set_rate(self,
+ network,
+ mode=None,
+ num_streams=None,
+ rate=None,
+ short_gi=0):
+ """Function that sets rate.
+
+ Args:
+ network: string containing network identifier (2G, 5G_1, 5G_2)
+ mode: string indicating the WiFi standard to use
+ num_streams: number of MIMO streams. used only for VHT
+ rate: data rate of MCS index to use
+ short_gi: boolean controlling the use of short guard interval
+ """
+ if "2G" in network:
+ interface = self.access_point.wlan_2g
+ interface_short = "2.4"
+ elif "5G_1" in network:
+ interface = self.access_point.wlan_5g
+ interface_short = "5"
+
+ if mode.lower() in ["legacy", "11a", "11b", "11g"]:
+ cmd_string = "iw dev {0} set bitrates legacy-{1} {2} ht-mcs-{1} vht-mcs-{1}".format(
+ interface, interface_short, rate)
+ elif "vht" in mode.lower():
+ cmd_string = "iw dev {0} set bitrates legacy-{1} ht-mcs-{1} vht-mcs-{1} {2}:{3}".format(
+ interface, interface_short, num_streams, rate)
+ if short_gi:
+ cmd_string = cmd_string + " sgi-interface_short"
+ elif "ht" in mode.lower():
+ cmd_string = "iw dev {0} set bitrates legacy-{1} ht-mcs-{1} {2} vht-mcs-{1}".format(
+ interface, interface_short, rate)
+ self.access_point.ssh.run(cmd_string)