blob: 08116223a3f871dfd0f699ccb2c83dc47b5a940d [file] [log] [blame]
Cheng-Yi Chiangf9427ff2015-11-02 21:01:45 +08001# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides utilities to do audio data analysis."""
6
7import logging
8import numpy
9import operator
10
11# Only peaks with coefficient greater than 0.01 of the first peak should be
12# considered. Note that this correspond to -40dB in the spectrum.
13DEFAULT_MIN_PEAK_RATIO = 0.01
14
15PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection.
16
17# The minimum RMS value of meaningful audio data.
18MEANINGFUL_RMS_THRESHOLD = 0.001
19
20class RMSTooSmallError(Exception):
21 """Error when signal RMS is too small."""
22 pass
23
24
Cheng-Yi Chiang3f282552015-12-08 09:06:35 +080025class EmptyDataError(Exception):
26 """Error when signal is empty."""
27
28
Cheng-Yi Chiangf9427ff2015-11-02 21:01:45 +080029def normalize_signal(signal, saturate_value):
30 """Normalizes the signal with respect to the saturate value.
31
32 @param signal: A list for one-channel PCM data.
33 @param saturate_value: The maximum value that the PCM data might be.
34
35 @returns: A numpy array containing normalized signal. The normalized signal
36 has value -1 and 1 when it saturates.
37
38 """
39 signal = numpy.array(signal)
40 return signal / float(saturate_value)
41
42
43def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
44 peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
45 """Gets the dominant frequencies by spectral analysis.
46
47 @param signal: A list of numbers for one-channel PCM data. This should be
48 normalized to [-1, 1] so the function can check if signal RMS
49 is too small to be meaningful.
50 @param rate: Sampling rate.
51 @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the
52 peaks other than the greatest one should be
53 considered.
54 This is to ignore peaks that are too small compared
55 to the first peak peak_0.
56 @param peak_window_size_hz: The window size in Hz to find the peaks.
57 The minimum differences between found peaks will
58 be half of this value.
59
60 @returns: A list of tuples:
61 [(peak_frequency_0, peak_coefficient_0),
62 (peak_frequency_1, peak_coefficient_1),
63 (peak_frequency_2, peak_coefficient_2), ...]
64 where the tuples are sorted by coefficients.
65 The last peak_coefficient will be no less than
66 peak_coefficient * min_peak_ratio.
67
68 """
69 # Checks the signal is meaningful.
Cheng-Yi Chiang3f282552015-12-08 09:06:35 +080070 if len(signal) == 0:
71 raise EmptyDataError('Signal data is empty')
72
Cheng-Yi Chiangf9427ff2015-11-02 21:01:45 +080073 signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
74 logging.debug('signal RMS = %s', signal_rms)
75 if signal_rms < MEANINGFUL_RMS_THRESHOLD:
76 raise RMSTooSmallError(
77 'RMS %s is too small to be meaningful' % signal_rms)
78
79 # First, pass signal through a window function to mitigate spectral leakage.
80 y_conv_w = signal * numpy.hanning(len(signal))
81
82 length = len(y_conv_w)
83
84 # x_f is the frequency in Hz, y_f is the transformed coefficient.
85 x_f = _rfft_freq(length, rate)
86 y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
87
88 # y_f is complex so consider its absolute value for magnitude.
89 abs_y_f = numpy.abs(y_f)
90 threshold = max(abs_y_f) * min_peak_ratio
91
92 # Suppresses all coefficients that are below threshold.
93 for i in xrange(len(abs_y_f)):
94 if abs_y_f[i] < threshold:
95 abs_y_f[i] = 0
96
97 # Gets the peak detection window size in indice.
98 # x_f[1] is the frequency difference per index.
99 peak_window_size = int(peak_window_size_hz / x_f[1])
100
101 # Detects peaks.
102 peaks = _peak_detection(abs_y_f, peak_window_size)
103
104 # Transform back the peak location from index to frequency.
105 results = []
106 for index, value in peaks:
107 results.append((x_f[index], value))
108 return results
109
110
111def _rfft_freq(length, rate):
112 """Gets the frequency at each index of real FFT.
113
114 @param length: The window length of FFT.
115 @param rate: Sampling rate.
116
117 @returns: A numpy array containing frequency corresponding to
118 numpy.fft.rfft result at each index.
119
120 """
121 # The difference in Hz between each index.
122 val = rate / float(length)
123 # Only care half of frequencies for FFT on real signal.
124 result_length = length // 2 + 1
125 return numpy.linspace(0, (result_length - 1) * val, result_length)
126
127
128def _peak_detection(array, window_size):
129 """Detects peaks in an array.
130
131 A point (i, array[i]) is a peak if array[i] is the maximum among
132 array[i - half_window_size] to array[i + half_window_size].
133 If array[i - half_window_size] to array[i + half_window_size] are all equal,
134 then there is no peak in this window.
135
136 @param window_size: The window to detect peaks.
137
138 @returns: A list of tuples:
139 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
140 where the tuples are sorted by peak values.
141
142 """
143 half_window_size = window_size / 2
144 length = len(array)
145
146 def find_max(numbers):
147 """Gets the index where maximum value happens.
148
149 @param numbers: A list of numbers.
150
151 @returns: (index, value) where value = numbers[index] is the maximum
152 among numbers.
153
154 """
155 index, value = max(enumerate(numbers), key=lambda x: x[1])
156 return index, value
157
158 results = []
159 for mid in xrange(length):
160 left = max(0, mid - half_window_size)
161 right = min(length - 1, mid + half_window_size)
162 numbers_in_window = array[left:right + 1]
163 max_index, max_value = find_max(numbers_in_window)
164
165 # Add the offset back.
166 max_index = max_index + left
167
168 # If all values are the same then there is no peak in this window.
169 if max_value != min(numbers_in_window) and max_index == mid:
170 results.append((mid, max_value))
171
172 # Sort the peaks by values.
173 return sorted(results, key=lambda x: x[1], reverse=True)
Cheng-Yi Chiang4085acc2015-11-16 15:31:35 -0800174
175
176# The default pattern mathing threshold. By experiment, this threshold
177# can tolerate normal noise of 0.3 amplitude when sine wave signal
178# amplitude is 1.
179PATTERN_MATCHING_THRESHOLD = 0.85
180
181def anomaly_detection(signal, rate, freq, block_size,
182 threshold=PATTERN_MATCHING_THRESHOLD):
183 """Detects anomaly in a sine wave signal.
184
185 This method detects anomaly in a sine wave signal by matching
186 patterns of each block.
187 For each moving window of block in the test signal, checks if there
188 is any block in golden signal that is similar to this block of test signal.
189 If there is such a block in golden signal, then this block of test
190 signal is matched and there is no anomaly in this block of test signal.
191 If there is any block in test signal that is not matched, then this block
192 covers an anomaly.
193 The block of test signal starts from index 0, and proceeds in steps of
194 half block size. The overlapping of test signal blocks makes sure there must
195 be at least one block covering the transition from sine wave to anomaly.
196
197 @param signal: A 1-D array-like object for 1-channel PCM data.
198 @param rate: The sampling rate.
199 @param freq: The expected frequency of signal.
200 @param block_size: The block size in samples to detect anomaly.
201 @param threshold: The threshold of correlation index to be judge as matched.
202
203 @returns: A list containing detected anomaly time in seconds.
204
205 """
Cheng-Yi Chiang3f282552015-12-08 09:06:35 +0800206 if len(signal) == 0:
207 raise EmptyDataError('Signal data is empty')
208
Cheng-Yi Chiang4085acc2015-11-16 15:31:35 -0800209 golden_y = _generate_golden_pattern(rate, freq, block_size)
210
211 results = []
212
213 for start in xrange(0, len(signal), block_size / 2):
214 end = start + block_size
215 test_signal = signal[start:end]
216 matched = _moving_pattern_matching(golden_y, test_signal, threshold)
217 if not matched:
218 results.append(start)
219
220 results = [float(x) / rate for x in results]
221
222 return results
223
224
225def _generate_golden_pattern(rate, freq, block_size):
226 """Generates a golden pattern of certain frequency.
227
228 The golden pattern must cover all the possibilities of waveforms in a
229 block. So, we need a golden pattern covering 1 period + 1 block size,
230 such that the test block can start anywhere in a period, and extends
231 a block size.
232
233 |period |1 bk|
234 | | |
235 . . . .
236 . . . .
237 . . .
238
239 @param rate: The sampling rate.
240 @param freq: The frequency of golden pattern.
241 @param block_size: The block size in samples to detect anomaly.
242
243 @returns: A 1-D array for golden pattern.
244
245 """
246 samples_in_a_period = int(rate / freq) + 1
247 samples_in_golden_pattern = samples_in_a_period + block_size
248 golden_x = numpy.linspace(
249 0.0, (samples_in_golden_pattern - 1) * 1.0 / rate,
250 samples_in_golden_pattern)
251 golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
252 return golden_y
253
254
255def _moving_pattern_matching(golden_signal, test_signal, threshold):
256 """Checks if test_signal is similar to any block of golden_signal.
257
258 Compares test signal with each block of golden signal by correlation
259 index. If there is any block of golden signal that is similar to
260 test signal, then it is matched.
261
262 @param golden_signal: A 1-D array for golden signal.
263 @param test_signal: A 1-D array for test signal.
264 @param threshold: The threshold of correlation index to be judge as matched.
265
266 @returns: True if there is a match. False otherwise.
267
268 @raises: ValueError: if test signal is longer than golden signal.
269
270 """
271 if len(golden_signal) < len(test_signal):
272 raise ValueError('Test signal is longer than golden signal')
273
274 block_length = len(test_signal)
275 number_of_movings = len(golden_signal) - block_length + 1
276 correlation_indices = []
277 for moving_index in xrange(number_of_movings):
278 # Cuts one block of golden signal from start index.
279 # The block length is the same as test signal.
280 start = moving_index
281 end = start + block_length
282 golden_signal_block = golden_signal[start:end]
283 try:
284 correlation_index = _get_correlation_index(
285 golden_signal_block, test_signal)
286 except TestSignalNormTooSmallError:
287 logging.info('Caught one block of test signal that has no meaningful norm')
288 return False
289 correlation_indices.append(correlation_index)
290
291 # Checks if the maximum correlation index is high enough.
292 max_corr = max(correlation_indices)
293 if max_corr < threshold:
294 logging.debug('Got one unmatched block with max_corr: %s', max_corr)
295 return False
296 return True
297
298
299class GoldenSignalNormTooSmallError(Exception):
300 """Exception when golden signal norm is too small."""
301 pass
302
303
304class TestSignalNormTooSmallError(Exception):
305 """Exception when test signal norm is too small."""
306 pass
307
308
309_MINIMUM_SIGNAL_NORM = 0.001
310
311def _get_correlation_index(golden_signal, test_signal):
312 """Computes correlation index of two signal of same length.
313
314 @param golden_signal: An 1-D array-like object.
315 @param test_signal: An 1-D array-like object.
316
317 @raises: ValueError: if two signal have different lengths.
318 @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small
319 @raises: TestSignalNormTooSmallError: if test signal norm is too small.
320
321 @returns: The correlation index.
322 """
323 if len(golden_signal) != len(test_signal):
324 raise ValueError(
325 'Only accepts signal of same length: %s, %s' % (
326 len(golden_signal), len(test_signal)))
327
328 norm_golden = numpy.linalg.norm(golden_signal)
329 norm_test = numpy.linalg.norm(test_signal)
330 if norm_golden <= _MINIMUM_SIGNAL_NORM:
331 raise GoldenSignalNormTooSmallError(
332 'No meaningful data as norm is too small.')
333 if norm_test <= _MINIMUM_SIGNAL_NORM:
334 raise TestSignalNormTooSmallError(
335 'No meaningful data as norm is too small.')
336
337 # The 'valid' cross correlation result of two signals of same length will
338 # contain only one number.
339 correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
340 return correlation / (norm_golden * norm_test)