blob: 59721caa785af69232f3674a5c45b90df854955a [file] [log] [blame]
Qia2d77742019-08-21 09:25:28 -07001#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Stream music through connected device from phone across different
17attenuations."""
18
19import random
20import time
21from acts.signals import TestFailure
Xianyuan Jia63751fb2020-11-17 00:07:40 +000022from acts_contrib.test_utils.bt.BtInterferenceBaseTest import BtInterferenceBaseTest
23from acts_contrib.test_utils.bt.BtInterferenceBaseTest import get_iperf_results
24from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation
Qia2d77742019-08-21 09:25:28 -070025from multiprocessing import Process, Queue
26
27DEFAULT_THDN_THRESHOLD = 0.9
28MAX_ATTENUATION = 95
29TIME_OVERHEAD = 2
30
31
32class BtInterferenceDynamicTest(BtInterferenceBaseTest):
33 def __init__(self, configs):
34 super().__init__(configs)
35 self.iperf_duration = self.audio_params['duration'] + TIME_OVERHEAD
36 self.wait_for_interference = self.dynamic_wifi_interference[
37 'waittime_to_inject_interference']
38 self.channel_change_interval = self.dynamic_wifi_interference[
39 'channel_change_interval_second']
40 self.interference_channels = self.dynamic_wifi_interference[
41 'two_hoppable_channels'] + self.dynamic_wifi_interference[
42 'one_hoppable_channel']
43
44 self.bt_signal_levels = list(
45 self.dynamic_wifi_interference['bt_signal_level'].keys())
46 self.wifi_int_levels = list(
47 self.dynamic_wifi_interference['interference_level'].keys())
48 self.bt_atten_levels = list(
49 self.dynamic_wifi_interference['bt_signal_level'].values())
50 self.wifi_atten_levels = list(
51 self.dynamic_wifi_interference['interference_level'].values())
52 for bt_level in self.bt_signal_levels:
53 bt_atten_level = self.dynamic_wifi_interference['bt_signal_level'][
54 bt_level]
55 for wifi_level in self.wifi_int_levels:
56 interference_atten_level = self.dynamic_wifi_interference[
57 'interference_level'][wifi_level]
58 self.generate_test_case_randomchange(
59 bt_atten_level, interference_atten_level,
60 self.channel_change_interval)
61 for channels in self.interference_channels:
62 self.generate_test_case(bt_atten_level,
63 interference_atten_level, channels)
64
65 def generate_test_case(self, bt_atten_level, interference_atten_level,
66 dynamic_channels):
67 """Function to generate test cases with different parameters.
68 Args:
69 bt_atten_level: bt path attenuation level
70 interference_atten_level: wifi interference path attenuation level
71 channels: wifi interference channel or channel combination
72 """
73 def test_case_fn():
74 self.bt_afh_with_dynamic_interference(bt_atten_level,
75 interference_atten_level,
76 dynamic_channels)
77
78 bt_signal_level = self.bt_signal_levels[self.bt_atten_levels.index(
79 bt_atten_level)]
80 wifi_int_level = self.wifi_int_levels[self.wifi_atten_levels.index(
81 interference_atten_level)]
82 interference_chans_before = dynamic_channels[0]
83 interference_chans_after = dynamic_channels[1]
84 chans_before_str = 'channel_'
85 chans_after_str = 'channel_'
86 if 0 in interference_chans_before:
87 chans_before_str = 'no_interference'
88 else:
89 for i in interference_chans_before:
90 chans_before_str = chans_before_str + str(i) + '_'
91 for i in interference_chans_after:
92 chans_after_str = chans_after_str + str(i) + '_'
93 test_case_name = ('test_bt_afh_from_{}to_{}bt_signal_level_{}_'
94 'interference_level_{}'.format(
95 chans_before_str, chans_after_str,
96 bt_signal_level, wifi_int_level))
97 setattr(self, test_case_name, test_case_fn)
98
99 def generate_test_case_randomchange(self, bt_atten_level,
100 interference_atten_level, interval):
101 def test_case_fn():
102 self.bt_afh_with_fast_changing_interference(
103 bt_atten_level, interference_atten_level, interval)
104
105 bt_signal_level = self.bt_signal_levels[self.bt_atten_levels.index(
106 bt_atten_level)]
107 wifi_int_level = self.wifi_atten_levels[self.wifi_atten_levels.index(
108 interference_atten_level)]
109 test_case_name = ('test_bt_afh_with_random_channel_interference_bt'
110 '_signal_level_{}_interference_level_{}'.format(
111 bt_signal_level, wifi_int_level))
112 setattr(self, test_case_name, test_case_fn)
113
114 def interference_rssi_mapping_from_attenuation(self, interference_level):
115 """Function to get wifi rssi-to-interference level mapping
116 Args:
117 interference_level: interference level in terms of attenuation
118 """
119 self.log.info('Get WiFi RSSI at the desired attenuation level')
120 for obj in self.wifi_int_pairs:
121 obj.attenuator.set_atten(interference_level)
122 self.get_interference_rssi()
123
124 def get_rssi_at_channel(self, channel):
125 """Function to get wifi rssi-to-interference level at each channel
126 Args:
127 channel: the channel to query the rssi
128 Returns:
129 rssi: wifi rssi at the queried channel
130 """
131 for item in self.interference_rssi:
132 if item['chan'] == channel:
133 rssi = item['rssi']
134 return rssi
135
136 def inject_dynamic_wifi_interference(self, interference_level,
137 interference_channels, time_wait):
138 """Function to inject dynamic wifi interference to bt link.
139 Args:
140 interference_level: signal strength of interference, represented
141 by attenuation level
142 interference_channels: interference channel for before and after,
143 e.g. [chans_before, chans_after]
144 time_wait: time wait to inject new interference
145 """
146 all_pair = range(len(self.wifi_int_pairs))
147 #List of channels before and after changing the interference
148 interference_chans_before = interference_channels[0]
149 interference_chans_after = interference_channels[1]
150 #Set existing wifi interference attenuation level
151 if 0 not in interference_chans_before:
152 interference_pair_indices_before = self.locate_interference_pair_by_channel(
153 interference_chans_before)
154 inactive_interference_pair_indices_before = [
155 item for item in all_pair
156 if item not in interference_pair_indices_before
157 ]
158 self.log.info(
159 'Set pre-existing interference before A2DP streaming')
160 for i in interference_pair_indices_before:
161 self.log.info(
162 'Set {} dB on attenuator {}, wifi rssi {} dBm at chan {}'.
163 format(
164 interference_level, i + 1,
165 self.get_rssi_at_channel(
166 self.wifi_int_pairs[i].channel),
167 self.wifi_int_pairs[i].channel))
168 self.wifi_int_pairs[i].attenuator.set_atten(interference_level)
169 for i in inactive_interference_pair_indices_before:
170 self.log.info('Set attenuation {} dB on attenuator {}'.format(
171 MAX_ATTENUATION, i + 1))
172 self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
173 ##Debug_purpose
174 for i in self.attenuators:
175 self.log.info(i.get_atten())
176
177 #Set after change wifi interference attenuation level
178 interference_pair_indices_after = self.locate_interference_pair_by_channel(
179 interference_chans_after)
180 inactive_interference_pair_indices_after = [
181 item for item in all_pair
182 if item not in interference_pair_indices_after
183 ]
184 #Wait for time_wait second to inject new interference
185 time.sleep(time_wait)
186 self.log.info('Inject new interference during A2DP streaming')
187 for i in interference_pair_indices_after:
188 self.log.info(
189 'Set {} dB on attenuator {}, with wifi rssi {} dBm at chan {}'.
190 format(
191 interference_level, i + 1,
192 self.get_rssi_at_channel(self.wifi_int_pairs[i].channel),
193 self.wifi_int_pairs[i].channel))
194 self.wifi_int_pairs[i].attenuator.set_atten(interference_level)
195 for i in inactive_interference_pair_indices_after:
196 self.log.info('Set attenuation {} dB on attenuator {}'.format(
197 MAX_ATTENUATION, i + 1))
198 self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
199 ##Debug_purpose
200 for i in self.attenuators:
201 self.log.info('Attenuator {} set to {} dB'.format(
202 self.attenuators.index(i) + 1, i.get_atten()))
203 self.log.info('Dymanic inteference injected')
204
205 def inject_fast_changing_wifi_interference(self, interference_level,
206 interval):
207 """Function to inject changing wifi interference one channel a time.
208 Args:
209 interference_level: signal strength of interference, represented
210 by attenuation level
211 interval: interval between channel changes
212 """
213 all_pair = range(len(self.wifi_int_pairs))
214 #Set initial WiFi interference at channel 1
215 self.log.info('Start with interference at channel 1')
216 self.wifi_int_pairs[0].attenuator.set_atten(interference_level)
217 self.wifi_int_pairs[1].attenuator.set_atten(MAX_ATTENUATION)
218 self.wifi_int_pairs[2].attenuator.set_atten(MAX_ATTENUATION)
219 current_int_pair = [0]
220 inactive_int_pairs = [
221 item for item in all_pair if item not in current_int_pair
222 ]
223 time.sleep(interval)
224 #Inject randomlized channel wifi interference
225 self.log.info(
226 'Inject random changing channel (1,6,11) wifi interference'
227 'every {} second'.format(interval))
228 while True:
229 current_int_pair = [
230 random.randint(inactive_int_pairs[0], inactive_int_pairs[1])
231 ]
232 inactive_int_pairs = [
233 item for item in all_pair if item not in current_int_pair
234 ]
235 self.wifi_int_pairs[current_int_pair[0]].attenuator.set_atten(
236 interference_level)
237 self.log.info(
238 'Current interference {} at channel {} with rssi {} dBm'.
239 format(
240 interference_level,
241 self.wifi_int_pairs[current_int_pair[0]].channel,
242 self.get_rssi_at_channel(
243 self.wifi_int_pairs[current_int_pair[0]].channel)))
244 for i in inactive_int_pairs:
245 self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
246 ##Debug_purpose
247 for i in self.attenuators:
248 self.log.info('Attenuator {} set to {} dB'.format(
249 self.attenuators.index(i) + 1, i.get_atten()))
250 time.sleep(interval)
251
252 def bt_afh_with_dynamic_interference(self, bt_atten_level,
253 interference_atten_level,
254 dynamic_channels):
255 """Run a2dp audio quality with dynamic interference added.
256 Args:
257 bt_atten_level: signal level of bt in terms of attenuation
258 interference_atten_level: interference level in terms of attenuation
259 dynamic_channels: interference channels before and after
260 """
261 ramp_attenuation(self.attenuator, bt_atten_level)
262 self.interference_rssi_mapping_from_attenuation(
263 interference_atten_level)
264 [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics()
265 tag_bt = 'bt_signal_level_{}_rssi_{}_dBm'.format(
266 bt_atten_level, rssi_master)
267 procs_iperf = []
268 for obj in self.wifi_int_pairs:
269 obj.iperf_server.start()
270 iperf_args = '-i 1 -t {} -p {} -J -R'.format(
271 self.iperf_duration, obj.iperf_server.port)
272 tag = 'chan_{}'.format(obj.channel)
273 proc_iperf = Process(target=obj.iperf_client.start,
274 args=(obj.server_address, iperf_args, tag))
275 proc_iperf.start()
276 procs_iperf.append(proc_iperf)
277 self.log.info('Start IPERF on all three channels')
278 queue = Queue()
279 proc_bt_audio = Process(target=self.play_and_record_audio,
280 args=(self.audio_params['duration'], queue))
281 proc_interference = Process(
282 target=self.inject_dynamic_wifi_interference,
283 args=(interference_atten_level, dynamic_channels,
284 self.wait_for_interference))
285 proc_bt_audio.start()
286 proc_interference.start()
287 proc_bt_audio.join()
288 proc_interference.join()
289 for proc in procs_iperf:
290 proc.join()
291 for obj in self.wifi_int_pairs:
292 iperf_throughput = get_iperf_results(obj.iperf_server)
293 self.log.info(
294 'Throughput for channel {} interference is {} Mbps'.format(
295 obj.channel, iperf_throughput))
296 obj.iperf_server.stop()
297 audio_captured = queue.get()
298 thdns = self.run_thdn_analysis(audio_captured, tag_bt)
299 self.log.info('THDN results are {}'.format(thdns))
300 for thdn in thdns:
301 if thdn >= self.audio_params['thdn_threshold']:
302 raise TestFailure('AFH failed')
303
304 def bt_afh_with_fast_changing_interference(self, bt_atten_level,
305 interference_atten_level,
306 interval):
307 """Run a2dp audio quality with random channel fast changing interference
308 Args:
309 bt_signale_level: signal level of bt in terms of attenuation
310 interference_level: interference level in terms of attenuation
311 interval: interval between channel changes
312 """
313 ramp_attenuation(self.attenuator, bt_atten_level)
314 self.interference_rssi_mapping_from_attenuation(
315 interference_atten_level)
316 [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics()
317 tag_bt = 'bt_signal_level_{}_rssi_{}_dBm'.format(
318 bt_atten_level, rssi_master)
319 procs_iperf = []
320 #Start IPERF on all three interference pairs
321 for obj in self.wifi_int_pairs:
322 obj.iperf_server.start()
323 iperf_args = '-i 1 -t {} -p {} -J -R'.format(
324 self.iperf_duration, obj.iperf_server.port)
325 tag = 'chan_{}'.format(obj.channel)
326 proc_iperf = Process(target=obj.iperf_client.start,
327 args=(obj.server_address, iperf_args, tag))
328 proc_iperf.start()
329 procs_iperf.append(proc_iperf)
330 self.log.info('Start IPERF on all three channels')
331 queue = Queue()
332 proc_bt_audio = Process(target=self.play_and_record_audio,
333 args=(self.audio_params['duration'], queue))
334 proc_interference = Process(
335 target=self.inject_fast_changing_wifi_interference,
336 args=(interference_atten_level, interval))
337 proc_bt_audio.start()
338 proc_interference.start()
339 proc_bt_audio.join()
340 while proc_bt_audio.is_alive():
341 continue
342 proc_interference.terminate()
343 proc_interference.join()
344 for proc in procs_iperf:
345 proc.join()
346 for obj in self.wifi_int_pairs:
347 iperf_throughput = get_iperf_results(obj.iperf_server)
348 self.log.info(
349 'Throughput for channel {} interference is {} Mbps'.format(
350 obj.channel, iperf_throughput))
351 obj.iperf_server.stop()
352 audio_captured = queue.get()
353 thdns = self.run_thdn_analysis(audio_captured, tag_bt)
354 self.log.info('THDN results are {}'.format(thdns))
355 for thdn in thdns:
356 if thdn >= self.audio_params['thdn_threshold']:
357 raise TestFailure('AFH failed')