markdr | dd1893d | 2018-02-05 17:13:47 -0800 | [diff] [blame] | 1 | #!/usr/bin/env python3 |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 2 | # |
| 3 | # Copyright 2017 - The Android Open Source Project |
| 4 | # |
| 5 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | # you may not use this file except in compliance with the License. |
| 7 | # You may obtain a copy of the License at |
| 8 | # |
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | # |
| 11 | # Unless required by applicable law or agreed to in writing, software |
| 12 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 17 | # Note: This test has been labelled as an integration test due to its use of |
| 18 | # real data, and the five to six second execution time. |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 19 | import logging |
| 20 | import numpy |
| 21 | import os |
| 22 | import unittest |
| 23 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 24 | # TODO(markdr): Remove this after soundfile is added to setup.py |
| 25 | import sys |
| 26 | import mock |
| 27 | sys.modules['soundfile'] = mock.Mock() |
| 28 | |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 29 | import acts.test_utils.audio_analysis_lib.audio_analysis as audio_analysis |
| 30 | import acts.test_utils.audio_analysis_lib.audio_data as audio_data |
| 31 | |
| 32 | |
| 33 | class SpectralAnalysisTest(unittest.TestCase): |
| 34 | def setUp(self): |
| 35 | """Uses the same seed to generate noise for each test.""" |
| 36 | numpy.random.seed(0) |
| 37 | |
| 38 | def dummy_peak_detection(self, array, window_size): |
| 39 | """Detects peaks in an array in simple way. |
| 40 | |
| 41 | A point (i, array[i]) is a peak if array[i] is the maximum among |
| 42 | array[i - half_window_size] to array[i + half_window_size]. |
| 43 | If array[i - half_window_size] to array[i + half_window_size] are all |
| 44 | equal, then there is no peak in this window. |
| 45 | |
| 46 | Args: |
| 47 | array: The input array to detect peaks in. Array is a list of |
| 48 | absolute values of the magnitude of transformed coefficient. |
| 49 | window_size: The window to detect peaks. |
| 50 | |
| 51 | Returns: |
| 52 | A list of tuples: |
| 53 | [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), |
| 54 | ...] |
| 55 | where the tuples are sorted by peak values. |
| 56 | |
| 57 | """ |
| 58 | half_window_size = window_size / 2 |
| 59 | length = len(array) |
| 60 | |
| 61 | def mid_is_peak(array, mid, left, right): |
| 62 | """Checks if value at mid is the largest among left to right. |
| 63 | |
| 64 | Args: |
| 65 | array: A list of numbers. |
| 66 | mid: The mid index. |
| 67 | left: The left index. |
| 68 | rigth: The right index. |
| 69 | |
| 70 | Returns: |
| 71 | True if array[index] is the maximum among numbers in array |
| 72 | between index [left, right] inclusively. |
| 73 | |
| 74 | """ |
| 75 | value_mid = array[int(mid)] |
| 76 | for index in range(int(left), int(right) + 1): |
| 77 | if index == mid: |
| 78 | continue |
| 79 | if array[index] >= value_mid: |
| 80 | return False |
| 81 | return True |
| 82 | |
| 83 | results = [] |
| 84 | for mid in range(length): |
| 85 | left = max(0, mid - half_window_size) |
| 86 | right = min(length - 1, mid + half_window_size) |
| 87 | if mid_is_peak(array, mid, left, right): |
| 88 | results.append((mid, array[int(mid)])) |
| 89 | |
| 90 | # Sort the peaks by values. |
| 91 | return sorted(results, key=lambda x: x[1], reverse=True) |
| 92 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 93 | def test_peak_detection(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 94 | array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1] |
| 95 | result = audio_analysis.peak_detection(array, 4) |
| 96 | golden_answer = [(12, 5), (4, 4)] |
| 97 | self.assertEqual(result, golden_answer) |
| 98 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 99 | def test_peak_detection_large(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 100 | array = numpy.random.uniform(0, 1, 1000000) |
| 101 | window_size = 100 |
| 102 | logging.debug('Test large array using dummy peak detection') |
| 103 | dummy_answer = self.dummy_peak_detection(array, window_size) |
| 104 | logging.debug('Test large array using improved peak detection') |
| 105 | improved_answer = audio_analysis.peak_detection(array, window_size) |
| 106 | logging.debug('Compare the result') |
| 107 | self.assertEqual(dummy_answer, improved_answer) |
| 108 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 109 | def test_spectral_analysis(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 110 | rate = 48000 |
| 111 | length_in_secs = 0.5 |
| 112 | freq_1 = 490.0 |
| 113 | freq_2 = 60.0 |
| 114 | coeff_1 = 1 |
| 115 | coeff_2 = 0.3 |
| 116 | samples = length_in_secs * rate |
| 117 | noise = numpy.random.standard_normal(int(samples)) * 0.005 |
| 118 | x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples) |
| 119 | y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + coeff_2 * |
| 120 | numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise |
| 121 | results = audio_analysis.spectral_analysis(y, rate) |
| 122 | # Results should contains |
| 123 | # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant |
| 124 | # frequency with coefficient 1, 60Hz is the second dominant frequency |
| 125 | # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient |
| 126 | # around 0.1. The k constant is resulted from window function. |
| 127 | logging.debug('Results: %s', results) |
| 128 | self.assertTrue(abs(results[0][0] - freq_1) < 1) |
| 129 | self.assertTrue(abs(results[1][0] - freq_2) < 1) |
| 130 | self.assertTrue( |
| 131 | abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01) |
| 132 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 133 | def test_spectral_snalysis_real_data(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 134 | """This unittest checks the spectral analysis works on real data.""" |
| 135 | file_path = os.path.join( |
| 136 | os.path.dirname(__file__), 'test_data', '1k_2k.raw') |
| 137 | binary = open(file_path, 'rb').read() |
| 138 | data = audio_data.AudioRawData(binary, 2, 'S32_LE') |
| 139 | saturate_value = audio_data.get_maximum_value_from_sample_format( |
| 140 | 'S32_LE') |
| 141 | golden_frequency = [1000, 2000] |
| 142 | for channel in [0, 1]: |
| 143 | normalized_signal = audio_analysis.normalize_signal( |
| 144 | data.channel_data[channel], saturate_value) |
| 145 | spectral = audio_analysis.spectral_analysis(normalized_signal, |
| 146 | 48000, 0.02) |
| 147 | logging.debug('channel %s: %s', channel, spectral) |
| 148 | self.assertTrue( |
| 149 | abs(spectral[0][0] - golden_frequency[channel]) < 5, |
| 150 | 'Dominant frequency is not correct') |
| 151 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 152 | def test_not_meaningful_data(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 153 | """Checks that sepectral analysis handles un-meaningful data.""" |
| 154 | rate = 48000 |
| 155 | length_in_secs = 0.5 |
| 156 | samples = length_in_secs * rate |
| 157 | noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5 |
| 158 | noise = numpy.random.standard_normal(int(samples)) * noise_amplitude |
| 159 | results = audio_analysis.spectral_analysis(noise, rate) |
| 160 | self.assertEqual([(0, 0)], results) |
| 161 | |
| 162 | def testEmptyData(self): |
| 163 | """Checks that sepectral analysis rejects empty data.""" |
| 164 | with self.assertRaises(audio_analysis.EmptyDataError): |
| 165 | results = audio_analysis.spectral_analysis([], 100) |
| 166 | |
| 167 | |
| 168 | class NormalizeTest(unittest.TestCase): |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 169 | def test_normalize(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 170 | y = [1, 2, 3, 4, 5] |
| 171 | normalized_y = audio_analysis.normalize_signal(y, 10) |
| 172 | expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5]) |
| 173 | for i in range(len(y)): |
| 174 | self.assertEqual(expected[i], normalized_y[i]) |
| 175 | |
| 176 | |
| 177 | class AnomalyTest(unittest.TestCase): |
| 178 | def setUp(self): |
| 179 | """Creates a test signal of sine wave.""" |
| 180 | # Use the same seed for each test case. |
| 181 | numpy.random.seed(0) |
| 182 | |
| 183 | self.block_size = 120 |
| 184 | self.rate = 48000 |
| 185 | self.freq = 440 |
| 186 | length_in_secs = 0.25 |
| 187 | self.samples = length_in_secs * self.rate |
| 188 | x = numpy.linspace(0.0, (self.samples - 1) * 1.0 / self.rate, |
| 189 | self.samples) |
| 190 | self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x) |
| 191 | |
| 192 | def add_noise(self): |
| 193 | """Add noise to the test signal.""" |
| 194 | noise_amplitude = 0.3 |
| 195 | noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude |
| 196 | self.y = self.y + noise |
| 197 | |
| 198 | def insert_anomaly(self): |
| 199 | """Inserts an anomaly to the test signal. |
| 200 | |
| 201 | The anomaly self.anomaly_samples should be created before calling this |
| 202 | method. |
| 203 | |
| 204 | """ |
| 205 | self.anomaly_start_secs = 0.1 |
| 206 | self.y = numpy.insert(self.y, |
| 207 | int(self.anomaly_start_secs * self.rate), |
| 208 | self.anomaly_samples) |
| 209 | |
| 210 | def generate_skip_anomaly(self): |
| 211 | """Skips a section of test signal.""" |
| 212 | self.anomaly_start_secs = 0.1 |
| 213 | self.anomaly_duration_secs = 0.005 |
| 214 | anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs |
| 215 | anomaly_start_index = self.anomaly_start_secs * self.rate |
| 216 | anomaly_append_index = anomaly_append_secs * self.rate |
| 217 | self.y = numpy.append(self.y[:int(anomaly_start_index)], |
| 218 | self.y[int(anomaly_append_index):]) |
| 219 | |
| 220 | def create_constant_anomaly(self, amplitude): |
| 221 | """Creates an anomaly of constant samples. |
| 222 | |
| 223 | Args: |
| 224 | amplitude: The amplitude of the constant samples. |
| 225 | |
| 226 | """ |
| 227 | self.anomaly_duration_secs = 0.005 |
| 228 | self.anomaly_samples = ([amplitude] * |
| 229 | int(self.anomaly_duration_secs * self.rate)) |
| 230 | |
| 231 | def run_analysis(self): |
| 232 | """Runs the anomaly detection.""" |
| 233 | self.results = audio_analysis.anomaly_detection( |
| 234 | self.y, self.rate, self.freq, self.block_size) |
| 235 | logging.debug('Results: %s', self.results) |
| 236 | |
| 237 | def check_no_anomaly(self): |
| 238 | """Verifies that there is no anomaly in detection result.""" |
| 239 | self.run_analysis() |
| 240 | self.assertFalse(self.results) |
| 241 | |
| 242 | def check_anomaly(self): |
| 243 | """Verifies that there is anomaly in detection result. |
| 244 | |
| 245 | The detection result should contain anomaly time stamps that are |
| 246 | close to where anomaly was inserted. There can be multiple anomalies |
| 247 | since the detection depends on the block size. |
| 248 | |
| 249 | """ |
| 250 | self.run_analysis() |
| 251 | self.assertTrue(self.results) |
| 252 | # Anomaly can be detected as long as the detection window of block size |
| 253 | # overlaps with anomaly. |
| 254 | expected_detected_range_secs = ( |
| 255 | self.anomaly_start_secs - float(self.block_size) / self.rate, |
| 256 | self.anomaly_start_secs + self.anomaly_duration_secs) |
| 257 | for detected_secs in self.results: |
| 258 | self.assertTrue(detected_secs <= expected_detected_range_secs[1]) |
| 259 | self.assertTrue(detected_secs >= expected_detected_range_secs[0]) |
| 260 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 261 | def test_good_signal(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 262 | """Sine wave signal with no noise or anomaly.""" |
| 263 | self.check_no_anomaly() |
| 264 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 265 | def test_good_signal_noise(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 266 | """Sine wave signal with noise.""" |
| 267 | self.add_noise() |
| 268 | self.check_no_anomaly() |
| 269 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 270 | def test_zero_anomaly(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 271 | """Sine wave signal with no noise but with anomaly. |
| 272 | |
| 273 | This test case simulates underrun in digital data where there will be |
| 274 | one block of samples with 0 amplitude. |
| 275 | |
| 276 | """ |
| 277 | self.create_constant_anomaly(0) |
| 278 | self.insert_anomaly() |
| 279 | self.check_anomaly() |
| 280 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 281 | def test_zero_anomaly_noise(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 282 | """Sine wave signal with noise and anomaly. |
| 283 | |
| 284 | This test case simulates underrun in analog data where there will be |
| 285 | one block of samples with amplitudes close to 0. |
| 286 | |
| 287 | """ |
| 288 | self.create_constant_anomaly(0) |
| 289 | self.insert_anomaly() |
| 290 | self.add_noise() |
| 291 | self.check_anomaly() |
| 292 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 293 | def test_low_constant_anomaly(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 294 | """Sine wave signal with low constant anomaly. |
| 295 | |
| 296 | The anomaly is one block of constant values. |
| 297 | |
| 298 | """ |
| 299 | self.create_constant_anomaly(0.05) |
| 300 | self.insert_anomaly() |
| 301 | self.check_anomaly() |
| 302 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 303 | def test_low_constant_anomaly_noise(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 304 | """Sine wave signal with low constant anomaly and noise. |
| 305 | |
| 306 | The anomaly is one block of constant values. |
| 307 | |
| 308 | """ |
| 309 | self.create_constant_anomaly(0.05) |
| 310 | self.insert_anomaly() |
| 311 | self.add_noise() |
| 312 | self.check_anomaly() |
| 313 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 314 | def test_high_constant_anomaly(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 315 | """Sine wave signal with high constant anomaly. |
| 316 | |
| 317 | The anomaly is one block of constant values. |
| 318 | |
| 319 | """ |
| 320 | self.create_constant_anomaly(2) |
| 321 | self.insert_anomaly() |
| 322 | self.check_anomaly() |
| 323 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 324 | def test_high_constant_anomaly_noise(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 325 | """Sine wave signal with high constant anomaly and noise. |
| 326 | |
| 327 | The anomaly is one block of constant values. |
| 328 | |
| 329 | """ |
| 330 | self.create_constant_anomaly(2) |
| 331 | self.insert_anomaly() |
| 332 | self.add_noise() |
| 333 | self.check_anomaly() |
| 334 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 335 | def test_skipped_anomaly(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 336 | """Sine wave signal with skipped anomaly. |
| 337 | |
| 338 | The anomaly simulates the symptom where a block is skipped. |
| 339 | |
| 340 | """ |
| 341 | self.generate_skip_anomaly() |
| 342 | self.check_anomaly() |
| 343 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 344 | def test_skipped_anomaly_noise(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 345 | """Sine wave signal with skipped anomaly with noise. |
| 346 | |
| 347 | The anomaly simulates the symptom where a block is skipped. |
| 348 | |
| 349 | """ |
| 350 | self.generate_skip_anomaly() |
| 351 | self.add_noise() |
| 352 | self.check_anomaly() |
| 353 | |
Mark De Ruyter | a629a16 | 2020-01-09 20:21:01 -0800 | [diff] [blame] | 354 | def test_empty_data(self): |
tturney | 91191de | 2017-11-16 15:07:17 -0800 | [diff] [blame] | 355 | """Checks that anomaly detection rejects empty data.""" |
| 356 | self.y = [] |
| 357 | with self.assertRaises(audio_analysis.EmptyDataError): |
| 358 | self.check_anomaly() |
| 359 | |
| 360 | |
| 361 | if __name__ == '__main__': |
| 362 | logging.basicConfig( |
| 363 | level=logging.DEBUG, |
| 364 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| 365 | unittest.main() |