blob: d6c2555eca5ae9dae975860e98201606bad4af79 [file] [log] [blame]
Cheng-Yi Chiangf9427ff2015-11-02 21:01:45 +08001# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides utilities to do audio data analysis."""
6
7import logging
8import numpy
9import operator
10
11# Only peaks with coefficient greater than 0.01 of the first peak should be
12# considered. Note that this correspond to -40dB in the spectrum.
13DEFAULT_MIN_PEAK_RATIO = 0.01
14
15PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection.
16
17# The minimum RMS value of meaningful audio data.
18MEANINGFUL_RMS_THRESHOLD = 0.001
19
20class RMSTooSmallError(Exception):
21 """Error when signal RMS is too small."""
22 pass
23
24
25def normalize_signal(signal, saturate_value):
26 """Normalizes the signal with respect to the saturate value.
27
28 @param signal: A list for one-channel PCM data.
29 @param saturate_value: The maximum value that the PCM data might be.
30
31 @returns: A numpy array containing normalized signal. The normalized signal
32 has value -1 and 1 when it saturates.
33
34 """
35 signal = numpy.array(signal)
36 return signal / float(saturate_value)
37
38
39def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
40 peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
41 """Gets the dominant frequencies by spectral analysis.
42
43 @param signal: A list of numbers for one-channel PCM data. This should be
44 normalized to [-1, 1] so the function can check if signal RMS
45 is too small to be meaningful.
46 @param rate: Sampling rate.
47 @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the
48 peaks other than the greatest one should be
49 considered.
50 This is to ignore peaks that are too small compared
51 to the first peak peak_0.
52 @param peak_window_size_hz: The window size in Hz to find the peaks.
53 The minimum differences between found peaks will
54 be half of this value.
55
56 @returns: A list of tuples:
57 [(peak_frequency_0, peak_coefficient_0),
58 (peak_frequency_1, peak_coefficient_1),
59 (peak_frequency_2, peak_coefficient_2), ...]
60 where the tuples are sorted by coefficients.
61 The last peak_coefficient will be no less than
62 peak_coefficient * min_peak_ratio.
63
64 """
65 # Checks the signal is meaningful.
66 signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
67 logging.debug('signal RMS = %s', signal_rms)
68 if signal_rms < MEANINGFUL_RMS_THRESHOLD:
69 raise RMSTooSmallError(
70 'RMS %s is too small to be meaningful' % signal_rms)
71
72 # First, pass signal through a window function to mitigate spectral leakage.
73 y_conv_w = signal * numpy.hanning(len(signal))
74
75 length = len(y_conv_w)
76
77 # x_f is the frequency in Hz, y_f is the transformed coefficient.
78 x_f = _rfft_freq(length, rate)
79 y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
80
81 # y_f is complex so consider its absolute value for magnitude.
82 abs_y_f = numpy.abs(y_f)
83 threshold = max(abs_y_f) * min_peak_ratio
84
85 # Suppresses all coefficients that are below threshold.
86 for i in xrange(len(abs_y_f)):
87 if abs_y_f[i] < threshold:
88 abs_y_f[i] = 0
89
90 # Gets the peak detection window size in indice.
91 # x_f[1] is the frequency difference per index.
92 peak_window_size = int(peak_window_size_hz / x_f[1])
93
94 # Detects peaks.
95 peaks = _peak_detection(abs_y_f, peak_window_size)
96
97 # Transform back the peak location from index to frequency.
98 results = []
99 for index, value in peaks:
100 results.append((x_f[index], value))
101 return results
102
103
104def _rfft_freq(length, rate):
105 """Gets the frequency at each index of real FFT.
106
107 @param length: The window length of FFT.
108 @param rate: Sampling rate.
109
110 @returns: A numpy array containing frequency corresponding to
111 numpy.fft.rfft result at each index.
112
113 """
114 # The difference in Hz between each index.
115 val = rate / float(length)
116 # Only care half of frequencies for FFT on real signal.
117 result_length = length // 2 + 1
118 return numpy.linspace(0, (result_length - 1) * val, result_length)
119
120
121def _peak_detection(array, window_size):
122 """Detects peaks in an array.
123
124 A point (i, array[i]) is a peak if array[i] is the maximum among
125 array[i - half_window_size] to array[i + half_window_size].
126 If array[i - half_window_size] to array[i + half_window_size] are all equal,
127 then there is no peak in this window.
128
129 @param window_size: The window to detect peaks.
130
131 @returns: A list of tuples:
132 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
133 where the tuples are sorted by peak values.
134
135 """
136 half_window_size = window_size / 2
137 length = len(array)
138
139 def find_max(numbers):
140 """Gets the index where maximum value happens.
141
142 @param numbers: A list of numbers.
143
144 @returns: (index, value) where value = numbers[index] is the maximum
145 among numbers.
146
147 """
148 index, value = max(enumerate(numbers), key=lambda x: x[1])
149 return index, value
150
151 results = []
152 for mid in xrange(length):
153 left = max(0, mid - half_window_size)
154 right = min(length - 1, mid + half_window_size)
155 numbers_in_window = array[left:right + 1]
156 max_index, max_value = find_max(numbers_in_window)
157
158 # Add the offset back.
159 max_index = max_index + left
160
161 # If all values are the same then there is no peak in this window.
162 if max_value != min(numbers_in_window) and max_index == mid:
163 results.append((mid, max_value))
164
165 # Sort the peaks by values.
166 return sorted(results, key=lambda x: x[1], reverse=True)
Cheng-Yi Chiang4085acc2015-11-16 15:31:35 -0800167
168
169# The default pattern mathing threshold. By experiment, this threshold
170# can tolerate normal noise of 0.3 amplitude when sine wave signal
171# amplitude is 1.
172PATTERN_MATCHING_THRESHOLD = 0.85
173
174def anomaly_detection(signal, rate, freq, block_size,
175 threshold=PATTERN_MATCHING_THRESHOLD):
176 """Detects anomaly in a sine wave signal.
177
178 This method detects anomaly in a sine wave signal by matching
179 patterns of each block.
180 For each moving window of block in the test signal, checks if there
181 is any block in golden signal that is similar to this block of test signal.
182 If there is such a block in golden signal, then this block of test
183 signal is matched and there is no anomaly in this block of test signal.
184 If there is any block in test signal that is not matched, then this block
185 covers an anomaly.
186 The block of test signal starts from index 0, and proceeds in steps of
187 half block size. The overlapping of test signal blocks makes sure there must
188 be at least one block covering the transition from sine wave to anomaly.
189
190 @param signal: A 1-D array-like object for 1-channel PCM data.
191 @param rate: The sampling rate.
192 @param freq: The expected frequency of signal.
193 @param block_size: The block size in samples to detect anomaly.
194 @param threshold: The threshold of correlation index to be judge as matched.
195
196 @returns: A list containing detected anomaly time in seconds.
197
198 """
199 golden_y = _generate_golden_pattern(rate, freq, block_size)
200
201 results = []
202
203 for start in xrange(0, len(signal), block_size / 2):
204 end = start + block_size
205 test_signal = signal[start:end]
206 matched = _moving_pattern_matching(golden_y, test_signal, threshold)
207 if not matched:
208 results.append(start)
209
210 results = [float(x) / rate for x in results]
211
212 return results
213
214
215def _generate_golden_pattern(rate, freq, block_size):
216 """Generates a golden pattern of certain frequency.
217
218 The golden pattern must cover all the possibilities of waveforms in a
219 block. So, we need a golden pattern covering 1 period + 1 block size,
220 such that the test block can start anywhere in a period, and extends
221 a block size.
222
223 |period |1 bk|
224 | | |
225 . . . .
226 . . . .
227 . . .
228
229 @param rate: The sampling rate.
230 @param freq: The frequency of golden pattern.
231 @param block_size: The block size in samples to detect anomaly.
232
233 @returns: A 1-D array for golden pattern.
234
235 """
236 samples_in_a_period = int(rate / freq) + 1
237 samples_in_golden_pattern = samples_in_a_period + block_size
238 golden_x = numpy.linspace(
239 0.0, (samples_in_golden_pattern - 1) * 1.0 / rate,
240 samples_in_golden_pattern)
241 golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
242 return golden_y
243
244
245def _moving_pattern_matching(golden_signal, test_signal, threshold):
246 """Checks if test_signal is similar to any block of golden_signal.
247
248 Compares test signal with each block of golden signal by correlation
249 index. If there is any block of golden signal that is similar to
250 test signal, then it is matched.
251
252 @param golden_signal: A 1-D array for golden signal.
253 @param test_signal: A 1-D array for test signal.
254 @param threshold: The threshold of correlation index to be judge as matched.
255
256 @returns: True if there is a match. False otherwise.
257
258 @raises: ValueError: if test signal is longer than golden signal.
259
260 """
261 if len(golden_signal) < len(test_signal):
262 raise ValueError('Test signal is longer than golden signal')
263
264 block_length = len(test_signal)
265 number_of_movings = len(golden_signal) - block_length + 1
266 correlation_indices = []
267 for moving_index in xrange(number_of_movings):
268 # Cuts one block of golden signal from start index.
269 # The block length is the same as test signal.
270 start = moving_index
271 end = start + block_length
272 golden_signal_block = golden_signal[start:end]
273 try:
274 correlation_index = _get_correlation_index(
275 golden_signal_block, test_signal)
276 except TestSignalNormTooSmallError:
277 logging.info('Caught one block of test signal that has no meaningful norm')
278 return False
279 correlation_indices.append(correlation_index)
280
281 # Checks if the maximum correlation index is high enough.
282 max_corr = max(correlation_indices)
283 if max_corr < threshold:
284 logging.debug('Got one unmatched block with max_corr: %s', max_corr)
285 return False
286 return True
287
288
289class GoldenSignalNormTooSmallError(Exception):
290 """Exception when golden signal norm is too small."""
291 pass
292
293
294class TestSignalNormTooSmallError(Exception):
295 """Exception when test signal norm is too small."""
296 pass
297
298
299_MINIMUM_SIGNAL_NORM = 0.001
300
301def _get_correlation_index(golden_signal, test_signal):
302 """Computes correlation index of two signal of same length.
303
304 @param golden_signal: An 1-D array-like object.
305 @param test_signal: An 1-D array-like object.
306
307 @raises: ValueError: if two signal have different lengths.
308 @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small
309 @raises: TestSignalNormTooSmallError: if test signal norm is too small.
310
311 @returns: The correlation index.
312 """
313 if len(golden_signal) != len(test_signal):
314 raise ValueError(
315 'Only accepts signal of same length: %s, %s' % (
316 len(golden_signal), len(test_signal)))
317
318 norm_golden = numpy.linalg.norm(golden_signal)
319 norm_test = numpy.linalg.norm(test_signal)
320 if norm_golden <= _MINIMUM_SIGNAL_NORM:
321 raise GoldenSignalNormTooSmallError(
322 'No meaningful data as norm is too small.')
323 if norm_test <= _MINIMUM_SIGNAL_NORM:
324 raise TestSignalNormTooSmallError(
325 'No meaningful data as norm is too small.')
326
327 # The 'valid' cross correlation result of two signals of same length will
328 # contain only one number.
329 correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
330 return correlation / (norm_golden * norm_test)