#!/usr/bin/env python3.4
#
#   Copyright 2017 - The Android Open Source Project
#
#   Licensed under the Apache License, Version 2.0 (the 'License');
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an 'AS IS' BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import collections
import csv
import itertools
import logging
import numpy
import os
from acts import asserts
from acts import context
from acts import base_test
from acts import utils
from acts.controllers import iperf_client
from acts.controllers.utils_lib import ssh
from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
from acts_contrib.test_utils.wifi import ota_chamber
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
from functools import partial
from WifiRvrTest import WifiRvrTest
from WifiPingTest import WifiPingTest


class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
    """Class to test WiFi sensitivity tests.

    This class implements measures WiFi sensitivity per rate. It heavily
    leverages the WifiRvrTest class and introduced minor differences to set
    specific rates and the access point, and implements a different pass/fail
    check. For an example config file to run this test class see
    example_connectivity_performance_ap_sta.json.
    """

    RSSI_POLL_INTERVAL = 0.2
    VALID_TEST_CONFIGS = {
        1: ['legacy', 'VHT20'],
        2: ['legacy', 'VHT20'],
        6: ['legacy', 'VHT20'],
        10: ['legacy', 'VHT20'],
        11: ['legacy', 'VHT20'],
        36: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
        40: ['legacy', 'VHT20'],
        44: ['legacy', 'VHT20'],
        48: ['legacy', 'VHT20'],
        149: ['legacy', 'VHT20', 'VHT40', 'VHT80'],
        153: ['legacy', 'VHT20'],
        157: ['legacy', 'VHT20'],
        161: ['legacy', 'VHT20']
    }
    RateTuple = collections.namedtuple(('RateTuple'),
                                       ['mcs', 'streams', 'data_rate'])
    #yapf:disable
    VALID_RATES = {
        'legacy_2GHz': [
            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
            RateTuple(11, 1, 11), RateTuple(9, 1, 9),
            RateTuple(6, 1, 6), RateTuple(5.5, 1, 5.5),
            RateTuple(2, 1, 2), RateTuple(1, 1, 1)],
        'legacy_5GHz': [
            RateTuple(54, 1, 54), RateTuple(48, 1, 48),
            RateTuple(36, 1, 36), RateTuple(24, 1, 24),
            RateTuple(18, 1, 18), RateTuple(12, 1, 12),
            RateTuple(9, 1, 9), RateTuple(6, 1, 6)],
        'HT20': [
            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
            RateTuple(3, 1, 26), RateTuple(2, 1, 21.7),
            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
            RateTuple(15, 2, 144.4), RateTuple(14, 2, 130),
            RateTuple(13, 2, 115.6), RateTuple(12, 2, 86.7),
            RateTuple(11, 2, 57.8), RateTuple(10, 2, 43.4),
            RateTuple(9, 2, 28.9), RateTuple(8, 2, 14.4)],
        'VHT20': [
            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
        'VHT40': [
            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
        'VHT80': [
            RateTuple(9, 1, 96), RateTuple(8, 1, 86.7),
            RateTuple(7, 1, 72.2), RateTuple(6, 1, 65),
            RateTuple(5, 1, 57.8), RateTuple(4, 1, 43.3),
            RateTuple(3, 1, 28.9), RateTuple(2, 1, 21.7),
            RateTuple(1, 1, 14.4), RateTuple(0, 1, 7.2),
            RateTuple(9, 2, 192), RateTuple(8, 2, 173.3),
            RateTuple(7, 2, 144.4), RateTuple(6, 2, 130.3),
            RateTuple(5, 2, 115.6), RateTuple(4, 2, 86.7),
            RateTuple(3, 2, 57.8), RateTuple(2, 2, 43.3),
            RateTuple(1, 2, 28.9), RateTuple(0, 2, 14.4)],
    }
    #yapf:enable

    def __init__(self, controllers):
        base_test.BaseTestClass.__init__(self, controllers)
        self.testcase_metric_logger = (
            BlackboxMappedMetricLogger.for_test_case())
        self.testclass_metric_logger = (
            BlackboxMappedMetricLogger.for_test_class())
        self.publish_testcase_metrics = True

    def setup_class(self):
        """Initializes common test hardware and parameters.

        This function initializes hardwares and compiles parameters that are
        common to all tests in this class.
        """
        self.dut = self.android_devices[-1]
        req_params = [
            'RetailAccessPoints', 'sensitivity_test_params', 'testbed_params',
            'RemoteServer'
        ]
        opt_params = ['main_network']
        self.unpack_userparams(req_params, opt_params)
        self.testclass_params = self.sensitivity_test_params
        self.num_atten = self.attenuators[0].instrument.num_atten
        self.ping_server = ssh.connection.SshConnection(
            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
        self.iperf_server = self.iperf_servers[0]
        self.iperf_client = self.iperf_clients[0]
        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
        self.log.info('Access Point Configuration: {}'.format(
            self.access_point.ap_settings))
        self.log_path = os.path.join(logging.log_path, 'results')
        os.makedirs(self.log_path, exist_ok=True)
        self.atten_dut_chain_map = {}
        self.testclass_results = []

        # Turn WiFi ON
        if self.testclass_params.get('airplane_mode', 1):
            self.log.info('Turning on airplane mode.')
            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
                                'Can not turn on airplane mode.')
        wutils.wifi_toggle_state(self.dut, True)

        # Configure test retries
        self.user_params['retry_tests'] = [self.__class__.__name__]

    def teardown_class(self):
        # Turn WiFi OFF
        for dev in self.android_devices:
            wutils.wifi_toggle_state(dev, False)
        self.process_testclass_results()

    def setup_test(self):
        self.retry_flag = False

    def teardown_test(self):
        self.retry_flag = False

    def on_retry(self):
        """Function to control test logic on retried tests.

        This function is automatically executed on tests that are being
        retried. In this case the function resets wifi, toggles it off and on
        and sets a retry_flag to enable further tweaking the test logic on
        second attempts.
        """
        self.retry_flag = True
        for dev in self.android_devices:
            wutils.reset_wifi(dev)
            wutils.toggle_wifi_off_and_on(dev)

    def pass_fail_check(self, result):
        """Checks sensitivity results and decides on pass/fail.

        Args:
            result: dict containing attenuation, throughput and other meta
                data
        """
        result_string = ('Throughput = {}%, Sensitivity = {}.'.format(
            result['peak_throughput_pct'], result['sensitivity']))
        if result['peak_throughput_pct'] < 95:
            asserts.fail('Result unreliable. {}'.format(result_string))
        else:
            asserts.explicit_pass('Test Passed. {}'.format(result_string))

    def process_testclass_results(self):
        """Saves and plots test results from all executed test cases."""
        # write json output
        testclass_results_dict = collections.OrderedDict()
        id_fields = ['mode', 'rate', 'num_streams', 'chain_mask']
        channels_tested = []
        for result in self.testclass_results:
            testcase_params = result['testcase_params']
            test_id = self.extract_test_id(testcase_params, id_fields)
            test_id = tuple(test_id.items())
            if test_id not in testclass_results_dict:
                testclass_results_dict[test_id] = collections.OrderedDict()
            channel = testcase_params['channel']
            if channel not in channels_tested:
                channels_tested.append(channel)
            if result['peak_throughput_pct'] >= 95:
                testclass_results_dict[test_id][channel] = result[
                    'sensitivity']
            else:
                testclass_results_dict[test_id][channel] = ''

        # calculate average metrics
        metrics_dict = collections.OrderedDict()
        id_fields = ['channel', 'mode', 'num_streams', 'chain_mask']
        for test_id in testclass_results_dict.keys():
            for channel in testclass_results_dict[test_id].keys():
                metric_tag = collections.OrderedDict(test_id, channel=channel)
                metric_tag = self.extract_test_id(metric_tag, id_fields)
                metric_tag = tuple(metric_tag.items())
                metrics_dict.setdefault(metric_tag, [])
                sensitivity_result = testclass_results_dict[test_id][channel]
                if sensitivity_result != '':
                    metrics_dict[metric_tag].append(sensitivity_result)
        for metric_tag_tuple, metric_data in metrics_dict.items():
            metric_tag_dict = collections.OrderedDict(metric_tag_tuple)
            metric_tag = 'ch{}_{}_nss{}_chain{}'.format(
                metric_tag_dict['channel'], metric_tag_dict['mode'],
                metric_tag_dict['num_streams'], metric_tag_dict['chain_mask'])
            metric_key = '{}.avg_sensitivity'.format(metric_tag)
            metric_value = numpy.nanmean(metric_data)
            self.testclass_metric_logger.add_metric(metric_key, metric_value)

        # write csv
        csv_header = ['Mode', 'MCS', 'Streams', 'Chain', 'Rate (Mbps)']
        for channel in channels_tested:
            csv_header.append('Ch. ' + str(channel))
        results_file_path = os.path.join(self.log_path, 'results.csv')
        with open(results_file_path, mode='w') as csv_file:
            writer = csv.DictWriter(csv_file, fieldnames=csv_header)
            writer.writeheader()
            for test_id, test_results in testclass_results_dict.items():
                test_id_dict = dict(test_id)
                if 'legacy' in test_id_dict['mode']:
                    rate_list = self.VALID_RATES['legacy_2GHz']
                else:
                    rate_list = self.VALID_RATES[test_id_dict['mode']]
                data_rate = next(rate.data_rate for rate in rate_list
                                 if rate[:-1] == (test_id_dict['rate'],
                                                  test_id_dict['num_streams']))
                row_value = {
                    'Mode': test_id_dict['mode'],
                    'MCS': test_id_dict['rate'],
                    'Streams': test_id_dict['num_streams'],
                    'Chain': test_id_dict['chain_mask'],
                    'Rate (Mbps)': data_rate,
                }
                for channel in channels_tested:
                    row_value['Ch. ' + str(channel)] = test_results.pop(
                        channel, ' ')
                writer.writerow(row_value)

        if not self.testclass_params['traffic_type'].lower() == 'ping':
            WifiRvrTest.process_testclass_results(self)

    def process_rvr_test_results(self, testcase_params, rvr_result):
        """Post processes RvR results to compute sensitivity.

        Takes in the results of the RvR tests and computes the sensitivity of
        the current rate by looking at the point at which throughput drops
        below the percentage specified in the config file. The function then
        calls on its parent class process_test_results to plot the result.

        Args:
            rvr_result: dict containing attenuation, throughput and other meta
            data
        """
        rvr_result['peak_throughput'] = max(rvr_result['throughput_receive'])
        rvr_result['peak_throughput_pct'] = 100
        throughput_check = [
            throughput < rvr_result['peak_throughput'] *
            (self.testclass_params['throughput_pct_at_sensitivity'] / 100)
            for throughput in rvr_result['throughput_receive']
        ]
        consistency_check = [
            idx for idx in range(len(throughput_check))
            if all(throughput_check[idx:])
        ]
        rvr_result['atten_at_range'] = rvr_result['attenuation'][
            consistency_check[0] - 1]
        rvr_result['range'] = rvr_result['fixed_attenuation'] + (
            rvr_result['atten_at_range'])
        rvr_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
            self.testbed_params['ap_tx_power_offset'][str(
                testcase_params['channel'])] - rvr_result['range'])
        WifiRvrTest.process_test_results(self, rvr_result)

    def process_ping_test_results(self, testcase_params, ping_result):
        """Post processes RvR results to compute sensitivity.

        Takes in the results of the RvR tests and computes the sensitivity of
        the current rate by looking at the point at which throughput drops
        below the percentage specified in the config file. The function then
        calls on its parent class process_test_results to plot the result.

        Args:
            rvr_result: dict containing attenuation, throughput and other meta
            data
        """
        WifiPingTest.process_ping_results(self, testcase_params, ping_result)
        ping_result['sensitivity'] = self.testclass_params['ap_tx_power'] + (
            self.testbed_params['ap_tx_power_offset'][str(
                testcase_params['channel'])] - ping_result['range'])

    def setup_sensitivity_test(self, testcase_params):
        if testcase_params['traffic_type'].lower() == 'ping':
            self.setup_ping_test(testcase_params)
            self.run_sensitivity_test = self.run_ping_test
            self.process_sensitivity_test_results = (
                self.process_ping_test_results)
        else:
            self.setup_rvr_test(testcase_params)
            self.run_sensitivity_test = self.run_rvr_test
            self.process_sensitivity_test_results = (
                self.process_rvr_test_results)

    def setup_ap(self, testcase_params):
        """Sets up the AP and attenuator to compensate for AP chain imbalance.

        Args:
            testcase_params: dict containing AP and other test params
        """
        band = self.access_point.band_lookup_by_channel(
            testcase_params['channel'])
        if '2G' in band:
            frequency = wutils.WifiEnums.channel_2G_to_freq[
                testcase_params['channel']]
        else:
            frequency = wutils.WifiEnums.channel_5G_to_freq[
                testcase_params['channel']]
        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
            self.access_point.set_region(self.testbed_params['DFS_region'])
        else:
            self.access_point.set_region(self.testbed_params['default_region'])
        self.access_point.set_channel(band, testcase_params['channel'])
        self.access_point.set_bandwidth(band, testcase_params['mode'])
        self.access_point.set_power(band, testcase_params['ap_tx_power'])
        self.access_point.set_rate(band, testcase_params['mode'],
                                   testcase_params['num_streams'],
                                   testcase_params['rate'],
                                   testcase_params['short_gi'])
        # Set attenuator offsets and set attenuators to initial condition
        atten_offsets = self.testbed_params['chain_offset'][str(
            testcase_params['channel'])]
        for atten in self.attenuators:
            if 'AP-Chain-0' in atten.path:
                atten.offset = atten_offsets[0]
            elif 'AP-Chain-1' in atten.path:
                atten.offset = atten_offsets[1]
            else:
                atten.offset = 0
        self.log.info('Access Point Configuration: {}'.format(
            self.access_point.ap_settings))

    def setup_dut(self, testcase_params):
        """Sets up the DUT in the configuration required by the test.

        Args:
            testcase_params: dict containing AP and other test params
        """
        # Check battery level before test
        if not wputils.health_check(self.dut, 10):
            asserts.skip('Battery level too low. Skipping test.')
        # Turn screen off to preserve battery
        self.dut.go_to_sleep()
        if wputils.validate_network(self.dut,
                                    testcase_params['test_network']['SSID']):
            self.log.info('Already connected to desired network')
        else:
            wutils.reset_wifi(self.dut)
            wutils.set_wifi_country_code(self.dut,
                                         self.testclass_params['country_code'])
            testcase_params['test_network']['channel'] = testcase_params[
                'channel']
            wutils.wifi_connect(self.dut,
                                testcase_params['test_network'],
                                num_of_tries=5,
                                check_connectivity=False)
        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
        # Activate/attenuate the correct chains
        if testcase_params['channel'] not in self.atten_dut_chain_map.keys():
            self.atten_dut_chain_map[testcase_params[
                'channel']] = wputils.get_current_atten_dut_chain_map(
                    self.attenuators, self.dut, self.ping_server)
        self.log.info('Current Attenuator-DUT Chain Map: {}'.format(
            self.atten_dut_chain_map[testcase_params['channel']]))
        for idx, atten in enumerate(self.attenuators):
            if self.atten_dut_chain_map[testcase_params['channel']][
                    idx] == testcase_params['attenuated_chain']:
                atten.offset = atten.instrument.max_atten

    def extract_test_id(self, testcase_params, id_fields):
        test_id = collections.OrderedDict(
            (param, testcase_params[param]) for param in id_fields)
        return test_id

    def get_start_atten(self, testcase_params):
        """Gets the starting attenuation for this sensitivity test.

        The function gets the starting attenuation by checking whether a test
        as the next higher MCS has been executed. If so it sets the starting
        point a configurable number of dBs below the next MCS's sensitivity.

        Returns:
            start_atten: starting attenuation for current test
        """
        # If the test is being retried, start from the beginning
        if self.retry_flag:
            self.log.info('Retry flag set. Setting attenuation to minimum.')
            return self.testclass_params['atten_start']
        # Get the current and reference test config. The reference test is the
        # one performed at the current MCS+1
        current_rate = testcase_params['rate']
        ref_test_params = self.extract_test_id(
            testcase_params,
            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
        if 'legacy' in testcase_params['mode']:
            if testcase_params['channel'] <= 13:
                rate_list = self.VALID_RATES['legacy_2GHz']
            else:
                rate_list = self.VALID_RATES['legacy_5GHz']
            ref_index = max(
                0,
                rate_list.index(self.RateTuple(current_rate, 1, current_rate))
                - 1)
            ref_test_params['rate'] = rate_list[ref_index].mcs
        else:
            ref_test_params['rate'] = current_rate + 1

        # Check if reference test has been run and set attenuation accordingly
        previous_params = [
            self.extract_test_id(
                result['testcase_params'],
                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
            for result in self.testclass_results
        ]

        try:
            ref_index = previous_params.index(ref_test_params)
            start_atten = self.testclass_results[ref_index][
                'atten_at_range'] - (
                    self.testclass_params['adjacent_mcs_range_gap'])
        except ValueError:
            self.log.warning(
                'Reference test not found. Starting from {} dB'.format(
                    self.testclass_params['atten_start']))
            start_atten = self.testclass_params['atten_start']
            start_atten = max(start_atten, 0)
        return start_atten

    def compile_test_params(self, testcase_params):
        """Function that generates test params based on the test name."""
        band = self.access_point.band_lookup_by_channel(
            testcase_params['channel'])
        testcase_params['test_network'] = self.main_network[band]
        if testcase_params['chain_mask'] in ['0', '1']:
            testcase_params['attenuated_chain'] = 'DUT-Chain-{}'.format(
                1 if testcase_params['chain_mask'] == '0' else 0)
        else:
            # Set attenuated chain to -1. Do not set to None as this will be
            # compared to RF chain map which may include None
            testcase_params['attenuated_chain'] = -1

        self.testclass_params[
            'range_ping_loss_threshold'] = 100 - self.testclass_params[
                'throughput_pct_at_sensitivity']
        if self.testclass_params['traffic_type'] == 'UDP':
            testcase_params['iperf_args'] = '-i 1 -t {} -J -u -b {}'.format(
                self.testclass_params['iperf_duration'],
                self.testclass_params['UDP_rates'][testcase_params['mode']])
        elif self.testclass_params['traffic_type'] == 'TCP':
            testcase_params['iperf_args'] = '-i 1 -t {} -J'.format(
                self.testclass_params['iperf_duration'])

        if self.testclass_params['traffic_type'] != 'ping' and isinstance(
                self.iperf_client, iperf_client.IPerfClientOverAdb):
            testcase_params['iperf_args'] += ' -R'
            testcase_params['use_client_output'] = True
        else:
            testcase_params['use_client_output'] = False

        return testcase_params

    def _test_sensitivity(self, testcase_params):
        """Function that gets called for each test case

        The function gets called in each rvr test case. The function customizes
        the rvr test based on the test name of the test that called it
        """
        # Compile test parameters from config and test name
        testcase_params = self.compile_test_params(testcase_params)
        testcase_params.update(self.testclass_params)
        testcase_params['atten_start'] = self.get_start_atten(testcase_params)
        num_atten_steps = int(
            (testcase_params['atten_stop'] - testcase_params['atten_start']) /
            testcase_params['atten_step'])
        testcase_params['atten_range'] = [
            testcase_params['atten_start'] + x * testcase_params['atten_step']
            for x in range(0, num_atten_steps)
        ]

        # Prepare devices and run test
        self.setup_sensitivity_test(testcase_params)
        result = self.run_sensitivity_test(testcase_params)
        self.process_sensitivity_test_results(testcase_params, result)

        # Post-process results
        self.testclass_results.append(result)
        self.pass_fail_check(result)

    def generate_test_cases(self, channels, modes, chain_mask):
        """Function that auto-generates test cases for a test class."""
        test_cases = []
        for channel in channels:
            requested_modes = [
                mode for mode in modes
                if mode in self.VALID_TEST_CONFIGS[channel]
            ]
            for mode in requested_modes:
                bandwidth = int(''.join([x for x in mode if x.isdigit()]))
                if 'VHT' in mode:
                    rates = self.VALID_RATES[mode]
                elif 'HT' in mode:
                    rates = self.VALID_RATES[mode]
                elif 'legacy' in mode and channel < 14:
                    rates = self.VALID_RATES['legacy_2GHz']
                elif 'legacy' in mode and channel > 14:
                    rates = self.VALID_RATES['legacy_5GHz']
                else:
                    raise ValueError('Invalid test mode.')
                for chain, rate in itertools.product(chain_mask, rates):
                    testcase_params = collections.OrderedDict(
                        channel=channel,
                        mode=mode,
                        bandwidth=bandwidth,
                        rate=rate.mcs,
                        num_streams=rate.streams,
                        short_gi=1,
                        chain_mask=chain)
                    if chain in ['0', '1'] and rate[1] == 2:
                        # Do not test 2-stream rates in single chain mode
                        continue
                    if 'legacy' in mode:
                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
                                         '_ch{}'.format(
                                             channel, mode,
                                             str(rate.mcs).replace('.', 'p'),
                                             rate.streams, chain))
                    else:
                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
                                         '_ch{}'.format(
                                             channel, mode, rate.mcs,
                                             rate.streams, chain))
                    setattr(self, testcase_name,
                            partial(self._test_sensitivity, testcase_params))
                    test_cases.append(testcase_name)
        return test_cases


class WifiSensitivity_AllChannels_Test(WifiSensitivityTest):
    def __init__(self, controllers):
        super().__init__(controllers)
        self.tests = self.generate_test_cases(
            [6, 36, 40, 44, 48, 149, 153, 157, 161],
            ['VHT20', 'VHT40', 'VHT80'], ['0', '1', '2x2'])


class WifiSensitivity_SampleChannels_Test(WifiSensitivityTest):
    def __init__(self, controllers):
        super().__init__(controllers)
        self.tests = self.generate_test_cases([6, 36, 149],
                                              ['VHT20', 'VHT40', 'VHT80'],
                                              ['0', '1', '2x2'])


class WifiSensitivity_2GHz_Test(WifiSensitivityTest):
    def __init__(self, controllers):
        super().__init__(controllers)
        self.tests = self.generate_test_cases([1, 2, 6, 10, 11], ['VHT20'],
                                              ['0', '1', '2x2'])


class WifiSensitivity_5GHz_Test(WifiSensitivityTest):
    def __init__(self, controllers):
        super().__init__(controllers)
        self.tests = self.generate_test_cases(
            [36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT40', 'VHT80'],
            ['0', '1', '2x2'])


class WifiSensitivity_UNII1_Test(WifiSensitivityTest):
    def __init__(self, controllers):
        super().__init__(controllers)
        self.tests = self.generate_test_cases([36, 40, 44, 48],
                                              ['VHT20', 'VHT40', 'VHT80'],
                                              ['0', '1', '2x2'])


class WifiSensitivity_UNII3_Test(WifiSensitivityTest):
    def __init__(self, controllers):
        super().__init__(controllers)
        self.tests = self.generate_test_cases([149, 153, 157, 161],
                                              ['VHT20', 'VHT40', 'VHT80'],
                                              ['0', '1', '2x2'])


# Over-the air version of senstivity tests
class WifiOtaSensitivityTest(WifiSensitivityTest):
    """Class to test over-the-air senstivity.

    This class implements measures WiFi sensitivity tests in an OTA chamber.
    It allows setting orientation and other chamber parameters to study
    performance in varying channel conditions
    """
    def __init__(self, controllers):
        base_test.BaseTestClass.__init__(self, controllers)
        self.testcase_metric_logger = (
            BlackboxMappedMetricLogger.for_test_case())
        self.testclass_metric_logger = (
            BlackboxMappedMetricLogger.for_test_class())
        self.publish_testcase_metrics = False

    def setup_class(self):
        WifiSensitivityTest.setup_class(self)
        self.current_chain_mask = '2x2'
        self.ota_chamber = ota_chamber.create(
            self.user_params['OTAChamber'])[0]

    def teardown_class(self):
        WifiSensitivityTest.teardown_class(self)
        self.ota_chamber.reset_chamber()

    def setup_sensitivity_test(self, testcase_params):
        # Setup turntable
        self.ota_chamber.set_orientation(testcase_params['orientation'])
        # Continue test setup
        WifiSensitivityTest.setup_sensitivity_test(self, testcase_params)

    def setup_dut(self, testcase_params):
        """Sets up the DUT in the configuration required by the test.

        Args:
            testcase_params: dict containing AP and other test params
        """
        # Configure the right INI settings
        if testcase_params['chain_mask'] != self.current_chain_mask:
            self.log.info('Updating WiFi chain mask to: {}'.format(
                testcase_params['chain_mask']))
            self.current_chain_mask = testcase_params['chain_mask']
            if testcase_params['chain_mask'] in ['0', '1']:
                wputils.set_ini_single_chain_mode(
                    self.dut, int(testcase_params['chain_mask']))
            else:
                wputils.set_ini_two_chain_mode(self.dut)
        # Check battery level before test
        if not wputils.health_check(self.dut, 10):
            asserts.skip('Battery level too low. Skipping test.')
        # Turn screen off to preserve battery
        self.dut.go_to_sleep()
        if wputils.validate_network(self.dut,
                                    testcase_params['test_network']['SSID']):
            self.log.info('Already connected to desired network')
        else:
            wutils.reset_wifi(self.dut)
            wutils.set_wifi_country_code(self.dut,
                                         self.testclass_params['country_code'])
            testcase_params['test_network']['channel'] = testcase_params[
                'channel']
            wutils.wifi_connect(self.dut,
                                testcase_params['test_network'],
                                num_of_tries=5,
                                check_connectivity=False)
        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]

    def process_testclass_results(self):
        """Saves and plots test results from all executed test cases."""
        testclass_results_dict = collections.OrderedDict()
        id_fields = ['channel', 'mode', 'rate']
        plots = []
        for result in self.testclass_results:
            test_id = self.extract_test_id(result['testcase_params'],
                                           id_fields)
            test_id = tuple(test_id.items())
            chain_mask = result['testcase_params']['chain_mask']
            num_streams = result['testcase_params']['num_streams']
            line_id = (chain_mask, num_streams)
            if test_id not in testclass_results_dict:
                testclass_results_dict[test_id] = collections.OrderedDict()
            if line_id not in testclass_results_dict[test_id]:
                testclass_results_dict[test_id][line_id] = {
                    'orientation': [],
                    'sensitivity': []
                }
            orientation = result['testcase_params']['orientation']
            if result['peak_throughput_pct'] >= 95:
                sensitivity = result['sensitivity']
            else:
                sensitivity = float('nan')
            if orientation not in testclass_results_dict[test_id][line_id][
                    'orientation']:
                testclass_results_dict[test_id][line_id]['orientation'].append(
                    orientation)
                testclass_results_dict[test_id][line_id]['sensitivity'].append(
                    sensitivity)
            else:
                testclass_results_dict[test_id][line_id]['sensitivity'][
                    -1] = sensitivity

        for test_id, test_data in testclass_results_dict.items():
            test_id_dict = dict(test_id)
            if 'legacy' in test_id_dict['mode']:
                test_id_str = 'Channel {} - {} {}Mbps'.format(
                    test_id_dict['channel'], test_id_dict['mode'],
                    test_id_dict['rate'])
            else:
                test_id_str = 'Channel {} - {} MCS{}'.format(
                    test_id_dict['channel'], test_id_dict['mode'],
                    test_id_dict['rate'])
            curr_plot = wputils.BokehFigure(
                title=str(test_id_str),
                x_label='Orientation (deg)',
                primary_y_label='Sensitivity (dBm)')
            for line_id, line_results in test_data.items():
                curr_plot.add_line(line_results['orientation'],
                                   line_results['sensitivity'],
                                   legend='Nss{} - Chain Mask {}'.format(
                                       line_id[1], line_id[0]),
                                   marker='circle')
                if 'legacy' in test_id_dict['mode']:
                    metric_tag = 'ota_summary_ch{}_{}_{}_ch{}'.format(
                        test_id_dict['channel'], test_id_dict['mode'],
                        test_id_dict['rate'], line_id[0])
                else:
                    metric_tag = 'ota_summary_ch{}_{}_mcs{}_nss{}_ch{}'.format(
                        test_id_dict['channel'], test_id_dict['mode'],
                        test_id_dict['rate'], line_id[1], line_id[0])

                metric_name = metric_tag + '.avg_sensitivity'
                metric_value = numpy.nanmean(line_results['sensitivity'])
                self.testclass_metric_logger.add_metric(
                    metric_name, metric_value)
                self.log.info(('Average Sensitivity for {}: {:.1f}').format(
                    metric_tag, metric_value))
            current_context = (
                context.get_current_context().get_full_output_path())
            output_file_path = os.path.join(current_context,
                                            str(test_id_str) + '.html')
            curr_plot.generate_figure(output_file_path)
            plots.append(curr_plot)
        output_file_path = os.path.join(current_context, 'results.html')
        wputils.BokehFigure.save_figures(plots, output_file_path)

    def get_start_atten(self, testcase_params):
        """Gets the starting attenuation for this sensitivity test.

        The function gets the starting attenuation by checking whether a test
        at the same rate configuration has executed. If so it sets the starting
        point a configurable number of dBs below the reference test.

        Returns:
            start_atten: starting attenuation for current test
        """
        # If the test is being retried, start from the beginning
        if self.retry_flag:
            self.log.info('Retry flag set. Setting attenuation to minimum.')
            return self.testclass_params['atten_start']
        # Get the current and reference test config. The reference test is the
        # one performed at the current MCS+1
        ref_test_params = self.extract_test_id(
            testcase_params,
            ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
        # Check if reference test has been run and set attenuation accordingly
        previous_params = [
            self.extract_test_id(
                result['testcase_params'],
                ['channel', 'mode', 'rate', 'num_streams', 'chain_mask'])
            for result in self.testclass_results
        ]
        try:
            ref_index = previous_params[::-1].index(ref_test_params)
            ref_index = len(previous_params) - 1 - ref_index
            start_atten = self.testclass_results[ref_index][
                'atten_at_range'] - (
                    self.testclass_params['adjacent_mcs_range_gap'])
        except ValueError:
            print('Reference test not found. Starting from {} dB'.format(
                self.testclass_params['atten_start']))
            start_atten = self.testclass_params['atten_start']
        start_atten = max(start_atten, 0)
        return start_atten

    def generate_test_cases(self, channels, modes, requested_rates, chain_mask,
                            angles):
        """Function that auto-generates test cases for a test class."""
        test_cases = []
        for channel in channels:
            requested_modes = [
                mode for mode in modes
                if mode in self.VALID_TEST_CONFIGS[channel]
            ]
            for chain, mode in itertools.product(chain_mask, requested_modes):
                bandwidth = int(''.join([x for x in mode if x.isdigit()]))
                if 'VHT' in mode:
                    valid_rates = self.VALID_RATES[mode]
                elif 'HT' in mode:
                    valid_rates = self.VALID_RATES[mode]
                elif 'legacy' in mode and channel < 14:
                    valid_rates = self.VALID_RATES['legacy_2GHz']
                elif 'legacy' in mode and channel > 14:
                    valid_rates = self.VALID_RATES['legacy_5GHz']
                else:
                    raise ValueError('Invalid test mode.')
                for rate, angle in itertools.product(valid_rates, angles):
                    testcase_params = collections.OrderedDict(
                        channel=channel,
                        mode=mode,
                        bandwidth=bandwidth,
                        rate=rate.mcs,
                        num_streams=rate.streams,
                        short_gi=1,
                        chain_mask=chain,
                        orientation=angle)
                    if rate not in requested_rates:
                        continue
                    if str(chain) in ['0', '1'] and rate[1] == 2:
                        # Do not test 2-stream rates in single chain mode
                        continue
                    if 'legacy' in mode:
                        testcase_name = ('test_sensitivity_ch{}_{}_{}_nss{}'
                                         '_ch{}_{}deg'.format(
                                             channel, mode,
                                             str(rate.mcs).replace('.', 'p'),
                                             rate.streams, chain, angle))
                    else:
                        testcase_name = ('test_sensitivity_ch{}_{}_mcs{}_nss{}'
                                         '_ch{}_{}deg'.format(
                                             channel, mode, rate.mcs,
                                             rate.streams, chain, angle))
                    setattr(self, testcase_name,
                            partial(self._test_sensitivity, testcase_params))
                    test_cases.append(testcase_name)
        return test_cases


class WifiOtaSensitivity_TenDegree_Test(WifiOtaSensitivityTest):
    def __init__(self, controllers):
        WifiOtaSensitivityTest.__init__(self, controllers)
        requested_channels = [6, 36, 149]
        requested_rates = [
            self.RateTuple(8, 1, 86.7),
            self.RateTuple(2, 1, 21.7),
            self.RateTuple(8, 2, 173.3),
            self.RateTuple(2, 2, 43.3)
        ]
        self.tests = self.generate_test_cases(requested_channels,
                                              ['VHT20', 'VHT80'],
                                              requested_rates, ['2x2'],
                                              list(range(0, 360, 10)))


class WifiOtaSensitivity_PerChain_TenDegree_Test(WifiOtaSensitivityTest):
    def __init__(self, controllers):
        WifiOtaSensitivityTest.__init__(self, controllers)
        requested_channels = [6, 36, 149]
        requested_rates = [
            self.RateTuple(2, 1, 21.7),
            self.RateTuple(2, 2, 43.3)
        ]
        self.tests = self.generate_test_cases(requested_channels, ['VHT20'],
                                              requested_rates,
                                              ['0', '1', '2x2'],
                                              list(range(0, 360, 10)))


class WifiOtaSensitivity_ThirtyDegree_Test(WifiOtaSensitivityTest):
    def __init__(self, controllers):
        WifiOtaSensitivityTest.__init__(self, controllers)
        requested_channels = [6, 36, 149]
        requested_rates = [
            self.RateTuple(9, 1, 96),
            self.RateTuple(8, 1, 86.7),
            self.RateTuple(7, 1, 72.2),
            self.RateTuple(4, 1, 43.3),
            self.RateTuple(2, 1, 21.7),
            self.RateTuple(0, 1, 7.2),
            self.RateTuple(9, 2, 192),
            self.RateTuple(8, 2, 173.3),
            self.RateTuple(7, 2, 144.4),
            self.RateTuple(4, 2, 86.7),
            self.RateTuple(2, 2, 43.3),
            self.RateTuple(0, 2, 14.4)
        ]
        self.tests = self.generate_test_cases(requested_channels,
                                              ['VHT20', 'VHT80'],
                                              requested_rates, ['2x2'],
                                              list(range(0, 360, 30)))


class WifiOtaSensitivity_45Degree_Test(WifiOtaSensitivityTest):
    def __init__(self, controllers):
        WifiOtaSensitivityTest.__init__(self, controllers)
        requested_rates = [
            self.RateTuple(8, 1, 86.7),
            self.RateTuple(2, 1, 21.7),
            self.RateTuple(8, 2, 173.3),
            self.RateTuple(2, 2, 43.3)
        ]
        self.tests = self.generate_test_cases(
            [1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161], ['VHT20', 'VHT80'],
            requested_rates, ['2x2'], list(range(0, 360, 45)))
