Remote audio submix: blocking writes and sleeping reads.

Change how the remote audio submix is handling piping audio
 through the pipe:
 - use a MonoPipe as audio sink for blocking writes,
 - use a MonoPipeReader as audio source for non blocking reads,
  and keep track of when recording started to align the
  time at which the in_read() call should return with the
  projected time of the recording duration.

Change-Id: I8b0f8c56a0486806101e272dfbf9c6d2d1c11112
diff --git a/modules/audio_remote_submix/audio_hw.cpp b/modules/audio_remote_submix/audio_hw.cpp
old mode 100644
new mode 100755
index 0e5589b..b17020a
--- a/modules/audio_remote_submix/audio_hw.cpp
+++ b/modules/audio_remote_submix/audio_hw.cpp
@@ -15,7 +15,7 @@
  */
 
 #define LOG_TAG "r_submix"
-//#define LOG_NDEBUG 0
+#define LOG_NDEBUG 0
 
 #include <errno.h>
 #include <pthread.h>
@@ -31,17 +31,23 @@
 #include <system/audio.h>
 #include <hardware/audio.h>
 
-#include <media/nbaio/Pipe.h>
-#include <media/nbaio/PipeReader.h>
+//#include <media/nbaio/Pipe.h>
+//#include <media/nbaio/PipeReader.h>
+#include <media/nbaio/MonoPipe.h>
+#include <media/nbaio/MonoPipeReader.h>
 #include <media/AudioBufferProvider.h>
 
 extern "C" {
 
 namespace android {
 
-#define MAX_PIPE_DEPTH_IN_FRAMES     (1024*4)
+#define MAX_PIPE_DEPTH_IN_FRAMES     (1024*8)
+// The duration of MAX_READ_ATTEMPTS * READ_ATTEMPT_SLEEP_MS must be stricly inferior to
+//   the duration of a record buffer at the current record sample rate (of the device, not of
+//   the recording itself). Here we have:
+//      3 * 5ms = 15ms < 1024 frames * 1000 / 48000 = 21.333ms
 #define MAX_READ_ATTEMPTS            3
-#define READ_ATTEMPT_SLEEP_MS        10 // 10ms between two read attempts when pipe is empty
+#define READ_ATTEMPT_SLEEP_MS        5 // 5ms between two read attempts when pipe is empty
 #define DEFAULT_RATE_HZ              48000 // default sample rate
 
 struct submix_config {
@@ -54,18 +60,21 @@
 
 struct submix_audio_device {
     struct audio_hw_device device;
+    bool output_standby;
+    bool input_standby;
     submix_config config;
     // Pipe variables: they handle the ring buffer that "pipes" audio:
-    //  - from the submix virtual audio output == what needs to be played by
-    //    the remotely, seen as an output for AudioFlinger
+    //  - from the submix virtual audio output == what needs to be played
+    //    remotely, seen as an output for AudioFlinger
     //  - to the virtual audio source == what is captured by the component
     //    which "records" the submix / virtual audio source, and handles it as needed.
-    // An usecase example is one where the component capturing the audio is then sending it over
+    // A usecase example is one where the component capturing the audio is then sending it over
     // Wifi for presentation on a remote Wifi Display device (e.g. a dongle attached to a TV, or a
     // TV with Wifi Display capabilities), or to a wireless audio player.
-    sp<Pipe>       rsxSink;
-    sp<PipeReader> rsxSource;
+    sp<MonoPipe>       rsxSink;
+    sp<MonoPipeReader> rsxSource;
 
+    // device lock, also used to protect access to the audio pipe
     pthread_mutex_t lock;
 };
 
@@ -77,9 +86,13 @@
 struct submix_stream_in {
     struct audio_stream_in stream;
     struct submix_audio_device *dev;
-};
+    bool output_standby; // output standby state as seen from record thread
 
-static struct timespec currentTs;
+    // wall clock when recording starts
+    struct timespec record_start_time;
+    // how many frames have been requested to be read
+    int64_t read_counter_frames;
+};
 
 
 /* audio HAL functions */
@@ -142,7 +155,16 @@
 
 static int out_standby(struct audio_stream *stream)
 {
-    // REMOTE_SUBMIX is a proxy / virtual audio device, so the notion of standby doesn't apply here
+    ALOGI("out_standby()");
+
+    const struct submix_stream_out *out = reinterpret_cast<const struct submix_stream_out *>(stream);
+
+    pthread_mutex_lock(&out->dev->lock);
+
+    out->dev->output_standby = true;
+
+    pthread_mutex_unlock(&out->dev->lock);
+
     return 0;
 }
 
@@ -181,12 +203,14 @@
                          size_t bytes)
 {
     //ALOGV("out_write(bytes=%d)", bytes);
-    ssize_t written = 0;
+    ssize_t written_frames = 0;
     struct submix_stream_out *out = reinterpret_cast<struct submix_stream_out *>(stream);
 
     pthread_mutex_lock(&out->dev->lock);
 
-    Pipe* sink = out->dev->rsxSink.get();
+    out->dev->output_standby = false;
+
+    MonoPipe* sink = out->dev->rsxSink.get();
     if (sink != NULL) {
         out->dev->rsxSink->incStrong(buffer);
     } else {
@@ -198,17 +222,23 @@
 
     pthread_mutex_unlock(&out->dev->lock);
 
-    const size_t frames = bytes / audio_stream_frame_size(&stream->common);
-    written = sink->write(buffer, frames);
-    if (written < 0) {
-        if (written == (ssize_t)NEGOTIATE) {
+    const size_t frame_size = audio_stream_frame_size(&stream->common);
+    const size_t frames = bytes / frame_size;
+    written_frames = sink->write(buffer, frames);
+    if (written_frames < 0) {
+        if (written_frames == (ssize_t)NEGOTIATE) {
             ALOGE("out_write() write to pipe returned NEGOTIATE");
-            written = 0;
+
+            pthread_mutex_lock(&out->dev->lock);
+            out->dev->rsxSink->decStrong(buffer);
+            pthread_mutex_unlock(&out->dev->lock);
+
+            written_frames = 0;
             return 0;
         } else {
             // write() returned UNDERRUN or WOULD_BLOCK, retry
-            ALOGE("out_write() write to pipe returned unexpected %16lx", written);
-            written = sink->write(buffer, frames);
+            ALOGE("out_write() write to pipe returned unexpected %16lx", written_frames);
+            written_frames = sink->write(buffer, frames);
         }
     }
 
@@ -218,25 +248,13 @@
 
     pthread_mutex_unlock(&out->dev->lock);
 
-    struct timespec newTs;
-    int toSleepUs = 0;
-    int rc = clock_gettime(CLOCK_MONOTONIC, &newTs);
-    if (rc == 0) {
-        time_t sec = newTs.tv_sec - currentTs.tv_sec;
-        long nsec = newTs.tv_nsec - currentTs.tv_nsec;
-        if (nsec < 0) {
-            --sec;
-            nsec += 1000000000;
-        }
-        if ((nsec / 1000) < (frames * 1000000 / out_get_sample_rate(&stream->common))) {
-            toSleepUs = (frames * 1000000 / out_get_sample_rate(&stream->common)) - (nsec/1000);
-            ALOGI("sleeping %dus", toSleepUs);
-            usleep(toSleepUs);
-        }
+    if (written_frames < 0) {
+        ALOGE("out_write() failed writing to pipe with %16lx", written_frames);
+        return 0;
+    } else {
+        ALOGV("out_write() wrote %lu bytes)", written_frames * frame_size);
+        return written_frames * frame_size;
     }
-    clock_gettime(CLOCK_MONOTONIC, &currentTs);
-    //ALOGV("out_write(bytes=%d) written=%d", bytes, written);
-    return written * audio_stream_frame_size(&stream->common);
 }
 
 static int out_get_render_position(const struct audio_stream_out *stream,
@@ -265,7 +283,7 @@
 static uint32_t in_get_sample_rate(const struct audio_stream *stream)
 {
     const struct submix_stream_in *in = reinterpret_cast<const struct submix_stream_in *>(stream);
-    ALOGV("in_get_sample_rate() returns %u", in->dev->config.rate);
+    //ALOGV("in_get_sample_rate() returns %u", in->dev->config.rate);
     return in->dev->config.rate;
 }
 
@@ -303,7 +321,15 @@
 
 static int in_standby(struct audio_stream *stream)
 {
-    // REMOTE_SUBMIX is a proxy / virtual audio device, so the notion of standby doesn't apply here
+    ALOGI("in_standby()");
+    const struct submix_stream_in *in = reinterpret_cast<const struct submix_stream_in *>(stream);
+
+    pthread_mutex_lock(&in->dev->lock);
+
+    in->dev->input_standby = true;
+
+    pthread_mutex_unlock(&in->dev->lock);
+
     return 0;
 }
 
@@ -333,15 +359,32 @@
 {
     //ALOGV("in_read bytes=%u", bytes);
     ssize_t frames_read = -1977;
-    const struct submix_stream_in *in = reinterpret_cast<const struct submix_stream_in *>(stream);
+    struct submix_stream_in *in = reinterpret_cast<struct submix_stream_in *>(stream);
     const size_t frame_size = audio_stream_frame_size(&stream->common);
+    const size_t frames_to_read = bytes / frame_size;
 
     pthread_mutex_lock(&in->dev->lock);
 
-    PipeReader* source = in->dev->rsxSource.get();
+    const bool output_standby_transition = (in->output_standby != in->dev->output_standby);
+    in->output_standby = in->dev->output_standby;
+
+    if (in->dev->input_standby || output_standby_transition) {
+        in->dev->input_standby = false;
+        // keep track of when we exit input standby (== first read == start "real recording")
+        // or when we start recording silence, and reset projected time
+        int rc = clock_gettime(CLOCK_MONOTONIC, &in->record_start_time);
+        if (rc == 0) {
+            in->read_counter_frames = 0;
+        }
+    }
+
+    in->read_counter_frames += frames_to_read;
+
+    MonoPipeReader* source = in->dev->rsxSource.get();
     if (source != NULL) {
         in->dev->rsxSource->incStrong(in);
     } else {
+        ALOGE("no audio pipe yet we're trying to read!");
         pthread_mutex_unlock(&in->dev->lock);
         usleep((bytes / frame_size) * 1000000 / in_get_sample_rate(&stream->common));
         memset(buffer, 0, bytes);
@@ -350,40 +393,25 @@
 
     pthread_mutex_unlock(&in->dev->lock);
 
-
-    int attempts = MAX_READ_ATTEMPTS;
-    size_t remaining_frames = bytes / frame_size;
+    // read the data from the pipe (it's non blocking)
+    size_t remaining_frames = frames_to_read;
+    int attempts = 0;
     char* buff = (char*)buffer;
-    while (attempts > 0) {
+    while ((remaining_frames > 0) && (attempts < MAX_READ_ATTEMPTS)) {
+        attempts++;
         frames_read = source->read(buff, remaining_frames, AudioBufferProvider::kInvalidPTS);
         if (frames_read > 0) {
-            //ALOGV("  in_read got frames=%u size=%u attempts=%d", remaining_frames, frame_size, attempts);
             remaining_frames -= frames_read;
             buff += frames_read * frame_size;
-            if (remaining_frames == 0) {
-                // TODO simplify code by breaking out of loop
-
-                pthread_mutex_lock(&in->dev->lock);
-
-                in->dev->rsxSource->decStrong(in);
-
-                pthread_mutex_unlock(&in->dev->lock);
-
-                return bytes;
-            }
-        } else if (frames_read == 0) {
-            // TODO sleep should be tied to how much data is expected
-            //ALOGW("sleeping %dms", READ_ATTEMPT_SLEEP_MS);
-            usleep(READ_ATTEMPT_SLEEP_MS*1000);
-            attempts--;
-        } else { // frames_read is an error code
-            if (frames_read != (ssize_t)OVERRUN) {
-                attempts--;
-            }
-            // else OVERRUN: error has been signaled, ok to read, do not decrement counter
+            //ALOGV("  in_read (att=%d) got %ld frames, remaining=%u",
+            //      attempts, frames_read, remaining_frames);
+        } else {
+            //ALOGE("  in_read read returned %ld", frames_read);
+            usleep(READ_ATTEMPT_SLEEP_MS * 1000);
         }
     }
 
+    // done using the source
     pthread_mutex_lock(&in->dev->lock);
 
     in->dev->rsxSource->decStrong(in);
@@ -391,17 +419,48 @@
     pthread_mutex_unlock(&in->dev->lock);
 
     if (remaining_frames > 0) {
-        ALOGW("remaining_frames = %d", remaining_frames);
+        ALOGV("  remaining_frames = %d", remaining_frames);
         memset(((char*)buffer)+ bytes - (remaining_frames * frame_size), 0,
                 remaining_frames * frame_size);
-        return bytes;
     }
 
-    if (frames_read < 0) {
-        ALOGE("in_read error=%16lx", frames_read);
+    // compute how much we need to sleep after reading the data by comparing the wall clock with
+    //   the projected time at which we should return.
+    struct timespec time_after_read;// wall clock after reading from the pipe
+    struct timespec record_duration;// observed record duration
+    int rc = clock_gettime(CLOCK_MONOTONIC, &time_after_read);
+    const uint32_t sample_rate = in_get_sample_rate(&stream->common);
+    if (rc == 0) {
+        // for how long have we been recording?
+        record_duration.tv_sec  = time_after_read.tv_sec - in->record_start_time.tv_sec;
+        record_duration.tv_nsec = time_after_read.tv_nsec - in->record_start_time.tv_nsec;
+        if (record_duration.tv_nsec < 0) {
+            record_duration.tv_sec--;
+            record_duration.tv_nsec += 1000000000;
+        }
+
+        // read_counter_frames contains the number of frames that have been read since the beginning
+        // of recording (including this call): it's converted to usec and compared to how long we've
+        // been recording for, which gives us how long we must wait to sync the projected recording
+        // time, and the observed recording time
+        long projected_vs_observed_offset_us =
+                ((int64_t)(in->read_counter_frames
+                            - (record_duration.tv_sec*sample_rate)))
+                        * 1000000 / sample_rate
+                - (record_duration.tv_nsec / 1000);
+
+        ALOGV("  record duration %5lds %3ldms, will wait: %7ldus",
+                record_duration.tv_sec, record_duration.tv_nsec/1000000,
+                projected_vs_observed_offset_us);
+        if (projected_vs_observed_offset_us > 0) {
+            usleep(projected_vs_observed_offset_us);
+        }
     }
-    ALOGE_IF(attempts == 0, "attempts == 0 ");
-    return 0;
+
+
+    ALOGV("in_read returns %d", bytes);
+    return bytes;
+
 }
 
 static uint32_t in_get_input_frames_lost(struct audio_stream_in *stream)
@@ -481,12 +540,11 @@
                 config->sample_rate == 48000 ? Format_SR48_C2_I16 : Format_SR44_1_C2_I16;
         const NBAIO_Format offers[1] = {format};
         size_t numCounterOffers = 0;
-        // creating a Pipe, not a MonoPipe with optional blocking set to true, so audio frames
-        //  entering a full sink will overwrite the contents of the pipe.
-        Pipe* sink = new Pipe(MAX_PIPE_DEPTH_IN_FRAMES, format);
+        // creating a MonoPipe with optional blocking set to true.
+        MonoPipe* sink = new MonoPipe(MAX_PIPE_DEPTH_IN_FRAMES, format, true/*writeCanBlock*/);
         ssize_t index = sink->negotiate(offers, 1, NULL, numCounterOffers);
         ALOG_ASSERT(index == 0);
-        PipeReader* source = new PipeReader(*sink);
+        MonoPipeReader* source = new MonoPipeReader(sink);
         numCounterOffers = 0;
         index = source->negotiate(offers, 1, NULL, numCounterOffers);
         ALOG_ASSERT(index == 0);
@@ -636,6 +694,9 @@
 
     in->dev = rsxadev;
 
+    in->read_counter_frames = 0;
+    in->output_standby = rsxadev->output_standby;
+
     pthread_mutex_unlock(&rsxadev->lock);
 
     return 0;
@@ -706,6 +767,9 @@
     rsxadev->device.close_input_stream = adev_close_input_stream;
     rsxadev->device.dump = adev_dump;
 
+    rsxadev->input_standby = true;
+    rsxadev->output_standby = true;
+
     *device = &rsxadev->device.common;
 
     return 0;