r_submix: Use intermediate pipe in non-blocking mode

Switch the pipe into non-blocking mode to prevent stalling
when attempting to close it.

Simulate circular buffer behavior by flushing old data if
there is no more space in the pipe.

This removes the need for "exiting=1" parameter.

Bug: 73175392
Test: r_submix_tests
Change-Id: Iff89980af71112892ff262030e471ae736b1f62a
diff --git a/modules/audio_remote_submix/audio_hw.cpp b/modules/audio_remote_submix/audio_hw.cpp
index 8c0c097..d78e723 100644
--- a/modules/audio_remote_submix/audio_hw.cpp
+++ b/modules/audio_remote_submix/audio_hw.cpp
@@ -418,8 +418,8 @@
             config->format);
         const NBAIO_Format offers[1] = {format};
         size_t numCounterOffers = 0;
-        // Create a MonoPipe with optional blocking set to true.
-        MonoPipe* sink = new MonoPipe(buffer_size_frames, format, true /*writeCanBlock*/);
+        // Create a MonoPipe with optional blocking set to false.
+        MonoPipe* sink = new MonoPipe(buffer_size_frames, format, false /*writeCanBlock*/);
         // Negotiation between the source and sink cannot fail as the device open operation
         // creates both ends of the pipe using the same audio format.
         ssize_t index = sink->negotiate(offers, 1, NULL, numCounterOffers);
@@ -714,30 +714,8 @@
 
 static int out_set_parameters(struct audio_stream *stream, const char *kvpairs)
 {
-    int exiting = -1;
-    AudioParameter parms = AudioParameter(String8(kvpairs));
+    (void)stream;
     SUBMIX_ALOGV("out_set_parameters() kvpairs='%s'", kvpairs);
-
-    // FIXME this is using hard-coded strings but in the future, this functionality will be
-    //       converted to use audio HAL extensions required to support tunneling
-    if ((parms.getInt(String8("exiting"), exiting) == NO_ERROR) && (exiting > 0)) {
-        struct submix_audio_device * const rsxadev =
-                audio_stream_get_submix_stream_out(stream)->dev;
-        pthread_mutex_lock(&rsxadev->lock);
-        { // using the sink
-            sp<MonoPipe> sink =
-                    rsxadev->routes[audio_stream_get_submix_stream_out(stream)->route_handle]
-                                    .rsxSink;
-            if (sink == NULL) {
-                pthread_mutex_unlock(&rsxadev->lock);
-                return 0;
-            }
-
-            ALOGD("out_set_parameters(): shutting down MonoPipe sink");
-            sink->shutdown(true);
-        } // done using the sink
-        pthread_mutex_unlock(&rsxadev->lock);
-    }
     return 0;
 }
 
@@ -805,12 +783,12 @@
         return 0;
     }
 
-    // If the write to the sink would block when no input stream is present, flush enough frames
+    // If the write to the sink would block, flush enough frames
     // from the pipe to make space to write the most recent data.
     {
         const size_t availableToWrite = sink->availableToWrite();
         sp<MonoPipeReader> source = rsxadev->routes[out->route_handle].rsxSource;
-        if (rsxadev->routes[out->route_handle].input == NULL && availableToWrite < frames) {
+        if (availableToWrite < frames) {
             static uint8_t flush_buffer[64];
             const size_t flushBufferSizeFrames = sizeof(flush_buffer) / frame_size;
             size_t frames_to_flush_from_source = frames - availableToWrite;
diff --git a/modules/audio_remote_submix/tests/remote_submix_tests.cpp b/modules/audio_remote_submix/tests/remote_submix_tests.cpp
index 2c01c92..36650e7 100644
--- a/modules/audio_remote_submix/tests/remote_submix_tests.cpp
+++ b/modules/audio_remote_submix/tests/remote_submix_tests.cpp
@@ -198,17 +198,16 @@
 }
 
 // Verifies that when input is opened but not reading, writing into an output stream does not block.
-// !!! Currently does not finish because requires setting a parameter from another thread !!!
-// TEST_F(RemoteSubmixTest, OutputDoesNotBlockWhenInputStuck) {
-//     const char* address = "1";
-//     audio_stream_out_t* streamOut;
-//     OpenOutputStream(address, true /*mono*/, 48000, &streamOut);
-//     audio_stream_in_t* streamIn;
-//     OpenInputStream(address, true /*mono*/, 48000, &streamIn);
-//     WriteSomethingIntoStream(streamOut, 1024, 16);
-//     mDev->close_input_stream(mDev, streamIn);
-//     mDev->close_output_stream(mDev, streamOut);
-// }
+TEST_F(RemoteSubmixTest, OutputDoesNotBlockWhenInputStuck) {
+    const char* address = "1";
+    audio_stream_out_t* streamOut;
+    OpenOutputStream(address, true /*mono*/, 48000, &streamOut);
+    audio_stream_in_t* streamIn;
+    OpenInputStream(address, true /*mono*/, 48000, &streamIn);
+    WriteSomethingIntoStream(streamOut, 1024, 16);
+    mDev->close_input_stream(mDev, streamIn);
+    mDev->close_output_stream(mDev, streamOut);
+}
 
 TEST_F(RemoteSubmixTest, OutputAndInput) {
     const char* address = "1";