| #!/usr/bin/env python3 |
| # |
| # Copyright 2020 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import json |
| import os |
| import time |
| |
| from statistics import pstdev |
| |
| from bokeh.models import FixedTicker |
| from bokeh.plotting import ColumnDataSource |
| from bokeh.plotting import figure |
| from bokeh.plotting import output_file |
| from bokeh.plotting import save |
| |
| from acts import asserts |
| from acts import context |
| from acts import utils |
| from acts.controllers.access_point import setup_ap |
| from acts.controllers.ap_lib import hostapd_config |
| from acts.controllers.ap_lib import hostapd_constants |
| from acts.controllers.ap_lib.hostapd_security import Security |
| from acts.controllers.iperf_server import IPerfResult |
| from acts_contrib.test_utils.abstract_devices.wlan_device import create_wlan_device |
| from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest |
| |
| N_CAPABILITIES_DEFAULT = [ |
| hostapd_constants.N_CAPABILITY_LDPC, hostapd_constants.N_CAPABILITY_SGI20, |
| hostapd_constants.N_CAPABILITY_SGI40, |
| hostapd_constants.N_CAPABILITY_TX_STBC, |
| hostapd_constants.N_CAPABILITY_RX_STBC1 |
| ] |
| |
| AC_CAPABILITIES_DEFAULT = [ |
| hostapd_constants.AC_CAPABILITY_MAX_MPDU_11454, |
| hostapd_constants.AC_CAPABILITY_RXLDPC, |
| hostapd_constants.AC_CAPABILITY_SHORT_GI_80, |
| hostapd_constants.AC_CAPABILITY_TX_STBC_2BY1, |
| hostapd_constants.AC_CAPABILITY_RX_STBC_1, |
| hostapd_constants.AC_CAPABILITY_MAX_A_MPDU_LEN_EXP7, |
| hostapd_constants.AC_CAPABILITY_RX_ANTENNA_PATTERN, |
| hostapd_constants.AC_CAPABILITY_TX_ANTENNA_PATTERN |
| ] |
| |
| DEFAULT_MIN_THROUGHPUT = 0 |
| DEFAULT_MAX_STD_DEV = 1 |
| |
| DEFAULT_TIME_TO_WAIT_FOR_IP_ADDR = 30 |
| GRAPH_CIRCLE_SIZE = 10 |
| IPERF_NO_THROUGHPUT_VALUE = 0 |
| MAX_2_4_CHANNEL = 14 |
| TIME_TO_SLEEP_BETWEEN_RETRIES = 1 |
| TIME_TO_WAIT_FOR_COUNTRY_CODE = 10 |
| WEP_HEX_STRING_LENGTH = 10 |
| |
| MEGABITS_PER_SECOND = 'Mbps' |
| |
| |
| def get_test_name(settings): |
| """Retrieves the test_name value from test_settings""" |
| return settings.get('test_name') |
| |
| |
| class ChannelSweepTest(WifiBaseTest): |
| """Tests channel performance and regulatory compliance.. |
| |
| Testbed Requirement: |
| * One ACTS compatible device (dut) |
| * One Access Point |
| * One Linux Machine used as IPerfServer if running performance tests |
| Note: Performance tests should be done in isolated testbed. |
| """ |
| def __init__(self, controllers): |
| WifiBaseTest.__init__(self, controllers) |
| if 'channel_sweep_test_params' in self.user_params: |
| self.time_to_wait_for_ip_addr = self.user_params[ |
| 'channel_sweep_test_params'].get( |
| 'time_to_wait_for_ip_addr', |
| DEFAULT_TIME_TO_WAIT_FOR_IP_ADDR) |
| else: |
| self.time_to_wait_for_ip_addr = DEFAULT_TIME_TO_WAIT_FOR_IP_ADDR |
| |
| def setup_class(self): |
| super().setup_class() |
| if 'dut' in self.user_params: |
| if self.user_params['dut'] == 'fuchsia_devices': |
| self.dut = create_wlan_device(self.fuchsia_devices[0]) |
| elif self.user_params['dut'] == 'android_devices': |
| self.dut = create_wlan_device(self.android_devices[0]) |
| else: |
| raise ValueError('Invalid DUT specified in config. (%s)' % |
| self.user_params['dut']) |
| else: |
| # Default is an android device, just like the other tests |
| self.dut = create_wlan_device(self.android_devices[0]) |
| |
| self.android_devices = getattr(self, 'android_devices', []) |
| |
| self.access_point = self.access_points[0] |
| self.iperf_server = self.iperf_servers[0] |
| self.iperf_server.start() |
| self.iperf_client = self.iperf_clients[0] |
| self.access_point.stop_all_aps() |
| |
| def setup_test(self): |
| # TODO(fxb/46417): Uncomment when wlanClearCountry is implemented up any |
| # country code changes. |
| # for fd in self.fuchsia_devices: |
| # phy_ids_response = fd.wlan_lib.wlanPhyIdList() |
| # if phy_ids_response.get('error'): |
| # raise ConnectionError( |
| # 'Failed to retrieve phy ids from FuchsiaDevice (%s). ' |
| # 'Error: %s' % (fd.ip, phy_ids_response['error'])) |
| # for id in phy_ids_response['result']: |
| # clear_country_response = fd.wlan_lib.wlanClearCountry(id) |
| # if clear_country_response.get('error'): |
| # raise EnvironmentError( |
| # 'Failed to reset country code on FuchsiaDevice (%s). ' |
| # 'Error: %s' % (fd.ip, clear_country_response['error']) |
| # ) |
| self.access_point.stop_all_aps() |
| for ad in self.android_devices: |
| ad.droid.wakeLockAcquireBright() |
| ad.droid.wakeUpNow() |
| self.dut.wifi_toggle_state(True) |
| self.dut.disconnect() |
| |
| def teardown_test(self): |
| for ad in self.android_devices: |
| ad.droid.wakeLockRelease() |
| ad.droid.goToSleepNow() |
| self.dut.turn_location_off_and_scan_toggle_off() |
| self.dut.disconnect() |
| self.access_point.stop_all_aps() |
| |
| def on_fail(self, test_name, begin_time): |
| self.dut.take_bug_report(test_name, begin_time) |
| self.dut.get_log(test_name, begin_time) |
| |
| def set_dut_country_code(self, country_code): |
| """Set the country code on the DUT. Then verify that the country |
| code was set successfully |
| |
| Args: |
| country_code: string, the 2 character country code to set |
| """ |
| self.log.info('Setting DUT country code to %s' % country_code) |
| country_code_response = self.dut.device.regulatory_region_lib.setRegion( |
| country_code) |
| if country_code_response.get('error'): |
| raise EnvironmentError( |
| 'Failed to set country code (%s) on DUT. Error: %s' % |
| (country_code, country_code_response['error'])) |
| |
| self.log.info('Verifying DUT country code was correctly set to %s.' % |
| country_code) |
| phy_ids_response = self.dut.device.wlan_lib.wlanPhyIdList() |
| if phy_ids_response.get('error'): |
| raise ConnectionError('Failed to get phy ids from DUT. Error: %s' % |
| (country_code, phy_ids_response['error'])) |
| |
| end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE |
| while time.time() < end_time: |
| for id in phy_ids_response['result']: |
| get_country_response = self.dut.device.wlan_lib.wlanGetCountry( |
| id) |
| if get_country_response.get('error'): |
| raise ConnectionError( |
| 'Failed to query PHY ID (%s) for country. Error: %s' % |
| (id, get_country_response['error'])) |
| |
| set_code = ''.join([ |
| chr(ascii_char) |
| for ascii_char in get_country_response['result'] |
| ]) |
| if set_code != country_code: |
| self.log.debug( |
| 'PHY (id: %s) has incorrect country code set. ' |
| 'Expected: %s, Got: %s' % (id, country_code, set_code)) |
| break |
| else: |
| self.log.info('All PHYs have expected country code (%s)' % |
| country_code) |
| break |
| time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES) |
| else: |
| raise EnvironmentError('Failed to set DUT country code to %s.' % |
| country_code) |
| |
| def setup_ap(self, channel, channel_bandwidth, security_profile=None): |
| """Start network on AP with basic configuration. |
| |
| Args: |
| channel: int, channel to use for network |
| channel_bandwidth: int, channel bandwidth in mhz to use for network, |
| security_profile: Security object, or None if open |
| |
| Returns: |
| string, ssid of network running |
| |
| Raises: |
| ConnectionError if network is not started successfully. |
| """ |
| if channel > MAX_2_4_CHANNEL: |
| vht_bandwidth = channel_bandwidth |
| else: |
| vht_bandwidth = None |
| |
| if channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_20MHZ: |
| n_capabilities = N_CAPABILITIES_DEFAULT + [ |
| hostapd_constants.N_CAPABILITY_HT20 |
| ] |
| elif (channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_40MHZ or |
| channel_bandwidth == hostapd_constants.CHANNEL_BANDWIDTH_80MHZ): |
| if hostapd_config.ht40_plus_allowed(channel): |
| extended_channel = [hostapd_constants.N_CAPABILITY_HT40_PLUS] |
| elif hostapd_config.ht40_minus_allowed(channel): |
| extended_channel = [hostapd_constants.N_CAPABILITY_HT40_MINUS] |
| else: |
| raise ValueError('Invalid Channel: %s' % channel) |
| n_capabilities = N_CAPABILITIES_DEFAULT + extended_channel |
| else: |
| raise ValueError('Invalid Bandwidth: %s' % channel_bandwidth) |
| ssid = utils.rand_ascii_str(hostapd_constants.AP_SSID_LENGTH_2G) |
| try: |
| setup_ap(access_point=self.access_point, |
| profile_name='whirlwind', |
| channel=channel, |
| security=security_profile, |
| n_capabilities=n_capabilities, |
| ac_capabilities=None, |
| force_wmm=True, |
| ssid=ssid, |
| vht_bandwidth=vht_bandwidth, |
| setup_bridge=True) |
| except Exception as err: |
| raise ConnectionError( |
| 'Failed to setup ap on channel: %s, channel bandwidth: %smhz. ' |
| 'Error: %s' % (channel, channel_bandwidth, err)) |
| else: |
| self.log.info( |
| 'Network (ssid: %s) up on channel %s w/ channel bandwidth %smhz' |
| % (ssid, channel, channel_bandwidth)) |
| |
| return ssid |
| |
| def get_and_verify_iperf_address(self, channel, device, interface=None): |
| """Get ip address from a devices interface and verify it belongs to |
| expected subnet based on APs DHCP config. |
| |
| Args: |
| channel: int, channel network is running on, to determine subnet |
| device: device to get ip address for |
| interface (default: None): interface on device to get ip address. |
| If None, uses device.test_interface. |
| |
| Returns: |
| String, ip address of device on given interface (or test_interface) |
| |
| Raises: |
| ConnectionError, if device does not have a valid ip address after |
| all retries. |
| """ |
| if channel <= MAX_2_4_CHANNEL: |
| subnet = self.access_point._AP_2G_SUBNET_STR |
| else: |
| subnet = self.access_point._AP_5G_SUBNET_STR |
| end_time = time.time() + self.time_to_wait_for_ip_addr |
| while time.time() < end_time: |
| if interface: |
| device_addresses = device.get_interface_ip_addresses(interface) |
| else: |
| device_addresses = device.get_interface_ip_addresses( |
| device.test_interface) |
| |
| if device_addresses['ipv4_private']: |
| for ip_addr in device_addresses['ipv4_private']: |
| if utils.ip_in_subnet(ip_addr, subnet): |
| return ip_addr |
| else: |
| self.log.debug( |
| 'Device has an ip address (%s), but it is not in ' |
| 'subnet %s' % (ip_addr, subnet)) |
| else: |
| self.log.debug( |
| 'Device does not have a valid ip address. Retrying.') |
| time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES) |
| raise ConnectionError('Device failed to get an ip address.') |
| |
| def get_iperf_throughput(self, |
| iperf_server_address, |
| iperf_client_address, |
| reverse=False): |
| """Run iperf between client and server and get the throughput. |
| |
| Args: |
| iperf_server_address: string, ip address of running iperf server |
| iperf_client_address: string, ip address of iperf client (dut) |
| reverse (default: False): If True, run traffic in reverse direction, |
| from server to client. |
| |
| Returns: |
| int, iperf throughput OR IPERF_NO_THROUGHPUT_VALUE, if iperf fails |
| """ |
| if reverse: |
| self.log.info( |
| 'Running IPerf traffic from server (%s) to dut (%s).' % |
| (iperf_server_address, iperf_client_address)) |
| iperf_results_file = self.iperf_client.start( |
| iperf_server_address, '-i 1 -t 10 -R -J', 'channel_sweep_rx') |
| else: |
| self.log.info( |
| 'Running IPerf traffic from dut (%s) to server (%s).' % |
| (iperf_client_address, iperf_server_address)) |
| iperf_results_file = self.iperf_client.start( |
| iperf_server_address, '-i 1 -t 10 -J', 'channel_sweep_tx') |
| if iperf_results_file: |
| iperf_results = IPerfResult( |
| iperf_results_file, reporting_speed_units=MEGABITS_PER_SECOND) |
| return iperf_results.avg_send_rate |
| else: |
| return IPERF_NO_THROUGHPUT_VALUE |
| |
| def log_to_file_and_throughput_data(self, channel, channel_bandwidth, |
| tx_throughput, rx_throughput): |
| """Write performance info to csv file and to throughput data. |
| |
| Args: |
| channel: int, channel that test was run on |
| channel_bandwidth: int, channel bandwidth the test used |
| tx_throughput: float, throughput value from dut to iperf server |
| rx_throughput: float, throughput value from iperf server to dut |
| """ |
| test_name = self.throughput_data['test'] |
| output_path = context.get_current_context().get_base_output_path() |
| log_path = '%s/ChannelSweepTest/%s' % (output_path, test_name) |
| if not os.path.exists(log_path): |
| os.makedirs(log_path) |
| log_file = '%s/%s_%smhz.csv' % (log_path, test_name, channel_bandwidth) |
| self.log.info('Writing IPerf results for %s to %s' % |
| (test_name, log_file)) |
| with open(log_file, 'a') as csv_file: |
| csv_file.write('%s,%s,%s\n' % |
| (channel, tx_throughput, rx_throughput)) |
| self.throughput_data['results'][str(channel)] = { |
| 'tx_throughput': tx_throughput, |
| 'rx_throughput': rx_throughput |
| } |
| |
| def write_graph(self): |
| """Create graph html files from throughput data, plotting channel vs |
| tx_throughput and channel vs rx_throughput. |
| """ |
| output_path = context.get_current_context().get_base_output_path() |
| test_name = self.throughput_data['test'] |
| channel_bandwidth = self.throughput_data['channel_bandwidth'] |
| output_file_name = '%s/ChannelSweepTest/%s/%s_%smhz.html' % ( |
| output_path, test_name, test_name, channel_bandwidth) |
| output_file(output_file_name) |
| channels = [] |
| tx_throughputs = [] |
| rx_throughputs = [] |
| for channel in self.throughput_data['results']: |
| channels.append(str(channel)) |
| tx_throughputs.append( |
| self.throughput_data['results'][channel]['tx_throughput']) |
| rx_throughputs.append( |
| self.throughput_data['results'][channel]['rx_throughput']) |
| channel_vs_throughput_data = ColumnDataSource( |
| data=dict(channels=channels, |
| tx_throughput=tx_throughputs, |
| rx_throughput=rx_throughputs)) |
| TOOLTIPS = [('Channel', '@channels'), |
| ('TX_Throughput', '@tx_throughput'), |
| ('RX_Throughput', '@rx_throughput')] |
| channel_vs_throughput_graph = figure(title='Channels vs. Throughput', |
| x_axis_label='Channels', |
| x_range=channels, |
| y_axis_label='Throughput', |
| tooltips=TOOLTIPS) |
| channel_vs_throughput_graph.sizing_mode = 'stretch_both' |
| channel_vs_throughput_graph.title.align = 'center' |
| channel_vs_throughput_graph.line('channels', |
| 'tx_throughput', |
| source=channel_vs_throughput_data, |
| line_width=2, |
| line_color='blue', |
| legend_label='TX_Throughput') |
| channel_vs_throughput_graph.circle('channels', |
| 'tx_throughput', |
| source=channel_vs_throughput_data, |
| size=GRAPH_CIRCLE_SIZE, |
| color='blue') |
| channel_vs_throughput_graph.line('channels', |
| 'rx_throughput', |
| source=channel_vs_throughput_data, |
| line_width=2, |
| line_color='red', |
| legend_label='RX_Throughput') |
| channel_vs_throughput_graph.circle('channels', |
| 'rx_throughput', |
| source=channel_vs_throughput_data, |
| size=GRAPH_CIRCLE_SIZE, |
| color='red') |
| |
| channel_vs_throughput_graph.legend.location = "top_left" |
| graph_file = save([channel_vs_throughput_graph]) |
| self.log.info('Saved graph to %s' % graph_file) |
| |
| def verify_standard_deviation(self, max_std_dev): |
| """Verifies the standard deviation of the throughput across the channels |
| does not exceed the max_std_dev value. |
| |
| Args: |
| max_std_dev: float, max standard deviation of throughput for a test |
| to pass (in Mb/s) |
| |
| Raises: |
| TestFailure, if standard deviation of throughput exceeds max_std_dev |
| """ |
| self.log.info('Verifying standard deviation across channels does not ' |
| 'exceed max standard deviation of %s Mb/s' % max_std_dev) |
| tx_values = [] |
| rx_values = [] |
| for channel in self.throughput_data['results']: |
| if self.throughput_data['results'][channel][ |
| 'tx_throughput'] is not None: |
| tx_values.append( |
| self.throughput_data['results'][channel]['tx_throughput']) |
| if self.throughput_data['results'][channel][ |
| 'rx_throughput'] is not None: |
| rx_values.append( |
| self.throughput_data['results'][channel]['rx_throughput']) |
| tx_std_dev = pstdev(tx_values) |
| rx_std_dev = pstdev(rx_values) |
| if tx_std_dev > max_std_dev or rx_std_dev > max_std_dev: |
| asserts.fail( |
| 'With %smhz channel bandwidth, throughput standard ' |
| 'deviation (tx: %s Mb/s, rx: %s Mb/s) exceeds max standard ' |
| 'deviation (%s Mb/s).' % |
| (self.throughput_data['channel_bandwidth'], tx_std_dev, |
| rx_std_dev, max_std_dev)) |
| else: |
| asserts.explicit_pass( |
| 'Throughput standard deviation (tx: %s Mb/s, rx: %s Mb/s) ' |
| 'with %smhz channel bandwidth does not exceed maximum (%s Mb/s).' |
| % (tx_std_dev, rx_std_dev, |
| self.throughput_data['channel_bandwidth'], max_std_dev)) |
| |
| def run_channel_performance_tests(self, settings): |
| """Test function for running channel performance tests. Used by both |
| explicit test cases and debug test cases from config. Runs a performance |
| test for each channel in test_channels with test_channel_bandwidth, then |
| writes a graph and csv file of the channel vs throughput. |
| |
| Args: |
| settings: dict, containing the following test settings |
| test_channels: list of channels to test. |
| test_channel_bandwidth: int, channel bandwidth to use for test. |
| test_security (optional): string, security type to use for test. |
| min_tx_throughput (optional, default: 0): float, minimum tx |
| throughput threshold to pass individual channel tests |
| (in Mb/s). |
| min_rx_throughput (optional, default: 0): float, minimum rx |
| throughput threshold to pass individual channel tests |
| (in Mb/s). |
| max_std_dev (optional, default: 1): float, maximum standard |
| deviation of throughput across all test channels to pass |
| test (in Mb/s). |
| base_test_name (optional): string, test name prefix to use with |
| generated subtests. |
| country_name (optional): string, country name from |
| hostapd_constants to set on device. |
| country_code (optional): string, two-char country code to set on |
| the DUT. Takes priority over country_name. |
| test_name (debug tests only): string, the test name for this |
| parent test case from the config file. In explicit tests, |
| this is not necessary. |
| |
| Writes: |
| CSV file: channel, tx_throughput, rx_throughput |
| for every test channel. |
| Graph: channel vs tx_throughput & channel vs rx_throughput |
| |
| Raises: |
| TestFailure, if throughput standard deviation across channels |
| exceeds max_std_dev |
| |
| Example Explicit Test (see EOF for debug JSON example): |
| def test_us_2g_20mhz_wpa2(self): |
| self.run_channel_performance_tests( |
| dict( |
| test_channels=hostapd_constants.US_CHANNELS_2G, |
| test_channel_bandwidth=20, |
| test_security=hostapd_constants.WPA2_STRING, |
| min_tx_throughput=2, |
| min_rx_throughput=4, |
| max_std_dev=0.75, |
| country_code='US', |
| base_test_name='test_us')) |
| """ |
| test_channels = settings['test_channels'] |
| test_channel_bandwidth = settings['test_channel_bandwidth'] |
| test_security = settings.get('test_security', None) |
| test_name = settings.get('test_name', self.test_name) |
| base_test_name = settings.get('base_test_name', 'test') |
| min_tx_throughput = settings.get('min_tx_throughput', |
| DEFAULT_MIN_THROUGHPUT) |
| min_rx_throughput = settings.get('min_rx_throughput', |
| DEFAULT_MIN_THROUGHPUT) |
| max_std_dev = settings.get('max_std_dev', DEFAULT_MAX_STD_DEV) |
| country_code = settings.get('country_code') |
| country_name = settings.get('country_name') |
| country_label = None |
| |
| if country_code: |
| country_label = country_code |
| self.set_dut_country_code(country_code) |
| elif country_name: |
| country_label = country_name |
| code = hostapd_constants.COUNTRY_CODE[country_name]['country_code'] |
| self.set_dut_country_code(code) |
| |
| self.throughput_data = { |
| 'test': test_name, |
| 'channel_bandwidth': test_channel_bandwidth, |
| 'results': {} |
| } |
| test_list = [] |
| for channel in test_channels: |
| sub_test_name = 'test_%schannel_%s_%smhz_%s_performance' % ( |
| '%s_' % country_label if country_label else '', channel, |
| test_channel_bandwidth, |
| test_security if test_security else 'open') |
| test_list.append({ |
| 'test_name': sub_test_name, |
| 'channel': int(channel), |
| 'channel_bandwidth': int(test_channel_bandwidth), |
| 'security': test_security, |
| 'min_tx_throughput': min_tx_throughput, |
| 'min_rx_throughput': min_rx_throughput |
| }) |
| self.run_generated_testcases(self.get_channel_performance, |
| settings=test_list, |
| name_func=get_test_name) |
| self.log.info('Channel tests completed.') |
| self.write_graph() |
| self.verify_standard_deviation(max_std_dev) |
| |
| def get_channel_performance(self, settings): |
| """Run a single channel performance test and logs results to csv file |
| and throughput data. Run with generated sub test cases in |
| run_channel_performance_tests. |
| |
| 1. Sets up network with test settings |
| 2. Associates DUT |
| 3. Runs traffic between DUT and iperf server (both directions) |
| 4. Logs channel, tx_throughput (Mb/s), and rx_throughput (Mb/s) to |
| log file and throughput data. |
| 5. Checks throughput values against minimum throughput thresholds. |
| |
| Args: |
| settings: see run_channel_performance_tests |
| |
| Raises: |
| TestFailure, if throughput (either direction) is less than |
| the directions given minimum throughput threshold. |
| """ |
| channel = settings['channel'] |
| channel_bandwidth = settings['channel_bandwidth'] |
| security = settings['security'] |
| test_name = settings['test_name'] |
| min_tx_throughput = settings['min_tx_throughput'] |
| min_rx_throughput = settings['min_rx_throughput'] |
| if security: |
| if security == hostapd_constants.WEP_STRING: |
| password = utils.rand_hex_str(WEP_HEX_STRING_LENGTH) |
| else: |
| password = utils.rand_ascii_str( |
| hostapd_constants.MIN_WPA_PSK_LENGTH) |
| security_profile = Security(security_mode=security, |
| password=password) |
| else: |
| password = None |
| security_profile = None |
| ssid = self.setup_ap(channel, channel_bandwidth, security_profile) |
| associated = self.dut.associate(ssid, target_pwd=password) |
| if not associated: |
| self.log_to_file_and_throughput_data(channel, channel_bandwidth, |
| None, None) |
| asserts.fail('Device failed to associate with network %s' % ssid) |
| self.log.info('DUT (%s) connected to network %s.' % |
| (self.dut.device.ip, ssid)) |
| self.iperf_server.renew_test_interface_ip_address() |
| self.log.info( |
| 'Getting ip address for iperf server. Will retry for %s seconds.' % |
| self.time_to_wait_for_ip_addr) |
| iperf_server_address = self.get_and_verify_iperf_address( |
| channel, self.iperf_server) |
| self.log.info( |
| 'Getting ip address for DUT. Will retry for %s seconds.' % |
| self.time_to_wait_for_ip_addr) |
| iperf_client_address = self.get_and_verify_iperf_address( |
| channel, self.dut, self.iperf_client.test_interface) |
| tx_throughput = self.get_iperf_throughput(iperf_server_address, |
| iperf_client_address) |
| rx_throughput = self.get_iperf_throughput(iperf_server_address, |
| iperf_client_address, |
| reverse=True) |
| self.log_to_file_and_throughput_data(channel, channel_bandwidth, |
| tx_throughput, rx_throughput) |
| self.log.info('Throughput (tx, rx): (%s Mb/s, %s Mb/s), ' |
| 'Minimum threshold (tx, rx): (%s Mb/s, %s Mb/s)' % |
| (tx_throughput, rx_throughput, min_tx_throughput, |
| min_rx_throughput)) |
| base_message = ( |
| 'Actual throughput (on channel: %s, channel bandwidth: ' |
| '%s, security: %s)' % (channel, channel_bandwidth, security)) |
| if (tx_throughput < min_tx_throughput |
| or rx_throughput < min_rx_throughput): |
| asserts.fail('%s below the minimum threshold.' % base_message) |
| asserts.explicit_pass('%s above the minimum threshold.' % base_message) |
| |
| def verify_regulatory_compliance(self, settings): |
| """Test function for regulatory compliance tests. Verify device complies |
| with provided regulatory requirements. |
| |
| Args: |
| settings: dict, containing the following test settings |
| test_channels: dict, mapping channels to a set of the channel |
| bandwidths to test (see example for using JSON). Defaults |
| to hostapd_constants.ALL_CHANNELS. |
| country_code: string, two-char country code to set on device |
| (prioritized over country_name) |
| country_name: string, country name from hostapd_constants to set |
| on device. |
| base_test_name (optional): string, test name prefix to use with |
| generatedsubtests. |
| test_name: string, the test name for this |
| parent test case from the config file. In explicit tests, |
| this is not necessary. |
| """ |
| country_name = settings.get('country_name') |
| country_code = settings.get('country_code') |
| if not (country_code or country_name): |
| raise ValueError('No country code or name provided.') |
| |
| test_channels = settings.get('test_channels', |
| hostapd_constants.ALL_CHANNELS) |
| allowed_channels = settings['allowed_channels'] |
| |
| base_test_name = settings.get('base_test_name', 'test_compliance') |
| |
| if country_code: |
| code = country_code |
| else: |
| code = hostapd_constants.COUNTRY_CODE[country_name]['country_code'] |
| |
| self.set_dut_country_code(code) |
| |
| test_list = [] |
| for channel in test_channels: |
| for channel_bandwidth in test_channels[channel]: |
| sub_test_name = '%s_channel_%s_%smhz' % ( |
| base_test_name, channel, channel_bandwidth) |
| should_associate = (channel in allowed_channels |
| and channel_bandwidth |
| in allowed_channels[channel]) |
| # Note: these int conversions because when these tests are |
| # imported via JSON, they may be strings since the channels |
| # will be keys. This makes the json/list test_channels param |
| # behave exactly like the in code dict/set test_channels. |
| test_list.append({ |
| 'country_code': code, |
| 'channel': int(channel), |
| 'channel_bandwidth': int(channel_bandwidth), |
| 'should_associate': should_associate, |
| 'test_name': sub_test_name |
| }) |
| self.run_generated_testcases(test_func=self.verify_channel_compliance, |
| settings=test_list, |
| name_func=get_test_name) |
| |
| def verify_channel_compliance(self, settings): |
| """Verify device complies with provided regulatory requirements for a |
| specific channel and channel bandwidth. Run with generated test cases |
| in the verify_regulatory_compliance parent test. |
| _ |
| Args: |
| settings: see verify_regulatory_compliance` |
| """ |
| channel = settings['channel'] |
| channel_bandwidth = settings['channel_bandwidth'] |
| code = settings['country_code'] |
| should_associate = settings['should_associate'] |
| |
| ssid = self.setup_ap(channel, channel_bandwidth) |
| |
| self.log.info( |
| 'Attempting to associate with network (%s) on channel %s @ %smhz. ' |
| 'Expected behavior: %s' % |
| (ssid, channel, channel_bandwidth, 'Device should associate' |
| if should_associate else 'Device should NOT associate.')) |
| |
| associated = self.dut.associate(ssid) |
| if associated == should_associate: |
| asserts.explicit_pass( |
| 'Device complied with %s regulatory requirement for channel %s ' |
| ' with channel bandwidth %smhz. %s' % |
| (code, channel, channel_bandwidth, |
| 'Associated.' if associated else 'Refused to associate.')) |
| else: |
| asserts.fail( |
| 'Device failed compliance with regulatory domain %s for ' |
| 'channel %s with channel bandwidth %smhz. Expected: %s, Got: %s' |
| % (code, channel, channel_bandwidth, 'Should associate' |
| if should_associate else 'Should not associate', |
| 'Associated' if associated else 'Did not associate')) |
| |
| # Helper functions to allow explicit tests throughput and standard deviation |
| # thresholds to be passed in via config. |
| def _get_min_tx_throughput(self, test_name): |
| return self.user_params.get('channel_sweep_test_params', |
| {}).get(test_name, |
| {}).get('min_tx_throughput', |
| DEFAULT_MIN_THROUGHPUT) |
| |
| def _get_min_rx_throughput(self, test_name): |
| return self.user_params.get('channel_sweep_test_params', |
| {}).get(test_name, |
| {}).get('min_rx_throughput', |
| DEFAULT_MIN_THROUGHPUT) |
| |
| def _get_max_std_dev(self, test_name): |
| return self.user_params.get('channel_sweep_test_params', |
| {}).get(test_name, |
| {}).get('min_std_dev', |
| DEFAULT_MAX_STD_DEV) |
| |
| # Channel Performance of US Channels: 570 Test Cases |
| # 36 Test Cases |
| def test_us_20mhz_open_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_20MHZ, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 35 Test Cases |
| def test_us_40mhz_open_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_40MHZ, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 24 Test Cases |
| def test_us_80mhz_open_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_80MHZ, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 36 Test Cases |
| def test_us_20mhz_wep_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_20MHZ, |
| test_security=hostapd_constants.WEP_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 35 Test Cases |
| def test_us_40mhz_wep_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_40MHZ, |
| test_security=hostapd_constants.WEP_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 24 Test Cases |
| def test_us_80mhz_wep_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_80MHZ, |
| test_security=hostapd_constants.WEP_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 36 Test Cases |
| def test_us_20mhz_wpa_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_20MHZ, |
| test_security=hostapd_constants.WPA_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 35 Test Cases |
| def test_us_40mhz_wpa_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_40MHZ, |
| test_security=hostapd_constants.WPA_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 24 Test Cases |
| def test_us_80mhz_wpa_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_80MHZ, |
| test_security=hostapd_constants.WPA_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 36 Test Cases |
| def test_us_20mhz_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_20MHZ, |
| test_security=hostapd_constants.WPA2_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 35 Test Cases |
| def test_us_40mhz_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_40MHZ, |
| test_security=hostapd_constants.WPA2_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 24 Test Cases |
| def test_us_80mhz_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_80MHZ, |
| test_security=hostapd_constants.WPA2_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 36 Test Cases |
| def test_us_20mhz_wpa_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_20MHZ, |
| test_security=hostapd_constants.WPA_MIXED_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 35 Test Cases |
| def test_us_40mhz_wpa_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_40MHZ, |
| test_security=hostapd_constants.WPA_MIXED_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 24 Test Cases |
| def test_us_80mhz_wpa_wpa2_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_80MHZ, |
| test_security=hostapd_constants.WPA_MIXED_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 36 Test Cases |
| def test_us_20mhz_wpa3_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G, |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_20MHZ, |
| test_security=hostapd_constants.WPA3_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 35 Test Cases |
| def test_us_40mhz_wpa3_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_2G + |
| hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_40MHZ, |
| test_security=hostapd_constants.WPA3_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| # 24 Test Cases |
| def test_us_80mhz_wpa3_channel_performance(self): |
| self.run_channel_performance_tests( |
| dict(test_channels=hostapd_constants.US_CHANNELS_5G[:-1], |
| test_channel_bandwidth=hostapd_constants. |
| CHANNEL_BANDWIDTH_80MHZ, |
| test_security=hostapd_constants.WPA3_STRING, |
| base_test_name=self.test_name, |
| min_tx_throughput=self._get_min_tx_throughput(self.test_name), |
| min_rx_throughput=self._get_min_rx_throughput(self.test_name), |
| max_std_dev=self._get_max_std_dev(self.test_name))) |
| |
| def test_channel_performance_debug(self): |
| """Run channel performance test cases from the ACTS config file. |
| |
| Example: |
| "channel_sweep_test_params": { |
| "debug_channel_performance_tests": [ |
| { |
| "test_name": "test_123_20mhz_wpa2_performance" |
| "test_channels": [1, 2, 3], |
| "test_channel_bandwidth": 20, |
| "test_security": "wpa2", |
| "base_test_name": "test_123_perf", |
| "min_tx_throughput": 1.1, |
| "min_rx_throughput": 3, |
| "max_std_dev": 0.5 |
| }, |
| ... |
| ] |
| } |
| |
| """ |
| asserts.skip_if( |
| 'debug_channel_performance_tests' |
| not in self.user_params.get('channel_sweep_test_params', {}), |
| 'No custom channel performance tests provided in config.') |
| base_tests = self.user_params['channel_sweep_test_params'][ |
| 'debug_channel_performance_tests'] |
| self.run_generated_testcases(self.run_channel_performance_tests, |
| settings=base_tests, |
| name_func=get_test_name) |
| |
| def test_regulatory_compliance(self): |
| """Run regulatory compliance test case from the ACTS config file. |
| Note: only one country_name OR country_code is required. |
| |
| Example: |
| "channel_sweep_test_params": { |
| "regulatory_compliance_tests": [ |
| { |
| "test_name": "test_japan_compliance_1_13_36" |
| "country_name": "JAPAN", |
| "country_code": "JP", |
| "test_channels": { |
| "1": [20, 40], "13": [40], "36": [20, 40, 80] |
| }, |
| "allowed_channels": { |
| "1": [20, 40], "36": [20, 40, 80] |
| }, |
| "base_test_name": "test_japan" |
| }, |
| ... |
| ] |
| } |
| """ |
| asserts.skip_if( |
| 'regulatory_compliance_tests' |
| not in self.user_params.get('channel_sweep_test_params', {}), |
| 'No custom regulatory compliance tests provided in config.') |
| base_tests = self.user_params['channel_sweep_test_params'][ |
| 'regulatory_compliance_tests'] |
| self.run_generated_testcases(self.verify_regulatory_compliance, |
| settings=base_tests, |
| name_func=get_test_name) |