Cheng-Yi Chiang | f9427ff | 2015-11-02 21:01:45 +0800 | [diff] [blame] | 1 | # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
| 5 | """This module provides utilities to do audio data analysis.""" |
| 6 | |
| 7 | import logging |
| 8 | import numpy |
| 9 | import operator |
| 10 | |
| 11 | # Only peaks with coefficient greater than 0.01 of the first peak should be |
| 12 | # considered. Note that this correspond to -40dB in the spectrum. |
| 13 | DEFAULT_MIN_PEAK_RATIO = 0.01 |
| 14 | |
| 15 | PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection. |
| 16 | |
| 17 | # The minimum RMS value of meaningful audio data. |
| 18 | MEANINGFUL_RMS_THRESHOLD = 0.001 |
| 19 | |
| 20 | class RMSTooSmallError(Exception): |
| 21 | """Error when signal RMS is too small.""" |
| 22 | pass |
| 23 | |
| 24 | |
Cheng-Yi Chiang | 3f28255 | 2015-12-08 09:06:35 +0800 | [diff] [blame^] | 25 | class EmptyDataError(Exception): |
| 26 | """Error when signal is empty.""" |
| 27 | |
| 28 | |
Cheng-Yi Chiang | f9427ff | 2015-11-02 21:01:45 +0800 | [diff] [blame] | 29 | def normalize_signal(signal, saturate_value): |
| 30 | """Normalizes the signal with respect to the saturate value. |
| 31 | |
| 32 | @param signal: A list for one-channel PCM data. |
| 33 | @param saturate_value: The maximum value that the PCM data might be. |
| 34 | |
| 35 | @returns: A numpy array containing normalized signal. The normalized signal |
| 36 | has value -1 and 1 when it saturates. |
| 37 | |
| 38 | """ |
| 39 | signal = numpy.array(signal) |
| 40 | return signal / float(saturate_value) |
| 41 | |
| 42 | |
| 43 | def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO, |
| 44 | peak_window_size_hz=PEAK_WINDOW_SIZE_HZ): |
| 45 | """Gets the dominant frequencies by spectral analysis. |
| 46 | |
| 47 | @param signal: A list of numbers for one-channel PCM data. This should be |
| 48 | normalized to [-1, 1] so the function can check if signal RMS |
| 49 | is too small to be meaningful. |
| 50 | @param rate: Sampling rate. |
| 51 | @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the |
| 52 | peaks other than the greatest one should be |
| 53 | considered. |
| 54 | This is to ignore peaks that are too small compared |
| 55 | to the first peak peak_0. |
| 56 | @param peak_window_size_hz: The window size in Hz to find the peaks. |
| 57 | The minimum differences between found peaks will |
| 58 | be half of this value. |
| 59 | |
| 60 | @returns: A list of tuples: |
| 61 | [(peak_frequency_0, peak_coefficient_0), |
| 62 | (peak_frequency_1, peak_coefficient_1), |
| 63 | (peak_frequency_2, peak_coefficient_2), ...] |
| 64 | where the tuples are sorted by coefficients. |
| 65 | The last peak_coefficient will be no less than |
| 66 | peak_coefficient * min_peak_ratio. |
| 67 | |
| 68 | """ |
| 69 | # Checks the signal is meaningful. |
Cheng-Yi Chiang | 3f28255 | 2015-12-08 09:06:35 +0800 | [diff] [blame^] | 70 | if len(signal) == 0: |
| 71 | raise EmptyDataError('Signal data is empty') |
| 72 | |
Cheng-Yi Chiang | f9427ff | 2015-11-02 21:01:45 +0800 | [diff] [blame] | 73 | signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal)) |
| 74 | logging.debug('signal RMS = %s', signal_rms) |
| 75 | if signal_rms < MEANINGFUL_RMS_THRESHOLD: |
| 76 | raise RMSTooSmallError( |
| 77 | 'RMS %s is too small to be meaningful' % signal_rms) |
| 78 | |
| 79 | # First, pass signal through a window function to mitigate spectral leakage. |
| 80 | y_conv_w = signal * numpy.hanning(len(signal)) |
| 81 | |
| 82 | length = len(y_conv_w) |
| 83 | |
| 84 | # x_f is the frequency in Hz, y_f is the transformed coefficient. |
| 85 | x_f = _rfft_freq(length, rate) |
| 86 | y_f = 2.0 / length * numpy.fft.rfft(y_conv_w) |
| 87 | |
| 88 | # y_f is complex so consider its absolute value for magnitude. |
| 89 | abs_y_f = numpy.abs(y_f) |
| 90 | threshold = max(abs_y_f) * min_peak_ratio |
| 91 | |
| 92 | # Suppresses all coefficients that are below threshold. |
| 93 | for i in xrange(len(abs_y_f)): |
| 94 | if abs_y_f[i] < threshold: |
| 95 | abs_y_f[i] = 0 |
| 96 | |
| 97 | # Gets the peak detection window size in indice. |
| 98 | # x_f[1] is the frequency difference per index. |
| 99 | peak_window_size = int(peak_window_size_hz / x_f[1]) |
| 100 | |
| 101 | # Detects peaks. |
| 102 | peaks = _peak_detection(abs_y_f, peak_window_size) |
| 103 | |
| 104 | # Transform back the peak location from index to frequency. |
| 105 | results = [] |
| 106 | for index, value in peaks: |
| 107 | results.append((x_f[index], value)) |
| 108 | return results |
| 109 | |
| 110 | |
| 111 | def _rfft_freq(length, rate): |
| 112 | """Gets the frequency at each index of real FFT. |
| 113 | |
| 114 | @param length: The window length of FFT. |
| 115 | @param rate: Sampling rate. |
| 116 | |
| 117 | @returns: A numpy array containing frequency corresponding to |
| 118 | numpy.fft.rfft result at each index. |
| 119 | |
| 120 | """ |
| 121 | # The difference in Hz between each index. |
| 122 | val = rate / float(length) |
| 123 | # Only care half of frequencies for FFT on real signal. |
| 124 | result_length = length // 2 + 1 |
| 125 | return numpy.linspace(0, (result_length - 1) * val, result_length) |
| 126 | |
| 127 | |
| 128 | def _peak_detection(array, window_size): |
| 129 | """Detects peaks in an array. |
| 130 | |
| 131 | A point (i, array[i]) is a peak if array[i] is the maximum among |
| 132 | array[i - half_window_size] to array[i + half_window_size]. |
| 133 | If array[i - half_window_size] to array[i + half_window_size] are all equal, |
| 134 | then there is no peak in this window. |
| 135 | |
| 136 | @param window_size: The window to detect peaks. |
| 137 | |
| 138 | @returns: A list of tuples: |
| 139 | [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...] |
| 140 | where the tuples are sorted by peak values. |
| 141 | |
| 142 | """ |
| 143 | half_window_size = window_size / 2 |
| 144 | length = len(array) |
| 145 | |
| 146 | def find_max(numbers): |
| 147 | """Gets the index where maximum value happens. |
| 148 | |
| 149 | @param numbers: A list of numbers. |
| 150 | |
| 151 | @returns: (index, value) where value = numbers[index] is the maximum |
| 152 | among numbers. |
| 153 | |
| 154 | """ |
| 155 | index, value = max(enumerate(numbers), key=lambda x: x[1]) |
| 156 | return index, value |
| 157 | |
| 158 | results = [] |
| 159 | for mid in xrange(length): |
| 160 | left = max(0, mid - half_window_size) |
| 161 | right = min(length - 1, mid + half_window_size) |
| 162 | numbers_in_window = array[left:right + 1] |
| 163 | max_index, max_value = find_max(numbers_in_window) |
| 164 | |
| 165 | # Add the offset back. |
| 166 | max_index = max_index + left |
| 167 | |
| 168 | # If all values are the same then there is no peak in this window. |
| 169 | if max_value != min(numbers_in_window) and max_index == mid: |
| 170 | results.append((mid, max_value)) |
| 171 | |
| 172 | # Sort the peaks by values. |
| 173 | return sorted(results, key=lambda x: x[1], reverse=True) |
Cheng-Yi Chiang | 4085acc | 2015-11-16 15:31:35 -0800 | [diff] [blame] | 174 | |
| 175 | |
| 176 | # The default pattern mathing threshold. By experiment, this threshold |
| 177 | # can tolerate normal noise of 0.3 amplitude when sine wave signal |
| 178 | # amplitude is 1. |
| 179 | PATTERN_MATCHING_THRESHOLD = 0.85 |
| 180 | |
| 181 | def anomaly_detection(signal, rate, freq, block_size, |
| 182 | threshold=PATTERN_MATCHING_THRESHOLD): |
| 183 | """Detects anomaly in a sine wave signal. |
| 184 | |
| 185 | This method detects anomaly in a sine wave signal by matching |
| 186 | patterns of each block. |
| 187 | For each moving window of block in the test signal, checks if there |
| 188 | is any block in golden signal that is similar to this block of test signal. |
| 189 | If there is such a block in golden signal, then this block of test |
| 190 | signal is matched and there is no anomaly in this block of test signal. |
| 191 | If there is any block in test signal that is not matched, then this block |
| 192 | covers an anomaly. |
| 193 | The block of test signal starts from index 0, and proceeds in steps of |
| 194 | half block size. The overlapping of test signal blocks makes sure there must |
| 195 | be at least one block covering the transition from sine wave to anomaly. |
| 196 | |
| 197 | @param signal: A 1-D array-like object for 1-channel PCM data. |
| 198 | @param rate: The sampling rate. |
| 199 | @param freq: The expected frequency of signal. |
| 200 | @param block_size: The block size in samples to detect anomaly. |
| 201 | @param threshold: The threshold of correlation index to be judge as matched. |
| 202 | |
| 203 | @returns: A list containing detected anomaly time in seconds. |
| 204 | |
| 205 | """ |
Cheng-Yi Chiang | 3f28255 | 2015-12-08 09:06:35 +0800 | [diff] [blame^] | 206 | if len(signal) == 0: |
| 207 | raise EmptyDataError('Signal data is empty') |
| 208 | |
Cheng-Yi Chiang | 4085acc | 2015-11-16 15:31:35 -0800 | [diff] [blame] | 209 | golden_y = _generate_golden_pattern(rate, freq, block_size) |
| 210 | |
| 211 | results = [] |
| 212 | |
| 213 | for start in xrange(0, len(signal), block_size / 2): |
| 214 | end = start + block_size |
| 215 | test_signal = signal[start:end] |
| 216 | matched = _moving_pattern_matching(golden_y, test_signal, threshold) |
| 217 | if not matched: |
| 218 | results.append(start) |
| 219 | |
| 220 | results = [float(x) / rate for x in results] |
| 221 | |
| 222 | return results |
| 223 | |
| 224 | |
| 225 | def _generate_golden_pattern(rate, freq, block_size): |
| 226 | """Generates a golden pattern of certain frequency. |
| 227 | |
| 228 | The golden pattern must cover all the possibilities of waveforms in a |
| 229 | block. So, we need a golden pattern covering 1 period + 1 block size, |
| 230 | such that the test block can start anywhere in a period, and extends |
| 231 | a block size. |
| 232 | |
| 233 | |period |1 bk| |
| 234 | | | | |
| 235 | . . . . |
| 236 | . . . . |
| 237 | . . . |
| 238 | |
| 239 | @param rate: The sampling rate. |
| 240 | @param freq: The frequency of golden pattern. |
| 241 | @param block_size: The block size in samples to detect anomaly. |
| 242 | |
| 243 | @returns: A 1-D array for golden pattern. |
| 244 | |
| 245 | """ |
| 246 | samples_in_a_period = int(rate / freq) + 1 |
| 247 | samples_in_golden_pattern = samples_in_a_period + block_size |
| 248 | golden_x = numpy.linspace( |
| 249 | 0.0, (samples_in_golden_pattern - 1) * 1.0 / rate, |
| 250 | samples_in_golden_pattern) |
| 251 | golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x) |
| 252 | return golden_y |
| 253 | |
| 254 | |
| 255 | def _moving_pattern_matching(golden_signal, test_signal, threshold): |
| 256 | """Checks if test_signal is similar to any block of golden_signal. |
| 257 | |
| 258 | Compares test signal with each block of golden signal by correlation |
| 259 | index. If there is any block of golden signal that is similar to |
| 260 | test signal, then it is matched. |
| 261 | |
| 262 | @param golden_signal: A 1-D array for golden signal. |
| 263 | @param test_signal: A 1-D array for test signal. |
| 264 | @param threshold: The threshold of correlation index to be judge as matched. |
| 265 | |
| 266 | @returns: True if there is a match. False otherwise. |
| 267 | |
| 268 | @raises: ValueError: if test signal is longer than golden signal. |
| 269 | |
| 270 | """ |
| 271 | if len(golden_signal) < len(test_signal): |
| 272 | raise ValueError('Test signal is longer than golden signal') |
| 273 | |
| 274 | block_length = len(test_signal) |
| 275 | number_of_movings = len(golden_signal) - block_length + 1 |
| 276 | correlation_indices = [] |
| 277 | for moving_index in xrange(number_of_movings): |
| 278 | # Cuts one block of golden signal from start index. |
| 279 | # The block length is the same as test signal. |
| 280 | start = moving_index |
| 281 | end = start + block_length |
| 282 | golden_signal_block = golden_signal[start:end] |
| 283 | try: |
| 284 | correlation_index = _get_correlation_index( |
| 285 | golden_signal_block, test_signal) |
| 286 | except TestSignalNormTooSmallError: |
| 287 | logging.info('Caught one block of test signal that has no meaningful norm') |
| 288 | return False |
| 289 | correlation_indices.append(correlation_index) |
| 290 | |
| 291 | # Checks if the maximum correlation index is high enough. |
| 292 | max_corr = max(correlation_indices) |
| 293 | if max_corr < threshold: |
| 294 | logging.debug('Got one unmatched block with max_corr: %s', max_corr) |
| 295 | return False |
| 296 | return True |
| 297 | |
| 298 | |
| 299 | class GoldenSignalNormTooSmallError(Exception): |
| 300 | """Exception when golden signal norm is too small.""" |
| 301 | pass |
| 302 | |
| 303 | |
| 304 | class TestSignalNormTooSmallError(Exception): |
| 305 | """Exception when test signal norm is too small.""" |
| 306 | pass |
| 307 | |
| 308 | |
| 309 | _MINIMUM_SIGNAL_NORM = 0.001 |
| 310 | |
| 311 | def _get_correlation_index(golden_signal, test_signal): |
| 312 | """Computes correlation index of two signal of same length. |
| 313 | |
| 314 | @param golden_signal: An 1-D array-like object. |
| 315 | @param test_signal: An 1-D array-like object. |
| 316 | |
| 317 | @raises: ValueError: if two signal have different lengths. |
| 318 | @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small |
| 319 | @raises: TestSignalNormTooSmallError: if test signal norm is too small. |
| 320 | |
| 321 | @returns: The correlation index. |
| 322 | """ |
| 323 | if len(golden_signal) != len(test_signal): |
| 324 | raise ValueError( |
| 325 | 'Only accepts signal of same length: %s, %s' % ( |
| 326 | len(golden_signal), len(test_signal))) |
| 327 | |
| 328 | norm_golden = numpy.linalg.norm(golden_signal) |
| 329 | norm_test = numpy.linalg.norm(test_signal) |
| 330 | if norm_golden <= _MINIMUM_SIGNAL_NORM: |
| 331 | raise GoldenSignalNormTooSmallError( |
| 332 | 'No meaningful data as norm is too small.') |
| 333 | if norm_test <= _MINIMUM_SIGNAL_NORM: |
| 334 | raise TestSignalNormTooSmallError( |
| 335 | 'No meaningful data as norm is too small.') |
| 336 | |
| 337 | # The 'valid' cross correlation result of two signals of same length will |
| 338 | # contain only one number. |
| 339 | correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0] |
| 340 | return correlation / (norm_golden * norm_test) |