Merge "Modify submix HAL to allow either input or output streams create the pipe."
diff --git a/modules/audio_remote_submix/audio_hw.cpp b/modules/audio_remote_submix/audio_hw.cpp
index 82aadb5..8014d18 100644
--- a/modules/audio_remote_submix/audio_hw.cpp
+++ b/modules/audio_remote_submix/audio_hw.cpp
@@ -53,10 +53,12 @@
 #define SUBMIX_ALOGE(...)
 #endif // SUBMIX_VERBOSE_LOGGING
 
-// Size of the pipe's buffer in frames.
-#define MAX_PIPE_DEPTH_IN_FRAMES     (1024*8)
-// Maximum number of frames users of the input and output stream should buffer.
-#define PERIOD_SIZE_IN_FRAMES         1024
+// NOTE: This value will be rounded up to the nearest power of 2 by MonoPipe().
+#define DEFAULT_PIPE_SIZE_IN_FRAMES  (1024*8)
+// Value used to divide the MonoPipe() buffer into segments that are written to the source and
+// read from the sink.  The maximum latency of the device is the size of the MonoPipe's buffer
+// the minimum latency is the MonoPipe buffer size divided by this value.
+#define DEFAULT_PIPE_PERIOD_COUNT    4
 // The duration of MAX_READ_ATTEMPTS * READ_ATTEMPT_SLEEP_MS must be stricly inferior to
 //   the duration of a record buffer at the current record sample rate (of the device, not of
 //   the recording itself). Here we have:
@@ -66,6 +68,17 @@
 #define DEFAULT_SAMPLE_RATE_HZ       48000 // default sample rate
 // See NBAIO_Format frameworks/av/include/media/nbaio/NBAIO.h.
 #define DEFAULT_FORMAT               AUDIO_FORMAT_PCM_16_BIT
+// A legacy user of this device does not close the input stream when it shuts down, which
+// results in the application opening a new input stream before closing the old input stream
+// handle it was previously using.  Setting this value to 1 allows multiple clients to open
+// multiple input streams from this device.  If this option is enabled, each input stream returned
+// is *the same stream* which means that readers will race to read data from these streams.
+#define ENABLE_LEGACY_INPUT_OPEN     1
+
+// Common limits macros.
+#ifndef min
+#define min(a, b) ((a) < (b) ? (a) : (b))
+#endif // min
 
 // Set *result_variable_ptr to true if value_to_find is present in the array array_to_search,
 // otherwise set *result_variable_ptr to false.
@@ -90,7 +103,9 @@
     // channel bitfields are not equivalent.
     audio_channel_mask_t input_channel_mask;
     audio_channel_mask_t output_channel_mask;
-    size_t period_size; // Size of the audio pipe is period_size * period_count in frames.
+    size_t buffer_size_frames; // Size of the audio pipe in frames.
+    // Maximum number of frames buffered by the input and output streams.
+    size_t buffer_period_size_frames;
 };
 
 struct submix_audio_device {
@@ -109,6 +124,11 @@
     sp<MonoPipe> rsxSink;
     sp<MonoPipeReader> rsxSource;
 
+    // Pointers to the current input and output stream instances.  rsxSink and rsxSource are
+    // destroyed if both and input and output streams are destroyed.
+    struct submix_stream_out *output;
+    struct submix_stream_in *input;
+
     // Device lock, also used to protect access to submix_audio_device from the input and output
     // streams.
     pthread_mutex_t lock;
@@ -128,6 +148,11 @@
     struct timespec record_start_time;
     // how many frames have been requested to be read
     int64_t read_counter_frames;
+
+#if ENABLE_LEGACY_INPUT_OPEN
+    // Number of references to this input stream.
+    volatile int32_t ref_count;
+#endif // ENABLE_LEGACY_INPUT_OPEN
 };
 
 // Determine whether the specified sample rate is supported by the submix module.
@@ -253,30 +278,16 @@
     return 0;
 }
 
-// Convert a input channel mask to output channel mask where a mapping is available, returns 0
-// otherwise.
-static audio_channel_mask_t audio_channel_in_mask_to_out(
-        const audio_channel_mask_t channel_in_mask)
-{
-  switch (channel_in_mask) {
-      case AUDIO_CHANNEL_IN_MONO:
-          return AUDIO_CHANNEL_OUT_MONO;
-      case AUDIO_CHANNEL_IN_STEREO:
-          return AUDIO_CHANNEL_OUT_STEREO;
-      default:
-          return 0;
-  }
-}
-
 // Compare an audio_config with input channel mask and an audio_config with output channel mask
 // returning false if they do *not* match, true otherwise.
 static bool audio_config_compare(const audio_config * const input_config,
         const audio_config * const output_config)
 {
-    audio_channel_mask_t channel_mask = audio_channel_in_mask_to_out(input_config->channel_mask);
-    if (channel_mask != output_config->channel_mask) {
-        ALOGE("audio_config_compare() channel mask mismatch %x (%x) vs. %x",
-              channel_mask, input_config->channel_mask, output_config->channel_mask);
+    const uint32_t input_channels = get_channel_count_from_mask(input_config->channel_mask);
+    const uint32_t output_channels = get_channel_count_from_mask(output_config->channel_mask);
+    if (input_channels != output_channels) {
+        ALOGE("audio_config_compare() channel count mismatch input=%d vs. output=%d",
+              input_channels, output_channels);
         return false;
     }
     if (input_config->sample_rate != output_config->sample_rate) {
@@ -293,6 +304,102 @@
     return true;
 }
 
+// If one doesn't exist, create a pipe for the submix audio device rsxadev of size
+// buffer_size_frames and optionally associate "in" or "out" with the submix audio device.
+static void submix_audio_device_create_pipe(struct submix_audio_device * const rsxadev,
+                                            const struct audio_config * const config,
+                                            const size_t buffer_size_frames,
+                                            const uint32_t buffer_period_count,
+                                            struct submix_stream_in * const in,
+                                            struct submix_stream_out * const out)
+{
+    ALOG_ASSERT(in || out);
+    ALOGV("submix_audio_device_create_pipe()");
+    pthread_mutex_lock(&rsxadev->lock);
+    // Save a reference to the specified input or output stream and the associated channel
+    // mask.
+    if (in) {
+        rsxadev->input = in;
+        rsxadev->config.input_channel_mask = config->channel_mask;
+    }
+    if (out) {
+        rsxadev->output = out;
+        rsxadev->config.output_channel_mask = config->channel_mask;
+    }
+    // If a pipe isn't associated with the device, create one.
+    if (rsxadev->rsxSink == NULL || rsxadev->rsxSource == NULL) {
+        struct submix_config * const device_config = &rsxadev->config;
+        const NBAIO_Format format = Format_from_SR_C(config->sample_rate,
+                 get_channel_count_from_mask(config->channel_mask), config->format);
+        const NBAIO_Format offers[1] = {format};
+        size_t numCounterOffers = 0;
+        // Create a MonoPipe with optional blocking set to true.
+        MonoPipe* sink = new MonoPipe(buffer_size_frames, format, true /*writeCanBlock*/);
+        // Negotiation between the source and sink cannot fail as the device open operation
+        // creates both ends of the pipe using the same audio format.
+        ssize_t index = sink->negotiate(offers, 1, NULL, numCounterOffers);
+        ALOG_ASSERT(index == 0);
+        MonoPipeReader* source = new MonoPipeReader(sink);
+        numCounterOffers = 0;
+        index = source->negotiate(offers, 1, NULL, numCounterOffers);
+        ALOG_ASSERT(index == 0);
+        ALOGV("submix_audio_device_create_pipe(): created pipe");
+
+        // Save references to the source and sink.
+        ALOG_ASSERT(rsxadev->rsxSink == NULL);
+        ALOG_ASSERT(rsxadev->rsxSource == NULL);
+        rsxadev->rsxSink = sink;
+        rsxadev->rsxSource = source;
+        // Store the sanitized audio format in the device so that it's possible to determine
+        // the format of the pipe source when opening the input device.
+        memcpy(&device_config->common, config, sizeof(device_config->common));
+        device_config->buffer_size_frames = sink->maxFrames();
+        device_config->buffer_period_size_frames = device_config->buffer_size_frames /
+                buffer_period_count;
+    }
+    pthread_mutex_unlock(&rsxadev->lock);
+}
+
+// Release references to the sink and source.  Input and output threads may maintain references
+// to these objects via StrongPointer (sp<MonoPipe> and sp<MonoPipeReader>) which they can use
+// before they shutdown.
+static void submix_audio_device_release_pipe(struct submix_audio_device * const rsxadev)
+{
+    ALOGV("submix_audio_device_release_pipe()");
+    rsxadev->rsxSink.clear();
+    rsxadev->rsxSource.clear();
+}
+
+// Remove references to the specified input and output streams.  When the device no longer
+// references input and output streams destroy the associated pipe.
+static void submix_audio_device_destroy_pipe(struct submix_audio_device * const rsxadev,
+                                             const struct submix_stream_in * const in,
+                                             const struct submix_stream_out * const out)
+{
+    MonoPipe* sink;
+    pthread_mutex_lock(&rsxadev->lock);
+    ALOGV("submix_audio_device_destroy_pipe()");
+    ALOG_ASSERT(in == NULL || rsxadev->input == in);
+    ALOG_ASSERT(out == NULL || rsxadev->output == out);
+    if (in != NULL) {
+#if ENABLE_LEGACY_INPUT_OPEN
+        const_cast<struct submix_stream_in*>(in)->ref_count--;
+        if (in->ref_count == 0) {
+            rsxadev->input = NULL;
+        }
+        ALOGV("submix_audio_device_destroy_pipe(): input ref_count %d", in->ref_count);
+#else
+        rsxadev->input = NULL;
+#endif // ENABLE_LEGACY_INPUT_OPEN
+    }
+    if (out != NULL) rsxadev->output = NULL;
+    if (rsxadev->input != NULL && rsxadev->output != NULL) {
+        submix_audio_device_release_pipe(rsxadev);
+        ALOGV("submix_audio_device_destroy_pipe(): pipe destroyed");
+    }
+    pthread_mutex_unlock(&rsxadev->lock);
+}
+
 // Sanitize the user specified audio config for a submix input / output stream.
 static void submix_sanitize_config(struct audio_config * const config, const bool is_input_format)
 {
@@ -308,18 +415,31 @@
                                  const struct audio_config * const config,
                                  const bool opening_input)
 {
-    bool pipe_open;
+    bool input_open;
+    bool output_open;
     audio_config pipe_config;
 
     // Query the device for the current audio config and whether input and output streams are open.
     pthread_mutex_lock(lock);
-    pipe_open = rsxadev->rsxSink.get() != NULL || rsxadev->rsxSource.get() != NULL;
+    output_open = rsxadev->output != NULL;
+    input_open = rsxadev->input != NULL;
     memcpy(&pipe_config, &rsxadev->config.common, sizeof(pipe_config));
     pthread_mutex_unlock(lock);
 
-    // If the pipe is open, verify the existing audio config the pipe matches the user
+    // If the stream is already open, don't open it again.
+    if (opening_input ? !ENABLE_LEGACY_INPUT_OPEN && input_open : output_open) {
+        ALOGE("submix_open_validate(): %s stream already open.", opening_input ? "Input" :
+                "Output");
+        return false;
+    }
+
+    SUBMIX_ALOGV("submix_open_validate(): sample rate=%d format=%x "
+                 "%s_channel_mask=%x", config->sample_rate, config->format,
+                 opening_input ? "in" : "out", config->channel_mask);
+
+    // If either stream is open, verify the existing audio config the pipe matches the user
     // specified config.
-    if (pipe_open) {
+    if (input_open || output_open) {
         const audio_config * const input_config = opening_input ? config : &pipe_config;
         const audio_config * const output_config = opening_input ? &pipe_config : config;
         // Get the channel mask of the open device.
@@ -328,7 +448,7 @@
                 rsxadev->config.input_channel_mask;
         if (!audio_config_compare(input_config, output_config)) {
             ALOGE("submix_open_validate(): Unsupported format.");
-            return -EINVAL;
+            return false;
         }
     }
     return true;
@@ -362,9 +482,9 @@
     const struct submix_stream_out * const out = audio_stream_get_submix_stream_out(
             const_cast<struct audio_stream *>(stream));
     const struct submix_config * const config = &out->dev->config;
-    const size_t buffer_size = config->period_size * audio_stream_frame_size(stream);
+    const size_t buffer_size = config->buffer_period_size_frames * audio_stream_frame_size(stream);
     SUBMIX_ALOGV("out_get_buffer_size() returns %zu bytes, %zu frames",
-                 buffer_size, config->period_size);
+                 buffer_size, config->buffer_period_size_frames);
     return buffer_size;
 }
 
@@ -423,6 +543,7 @@
     int exiting = -1;
     AudioParameter parms = AudioParameter(String8(kvpairs));
     SUBMIX_ALOGV("out_set_parameters() kvpairs='%s'", kvpairs);
+
     // FIXME this is using hard-coded strings but in the future, this functionality will be
     //       converted to use audio HAL extensions required to support tunneling
     if ((parms.getInt(String8("exiting"), exiting) == NO_ERROR) && (exiting > 0)) {
@@ -430,7 +551,7 @@
                 audio_stream_get_submix_stream_out(stream)->dev;
         pthread_mutex_lock(&rsxadev->lock);
         { // using the sink
-            sp<MonoPipe> sink = rsxadev->rsxSink.get();
+            sp<MonoPipe> sink = rsxadev->rsxSink;
             if (sink == NULL) {
                 pthread_mutex_unlock(&rsxadev->lock);
                 return 0;
@@ -456,9 +577,9 @@
     const struct submix_stream_out * const out = audio_stream_out_get_submix_stream_out(
             const_cast<struct audio_stream_out *>(stream));
     const struct submix_config * const config = &out->dev->config;
-    const uint32_t latency_ms = (MAX_PIPE_DEPTH_IN_FRAMES * 1000) / config->common.sample_rate;
+    const uint32_t latency_ms = (config->buffer_size_frames * 1000) / config->common.sample_rate;
     SUBMIX_ALOGV("out_get_latency() returns %u ms, size in frames %zu, sample rate %u", latency_ms,
-          MAX_PIPE_DEPTH_IN_FRAMES, config->common.sample_rate);
+          config->buffer_size_frames, config->common.sample_rate);
     return latency_ms;
 }
 
@@ -477,15 +598,15 @@
     SUBMIX_ALOGV("out_write(bytes=%zd)", bytes);
     ssize_t written_frames = 0;
     const size_t frame_size = audio_stream_frame_size(&stream->common);
-    struct submix_audio_device * const rsxadev =
-            audio_stream_out_get_submix_stream_out(stream)->dev;
+    struct submix_stream_out * const out = audio_stream_out_get_submix_stream_out(stream);
+    struct submix_audio_device * const rsxadev = out->dev;
     const size_t frames = bytes / frame_size;
 
     pthread_mutex_lock(&rsxadev->lock);
 
     rsxadev->output_standby = false;
 
-    sp<MonoPipe> sink = rsxadev->rsxSink.get();
+    sp<MonoPipe> sink = rsxadev->rsxSink;
     if (sink != NULL) {
         if (sink->isShutdown()) {
             sink.clear();
@@ -592,7 +713,8 @@
 {
     const struct submix_stream_in * const in = audio_stream_get_submix_stream_in(
             const_cast<struct audio_stream*>(stream));
-    const size_t buffer_size = in->dev->config.period_size * audio_stream_frame_size(stream);
+    const size_t buffer_size = in->dev->config.buffer_period_size_frames *
+            audio_stream_frame_size(stream);
     SUBMIX_ALOGV("in_get_buffer_size() returns %zu", buffer_size);
     return buffer_size;
 }
@@ -703,7 +825,7 @@
         if (source == NULL) {
             ALOGE("no audio pipe yet we're trying to read!");
             pthread_mutex_unlock(&rsxadev->lock);
-            usleep((bytes / frame_size) * 1000000 / in_get_sample_rate(&stream->common));
+            usleep(frames_to_read * 1000000 / in_get_sample_rate(&stream->common));
             memset(buffer, 0, bytes);
             return bytes;
         }
@@ -714,14 +836,15 @@
         int attempts = 0;
         char* buff = (char*)buffer;
         while ((remaining_frames > 0) && (attempts < MAX_READ_ATTEMPTS)) {
-            attempts++;
             frames_read = source->read(buff, remaining_frames, AudioBufferProvider::kInvalidPTS);
+
             if (frames_read > 0) {
                 remaining_frames -= frames_read;
                 buff += frames_read * frame_size;
                 SUBMIX_ALOGV("  in_read (att=%d) got %zd frames, remaining=%zu",
                              attempts, frames_read, remaining_frames);
             } else {
+                attempts++;
                 SUBMIX_ALOGE("  in_read read returned %zd", frames_read);
                 usleep(READ_ATTEMPT_SLEEP_MS * 1000);
             }
@@ -733,9 +856,9 @@
     }
 
     if (remaining_frames > 0) {
+        const size_t remaining_bytes = remaining_frames * frame_size;
         SUBMIX_ALOGV("  remaining_frames = %zu", remaining_frames);
-        memset(((char*)buffer)+ bytes - (remaining_frames * frame_size), 0,
-                remaining_frames * frame_size);
+        memset(((char*)buffer)+ bytes - remaining_bytes, 0, remaining_bytes);
     }
 
     // compute how much we need to sleep after reading the data by comparing the wall clock with
@@ -806,11 +929,12 @@
     struct submix_audio_device * const rsxadev = audio_hw_device_get_submix_audio_device(dev);
     ALOGV("adev_open_output_stream()");
     struct submix_stream_out *out;
-    int ret;
     (void)handle;
     (void)devices;
     (void)flags;
 
+    *stream_out = NULL;
+
     // Make sure it's possible to open the device given the current audio config.
     submix_sanitize_config(config, false);
     if (!submix_open_validate(rsxadev, &rsxadev->lock, config, false)) {
@@ -819,12 +943,7 @@
     }
 
     out = (struct submix_stream_out *)calloc(1, sizeof(struct submix_stream_out));
-    if (!out) {
-        ret = -ENOMEM;
-        goto err_open;
-    }
-
-    pthread_mutex_lock(&rsxadev->lock);
+    if (!out) return -ENOMEM;
 
     // Initialize the function pointer tables (v-tables).
     out->stream.common.get_sample_rate = out_get_sample_rate;
@@ -845,59 +964,32 @@
     out->stream.get_render_position = out_get_render_position;
     out->stream.get_next_write_timestamp = out_get_next_write_timestamp;
 
-    memcpy(&rsxadev->config.common, config, sizeof(rsxadev->config.common));
-    rsxadev->config.output_channel_mask = config->channel_mask;
-
-    rsxadev->config.period_size = PERIOD_SIZE_IN_FRAMES;
+    // If the sink has been shutdown, delete the pipe so that it's recreated.
+    pthread_mutex_lock(&rsxadev->lock);
+    if (rsxadev->rsxSink != NULL && rsxadev->rsxSink->isShutdown()) {
+        submix_audio_device_release_pipe(rsxadev);
+    }
+    pthread_mutex_unlock(&rsxadev->lock);
 
     // Store a pointer to the device from the output stream.
     out->dev = rsxadev;
+    // Initialize the pipe.
+    ALOGV("adev_open_output_stream(): Initializing pipe");
+    submix_audio_device_create_pipe(rsxadev, config, DEFAULT_PIPE_SIZE_IN_FRAMES,
+                                    DEFAULT_PIPE_PERIOD_COUNT, NULL, out);
     // Return the output stream.
     *stream_out = &out->stream;
 
-
-    // initialize pipe
-    {
-        ALOGV("  initializing pipe");
-        const NBAIO_Format format = Format_from_SR_C(rsxadev->config.common.sample_rate,
-                get_channel_count_from_mask(rsxadev->config.common.channel_mask),
-                rsxadev->config.common.format);
-        const NBAIO_Format offers[1] = {format};
-        size_t numCounterOffers = 0;
-        // creating a MonoPipe with optional blocking set to true.
-        MonoPipe* sink = new MonoPipe(MAX_PIPE_DEPTH_IN_FRAMES, format, true/*writeCanBlock*/);
-        ssize_t index = sink->negotiate(offers, 1, NULL, numCounterOffers);
-        ALOG_ASSERT(index == 0);
-        MonoPipeReader* source = new MonoPipeReader(sink);
-        numCounterOffers = 0;
-        index = source->negotiate(offers, 1, NULL, numCounterOffers);
-        ALOG_ASSERT(index == 0);
-        rsxadev->rsxSink = sink;
-        rsxadev->rsxSource = source;
-    }
-
-    pthread_mutex_unlock(&rsxadev->lock);
-
     return 0;
-
-err_open:
-    *stream_out = NULL;
-    return ret;
 }
 
 static void adev_close_output_stream(struct audio_hw_device *dev,
                                      struct audio_stream_out *stream)
 {
-    struct submix_audio_device *rsxadev = audio_hw_device_get_submix_audio_device(dev);
+    struct submix_stream_out * const out = audio_stream_out_get_submix_stream_out(stream);
     ALOGV("adev_close_output_stream()");
-
-    pthread_mutex_lock(&rsxadev->lock);
-
-    rsxadev->rsxSink.clear();
-    rsxadev->rsxSource.clear();
-    free(stream);
-
-    pthread_mutex_unlock(&rsxadev->lock);
+    submix_audio_device_destroy_pipe(audio_hw_device_get_submix_audio_device(dev), NULL, out);
+    free(out);
 }
 
 static int adev_set_parameters(struct audio_hw_device *dev, const char *kvpairs)
@@ -984,7 +1076,7 @@
     if (audio_is_linear_pcm(config->format)) {
         const size_t buffer_period_size_frames =
             audio_hw_device_get_submix_audio_device(const_cast<struct audio_hw_device*>(dev))->
-                config.period_size;
+                config.buffer_period_size_frames;
         const size_t frame_size_in_bytes = get_channel_count_from_mask(config->channel_mask) *
                 audio_bytes_per_sample(config->format);
         const size_t buffer_size = buffer_period_size_frames * frame_size_in_bytes;
@@ -1003,11 +1095,12 @@
 {
     struct submix_audio_device *rsxadev = audio_hw_device_get_submix_audio_device(dev);
     struct submix_stream_in *in;
-    int ret;
     ALOGI("adev_open_input_stream()");
     (void)handle;
     (void)devices;
 
+    *stream_in = NULL;
+
     // Make sure it's possible to open the device given the current audio config.
     submix_sanitize_config(config, true);
     if (!submix_open_validate(rsxadev, &rsxadev->lock, config, true)) {
@@ -1015,70 +1108,68 @@
         return -EINVAL;
     }
 
-    in = (struct submix_stream_in *)calloc(1, sizeof(struct submix_stream_in));
-    if (!in) {
-        ret = -ENOMEM;
-        goto err_open;
-    }
-
+#if ENABLE_LEGACY_INPUT_OPEN
     pthread_mutex_lock(&rsxadev->lock);
+    in = rsxadev->input;
+    if (in) {
+        in->ref_count++;
+        sp<MonoPipe> sink = rsxadev->rsxSink;
+        ALOG_ASSERT(sink != NULL);
+        // If the sink has been shutdown, delete the pipe.
+        if (sink->isShutdown()) submix_audio_device_release_pipe(rsxadev);
+    }
+    pthread_mutex_unlock(&rsxadev->lock);
+#else
+    in = NULL;
+#endif // ENABLE_LEGACY_INPUT_OPEN
 
-    // Initialize the function pointer tables (v-tables).
-    in->stream.common.get_sample_rate = in_get_sample_rate;
-    in->stream.common.set_sample_rate = in_set_sample_rate;
-    in->stream.common.get_buffer_size = in_get_buffer_size;
-    in->stream.common.get_channels = in_get_channels;
-    in->stream.common.get_format = in_get_format;
-    in->stream.common.set_format = in_set_format;
-    in->stream.common.standby = in_standby;
-    in->stream.common.dump = in_dump;
-    in->stream.common.set_parameters = in_set_parameters;
-    in->stream.common.get_parameters = in_get_parameters;
-    in->stream.common.add_audio_effect = in_add_audio_effect;
-    in->stream.common.remove_audio_effect = in_remove_audio_effect;
-    in->stream.set_gain = in_set_gain;
-    in->stream.read = in_read;
-    in->stream.get_input_frames_lost = in_get_input_frames_lost;
+    if (!in) {
+        in = (struct submix_stream_in *)calloc(1, sizeof(struct submix_stream_in));
+        if (!in) return -ENOMEM;
+        in->ref_count = 1;
 
-    memcpy(&rsxadev->config.common, config, sizeof(rsxadev->config.common));
-    rsxadev->config.input_channel_mask = config->channel_mask;
-
-    rsxadev->config.period_size = PERIOD_SIZE_IN_FRAMES;
-
-    *stream_in = &in->stream;
-
-    in->dev = rsxadev;
+        // Initialize the function pointer tables (v-tables).
+        in->stream.common.get_sample_rate = in_get_sample_rate;
+        in->stream.common.set_sample_rate = in_set_sample_rate;
+        in->stream.common.get_buffer_size = in_get_buffer_size;
+        in->stream.common.get_channels = in_get_channels;
+        in->stream.common.get_format = in_get_format;
+        in->stream.common.set_format = in_set_format;
+        in->stream.common.standby = in_standby;
+        in->stream.common.dump = in_dump;
+        in->stream.common.set_parameters = in_set_parameters;
+        in->stream.common.get_parameters = in_get_parameters;
+        in->stream.common.add_audio_effect = in_add_audio_effect;
+        in->stream.common.remove_audio_effect = in_remove_audio_effect;
+        in->stream.set_gain = in_set_gain;
+        in->stream.read = in_read;
+        in->stream.get_input_frames_lost = in_get_input_frames_lost;
+    }
 
     // Initialize the input stream.
     in->read_counter_frames = 0;
     in->output_standby = rsxadev->output_standby;
-
-    pthread_mutex_unlock(&rsxadev->lock);
+    in->dev = rsxadev;
+    // Initialize the pipe.
+    submix_audio_device_create_pipe(rsxadev, config, DEFAULT_PIPE_SIZE_IN_FRAMES,
+                                    DEFAULT_PIPE_PERIOD_COUNT, in, NULL);
+    // Return the input stream.
+    *stream_in = &in->stream;
 
     return 0;
-
-err_open:
-    *stream_in = NULL;
-    return ret;
 }
 
 static void adev_close_input_stream(struct audio_hw_device *dev,
                                     struct audio_stream_in *stream)
 {
-    struct submix_audio_device *rsxadev = audio_hw_device_get_submix_audio_device(dev);
+    struct submix_stream_in * const in = audio_stream_in_get_submix_stream_in(stream);
     ALOGV("adev_close_input_stream()");
-
-    pthread_mutex_lock(&rsxadev->lock);
-
-    MonoPipe* sink = rsxadev->rsxSink.get();
-    if (sink != NULL) {
-        ALOGI("shutdown");
-        sink->shutdown(true);
-    }
-
-    free(stream);
-
-    pthread_mutex_unlock(&rsxadev->lock);
+    submix_audio_device_destroy_pipe(audio_hw_device_get_submix_audio_device(dev), in, NULL);
+#if ENABLE_LEGACY_INPUT_OPEN
+    if (in->ref_count == 0) free(in);
+#else
+    free(in);
+#endif // ENABLE_LEGACY_INPUT_OPEN
 }
 
 static int adev_dump(const audio_hw_device_t *device, int fd)