Support streaming data across binder boundaries.

Change-Id: Ifbac61406dcb81343765f99ccba08bd90f9274cc
diff --git a/media/libstagefright/AwesomePlayer.cpp b/media/libstagefright/AwesomePlayer.cpp
index ec58919..a804866 100644
--- a/media/libstagefright/AwesomePlayer.cpp
+++ b/media/libstagefright/AwesomePlayer.cpp
@@ -34,13 +34,16 @@
 #include "UDPPusher.h"
 
 #include <binder/IPCThreadState.h>
+#include <binder/MemoryDealer.h>
+#include <media/IStreamSource.h>
+#include <media/stagefright/foundation/hexdump.h>
+#include <media/stagefright/foundation/ADebug.h>
 #include <media/stagefright/AudioPlayer.h>
 #include <media/stagefright/DataSource.h>
 #include <media/stagefright/FileSource.h>
 #include <media/stagefright/MediaBuffer.h>
 #include <media/stagefright/MediaDefs.h>
 #include <media/stagefright/MediaExtractor.h>
-#include <media/stagefright/MediaDebug.h>
 #include <media/stagefright/MediaSource.h>
 #include <media/stagefright/MetaData.h>
 #include <media/stagefright/OMXCodec.h>
@@ -155,6 +158,201 @@
             const AwesomeNativeWindowRenderer &);
 };
 
+////////////////////////////////////////////////////////////////////////////////
+
+struct QueueDataSource;
+
+struct QueueListener : public BnStreamListener {
+    QueueListener(QueueDataSource *owner)
+        : mOwner(owner) {
+    }
+
+    void clearOwner();
+
+    virtual void queueBuffer(size_t index, size_t size);
+    virtual void queueCommand(Command cmd);
+
+private:
+    Mutex mLock;
+
+    QueueDataSource *mOwner;
+
+    DISALLOW_EVIL_CONSTRUCTORS(QueueListener);
+};
+
+struct QueueDataSource : public DataSource {
+    QueueDataSource(const sp<IStreamSource> &source);
+
+    virtual status_t initCheck() const;
+
+    virtual ssize_t readAt(off64_t offset, void *data, size_t size);
+
+    virtual void queueBuffer(size_t index, size_t size);
+    virtual void queueCommand(IStreamListener::Command cmd);
+
+protected:
+    virtual ~QueueDataSource();
+
+private:
+    enum {
+        kNumBuffers = 16
+    };
+
+    struct BufferInfo {
+        size_t mIndex;
+        size_t mOffset;
+        size_t mSize;
+    };
+
+    Mutex mLock;
+    Condition mCondition;
+
+    sp<IStreamSource> mSource;
+    sp<QueueListener> mListener;
+    sp<MemoryDealer> mDealer;
+    Vector<sp<IMemory> > mBuffers;
+
+    List<BufferInfo> mFilledBuffers;
+
+    off64_t mPosition;
+    bool mEOS;
+
+    DISALLOW_EVIL_CONSTRUCTORS(QueueDataSource);
+};
+
+QueueDataSource::QueueDataSource(const sp<IStreamSource> &source)
+    : mSource(source),
+      mPosition(0),
+      mEOS(false) {
+    mListener = new QueueListener(this);
+    mSource->setListener(mListener);
+
+    static const size_t kBufferSize = 8192;
+
+    mDealer = new MemoryDealer(kNumBuffers * kBufferSize);
+    for (size_t i = 0; i < kNumBuffers; ++i) {
+        sp<IMemory> mem = mDealer->allocate(kBufferSize);
+        CHECK(mem != NULL);
+
+        mBuffers.push(mem);
+    }
+    mSource->setBuffers(mBuffers);
+
+    for (size_t i = 0; i < kNumBuffers; ++i) {
+        mSource->onBufferAvailable(i);
+    }
+}
+
+QueueDataSource::~QueueDataSource() {
+    Mutex::Autolock autoLock(mLock);
+
+    while (mFilledBuffers.size() < kNumBuffers && !mEOS) {
+        mCondition.wait(mLock);
+    }
+
+    mListener->clearOwner();
+}
+
+status_t QueueDataSource::initCheck() const {
+    return OK;
+}
+
+ssize_t QueueDataSource::readAt(off64_t offset, void *data, size_t size) {
+    if (offset != mPosition) {
+        return -EPIPE;
+    }
+
+    Mutex::Autolock autoLock(mLock);
+
+    size_t sizeDone = 0;
+
+    while (sizeDone < size) {
+        while (mFilledBuffers.empty() && !mEOS) {
+            mCondition.wait(mLock);
+        }
+
+        if (mFilledBuffers.empty()) {
+            if (sizeDone > 0) {
+                mPosition += sizeDone;
+                return sizeDone;
+            }
+            return ERROR_END_OF_STREAM;
+        }
+
+        BufferInfo &info = *mFilledBuffers.begin();
+
+        size_t copy = size - sizeDone;
+        if (copy > info.mSize) {
+            copy = info.mSize;
+        }
+
+        memcpy((uint8_t *)data + sizeDone,
+               (const uint8_t *)mBuffers.itemAt(info.mIndex)->pointer()
+                    + info.mOffset,
+               copy);
+
+        info.mSize -= copy;
+        info.mOffset += copy;
+        sizeDone += copy;
+
+        if (info.mSize == 0) {
+            mSource->onBufferAvailable(info.mIndex);
+            mFilledBuffers.erase(mFilledBuffers.begin());
+        }
+    }
+
+    mPosition += sizeDone;
+
+    return sizeDone;
+}
+
+void QueueDataSource::queueBuffer(size_t index, size_t size) {
+    Mutex::Autolock autoLock(mLock);
+
+    CHECK_LT(index, mBuffers.size());
+    CHECK_LE(size, mBuffers.itemAt(index)->size());
+
+    BufferInfo info;
+    info.mIndex = index;
+    info.mSize = size;
+    info.mOffset = 0;
+
+    mFilledBuffers.push_back(info);
+    mCondition.signal();
+}
+
+void QueueDataSource::queueCommand(IStreamListener::Command cmd) {
+    Mutex::Autolock autoLock(mLock);
+
+    if (cmd == IStreamListener::EOS) {
+        mEOS = true;
+        mCondition.signal();
+    }
+}
+
+void QueueListener::clearOwner() {
+    Mutex::Autolock autoLock(mLock);
+    mOwner = NULL;
+}
+
+void QueueListener::queueBuffer(size_t index, size_t size) {
+    Mutex::Autolock autoLock(mLock);
+    if (mOwner == NULL) {
+        return;
+    }
+    mOwner->queueBuffer(index, size);
+}
+
+void QueueListener::queueCommand(Command cmd) {
+    Mutex::Autolock autoLock(mLock);
+    if (mOwner == NULL) {
+        return;
+    }
+    mOwner->queueCommand(cmd);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
 AwesomePlayer::AwesomePlayer()
     : mQueueStarted(false),
       mTimeSource(NULL),
@@ -164,7 +362,7 @@
       mExtractorFlags(0),
       mVideoBuffer(NULL),
       mDecryptHandle(NULL) {
-    CHECK_EQ(mClient.connect(), OK);
+    CHECK_EQ(mClient.connect(), (status_t)OK);
 
     DataSource::RegisterDefaultSniffers();
 
@@ -264,6 +462,26 @@
     return setDataSource_l(dataSource);
 }
 
+status_t AwesomePlayer::setDataSource(const sp<IStreamSource> &source) {
+    Mutex::Autolock autoLock(mLock);
+
+    reset_l();
+
+    sp<DataSource> dataSource = new QueueDataSource(source);
+
+#if 0
+    sp<MediaExtractor> extractor =
+        MediaExtractor::Create(dataSource, MEDIA_MIMETYPE_CONTAINER_MPEG2TS);
+
+    return setDataSource_l(extractor);
+#else
+    sp<NuCachedSource2> cached = new NuCachedSource2(dataSource);
+    dataSource = cached;
+
+    return setDataSource_l(dataSource);
+#endif
+}
+
 status_t AwesomePlayer::setDataSource_l(
         const sp<DataSource> &dataSource) {
     sp<MediaExtractor> extractor = MediaExtractor::Create(dataSource);
@@ -619,7 +837,8 @@
         IPCThreadState::self()->flushCommands();
     }
 
-    CHECK_EQ(OK, initVideoDecoder(OMXCodec::kIgnoreCodecSpecificData));
+    CHECK_EQ((status_t)OK,
+             initVideoDecoder(OMXCodec::kIgnoreCodecSpecificData));
 }
 
 void AwesomePlayer::onStreamDone() {
@@ -1171,7 +1390,7 @@
             options.clearSeekTo();
 
             if (err != OK) {
-                CHECK_EQ(mVideoBuffer, NULL);
+                CHECK(mVideoBuffer == NULL);
 
                 if (err == INFO_FORMAT_CHANGED) {
                     LOGV("VideoSource signalled format change.");