Add function to trim wave file
Add function to trim wave file by defining the start and end point.
Bug: None
Test: Done
Change-Id: I0334a4caf114f7a0541f08569b5f96f35aa2821d
Signed-off-by: Qi <qijiang@google.com>
diff --git a/acts/framework/acts/test_utils/coex/audio_test_utils.py b/acts/framework/acts/test_utils/coex/audio_test_utils.py
index 02b99ca..f4dc403 100644
--- a/acts/framework/acts/test_utils/coex/audio_test_utils.py
+++ b/acts/framework/acts/test_utils/coex/audio_test_utils.py
@@ -15,7 +15,9 @@
# the License.
import logging
+import numpy
import os
+import scipy.io.wavfile as sciwav
from acts.test_utils.coex.audio_capture_device import AudioCaptureBase
from acts.test_utils.coex.audio_capture_device import CaptureAudioOverAdb
@@ -65,8 +67,7 @@
class AudioCaptureResult(AudioCaptureBase):
-
- def __init__(self, path):
+ def __init__(self, path, audio_params=None):
"""Initializes Audio Capture Result class.
Args:
@@ -74,7 +75,11 @@
"""
super().__init__()
self.path = path
- self.analysis_path = os.path.join(self.log_path, ANALYSIS_FILE_TEMPLATE)
+ self.audio_params = audio_params
+ self.analysis_path = os.path.join(self.log_path,
+ ANALYSIS_FILE_TEMPLATE)
+ if self.audio_params:
+ self._trim_wave_file()
def THDN(self, win_size=None, step_size=None, q=1, freq=None):
"""Calculate THD+N value for most recently recorded file.
@@ -104,7 +109,8 @@
q=q,
freq=freq)
- def detect_anomalies(self, freq=None,
+ def detect_anomalies(self,
+ freq=None,
block_size=ANOMALY_DETECTION_BLOCK_SIZE,
threshold=PATTERN_MATCHING_THRESHOLD,
tolerance=ANOMALY_GROUPING_TOLERANCE):
@@ -127,12 +133,11 @@
channel_results (list): anomaly durations for each channel's signal.
List index corresponds to channel index.
"""
- return audio_analysis.get_file_anomaly_durations(
- filename=self.path,
- freq=freq,
- block_size=block_size,
- threshold=threshold,
- tolerance=tolerance)
+ return audio_analysis.get_file_anomaly_durations(filename=self.path,
+ freq=freq,
+ block_size=block_size,
+ threshold=threshold,
+ tolerance=tolerance)
@property
def analysis_fileno(self):
@@ -142,12 +147,9 @@
counter += 1
return counter
- def audio_quality_analysis(self, audio_params):
+ def audio_quality_analysis(self):
"""Measures audio quality based on the audio file given as input.
- Args:
- path: Log path
-
Returns:
analysis_path on success.
"""
@@ -155,14 +157,35 @@
if not os.path.exists(self.path):
raise FileNotFound("Recorded file not found")
try:
- quality_analysis(
- filename=self.path,
- output_file=analysis_path,
- bit_width=bits_per_sample,
- rate=audio_params["sample_rate"],
- channel=audio_params["channel"],
- spectral_only=False)
+ quality_analysis(filename=self.path,
+ output_file=analysis_path,
+ bit_width=bits_per_sample,
+ rate=self.audio_params["sample_rate"],
+ channel=self.audio_params["channel"],
+ spectral_only=False)
except Exception as err:
logging.exception("Failed to analyze raw audio: %s" % err)
return analysis_path
+ def _trim_wave_file(self):
+ """Trim wave files.
+
+ """
+ original_record_file_name = 'original_' + os.path.basename(self.path)
+ original_record_file_path = os.path.join(os.path.dirname(self.path),
+ original_record_file_name)
+ os.rename(self.path, original_record_file_path)
+ fs, data = sciwav.read(original_record_file_path)
+ trim_start = self.audio_params['trim_start']
+ trim_end = self.audio_params['trim_end']
+ trim = numpy.array([[trim_start, trim_end]])
+ trim = trim * fs
+ new_wave_file_list = []
+ for elem in trim:
+ # To check start and end doesn't exceed raw data dimension
+ start_read = min(elem[0], data.shape[0] - 1)
+ end_read = min(elem[1], data.shape[0] - 1)
+ new_wave_file_list.extend(data[start_read:end_read])
+ new_wave_file = numpy.array(new_wave_file_list)
+
+ sciwav.write(self.path, fs, new_wave_file)