Add audio quality factory test, and audio_helper as a common audio
utility code.

BUG=chrome-os-partner:6849
TEST=Run factory_AudioQulity test, send 'LOOP_1' command to port 8888 on
device. Device should respond by start audio loop.

Change-Id: I677c41eeef9e7a9fad9a01b35867c1ab71c7329d
Reviewed-on: https://gerrit.chromium.org/gerrit/11724
Reviewed-by: Hung-Te Lin <hungte@chromium.org>
Commit-Ready: Hsinyu Chao <hychao@chromium.org>
Tested-by: Hsinyu Chao <hychao@chromium.org>
diff --git a/client/cros/audio/__init__.py b/client/cros/audio/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/client/cros/audio/__init__.py
diff --git a/client/cros/audio/audio_helper.py b/client/cros/audio/audio_helper.py
new file mode 100644
index 0000000..79a8a02
--- /dev/null
+++ b/client/cros/audio/audio_helper.py
@@ -0,0 +1,75 @@
+#!/usr/bin/python
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+import os
+
+from autotest_lib.client.bin import utils
+from autotest_lib.client.common_lib import error
+
+LD_LIBRARY_PATH = 'LD_LIBRARY_PATH'
+
+class AudioHelper(object):
+    '''
+    A helper class contains audio related utility functions.
+    '''
+    def __init__(self, test):
+        self._test = test
+
+    def setup_deps(self, deps):
+        '''
+        Sets up audio related dependencies.
+        '''
+        for dep in deps:
+            if dep == 'test_tones':
+                dep_dir = os.path.join(self._test.autodir, 'deps', dep)
+                self._test.job.install_pkg(dep, 'dep', dep_dir)
+                self.test_tones_path = os.path.join(dep_dir, 'src', dep)
+            elif dep == 'audioloop':
+                dep_dir = os.path.join(self._test.autodir, 'deps', dep)
+                self._test.job.install_pkg(dep, 'dep', dep_dir)
+                self.audioloop_path = os.path.join(dep_dir, 'src',
+                        'looptest')
+            elif dep == 'sox':
+                dep_dir = os.path.join(self._test.autodir, 'deps', dep)
+                self._test.job.install_pkg(dep, 'dep', dep_dir)
+                self.sox_path = os.path.join(dep_dir, 'bin', dep)
+                self.sox_lib_path = os.path.join(dep_dir, 'lib')
+                if os.environ.has_key(LD_LIBRARY_PATH):
+                    paths = os.environ[LD_LIBRARY_PATH].split(':')
+                    if not self.sox_lib_path in paths:
+                        paths.append(self.sox_lib_path)
+                        os.environ[LD_LIBRARY_PATH] = ':'.join(paths)
+                else:
+                    os.environ[LD_LIBRARY_PATH] = self.sox_lib_path
+
+    def cleanup_deps(self, deps):
+        '''
+        Cleans up environments which has been setup for dependencies.
+        '''
+        for dep in deps:
+            if dep == 'sox':
+                if (os.environ.has_key(LD_LIBRARY_PATH)
+                        and hasattr(self, 'sox_lib_path')):
+                    paths = filter(lambda x: x != self.sox_lib_path,
+                            os.environ[LD_LIBRARY_PATH].split(':'))
+                    os.environ[LD_LIBRARY_PATH] = ':'.join(paths)
+
+    def set_mixer_controls(self, mixer_settings={}, card='0'):
+        '''
+        Sets all mixer controls listed in the mixer settings on card.
+        '''
+        logging.info('Setting mixer control values on %s' % card)
+        for item in mixer_settings:
+            logging.info('Setting %s to %s on card %s' %
+                         (item['name'], item['value'], card))
+            cmd = 'amixer -c %s cset name=%s %s'
+            cmd = cmd % (card, item['name'], item['value'])
+            try:
+                utils.system(cmd)
+            except error.CmdError:
+                # A card is allowed not to support all the controls, so don't
+                # fail the test here if we get an error.
+                logging.info('amixer command failed: %s' % cmd)
diff --git a/client/site_tests/factory_AudioQuality/factory_AudioQuality.py b/client/site_tests/factory_AudioQuality/factory_AudioQuality.py
new file mode 100644
index 0000000..4ef08f4
--- /dev/null
+++ b/client/site_tests/factory_AudioQuality/factory_AudioQuality.py
@@ -0,0 +1,280 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+
+# DESCRIPTION :
+#
+# This is a factory test for audio quality. External equipment will send
+# command through ethernet to configure the audio loop path. Note that it's
+# External equipment's responsibility to capture and analyze the audio signal.
+
+import gobject
+import gtk
+import os
+import re
+import socket
+import subprocess
+import tempfile
+
+from autotest_lib.client.bin import factory
+from autotest_lib.client.bin import factory_ui_lib as ful
+from autotest_lib.client.bin import test, utils
+from autotest_lib.client.common_lib import error
+from autotest_lib.client.cros.audio import audio_helper
+
+# Host test machine crossover connected to DUT, fix local ip and port for
+# communication in between.
+_HOST = ''
+_PORT = 8888
+_LOCAL_IP = '192.168.1.2'
+
+# Label strings.
+_LABEL_CONNECTED = 'Connected\n已連線\n'
+_LABEL_WAITING = 'Waiting for command\n等待指令中\n'
+_LABEL_AUDIOLOOP = 'Audio looping\n音源回放中\n'
+_LABEL_SPEAKER_MUTE_OFF = 'Speaker on\n喇叭開啟\n'
+_LABEL_PLAYTONE_LEFT = ('Playing tone to left channel\n'
+                        '播音至左聲道\n')
+_LABEL_PLAYTONE_RIGHT = ('Playing tone to right channel\n'
+                         '播音至右聲道\n')
+
+# Regular expression to match external commands.
+_LOOP_0_RE = re.compile("(?i)loop_0")
+_LOOP_1_RE = re.compile("(?i)loop_1")
+_LOOP_2_RE = re.compile("(?i)loop_2")
+_LOOP_3_RE = re.compile("(?i)loop_3")
+_XTALK_L_RE = re.compile("(?i)xtalk_l")
+_XTALK_R_RE = re.compile("(?i)xtalk_r")
+_SEND_FILE_RE = re.compile("(?i)send_file\,\s*[^\,]+\,\s*(\d)+$")
+_TEST_COMPLETE_RE = re.compile("(?i)test_complete")
+_RESULT_PASS_RE = re.compile("(?i)result_pass")
+_RESULT_FAIL_RE = re.compile("(?i)result_fail")
+
+class factory_AudioQuality(test.test):
+    version = 1
+
+    def handle_connection(self, conn, *args):
+        '''
+        Asynchronous handler for socket connection.
+        '''
+        line = conn.recv(1024)
+        if line:
+            factory.log("Received command %s" % line)
+        else:
+            return False
+
+        for key in self._handlers.iterkeys():
+            if key.match(line):
+                self._handlers[key](conn, line)
+                break
+
+        # Respond by the received command with '_OK' postfix.
+        conn.send(line + '_OK')
+        return False
+
+    def start_loop(self):
+        '''
+        Starts the internal audio loopback.
+        '''
+        self._loop_process = subprocess.Popen(
+                [self._ah.sox_path, '-d', '-d'])
+
+    def play_tone(self):
+        '''
+        Plays a single tone.
+        '''
+        cmdargs  = [self._ah.sox_path, '-t', 'null', '/dev/null', '-d', 'synth',
+                    '20.0', 'sine', '1000.0']
+        self._play_tone_process = subprocess.Popen(cmdargs)
+
+    def restore_configuration(self):
+        '''
+        Stops all the running process and restore the mute settings.
+        '''
+        if hasattr(self, '_play_tone_process') and self._play_tone_process:
+            self._play_tone_process.kill()
+        if hasattr(self, '_loop_process') and self._loop_process:
+            self._loop_process.kill()
+            factory.log("Stopped audio loop process")
+        self.mute_headphone_left_right(False, False)
+        self.set_auto_mute(True)
+
+    def handle_send_file(self, *args):
+        conn = args[0]
+        conn.send('OK')
+
+        params = args[1].split(',')
+        file_name = params[1]
+        size = int(params[2])
+
+        with tempfile.NamedTemporaryFile(mode='w+t') as tmp_file:
+            factory.log("created tmp_file: %s\n" % tmp_file.name)
+            tmp_file.write('File name: %s\n' % file_name)
+
+            # A message DONE will be concatenated to the end of detailed log.
+            left = size + 4
+            while left > 0:
+                data = conn.recv(1024)
+                left -= len(data)
+                tmp_file.write(data)
+            tmp_file.seek(0)
+            for line in tmp_file:
+                self._detail_log += line
+
+        factory.log("Received file %s with size %d" % (file_name, size))
+
+    def handle_result_pass(self, *args):
+        self._test_passed = True
+
+    def handle_result_fail(self, *args):
+        self._test_passed = False
+
+    def handle_test_complete(self, *args):
+        gtk.main_quit()
+
+    def handle_loop_none(self, *args):
+        self.restore_configuration()
+        self._loop_status_label.set_text(_LABEL_WAITING)
+
+    def handle_loop(self, *args):
+        self.restore_configuration()
+        self._loop_status_label.set_text(_LABEL_AUDIOLOOP)
+        self.start_loop()
+
+    def handle_loop_speaker_unmute(self, *args):
+        self.handle_loop()
+        self._loop_status_label.set_text(_LABEL_AUDIOLOOP +
+                _LABEL_SPEAKER_MUTE_OFF)
+        self.set_auto_mute(False)
+
+    def handle_xtalk_left(self, *args):
+        self.restore_configuration()
+        self._loop_status_label.set_text(_LABEL_PLAYTONE_LEFT)
+        self.mute_headphone_left_right(False, True)
+        self.play_tone()
+
+    def handle_xtalk_right(self, *args):
+        self.restore_configuration()
+        self._loop_status_label.set_text(_LABEL_PLAYTONE_RIGHT)
+        self.mute_headphone_left_right(True, False)
+        self.play_tone()
+
+    def listen(self, sock, *args):
+        '''
+        Listens for connection and start handler for it.
+        '''
+        conn, addr = sock.accept()
+        self._loop_status_label.set_text("Connected")
+        gobject.io_add_watch(conn, gobject.IO_IN, self.handle_connection)
+        return True
+
+    def start_server(self):
+        '''
+        Initialize server and start listening for external commands.
+        '''
+        sock = socket.socket()
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        sock.bind((_HOST, _PORT))
+        sock.listen(1)
+        factory.log("Listening at port %d" % _PORT)
+        gobject.io_add_watch(sock, gobject.IO_IN, self.listen)
+
+    def mute_headphone_left_right(self, left=False, right=False):
+        '''
+        Mutes specified headphone channels.
+        '''
+        left_vol = 100 if left else 0
+        right_vol = 100 if right else 0
+        mixer_settings = [{'name': "'Headphone Playback Volume'",
+                          'value': ('%d%%,%d%%' % (left_vol, right_vol))}]
+        self._ah.set_mixer_controls(mixer_settings)
+
+    def set_auto_mute(self, enable=True):
+        '''
+        Sets the auto-mute mode. When auto-mute is enabled, the speaker will be
+        muted automatically when an external mic detected.
+        '''
+        mixer_settings = [{'name': "'Auto-Mute Mode'",
+                           'value': ('Enabled' if enable else 'Disabled')}]
+        self._ah.set_mixer_controls(mixer_settings)
+
+    def on_test_complete(self):
+        '''
+        Restores the original state before exiting the test.
+        '''
+        os.system('iptables -D INPUT -p tcp --dport %s -j ACCEPT' % _PORT)
+        os.system('ifconfig eth0 down')
+        os.system('ifconfig eth0 up')
+        self.restore_configuration()
+        self._ah.cleanup_deps(['sox'])
+
+    def key_release_callback(self, widget, event):
+        # Hit Q to force quit this test.
+        if event.keyval == ord('Q'):
+            self._test_passed = False
+            gtk.main_quit()
+
+    def register_callbacks(self, window):
+        window.connect('key-release-event', self.key_release_callback)
+
+    def check_eth_state(self):
+        path = '/sys/class/net/eth0/carrier'
+        output = None
+        try:
+            if os.path.exists(path):
+                output = open(path).read()
+        finally:
+            if output:
+                return output == '1\n'
+            else:
+                return False
+
+    def run_once(self, audio_sample_path=None, audio_init_volume=None):
+        factory.log('%s run_once' % self.__class__)
+
+        self._ah = audio_helper.AudioHelper(self)
+        self._ah.setup_deps(['sox'])
+        self._detail_log = ''
+
+        factory.log('Checking eth0 state....')
+        utils.poll_for_condition(self.check_eth_state,
+                                 timeout=30, desc='Checking eth0')
+        factory.log('Checking eth0 done!')
+
+        # Configure local network environment to accept command from test host.
+        os.system('ifconfig eth0 %s netmask 255.255.255.0 up' % _LOCAL_IP)
+        os.system('iptables -A INPUT -p tcp --dport %s -j ACCEPT' % _PORT)
+
+        # Register commands to corresponding handlers.
+        self._handlers = {}
+        self._handlers[_SEND_FILE_RE] = self.handle_send_file
+        self._handlers[_RESULT_PASS_RE] = self.handle_result_pass
+        self._handlers[_RESULT_FAIL_RE] = self.handle_result_fail
+        self._handlers[_TEST_COMPLETE_RE] = self.handle_test_complete
+        self._handlers[_LOOP_0_RE] = self.handle_loop_none
+        self._handlers[_LOOP_1_RE] = self.handle_loop
+        self._handlers[_LOOP_2_RE] = self.handle_loop_speaker_unmute
+        self._handlers[_LOOP_3_RE] = self.handle_loop
+        self._handlers[_XTALK_L_RE] = self.handle_xtalk_left
+        self._handlers[_XTALK_R_RE] = self.handle_xtalk_right
+
+        self.start_server()
+
+        self._main_widget = gtk.EventBox()
+        self._main_widget.modify_bg(gtk.STATE_NORMAL, ful.BLACK)
+        self._loop_status_label = ful.make_label('No audio loop', fg=ful.WHITE)
+        self._main_widget.add(self._loop_status_label)
+
+        try:
+            ful.run_test_widget(self.job, self._main_widget,
+                    window_registration_callback=self.register_callbacks)
+
+            if not self._test_passed:
+                factory.log(self._detail_log)
+                raise error.TestError('Test failed.')
+        finally:
+            self.on_test_complete()
+
+        factory.log('%s run_once finished' % self.__class__)