Refactor some utility code to audio_helper.
BUG=None
TEST=Run audiovideo_LineOutToMicInLoopback, desktopui_AudioFeedback,
desktopui_MediaAudioFeedback test work good.
Change-Id: I65b9d52f43869333f1041a91c5200ff2d5fa225d
Reviewed-on: https://gerrit.chromium.org/gerrit/19809
Commit-Ready: Hsinyu Chao <hychao@chromium.org>
Reviewed-by: Hsinyu Chao <hychao@chromium.org>
Tested-by: Hsinyu Chao <hychao@chromium.org>
diff --git a/client/cros/audio/audio_helper.py b/client/cros/audio/audio_helper.py
index 79a8a02..b5b5f54 100644
--- a/client/cros/audio/audio_helper.py
+++ b/client/cros/audio/audio_helper.py
@@ -5,18 +5,29 @@
import logging
import os
+import re
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
LD_LIBRARY_PATH = 'LD_LIBRARY_PATH'
+_DEFAULT_NUM_CHANNELS = 2
+_DEFAULT_INPUT_DEVICE = 'default'
+_DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
+
+_SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)')
+_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
+
class AudioHelper(object):
'''
A helper class contains audio related utility functions.
'''
- def __init__(self, test):
+ def __init__(self, test, sox_format=_DEFAULT_SOX_FORMAT,
+ input_device=_DEFAULT_INPUT_DEVICE):
self._test = test
+ self._sox_format = sox_format
+ self._input_device = input_device
def setup_deps(self, deps):
'''
@@ -73,3 +84,60 @@
# A card is allowed not to support all the controls, so don't
# fail the test here if we get an error.
logging.info('amixer command failed: %s' % cmd)
+
+ def get_audio_rms(self, infile, channel):
+ sox_mixer_cmd = self.get_sox_mixer_cmd(infile, channel)
+ stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (self.sox_path,
+ self._sox_format)
+ sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
+ sox_output = utils.system_output(sox_cmd, retain_output=True)
+ for rms_line in sox_output.split('\n'):
+ m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
+ if m is not None:
+ return float(m.group(1))
+
+ def get_sox_mixer_cmd(self, infile, channel):
+ # Build up a pan value string for the sox command.
+ if channel == 0:
+ pan_values = '1'
+ else:
+ pan_values = '0'
+ for pan_index in range(1, self._num_channels):
+ if channel == pan_index:
+ pan_values = '%s%s' % (pan_values, ',1')
+ else:
+ pan_values = '%s%s' % (pan_values, ',0')
+
+ return '%s -c 2 %s %s -c 1 %s - mixer %s' % (self.sox_path,
+ self._sox_format, infile, self._sox_format, pan_values)
+
+ def noise_reduce_file(self, in_file, noise_file, out_file):
+ """Runs the sox command to noise-reduce in_file using
+ the noise profile from noise_file.
+
+ Args:
+ in_file: The file to noise reduce.
+ noise_file: The file containing the noise profile.
+ This can be created by recording silence.
+ out_file: The file contains the noise reduced sound.
+
+ Returns:
+ The name of the file containing the noise-reduced data.
+ """
+ prof_cmd = '%s -c 2 %s %s -n noiseprof' % (self.sox_path,
+ _SOX_FORMAT, noise_file)
+ reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
+ (self.sox_path, _SOX_FORMAT, in_file, _SOX_FORMAT, out_file))
+ utils.system('%s | %s' % (prof_cmd, reduce_cmd))
+
+ def record_sample(self, duration, tmpfile):
+ """Records a sample from the default input device.
+
+ Args:
+ duration: How long to record in seconds.
+ tmpfile: The file to record to.
+ """
+ cmd_rec = 'arecord -D %s -d %f -f dat %s' % (self._input_device,
+ duration, tmpfile)
+ logging.info('Command %s recording now (%fs)' % (cmd_rec, duration))
+ utils.system(cmd_rec)
diff --git a/client/site_tests/audiovideo_LineOutToMicInLoopback/audiovideo_LineOutToMicInLoopback.py b/client/site_tests/audiovideo_LineOutToMicInLoopback/audiovideo_LineOutToMicInLoopback.py
index 56d12dd..b981b9d 100755
--- a/client/site_tests/audiovideo_LineOutToMicInLoopback/audiovideo_LineOutToMicInLoopback.py
+++ b/client/site_tests/audiovideo_LineOutToMicInLoopback/audiovideo_LineOutToMicInLoopback.py
@@ -2,10 +2,11 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-import logging, os, re, threading, utils
+import logging, threading, utils, tempfile
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
+from autotest_lib.client.cros.audio import audio_helper
# Names of mixer controls
_CONTROL_MASTER = "'Master Playback Volume'"
@@ -41,34 +42,24 @@
# know how much or our recording will be silence waiting for the tone to start.
_DEFAULT_SOX_RMS_THRESHOLD = 0.5
-# Regexp parsing sox output.
-_SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)')
-# Format used in sox commands.
-_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
-
_DEFAULT_INPUT = 'default'
_DEFAULT_OUTPUT = 'default'
class RecordSampleThread(threading.Thread):
"""Wraps the running of arecord in a thread."""
- def __init__(self, audio, duration, recordfile):
+ def __init__(self, ah, duration, recordfile):
threading.Thread.__init__(self)
- self.audio = audio
+ self.ah = ah
self.duration = duration
self.recordfile = recordfile
def run(self):
- self.audio.record_sample(self.duration, self.recordfile)
+ self.ah.record_sample(self.duration, self.recordfile)
class audiovideo_LineOutToMicInLoopback(test.test):
version = 1
- def setup(self):
- self.job.setup_dep(['test_tones'])
- self.job.setup_dep(['sox'])
-
-
def initialize(self,
card=_DEFAULT_CARD,
frequency=_DEFAULT_FREQUENCY,
@@ -95,54 +86,33 @@
"""
self._card = card
self._frequency = frequency
- self._input = input
self._mixer_settings = mixer_settings
self._num_channels = num_channels
self._output = output
self._record_duration = record_duration
self._sox_min_rms = sox_min_rms
- dep = 'test_tones'
- dep_dir = os.path.join(self.autodir, 'deps', dep)
- self.job.install_pkg(dep, 'dep', dep_dir)
- self._test_tones_path = os.path.join(dep_dir, 'src', dep)
- if not (os.path.exists(self._test_tones_path) and
- os.access(self._test_tones_path, os.X_OK)):
- raise error.TestError(
- '%s is not an executable' % self._test_tones_path)
- dep = 'sox'
- dep_dir = os.path.join(self.autodir, 'deps', dep)
- self.job.install_pkg(dep, 'dep', dep_dir)
- self._sox_path = os.path.join(dep_dir, 'bin', dep)
- self._sox_lib_path = os.path.join(dep_dir, 'lib')
- if not (os.path.exists(self._sox_path) and
- os.access(self._sox_path, os.X_OK)):
- raise error.TestError(
- '%s is not an executable' % self._sox_path)
+ self._ah = audio_helper.AudioHelper(self, input_device=input)
+ self._ah.setup_deps(['sox', 'test_tones'])
super(audiovideo_LineOutToMicInLoopback, self).initialize()
-
def run_once(self):
self.do_loopback_test()
-
def do_loopback_test(self):
"""Runs the loopback test.
"""
- self.set_mixer_controls()
- # Record a sample of "silence" to use as a noise profile.
- noise_file = os.path.join(self.tmpdir, os.tmpnam())
- logging.info('Noise file: %s' % noise_file)
- self.record_sample(1, noise_file)
+ self._ah.set_mixer_controls(self._mixer_settings, self._card)
- try:
+ # Record a sample of "silence" to use as a noise profile.
+ with tempfile.NamedTemporaryFile(mode='w+t') as noise_file:
+ logging.info('Noise file: %s' % noise_file.name)
+ self._ah.record_sample(1, noise_file.name)
+
# Test each channel separately. Assume two channels.
for channel in xrange(0, self._num_channels):
- self.loopback_test_one_channel(channel, noise_file)
- finally:
- if os.path.isfile(noise_file):
- os.unlink(noise_file)
+ self.loopback_test_one_channel(channel, noise_file.name)
def loopback_test_one_channel(self, channel, noise_file):
@@ -159,53 +129,20 @@
config['frequency'] = self._frequency
config['alsa_device'] = self._output
- tmpfile = os.path.join(self.tmpdir, os.tmpnam())
- record_thread = RecordSampleThread(self, self._record_duration, tmpfile)
- record_thread.start()
- self.run_test_tones(config)
- record_thread.join()
+ # Temp file for the final noise-reduced file.
+ with tempfile.NamedTemporaryFile(mode='w+t') as reduced_file:
+ # Temp file that records before noise reduction.
+ with tempfile.NamedTemporaryFile(mode='w+t') as tmpfile:
+ record_thread = RecordSampleThread(self._ah,
+ self._record_duration, tmpfile.name)
+ record_thread.start()
+ self.run_test_tones(config)
+ record_thread.join()
- if noise_file is not None:
- test_file = self.noise_reduce_file(tmpfile, noise_file)
- os.unlink(tmpfile)
- else:
- test_file = tmpfile
+ self._ah.noise_reduce_file(tmpfile.name, noise_file,
+ reduced_file.name)
- try:
- self.check_recorded_audio(test_file, channel)
- finally:
- if os.path.isfile(test_file):
- os.unlink(test_file)
-
-
- def record_sample(self, duration, tmpfile):
- """Records a sample from the default input device.
-
- Args:
- duration: How long to record in seconds.
- tmpfile: The file to record to.
- """
- cmd_rec = 'arecord -D %s -d %f -f dat %s' % (self._input,
- duration, tmpfile)
- logging.info('Command %s recording now (%fs)' % (cmd_rec, duration))
- utils.system(cmd_rec)
-
-
- def set_mixer_controls(self):
- """Sets all mixer controls listed in the mixer settings on card.
- """
- logging.info('Setting mixer control values on %s' % self._card)
- for item in self._mixer_settings:
- logging.info('Setting %s to %s on card %s' %
- (item['name'], item['value'], self._card))
- cmd = 'amixer -c %s cset name=%s %s'
- cmd = cmd % (self._card, item['name'], item['value'])
- try:
- utils.system(cmd)
- except error.CmdError:
- # A card is allowed not to support all the controls, so don't
- # fail the test here if we get an error.
- logging.info('amixer command failed: %s' % cmd)
+ self.check_recorded_audio(reduced_file.name, channel)
def run_test_tones(self, args):
"""Runs the tone generator executable.
@@ -224,7 +161,7 @@
active_channel: integer to select channel for playback.
None means playback on all channels.
"""
- args['exec'] = self._test_tones_path
+ args['exec'] = self._ah.test_tones_path
if not 'tone_end_volume' in args:
args['tone_end_volume'] = args['tone_volume']
@@ -256,54 +193,8 @@
error.TestFail if the RMS amplitude of the recording isn't above
the threshold.
"""
- # Build up a pan value string for the sox command.
- if channel == 0:
- pan_values = '1'
- else:
- pan_values = '0'
- for pan_index in range(1, self._num_channels):
- if channel == pan_index:
- pan_values = '%s%s' % (pan_values, ',1')
- else:
- pan_values = '%s%s' % (pan_values, ',0')
- # Set up the sox commands.
- os.environ["LD_LIBRARY_PATH"] = self._sox_lib_path
- sox_mixer_cmd = '%s -c 2 %s %s -c 1 %s - mixer %s'
- sox_mixer_cmd = sox_mixer_cmd % (self._sox_path, _SOX_FORMAT, infile,
- _SOX_FORMAT, pan_values)
- stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (self._sox_path, _SOX_FORMAT)
- sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
- logging.info('running %s' % sox_cmd)
- sox_output = utils.system_output(sox_cmd, retain_output=True)
- # Find the RMS value line and check that it is above threshold.
- for rms_line in sox_output.split('\n'):
- m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
- if m is not None:
- rms_val = float(m.group(1))
- logging.info('Got RMS value of %f' % rms_val)
- if rms_val < self._sox_min_rms:
- raise error.TestError( 'RMS value %f too low.' % rms_val)
+ rms_val = self._ah.get_audio_rms(infile, channel)
+ logging.info('Got RMS value of %f' % rms_val)
+ if rms_val < self._sox_min_rms:
+ raise error.TestError( 'RMS value %f too low.' % rms_val)
-
- def noise_reduce_file(self, test_file, noise_file):
- """ Runs the sox command to noise-reduce test_file using
- the noise profile from noise_file.
-
- Args:
- test_file: The file to noise reduce.
- noise_file: The file containing the noise profile.
- This can be created by recording silence.
-
- Returns:
- The name of the file containing the noise-reduced data.
- """
- out_file = os.path.join(self.tmpdir, os.tmpnam())
- os.environ["LD_LIBRARY_PATH"] = self._sox_lib_path
- prof_cmd = '%s -c 2 %s %s -n noiseprof' % (self._sox_path,
- _SOX_FORMAT,
- noise_file)
- reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
- (self._sox_path, _SOX_FORMAT, test_file, _SOX_FORMAT,
- out_file))
- utils.system('%s | %s' % (prof_cmd, reduce_cmd))
- return out_file
diff --git a/client/site_tests/desktopui_AudioFeedback/desktopui_AudioFeedback.py b/client/site_tests/desktopui_AudioFeedback/desktopui_AudioFeedback.py
index ec8b5bd..db03ee6 100644
--- a/client/site_tests/desktopui_AudioFeedback/desktopui_AudioFeedback.py
+++ b/client/site_tests/desktopui_AudioFeedback/desktopui_AudioFeedback.py
@@ -2,11 +2,12 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-import logging, os, re, threading, utils
+import logging, threading, tempfile
-from autotest_lib.client.bin import test, utils
+from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import cros_ui_test, httpd
+from autotest_lib.client.cros.audio import audio_helper
# Names of mixer controls.
_CONTROL_MASTER = "'Master Playback Volume'"
@@ -21,7 +22,6 @@
# Default test configuration.
_DEFAULT_CARD = '0'
-_DEFAULT_FREQUENCY = 1000
_DEFAULT_MIXER_SETTINGS = [{'name': _CONTROL_MASTER, 'value': "100%"},
{'name': _CONTROL_HEADPHONE, 'value': "100%"},
{'name': _CONTROL_MIC_BOOST, 'value': "50%"},
@@ -38,11 +38,6 @@
# Minimum RMS value to consider a "pass".
_DEFAULT_SOX_RMS_THRESHOLD = 0.30
-# Regexp parsing sox output.
-_SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)')
-# Format used in sox commands.
-_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
-
class RecordSampleThread(threading.Thread):
"""Wraps the execution of arecord in a thread."""
@@ -59,13 +54,8 @@
class desktopui_AudioFeedback(cros_ui_test.UITest):
version = 1
- def setup(self):
- self.job.setup_dep(['test_tones'])
- self.job.setup_dep(['sox'])
-
def initialize(self,
card=_DEFAULT_CARD,
- frequency=_DEFAULT_FREQUENCY,
mixer_settings=_DEFAULT_MIXER_SETTINGS,
num_channels=_DEFAULT_NUM_CHANNELS,
record_duration=_DEFAULT_RECORD_DURATION,
@@ -74,7 +64,6 @@
Args:
card: The index of the sound card to use.
- frequency: The frequency of the test tone that is looped back.
mixer_settings: Alsa control settings to apply to the mixer before
starting the test.
num_channels: The number of channels on the device to test.
@@ -85,21 +74,13 @@
error.TestError if the deps can't be run.
"""
self._card = card
- self._frequency = frequency
self._mixer_settings = mixer_settings
self._num_channels = num_channels
self._record_duration = record_duration
self._sox_min_rms = sox_min_rms
- dep = 'sox'
- dep_dir = os.path.join(self.autodir, 'deps', dep)
- self.job.install_pkg(dep, 'dep', dep_dir)
- self._sox_path = os.path.join(dep_dir, 'bin', dep)
- self._sox_lib_path = os.path.join(dep_dir, 'lib')
- if not (os.path.exists(self._sox_path) and
- os.access(self._sox_path, os.X_OK)):
- raise error.TestError(
- '%s is not an executable' % self._sox_path)
+ self._ah = audio_helper.AudioHelper(self)
+ self._ah.setup_deps(['sox'])
super(desktopui_AudioFeedback, self).initialize()
self._test_url = 'http://localhost:8000/youtube.html'
@@ -107,16 +88,23 @@
self._testServer.run()
def run_once(self):
- self.set_mixer_controls()
- noise_file = os.path.join(self.tmpdir, os.tmpnam())
- logging.info('Noise file: %s' % noise_file)
- self.record_sample(_DEFAULT_RECORD_DURATION, noise_file)
- try:
+ # Speaker control settings may differ from device to device.
+ if self.pyauto.ChromeOSBoard() in _CONTROL_SPEAKER_DEVICE:
+ self._mixer_settings.append({'name': _CONTROL_SPEAKER,
+ 'value': "0%"})
+ elif self.pyauto.ChromeOSBoard() in _CONTROL_SPEAKER_DEVICE_HP:
+ self._mixer_settings.append({'name': _CONTROL_SPEAKER_HP,
+ 'value': "0%"})
+ self._ah.set_mixer_controls(self._mixer_settings, self._card)
+
+ # Record a sample of "silence" to use as a noise profile.
+ with tempfile.NamedTemporaryFile(mode='w+t') as noise_file:
+ logging.info('Noise file: %s' % noise_file.name)
+ self._ah.record_sample(1, noise_file.name)
+
+ # Test each channel separately. Assume two channels.
for channel in xrange(0, self._num_channels):
- self.loopback_test_one_channel(channel, noise_file)
- finally:
- if os.path.isfile(noise_file):
- os.unlink(noise_file)
+ self.loopback_test_one_channel(channel, noise_file.name)
def play_video(self):
"""Plays a Youtube video to record audio samples.
@@ -146,57 +134,17 @@
noise_file: Noise profile to use for filtering, None to skip noise
filtering.
"""
- tmpfile = os.path.join(self.tmpdir, os.tmpnam())
- record_thread = RecordSampleThread(self, self._record_duration, tmpfile)
- self.play_video()
- record_thread.start()
- record_thread.join()
+ with tempfile.NamedTemporaryFile(mode='w+t') as reduced_file:
+ with tempfile.NamedTemporaryFile(mode='w+t') as tmpfile:
+ record_thread = RecordSampleThread(self._ah,
+ self._record_duration, tmpfile.name)
+ self.play_video()
+ record_thread.start()
+ record_thread.join()
- if noise_file is not None:
- test_file = self.noise_reduce_file(tmpfile, noise_file)
- os.unlink(tmpfile)
- else:
- test_file = tmpfile
- try:
- self.check_recorded_audio(test_file, channel)
- finally:
- if os.path.isfile(test_file):
- os.unlink(test_file)
-
- def record_sample(self, duration, tmpfile):
- """Records a sample from the default input device.
-
- Args:
- duration: How long to record in seconds.
- tmpfile: The file to record to.
- """
- cmd_rec = 'arecord -d %f -f dat %s' % (duration, tmpfile)
- logging.info('Recording audio now for %f seconds.' % duration)
- utils.system(cmd_rec)
-
- def set_mixer_controls(self):
- """Sets all mixer controls listed in the mixer settings on card."""
- logging.info('Setting mixer control values on %s' % self._card)
-
- # Speaker control settings may differ from device to device.
- if self.pyauto.ChromeOSBoard() in _CONTROL_SPEAKER_DEVICE:
- self._mixer_settings.append({'name': _CONTROL_SPEAKER,
- 'value': "0%"})
- elif self.pyauto.ChromeOSBoard() in _CONTROL_SPEAKER_DEVICE_HP:
- self._mixer_settings.append({'name': _CONTROL_SPEAKER_HP,
- 'value': "0%"})
-
- for item in self._mixer_settings:
- logging.info('Setting %s to %s on card %s' %
- (item['name'], item['value'], self._card))
- cmd = 'amixer -c %s cset name=%s %s'
- cmd = cmd % (self._card, item['name'], item['value'])
- try:
- utils.system(cmd)
- except error.CmdError:
- # A card is allowed to not support all the controls, so don't
- # fail the test here if we get an error.
- logging.info('amixer command failed: %s' % cmd)
+ self._ah.noise_reduce_file(tmpfile.name, noise_file,
+ reduced_file.name)
+ self.check_recorded_audio(reduced_file.name, channel)
def check_recorded_audio(self, infile, channel):
"""Runs the sox command to check if we captured audio.
@@ -215,61 +163,15 @@
error.TestFail if the RMS amplitude of the recording isn't above
the threshold.
"""
- # Build up a pan value string for the sox command.
- pan_values = '1' if channel == 0 else '0'
- for pan_index in range(1, self._num_channels):
- if channel == pan_index:
- pan_values += ',1'
- else:
- pan_values += ',0'
- # Set up the sox commands.
- os.environ['LD_LIBRARY_PATH'] = self._sox_lib_path
- sox_mixer_cmd = '%s -c 2 %s %s -c 1 %s - mixer %s'
- sox_mixer_cmd = sox_mixer_cmd % (self._sox_path, _SOX_FORMAT, infile,
- _SOX_FORMAT, pan_values)
- stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (self._sox_path, _SOX_FORMAT)
- sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
- logging.info('running %s' % sox_cmd)
- sox_output = utils.system_output(sox_cmd, retain_output=True)
- # Find the RMS value line and check that it is above threshold.
- sox_rms_status = False
- for rms_line in sox_output.split('\n'):
- m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
- if m is not None:
- sox_rms_status = True
- rms_val = float(m.group(1))
- logging.info('Got audio RMS value of %f. Minimum pass is %f.' %
- (rms_val, self._sox_min_rms))
- if rms_val < self._sox_min_rms:
- raise error.TestError(
- 'Audio RMS value %f too low. Minimum pass is %f.' %
- (rms_val, self._sox_min_rms))
+ rms_val = self._ah.get_audio_rms(infile, channel)
# In case sox didn't return an RMS value.
- if not sox_rms_status:
+ if rms_val is None:
raise error.TestError(
'Failed to generate an audio RMS value from playback.')
- def noise_reduce_file(self, test_file, noise_file):
- """Runs the sox command to reduce the noise.
-
- Performs noise reduction on test_file using the noise profile from
- noise_file.
-
- Args:
- test_file: The file to noise reduce.
- noise_file: The file containing the noise profile.
- This can be created by recording silence.
-
- Returns:
- The name of the file containing the noise-reduced data.
- """
- out_file = os.path.join(self.tmpdir, os.tmpnam())
- os.environ['LD_LIBRARY_PATH'] = self._sox_lib_path
- prof_cmd = '%s -c 2 %s %s -n noiseprof' % (self._sox_path,
- _SOX_FORMAT,
- noise_file)
- reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
- (self._sox_path, _SOX_FORMAT, test_file, _SOX_FORMAT,
- out_file))
- utils.system('%s | %s' % (prof_cmd, reduce_cmd))
- return out_file
+ logging.info('Got audio RMS value of %f. Minimum pass is %f.' %
+ (rms_val, self._sox_min_rms))
+ if rms_val < self._sox_min_rms:
+ raise error.TestError(
+ 'Audio RMS value %f too low. Minimum pass is %f.' %
+ (rms_val, self._sox_min_rms))
diff --git a/client/site_tests/desktopui_MediaAudioFeedback/desktopui_MediaAudioFeedback.py b/client/site_tests/desktopui_MediaAudioFeedback/desktopui_MediaAudioFeedback.py
index 9fe1718..86ca150 100644
--- a/client/site_tests/desktopui_MediaAudioFeedback/desktopui_MediaAudioFeedback.py
+++ b/client/site_tests/desktopui_MediaAudioFeedback/desktopui_MediaAudioFeedback.py
@@ -2,11 +2,12 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-import logging, os, re, threading, utils
+import logging, tempfile, threading
-from autotest_lib.client.bin import test, utils
+from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import cros_ui_test, httpd
+from autotest_lib.client.cros.audio import audio_helper
# Names of mixer controls.
_CONTROL_MASTER = "'Master Playback Volume'"
@@ -21,7 +22,6 @@
# Default test configuration.
_DEFAULT_CARD = '0'
-_DEFAULT_FREQUENCY = 1000
_DEFAULT_MIXER_SETTINGS = [{'name': _CONTROL_MASTER, 'value': "100%"},
{'name': _CONTROL_HEADPHONE, 'value': "100%"},
{'name': _CONTROL_MIC_BOOST, 'value': "50%"},
@@ -41,11 +41,6 @@
# Media formats to test.
_MEDIA_FORMATS = ['BBB.mp3', 'BBB.mp4', 'BBB_mulaw.wav', 'BBB.ogv', 'BBB.webm']
-# Regexp parsing sox output.
-_SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)')
-# Format used in sox commands.
-_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
-
class RecordSampleThread(threading.Thread):
"""Wraps the execution of arecord in a thread."""
@@ -62,13 +57,8 @@
class desktopui_MediaAudioFeedback(cros_ui_test.UITest):
version = 1
- def setup(self):
- self.job.setup_dep(['test_tones'])
- self.job.setup_dep(['sox'])
-
def initialize(self,
card=_DEFAULT_CARD,
- frequency=_DEFAULT_FREQUENCY,
mixer_settings=_DEFAULT_MIXER_SETTINGS,
num_channels=_DEFAULT_NUM_CHANNELS,
record_duration=_DEFAULT_RECORD_DURATION,
@@ -77,7 +67,6 @@
Args:
card: The index of the sound card to use.
- frequency: The frequency of the test tone that is looped back.
mixer_settings: Alsa control settings to apply to the mixer before
starting the test.
num_channels: The number of channels on the device to test.
@@ -88,21 +77,13 @@
error.TestError if the deps can't be run.
"""
self._card = card
- self._frequency = frequency
self._mixer_settings = mixer_settings
self._num_channels = num_channels
self._record_duration = record_duration
self._sox_min_rms = sox_min_rms
- dep = 'sox'
- dep_dir = os.path.join(self.autodir, 'deps', dep)
- self.job.install_pkg(dep, 'dep', dep_dir)
- self._sox_path = os.path.join(dep_dir, 'bin', dep)
- self._sox_lib_path = os.path.join(dep_dir, 'lib')
- if not (os.path.exists(self._sox_path) and
- os.access(self._sox_path, os.X_OK)):
- raise error.TestError(
- '%s is not an executable' % self._sox_path)
+ self._ah = audio_helper.AudioHelper(self)
+ self._ah.setup_deps(['sox'])
super(desktopui_MediaAudioFeedback, self).initialize()
# _test_url must end with '/'.
@@ -111,18 +92,23 @@
self._testServer.run()
def run_once(self):
- self.set_mixer_controls()
- noise_file = os.path.join(self.tmpdir, os.tmpnam())
- logging.info('Noise file: %s' % noise_file)
- self.record_sample(_DEFAULT_RECORD_DURATION, noise_file)
- try:
+ # Speaker control settings may differ from device to device.
+ if self.pyauto.ChromeOSBoard() in _CONTROL_SPEAKER_DEVICE:
+ self._mixer_settings.append({'name': _CONTROL_SPEAKER,
+ 'value': "0%"})
+ elif self.pyauto.ChromeOSBoard() in _CONTROL_SPEAKER_DEVICE_HP:
+ self._mixer_settings.append({'name': _CONTROL_SPEAKER_HP,
+ 'value': "0%"})
+ self._ah.set_mixer_controls(self._mixer_settings, self._card)
+
+ # Record a sample of "silence" to use as a noise profile.
+ with tempfile.NamedTemporaryFile(mode='w+t') as noise_file:
+ logging.info('Noise file: %s' % noise_file.name)
+ self._ah.record_sample(self._record_duration, noise_file.name)
for media_file in _MEDIA_FORMATS:
for channel in xrange(0, self._num_channels):
- self.loopback_test_one_channel(channel, noise_file,
+ self.loopback_test_one_channel(channel, noise_file.name,
media_file)
- finally:
- if os.path.isfile(noise_file):
- os.unlink(noise_file)
def play_media(self, media_file):
"""Plays a media file in Chromium.
@@ -142,57 +128,20 @@
filtering.
media_file: Media file to test.
"""
- tmpfile = os.path.join(self.tmpdir, os.tmpnam())
- record_thread = RecordSampleThread(self, self._record_duration, tmpfile)
- self.play_media(media_file)
- record_thread.start()
- record_thread.join()
+ # Temp file for the final noise-reduced file.
+ with tempfile.NamedTemporaryFile(mode='w+t') as reduced_file:
+ # Temp file that records before noise reduction.
+ with tempfile.NamedTemporaryFile(mode='w+t') as tmpfile:
+ record_thread = RecordSampleThread(self._ah,
+ self._record_duration, tmpfile.name)
+ record_thread.start()
+ self.play_media(media_file)
+ record_thread.join()
- if noise_file is not None:
- test_file = self.noise_reduce_file(tmpfile, noise_file)
- os.unlink(tmpfile)
- else:
- test_file = tmpfile
- try:
- self.check_recorded_audio(test_file, channel)
- finally:
- if os.path.isfile(test_file):
- os.unlink(test_file)
+ self._ah.noise_reduce_file(tmpfile.name, noise_file,
+ reduced_file.name)
- def record_sample(self, duration, tmpfile):
- """Records a sample from the default input device.
-
- Args:
- duration: How long to record in seconds.
- tmpfile: The file to record to.
- """
- cmd_rec = 'arecord -d %f -f dat %s' % (duration, tmpfile)
- logging.info('Recording audio now for %f seconds.' % duration)
- utils.system(cmd_rec)
-
- def set_mixer_controls(self):
- """Sets all mixer controls listed in the mixer settings on card."""
- logging.info('Setting mixer control values on %s' % self._card)
-
- # Speaker control settings may differ from device to device.
- if self.pyauto.ChromeOSBoard() in _CONTROL_SPEAKER_DEVICE:
- self._mixer_settings.append({'name': _CONTROL_SPEAKER,
- 'value': "0%"})
- elif self.pyauto.ChromeOSBoard() in _CONTROL_SPEAKER_DEVICE_HP:
- self._mixer_settings.append({'name': _CONTROL_SPEAKER_HP,
- 'value': "0%"})
-
- for item in self._mixer_settings:
- logging.info('Setting %s to %s on card %s' %
- (item['name'], item['value'], self._card))
- cmd = 'amixer -c %s cset name=%s %s'
- cmd = cmd % (self._card, item['name'], item['value'])
- try:
- utils.system(cmd)
- except error.CmdError:
- # A card is allowed to not support all the controls, so don't
- # fail the test here if we get an error.
- logging.info('amixer command failed: %s' % cmd)
+ self.check_recorded_audio(reduced_file.name, channel)
def check_recorded_audio(self, infile, channel):
"""Runs the sox command to check if we captured audio.
@@ -211,61 +160,15 @@
error.TestFail if the RMS amplitude of the recording isn't above
the threshold.
"""
- # Build up a pan value string for the sox command.
- pan_values = '1' if channel == 0 else '0'
- for pan_index in range(1, self._num_channels):
- if channel == pan_index:
- pan_values += ',1'
- else:
- pan_values += ',0'
- # Set up the sox commands.
- os.environ['LD_LIBRARY_PATH'] = self._sox_lib_path
- sox_mixer_cmd = '%s -c 2 %s %s -c 1 %s - mixer %s'
- sox_mixer_cmd = sox_mixer_cmd % (self._sox_path, _SOX_FORMAT, infile,
- _SOX_FORMAT, pan_values)
- stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (self._sox_path, _SOX_FORMAT)
- sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
- logging.info('running %s' % sox_cmd)
- sox_output = utils.system_output(sox_cmd, retain_output=True)
- # Find the RMS value line and check that it is above threshold.
- sox_rms_status = False
- for rms_line in sox_output.split('\n'):
- m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
- if m is not None:
- sox_rms_status = True
- rms_val = float(m.group(1))
- logging.info('Got audio RMS value of %f. Minimum pass is %f.' %
- (rms_val, self._sox_min_rms))
- if rms_val < self._sox_min_rms:
- raise error.TestError(
- 'Audio RMS value %f too low. Minimum pass is %f.' %
- (rms_val, self._sox_min_rms))
+ rms_val = self._ah.get_audio_rms(infile, channel)
# In case sox didn't return an RMS value.
- if not sox_rms_status:
+ if rms_val is None:
raise error.TestError(
'Failed to generate an audio RMS value from playback.')
- def noise_reduce_file(self, test_file, noise_file):
- """Runs the sox command to reduce the noise.
-
- Performs noise reduction on test_file using the noise profile from
- noise_file.
-
- Args:
- test_file: The file to noise reduce.
- noise_file: The file containing the noise profile.
- This can be created by recording silence.
-
- Returns:
- The name of the file containing the noise-reduced data.
- """
- out_file = os.path.join(self.tmpdir, os.tmpnam())
- os.environ['LD_LIBRARY_PATH'] = self._sox_lib_path
- prof_cmd = '%s -c 2 %s %s -n noiseprof' % (self._sox_path,
- _SOX_FORMAT,
- noise_file)
- reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
- (self._sox_path, _SOX_FORMAT, test_file, _SOX_FORMAT,
- out_file))
- utils.system('%s | %s' % (prof_cmd, reduce_cmd))
- return out_file
+ logging.info('Got audio RMS value of %f. Minimum pass is %f.' %
+ (rms_val, self._sox_min_rms))
+ if rms_val < self._sox_min_rms:
+ raise error.TestError(
+ 'Audio RMS value %f too low. Minimum pass is %f.' %
+ (rms_val, self._sox_min_rms))