BT with WiFi interference Test

Add test suite to cover BT tests under WiFi interference. The suite
includes a base test class for all interference type tests and test
classes for Static and Dynamic Interference tests.

Bug: None
Test: Done

Change-Id: Ib432f55e3a7482d5d52fed14982245563df36e10
Signed-off-by: Qi <qijiang@google.com>
diff --git a/acts_tests/tests/google/bt/performance/BtInterferenceDynamicTest.py b/acts_tests/tests/google/bt/performance/BtInterferenceDynamicTest.py
new file mode 100644
index 0000000..664121d
--- /dev/null
+++ b/acts_tests/tests/google/bt/performance/BtInterferenceDynamicTest.py
@@ -0,0 +1,357 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2019 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Stream music through connected device from phone across different
+attenuations."""
+
+import random
+import time
+from acts.signals import TestFailure
+from acts.test_utils.bt.BtInterferenceBaseTest import BtInterferenceBaseTest
+from acts.test_utils.bt.BtInterferenceBaseTest import get_iperf_results
+from acts.test_utils.power.PowerBTBaseTest import ramp_attenuation
+from multiprocessing import Process, Queue
+
+DEFAULT_THDN_THRESHOLD = 0.9
+MAX_ATTENUATION = 95
+TIME_OVERHEAD = 2
+
+
+class BtInterferenceDynamicTest(BtInterferenceBaseTest):
+    def __init__(self, configs):
+        super().__init__(configs)
+        self.iperf_duration = self.audio_params['duration'] + TIME_OVERHEAD
+        self.wait_for_interference = self.dynamic_wifi_interference[
+            'waittime_to_inject_interference']
+        self.channel_change_interval = self.dynamic_wifi_interference[
+            'channel_change_interval_second']
+        self.interference_channels = self.dynamic_wifi_interference[
+            'two_hoppable_channels'] + self.dynamic_wifi_interference[
+                'one_hoppable_channel']
+
+        self.bt_signal_levels = list(
+            self.dynamic_wifi_interference['bt_signal_level'].keys())
+        self.wifi_int_levels = list(
+            self.dynamic_wifi_interference['interference_level'].keys())
+        self.bt_atten_levels = list(
+            self.dynamic_wifi_interference['bt_signal_level'].values())
+        self.wifi_atten_levels = list(
+            self.dynamic_wifi_interference['interference_level'].values())
+        for bt_level in self.bt_signal_levels:
+            bt_atten_level = self.dynamic_wifi_interference['bt_signal_level'][
+                bt_level]
+            for wifi_level in self.wifi_int_levels:
+                interference_atten_level = self.dynamic_wifi_interference[
+                    'interference_level'][wifi_level]
+                self.generate_test_case_randomchange(
+                    bt_atten_level, interference_atten_level,
+                    self.channel_change_interval)
+                for channels in self.interference_channels:
+                    self.generate_test_case(bt_atten_level,
+                                            interference_atten_level, channels)
+
+    def generate_test_case(self, bt_atten_level, interference_atten_level,
+                           dynamic_channels):
+        """Function to generate test cases with different parameters.
+        Args:
+            bt_atten_level: bt path attenuation level
+            interference_atten_level: wifi interference path attenuation level
+            channels: wifi interference channel or channel combination
+        """
+        def test_case_fn():
+            self.bt_afh_with_dynamic_interference(bt_atten_level,
+                                                  interference_atten_level,
+                                                  dynamic_channels)
+
+        bt_signal_level = self.bt_signal_levels[self.bt_atten_levels.index(
+            bt_atten_level)]
+        wifi_int_level = self.wifi_int_levels[self.wifi_atten_levels.index(
+            interference_atten_level)]
+        interference_chans_before = dynamic_channels[0]
+        interference_chans_after = dynamic_channels[1]
+        chans_before_str = 'channel_'
+        chans_after_str = 'channel_'
+        if 0 in interference_chans_before:
+            chans_before_str = 'no_interference'
+        else:
+            for i in interference_chans_before:
+                chans_before_str = chans_before_str + str(i) + '_'
+        for i in interference_chans_after:
+            chans_after_str = chans_after_str + str(i) + '_'
+        test_case_name = ('test_bt_afh_from_{}to_{}bt_signal_level_{}_'
+                          'interference_level_{}'.format(
+                              chans_before_str, chans_after_str,
+                              bt_signal_level, wifi_int_level))
+        setattr(self, test_case_name, test_case_fn)
+
+    def generate_test_case_randomchange(self, bt_atten_level,
+                                        interference_atten_level, interval):
+        def test_case_fn():
+            self.bt_afh_with_fast_changing_interference(
+                bt_atten_level, interference_atten_level, interval)
+
+        bt_signal_level = self.bt_signal_levels[self.bt_atten_levels.index(
+            bt_atten_level)]
+        wifi_int_level = self.wifi_atten_levels[self.wifi_atten_levels.index(
+            interference_atten_level)]
+        test_case_name = ('test_bt_afh_with_random_channel_interference_bt'
+                          '_signal_level_{}_interference_level_{}'.format(
+                              bt_signal_level, wifi_int_level))
+        setattr(self, test_case_name, test_case_fn)
+
+    def interference_rssi_mapping_from_attenuation(self, interference_level):
+        """Function to get wifi rssi-to-interference level mapping
+        Args:
+            interference_level: interference level in terms of attenuation
+        """
+        self.log.info('Get WiFi RSSI at the desired attenuation level')
+        for obj in self.wifi_int_pairs:
+            obj.attenuator.set_atten(interference_level)
+        self.get_interference_rssi()
+
+    def get_rssi_at_channel(self, channel):
+        """Function to get wifi rssi-to-interference level at each channel
+        Args:
+            channel: the channel to query the rssi
+        Returns:
+            rssi: wifi rssi at the queried channel
+        """
+        for item in self.interference_rssi:
+            if item['chan'] == channel:
+                rssi = item['rssi']
+        return rssi
+
+    def inject_dynamic_wifi_interference(self, interference_level,
+                                         interference_channels, time_wait):
+        """Function to inject dynamic wifi interference to bt link.
+        Args:
+            interference_level: signal strength of interference, represented
+                by attenuation level
+            interference_channels: interference channel for before and after,
+                e.g. [chans_before, chans_after]
+            time_wait: time wait to inject new interference
+        """
+        all_pair = range(len(self.wifi_int_pairs))
+        #List of channels before and after changing the interference
+        interference_chans_before = interference_channels[0]
+        interference_chans_after = interference_channels[1]
+        #Set existing wifi interference attenuation level
+        if 0 not in interference_chans_before:
+            interference_pair_indices_before = self.locate_interference_pair_by_channel(
+                interference_chans_before)
+            inactive_interference_pair_indices_before = [
+                item for item in all_pair
+                if item not in interference_pair_indices_before
+            ]
+            self.log.info(
+                'Set pre-existing interference before A2DP streaming')
+            for i in interference_pair_indices_before:
+                self.log.info(
+                    'Set {} dB on attenuator {}, wifi rssi {} dBm at chan {}'.
+                    format(
+                        interference_level, i + 1,
+                        self.get_rssi_at_channel(
+                            self.wifi_int_pairs[i].channel),
+                        self.wifi_int_pairs[i].channel))
+                self.wifi_int_pairs[i].attenuator.set_atten(interference_level)
+            for i in inactive_interference_pair_indices_before:
+                self.log.info('Set attenuation {} dB on attenuator {}'.format(
+                    MAX_ATTENUATION, i + 1))
+                self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
+        ##Debug_purpose
+        for i in self.attenuators:
+            self.log.info(i.get_atten())
+
+        #Set after change wifi interference attenuation level
+        interference_pair_indices_after = self.locate_interference_pair_by_channel(
+            interference_chans_after)
+        inactive_interference_pair_indices_after = [
+            item for item in all_pair
+            if item not in interference_pair_indices_after
+        ]
+        #Wait for time_wait second to inject new interference
+        time.sleep(time_wait)
+        self.log.info('Inject new interference during A2DP streaming')
+        for i in interference_pair_indices_after:
+            self.log.info(
+                'Set {} dB on attenuator {}, with wifi rssi {} dBm at chan {}'.
+                format(
+                    interference_level, i + 1,
+                    self.get_rssi_at_channel(self.wifi_int_pairs[i].channel),
+                    self.wifi_int_pairs[i].channel))
+            self.wifi_int_pairs[i].attenuator.set_atten(interference_level)
+        for i in inactive_interference_pair_indices_after:
+            self.log.info('Set attenuation {} dB on attenuator {}'.format(
+                MAX_ATTENUATION, i + 1))
+            self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
+        ##Debug_purpose
+        for i in self.attenuators:
+            self.log.info('Attenuator {} set to {} dB'.format(
+                self.attenuators.index(i) + 1, i.get_atten()))
+        self.log.info('Dymanic inteference injected')
+
+    def inject_fast_changing_wifi_interference(self, interference_level,
+                                               interval):
+        """Function to inject changing wifi interference one channel a time.
+        Args:
+            interference_level: signal strength of interference, represented
+                by attenuation level
+            interval: interval between channel changes
+        """
+        all_pair = range(len(self.wifi_int_pairs))
+        #Set initial WiFi interference at channel 1
+        self.log.info('Start with interference at channel 1')
+        self.wifi_int_pairs[0].attenuator.set_atten(interference_level)
+        self.wifi_int_pairs[1].attenuator.set_atten(MAX_ATTENUATION)
+        self.wifi_int_pairs[2].attenuator.set_atten(MAX_ATTENUATION)
+        current_int_pair = [0]
+        inactive_int_pairs = [
+            item for item in all_pair if item not in current_int_pair
+        ]
+        time.sleep(interval)
+        #Inject randomlized channel wifi interference
+        self.log.info(
+            'Inject random changing channel (1,6,11) wifi interference'
+            'every {} second'.format(interval))
+        while True:
+            current_int_pair = [
+                random.randint(inactive_int_pairs[0], inactive_int_pairs[1])
+            ]
+            inactive_int_pairs = [
+                item for item in all_pair if item not in current_int_pair
+            ]
+            self.wifi_int_pairs[current_int_pair[0]].attenuator.set_atten(
+                interference_level)
+            self.log.info(
+                'Current interference {} at channel {} with rssi {} dBm'.
+                format(
+                    interference_level,
+                    self.wifi_int_pairs[current_int_pair[0]].channel,
+                    self.get_rssi_at_channel(
+                        self.wifi_int_pairs[current_int_pair[0]].channel)))
+            for i in inactive_int_pairs:
+                self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
+            ##Debug_purpose
+            for i in self.attenuators:
+                self.log.info('Attenuator {} set to {} dB'.format(
+                    self.attenuators.index(i) + 1, i.get_atten()))
+            time.sleep(interval)
+
+    def bt_afh_with_dynamic_interference(self, bt_atten_level,
+                                         interference_atten_level,
+                                         dynamic_channels):
+        """Run a2dp audio quality with dynamic interference added.
+        Args:
+            bt_atten_level: signal level of bt in terms of attenuation
+            interference_atten_level: interference level in terms of attenuation
+            dynamic_channels: interference channels before and after
+        """
+        ramp_attenuation(self.attenuator, bt_atten_level)
+        self.interference_rssi_mapping_from_attenuation(
+            interference_atten_level)
+        [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics()
+        tag_bt = 'bt_signal_level_{}_rssi_{}_dBm'.format(
+            bt_atten_level, rssi_master)
+        procs_iperf = []
+        for obj in self.wifi_int_pairs:
+            obj.iperf_server.start()
+            iperf_args = '-i 1 -t {} -p {} -J -R'.format(
+                self.iperf_duration, obj.iperf_server.port)
+            tag = 'chan_{}'.format(obj.channel)
+            proc_iperf = Process(target=obj.iperf_client.start,
+                                 args=(obj.server_address, iperf_args, tag))
+            proc_iperf.start()
+            procs_iperf.append(proc_iperf)
+        self.log.info('Start IPERF on all three channels')
+        queue = Queue()
+        proc_bt_audio = Process(target=self.play_and_record_audio,
+                                args=(self.audio_params['duration'], queue))
+        proc_interference = Process(
+            target=self.inject_dynamic_wifi_interference,
+            args=(interference_atten_level, dynamic_channels,
+                  self.wait_for_interference))
+        proc_bt_audio.start()
+        proc_interference.start()
+        proc_bt_audio.join()
+        proc_interference.join()
+        for proc in procs_iperf:
+            proc.join()
+        for obj in self.wifi_int_pairs:
+            iperf_throughput = get_iperf_results(obj.iperf_server)
+            self.log.info(
+                'Throughput for channel {} interference is {} Mbps'.format(
+                    obj.channel, iperf_throughput))
+            obj.iperf_server.stop()
+        audio_captured = queue.get()
+        thdns = self.run_thdn_analysis(audio_captured, tag_bt)
+        self.log.info('THDN results are {}'.format(thdns))
+        for thdn in thdns:
+            if thdn >= self.audio_params['thdn_threshold']:
+                raise TestFailure('AFH failed')
+
+    def bt_afh_with_fast_changing_interference(self, bt_atten_level,
+                                               interference_atten_level,
+                                               interval):
+        """Run a2dp audio quality with random channel fast changing interference
+        Args:
+            bt_signale_level: signal level of bt in terms of attenuation
+            interference_level: interference level in terms of attenuation
+            interval: interval between channel changes
+        """
+        ramp_attenuation(self.attenuator, bt_atten_level)
+        self.interference_rssi_mapping_from_attenuation(
+            interference_atten_level)
+        [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics()
+        tag_bt = 'bt_signal_level_{}_rssi_{}_dBm'.format(
+            bt_atten_level, rssi_master)
+        procs_iperf = []
+        #Start IPERF on all three interference pairs
+        for obj in self.wifi_int_pairs:
+            obj.iperf_server.start()
+            iperf_args = '-i 1 -t {} -p {} -J -R'.format(
+                self.iperf_duration, obj.iperf_server.port)
+            tag = 'chan_{}'.format(obj.channel)
+            proc_iperf = Process(target=obj.iperf_client.start,
+                                 args=(obj.server_address, iperf_args, tag))
+            proc_iperf.start()
+            procs_iperf.append(proc_iperf)
+        self.log.info('Start IPERF on all three channels')
+        queue = Queue()
+        proc_bt_audio = Process(target=self.play_and_record_audio,
+                                args=(self.audio_params['duration'], queue))
+        proc_interference = Process(
+            target=self.inject_fast_changing_wifi_interference,
+            args=(interference_atten_level, interval))
+        proc_bt_audio.start()
+        proc_interference.start()
+        proc_bt_audio.join()
+        while proc_bt_audio.is_alive():
+            continue
+        proc_interference.terminate()
+        proc_interference.join()
+        for proc in procs_iperf:
+            proc.join()
+        for obj in self.wifi_int_pairs:
+            iperf_throughput = get_iperf_results(obj.iperf_server)
+            self.log.info(
+                'Throughput for channel {} interference is {} Mbps'.format(
+                    obj.channel, iperf_throughput))
+            obj.iperf_server.stop()
+        audio_captured = queue.get()
+        thdns = self.run_thdn_analysis(audio_captured, tag_bt)
+        self.log.info('THDN results are {}'.format(thdns))
+        for thdn in thdns:
+            if thdn >= self.audio_params['thdn_threshold']:
+                raise TestFailure('AFH failed')