blob: b41e6628c2b45a6d548deef7f866c9dabad82b63 [file] [log] [blame]
markdrdd1893d2018-02-05 17:13:47 -08001#!/usr/bin/env python3
tturney91191de2017-11-16 15:07:17 -08002#
3# Copyright 2017 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
Mark De Ruytera629a162020-01-09 20:21:01 -080017# Note: This test has been labelled as an integration test due to its use of
18# real data, and the five to six second execution time.
tturney91191de2017-11-16 15:07:17 -080019import logging
20import numpy
21import os
22import unittest
23
Mark De Ruytera629a162020-01-09 20:21:01 -080024# TODO(markdr): Remove this after soundfile is added to setup.py
25import sys
26import mock
27sys.modules['soundfile'] = mock.Mock()
28
tturney91191de2017-11-16 15:07:17 -080029import acts.test_utils.audio_analysis_lib.audio_analysis as audio_analysis
30import acts.test_utils.audio_analysis_lib.audio_data as audio_data
31
32
33class SpectralAnalysisTest(unittest.TestCase):
34 def setUp(self):
35 """Uses the same seed to generate noise for each test."""
36 numpy.random.seed(0)
37
38 def dummy_peak_detection(self, array, window_size):
39 """Detects peaks in an array in simple way.
40
41 A point (i, array[i]) is a peak if array[i] is the maximum among
42 array[i - half_window_size] to array[i + half_window_size].
43 If array[i - half_window_size] to array[i + half_window_size] are all
44 equal, then there is no peak in this window.
45
46 Args:
47 array: The input array to detect peaks in. Array is a list of
48 absolute values of the magnitude of transformed coefficient.
49 window_size: The window to detect peaks.
50
51 Returns:
52 A list of tuples:
53 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2),
54 ...]
55 where the tuples are sorted by peak values.
56
57 """
58 half_window_size = window_size / 2
59 length = len(array)
60
61 def mid_is_peak(array, mid, left, right):
62 """Checks if value at mid is the largest among left to right.
63
64 Args:
65 array: A list of numbers.
66 mid: The mid index.
67 left: The left index.
68 rigth: The right index.
69
70 Returns:
71 True if array[index] is the maximum among numbers in array
72 between index [left, right] inclusively.
73
74 """
75 value_mid = array[int(mid)]
76 for index in range(int(left), int(right) + 1):
77 if index == mid:
78 continue
79 if array[index] >= value_mid:
80 return False
81 return True
82
83 results = []
84 for mid in range(length):
85 left = max(0, mid - half_window_size)
86 right = min(length - 1, mid + half_window_size)
87 if mid_is_peak(array, mid, left, right):
88 results.append((mid, array[int(mid)]))
89
90 # Sort the peaks by values.
91 return sorted(results, key=lambda x: x[1], reverse=True)
92
Mark De Ruytera629a162020-01-09 20:21:01 -080093 def test_peak_detection(self):
tturney91191de2017-11-16 15:07:17 -080094 array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1]
95 result = audio_analysis.peak_detection(array, 4)
96 golden_answer = [(12, 5), (4, 4)]
97 self.assertEqual(result, golden_answer)
98
Mark De Ruytera629a162020-01-09 20:21:01 -080099 def test_peak_detection_large(self):
tturney91191de2017-11-16 15:07:17 -0800100 array = numpy.random.uniform(0, 1, 1000000)
101 window_size = 100
102 logging.debug('Test large array using dummy peak detection')
103 dummy_answer = self.dummy_peak_detection(array, window_size)
104 logging.debug('Test large array using improved peak detection')
105 improved_answer = audio_analysis.peak_detection(array, window_size)
106 logging.debug('Compare the result')
107 self.assertEqual(dummy_answer, improved_answer)
108
Mark De Ruytera629a162020-01-09 20:21:01 -0800109 def test_spectral_analysis(self):
tturney91191de2017-11-16 15:07:17 -0800110 rate = 48000
111 length_in_secs = 0.5
112 freq_1 = 490.0
113 freq_2 = 60.0
114 coeff_1 = 1
115 coeff_2 = 0.3
116 samples = length_in_secs * rate
117 noise = numpy.random.standard_normal(int(samples)) * 0.005
118 x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
119 y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + coeff_2 *
120 numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
121 results = audio_analysis.spectral_analysis(y, rate)
122 # Results should contains
123 # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
124 # frequency with coefficient 1, 60Hz is the second dominant frequency
125 # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
126 # around 0.1. The k constant is resulted from window function.
127 logging.debug('Results: %s', results)
128 self.assertTrue(abs(results[0][0] - freq_1) < 1)
129 self.assertTrue(abs(results[1][0] - freq_2) < 1)
130 self.assertTrue(
131 abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)
132
Mark De Ruytera629a162020-01-09 20:21:01 -0800133 def test_spectral_snalysis_real_data(self):
tturney91191de2017-11-16 15:07:17 -0800134 """This unittest checks the spectral analysis works on real data."""
135 file_path = os.path.join(
136 os.path.dirname(__file__), 'test_data', '1k_2k.raw')
137 binary = open(file_path, 'rb').read()
138 data = audio_data.AudioRawData(binary, 2, 'S32_LE')
139 saturate_value = audio_data.get_maximum_value_from_sample_format(
140 'S32_LE')
141 golden_frequency = [1000, 2000]
142 for channel in [0, 1]:
143 normalized_signal = audio_analysis.normalize_signal(
144 data.channel_data[channel], saturate_value)
145 spectral = audio_analysis.spectral_analysis(normalized_signal,
146 48000, 0.02)
147 logging.debug('channel %s: %s', channel, spectral)
148 self.assertTrue(
149 abs(spectral[0][0] - golden_frequency[channel]) < 5,
150 'Dominant frequency is not correct')
151
Mark De Ruytera629a162020-01-09 20:21:01 -0800152 def test_not_meaningful_data(self):
tturney91191de2017-11-16 15:07:17 -0800153 """Checks that sepectral analysis handles un-meaningful data."""
154 rate = 48000
155 length_in_secs = 0.5
156 samples = length_in_secs * rate
157 noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
158 noise = numpy.random.standard_normal(int(samples)) * noise_amplitude
159 results = audio_analysis.spectral_analysis(noise, rate)
160 self.assertEqual([(0, 0)], results)
161
162 def testEmptyData(self):
163 """Checks that sepectral analysis rejects empty data."""
164 with self.assertRaises(audio_analysis.EmptyDataError):
165 results = audio_analysis.spectral_analysis([], 100)
166
167
168class NormalizeTest(unittest.TestCase):
Mark De Ruytera629a162020-01-09 20:21:01 -0800169 def test_normalize(self):
tturney91191de2017-11-16 15:07:17 -0800170 y = [1, 2, 3, 4, 5]
171 normalized_y = audio_analysis.normalize_signal(y, 10)
172 expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
173 for i in range(len(y)):
174 self.assertEqual(expected[i], normalized_y[i])
175
176
177class AnomalyTest(unittest.TestCase):
178 def setUp(self):
179 """Creates a test signal of sine wave."""
180 # Use the same seed for each test case.
181 numpy.random.seed(0)
182
183 self.block_size = 120
184 self.rate = 48000
185 self.freq = 440
186 length_in_secs = 0.25
187 self.samples = length_in_secs * self.rate
188 x = numpy.linspace(0.0, (self.samples - 1) * 1.0 / self.rate,
189 self.samples)
190 self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)
191
192 def add_noise(self):
193 """Add noise to the test signal."""
194 noise_amplitude = 0.3
195 noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
196 self.y = self.y + noise
197
198 def insert_anomaly(self):
199 """Inserts an anomaly to the test signal.
200
201 The anomaly self.anomaly_samples should be created before calling this
202 method.
203
204 """
205 self.anomaly_start_secs = 0.1
206 self.y = numpy.insert(self.y,
207 int(self.anomaly_start_secs * self.rate),
208 self.anomaly_samples)
209
210 def generate_skip_anomaly(self):
211 """Skips a section of test signal."""
212 self.anomaly_start_secs = 0.1
213 self.anomaly_duration_secs = 0.005
214 anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
215 anomaly_start_index = self.anomaly_start_secs * self.rate
216 anomaly_append_index = anomaly_append_secs * self.rate
217 self.y = numpy.append(self.y[:int(anomaly_start_index)],
218 self.y[int(anomaly_append_index):])
219
220 def create_constant_anomaly(self, amplitude):
221 """Creates an anomaly of constant samples.
222
223 Args:
224 amplitude: The amplitude of the constant samples.
225
226 """
227 self.anomaly_duration_secs = 0.005
228 self.anomaly_samples = ([amplitude] *
229 int(self.anomaly_duration_secs * self.rate))
230
231 def run_analysis(self):
232 """Runs the anomaly detection."""
233 self.results = audio_analysis.anomaly_detection(
234 self.y, self.rate, self.freq, self.block_size)
235 logging.debug('Results: %s', self.results)
236
237 def check_no_anomaly(self):
238 """Verifies that there is no anomaly in detection result."""
239 self.run_analysis()
240 self.assertFalse(self.results)
241
242 def check_anomaly(self):
243 """Verifies that there is anomaly in detection result.
244
245 The detection result should contain anomaly time stamps that are
246 close to where anomaly was inserted. There can be multiple anomalies
247 since the detection depends on the block size.
248
249 """
250 self.run_analysis()
251 self.assertTrue(self.results)
252 # Anomaly can be detected as long as the detection window of block size
253 # overlaps with anomaly.
254 expected_detected_range_secs = (
255 self.anomaly_start_secs - float(self.block_size) / self.rate,
256 self.anomaly_start_secs + self.anomaly_duration_secs)
257 for detected_secs in self.results:
258 self.assertTrue(detected_secs <= expected_detected_range_secs[1])
259 self.assertTrue(detected_secs >= expected_detected_range_secs[0])
260
Mark De Ruytera629a162020-01-09 20:21:01 -0800261 def test_good_signal(self):
tturney91191de2017-11-16 15:07:17 -0800262 """Sine wave signal with no noise or anomaly."""
263 self.check_no_anomaly()
264
Mark De Ruytera629a162020-01-09 20:21:01 -0800265 def test_good_signal_noise(self):
tturney91191de2017-11-16 15:07:17 -0800266 """Sine wave signal with noise."""
267 self.add_noise()
268 self.check_no_anomaly()
269
Mark De Ruytera629a162020-01-09 20:21:01 -0800270 def test_zero_anomaly(self):
tturney91191de2017-11-16 15:07:17 -0800271 """Sine wave signal with no noise but with anomaly.
272
273 This test case simulates underrun in digital data where there will be
274 one block of samples with 0 amplitude.
275
276 """
277 self.create_constant_anomaly(0)
278 self.insert_anomaly()
279 self.check_anomaly()
280
Mark De Ruytera629a162020-01-09 20:21:01 -0800281 def test_zero_anomaly_noise(self):
tturney91191de2017-11-16 15:07:17 -0800282 """Sine wave signal with noise and anomaly.
283
284 This test case simulates underrun in analog data where there will be
285 one block of samples with amplitudes close to 0.
286
287 """
288 self.create_constant_anomaly(0)
289 self.insert_anomaly()
290 self.add_noise()
291 self.check_anomaly()
292
Mark De Ruytera629a162020-01-09 20:21:01 -0800293 def test_low_constant_anomaly(self):
tturney91191de2017-11-16 15:07:17 -0800294 """Sine wave signal with low constant anomaly.
295
296 The anomaly is one block of constant values.
297
298 """
299 self.create_constant_anomaly(0.05)
300 self.insert_anomaly()
301 self.check_anomaly()
302
Mark De Ruytera629a162020-01-09 20:21:01 -0800303 def test_low_constant_anomaly_noise(self):
tturney91191de2017-11-16 15:07:17 -0800304 """Sine wave signal with low constant anomaly and noise.
305
306 The anomaly is one block of constant values.
307
308 """
309 self.create_constant_anomaly(0.05)
310 self.insert_anomaly()
311 self.add_noise()
312 self.check_anomaly()
313
Mark De Ruytera629a162020-01-09 20:21:01 -0800314 def test_high_constant_anomaly(self):
tturney91191de2017-11-16 15:07:17 -0800315 """Sine wave signal with high constant anomaly.
316
317 The anomaly is one block of constant values.
318
319 """
320 self.create_constant_anomaly(2)
321 self.insert_anomaly()
322 self.check_anomaly()
323
Mark De Ruytera629a162020-01-09 20:21:01 -0800324 def test_high_constant_anomaly_noise(self):
tturney91191de2017-11-16 15:07:17 -0800325 """Sine wave signal with high constant anomaly and noise.
326
327 The anomaly is one block of constant values.
328
329 """
330 self.create_constant_anomaly(2)
331 self.insert_anomaly()
332 self.add_noise()
333 self.check_anomaly()
334
Mark De Ruytera629a162020-01-09 20:21:01 -0800335 def test_skipped_anomaly(self):
tturney91191de2017-11-16 15:07:17 -0800336 """Sine wave signal with skipped anomaly.
337
338 The anomaly simulates the symptom where a block is skipped.
339
340 """
341 self.generate_skip_anomaly()
342 self.check_anomaly()
343
Mark De Ruytera629a162020-01-09 20:21:01 -0800344 def test_skipped_anomaly_noise(self):
tturney91191de2017-11-16 15:07:17 -0800345 """Sine wave signal with skipped anomaly with noise.
346
347 The anomaly simulates the symptom where a block is skipped.
348
349 """
350 self.generate_skip_anomaly()
351 self.add_noise()
352 self.check_anomaly()
353
Mark De Ruytera629a162020-01-09 20:21:01 -0800354 def test_empty_data(self):
tturney91191de2017-11-16 15:07:17 -0800355 """Checks that anomaly detection rejects empty data."""
356 self.y = []
357 with self.assertRaises(audio_analysis.EmptyDataError):
358 self.check_anomaly()
359
360
361if __name__ == '__main__':
362 logging.basicConfig(
363 level=logging.DEBUG,
364 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
365 unittest.main()