API Support for both synchronous and queued commands, optionally associated metadata.

Change-Id: Idb90d64cb638942210c5822b3cba2f05b087d601
diff --git a/cmds/stagefright/stream.cpp b/cmds/stagefright/stream.cpp
index f2b5638..a3d7a6e 100644
--- a/cmds/stagefright/stream.cpp
+++ b/cmds/stagefright/stream.cpp
@@ -3,6 +3,7 @@
 #include <media/IStreamSource.h>
 #include <media/mediaplayer.h>
 #include <media/stagefright/foundation/ADebug.h>
+#include <media/stagefright/foundation/AMessage.h>
 
 #include <binder/IServiceManager.h>
 #include <media/IMediaPlayerService.h>
@@ -56,7 +57,7 @@
 
     ssize_t n = read(mFd, mem->pointer(), mem->size());
     if (n <= 0) {
-        mListener->queueCommand(IStreamListener::EOS);
+        mListener->issueCommand(IStreamListener::EOS, false /* synchronous */);
     } else {
         mListener->queueBuffer(index, n);
     }
diff --git a/include/media/IStreamSource.h b/include/media/IStreamSource.h
index 6291124..4b698e6 100644
--- a/include/media/IStreamSource.h
+++ b/include/media/IStreamSource.h
@@ -22,6 +22,7 @@
 
 namespace android {
 
+struct AMessage;
 struct IMemory;
 struct IStreamListener;
 
@@ -38,13 +39,14 @@
     DECLARE_META_INTERFACE(StreamListener);
 
     enum Command {
-        FLUSH,
+        EOS,
         DISCONTINUITY,
-        EOS
     };
 
     virtual void queueBuffer(size_t index, size_t size) = 0;
-    virtual void queueCommand(Command cmd) = 0;
+
+    virtual void issueCommand(
+            Command cmd, bool synchronous, const sp<AMessage> &msg = NULL) = 0;
 };
 
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/include/media/stagefright/foundation/AMessage.h b/include/media/stagefright/foundation/AMessage.h
index c674cba..2fbdddc 100644
--- a/include/media/stagefright/foundation/AMessage.h
+++ b/include/media/stagefright/foundation/AMessage.h
@@ -26,10 +26,14 @@
 namespace android {
 
 struct AString;
+struct Parcel;
 
 struct AMessage : public RefBase {
     AMessage(uint32_t what = 0, ALooper::handler_id target = 0);
 
+    static sp<AMessage> FromParcel(const Parcel &parcel);
+    void writeToParcel(Parcel *parcel) const;
+
     void setWhat(uint32_t what);
     uint32_t what() const;
 
diff --git a/media/libmedia/Android.mk b/media/libmedia/Android.mk
index 731c09d..74fb531 100644
--- a/media/libmedia/Android.mk
+++ b/media/libmedia/Android.mk
@@ -36,7 +36,8 @@
     fixedfft.cpp.arm
 
 LOCAL_SHARED_LIBRARIES := \
-	libui libcutils libutils libbinder libsonivox libicuuc libexpat libsurfaceflinger_client libcamera_client
+	libui libcutils libutils libbinder libsonivox libicuuc libexpat \
+        libsurfaceflinger_client libcamera_client libstagefright_foundation
 
 LOCAL_MODULE:= libmedia
 
diff --git a/media/libmedia/IStreamSource.cpp b/media/libmedia/IStreamSource.cpp
index 89f2b44..5069002 100644
--- a/media/libmedia/IStreamSource.cpp
+++ b/media/libmedia/IStreamSource.cpp
@@ -19,6 +19,7 @@
 #include <utils/Log.h>
 
 #include <media/IStreamSource.h>
+#include <media/stagefright/foundation/AMessage.h>
 
 #include <binder/IMemory.h>
 #include <binder/Parcel.h>
@@ -33,7 +34,7 @@
 
     // IStreamListener
     QUEUE_BUFFER,
-    QUEUE_COMMAND,
+    ISSUE_COMMAND,
 };
 
 struct BpStreamSource : public BpInterface<IStreamSource> {
@@ -125,12 +126,21 @@
         remote()->transact(QUEUE_BUFFER, data, &reply, IBinder::FLAG_ONEWAY);
     }
 
-    virtual void queueCommand(Command cmd) {
+    virtual void issueCommand(
+            Command cmd, bool synchronous, const sp<AMessage> &msg) {
         Parcel data, reply;
         data.writeInterfaceToken(IStreamListener::getInterfaceDescriptor());
         data.writeInt32(static_cast<int32_t>(cmd));
+        data.writeInt32(static_cast<int32_t>(synchronous));
 
-        remote()->transact(QUEUE_COMMAND, data, &reply, IBinder::FLAG_ONEWAY);
+        if (msg != NULL) {
+            data.writeInt32(1);
+            msg->writeToParcel(&data);
+        } else {
+            data.writeInt32(0);
+        }
+
+        remote()->transact(ISSUE_COMMAND, data, &reply, IBinder::FLAG_ONEWAY);
     }
 };
 
@@ -149,12 +159,20 @@
             break;
         }
 
-        case QUEUE_COMMAND:
+        case ISSUE_COMMAND:
         {
             CHECK_INTERFACE(IStreamListener, data, reply);
             Command cmd = static_cast<Command>(data.readInt32());
 
-            queueCommand(cmd);
+            bool synchronous = static_cast<bool>(data.readInt32());
+
+            sp<AMessage> msg;
+
+            if (data.readInt32()) {
+                msg = AMessage::FromParcel(data);
+            }
+
+            issueCommand(cmd, synchronous, msg);
             break;
         }
 
diff --git a/media/libstagefright/AwesomePlayer.cpp b/media/libstagefright/AwesomePlayer.cpp
index 2743a3a..7613d04 100644
--- a/media/libstagefright/AwesomePlayer.cpp
+++ b/media/libstagefright/AwesomePlayer.cpp
@@ -50,6 +50,7 @@
 #include <surfaceflinger/Surface.h>
 
 #include <media/stagefright/foundation/ALooper.h>
+#include <media/stagefright/foundation/AMessage.h>
 #include "include/LiveSession.h"
 
 #define USE_SURFACE_ALLOC 1
@@ -170,7 +171,9 @@
     void clearOwner();
 
     virtual void queueBuffer(size_t index, size_t size);
-    virtual void queueCommand(Command cmd);
+
+    virtual void issueCommand(
+            Command cmd, bool synchronous, const sp<AMessage> &msg);
 
 private:
     Mutex mLock;
@@ -188,7 +191,11 @@
     virtual ssize_t readAt(off64_t offset, void *data, size_t size);
 
     virtual void queueBuffer(size_t index, size_t size);
-    virtual void queueCommand(IStreamListener::Command cmd);
+
+    virtual void issueCommand(
+            IStreamListener::Command cmd,
+            bool synchronous,
+            const sp<AMessage> &msg);
 
 protected:
     virtual ~QueueDataSource();
@@ -198,7 +205,12 @@
         kNumBuffers = 16
     };
 
-    struct BufferInfo {
+    struct QueueEntry {
+        bool mIsCommand;
+
+        IStreamListener::Command mCommand;
+        sp<AMessage> mCommandMessage;
+
         size_t mIndex;
         size_t mOffset;
         size_t mSize;
@@ -212,7 +224,7 @@
     sp<MemoryDealer> mDealer;
     Vector<sp<IMemory> > mBuffers;
 
-    List<BufferInfo> mFilledBuffers;
+    List<QueueEntry> mQueue;
 
     off64_t mPosition;
     bool mEOS;
@@ -227,7 +239,7 @@
     mListener = new QueueListener(this);
     mSource->setListener(mListener);
 
-    static const size_t kBufferSize = 8192;
+    static const size_t kBufferSize = (8192 / 188) * 188;
 
     mDealer = new MemoryDealer(kNumBuffers * kBufferSize);
     for (size_t i = 0; i < kNumBuffers; ++i) {
@@ -246,10 +258,6 @@
 QueueDataSource::~QueueDataSource() {
     Mutex::Autolock autoLock(mLock);
 
-    while (mFilledBuffers.size() < kNumBuffers && !mEOS) {
-        mCondition.wait(mLock);
-    }
-
     mListener->clearOwner();
 }
 
@@ -264,40 +272,69 @@
 
     Mutex::Autolock autoLock(mLock);
 
+    if (mEOS) {
+        return ERROR_END_OF_STREAM;
+    }
+
     size_t sizeDone = 0;
 
     while (sizeDone < size) {
-        while (mFilledBuffers.empty() && !mEOS) {
+        while (mQueue.empty()) {
             mCondition.wait(mLock);
         }
 
-        if (mFilledBuffers.empty()) {
-            if (sizeDone > 0) {
-                mPosition += sizeDone;
-                return sizeDone;
+        QueueEntry &entry = *mQueue.begin();
+
+        if (entry.mIsCommand) {
+            switch (entry.mCommand) {
+                case IStreamListener::EOS:
+                {
+                    mEOS = true;
+
+                    if (sizeDone > 0) {
+                        offset += sizeDone;
+                        return sizeDone;
+                    } else {
+                        return ERROR_END_OF_STREAM;
+                    }
+                    break;
+                }
+
+                case IStreamListener::DISCONTINUITY:
+                {
+                    CHECK_EQ(size, 188u);
+                    CHECK_EQ(sizeDone, 0u);
+
+                    memset(data, 0, size);
+                    sizeDone = size;
+                    break;
+                }
+
+                default:
+                    break;
             }
-            return ERROR_END_OF_STREAM;
+
+            mQueue.erase(mQueue.begin());
+            continue;
         }
 
-        BufferInfo &info = *mFilledBuffers.begin();
-
         size_t copy = size - sizeDone;
-        if (copy > info.mSize) {
-            copy = info.mSize;
+        if (copy > entry.mSize) {
+            copy = entry.mSize;
         }
 
         memcpy((uint8_t *)data + sizeDone,
-               (const uint8_t *)mBuffers.itemAt(info.mIndex)->pointer()
-                    + info.mOffset,
+               (const uint8_t *)mBuffers.itemAt(entry.mIndex)->pointer()
+                    + entry.mOffset,
                copy);
 
-        info.mSize -= copy;
-        info.mOffset += copy;
+        entry.mSize -= copy;
+        entry.mOffset += copy;
         sizeDone += copy;
 
-        if (info.mSize == 0) {
-            mSource->onBufferAvailable(info.mIndex);
-            mFilledBuffers.erase(mFilledBuffers.begin());
+        if (entry.mSize == 0) {
+            mSource->onBufferAvailable(entry.mIndex);
+            mQueue.erase(mQueue.begin());
         }
     }
 
@@ -312,22 +349,31 @@
     CHECK_LT(index, mBuffers.size());
     CHECK_LE(size, mBuffers.itemAt(index)->size());
 
-    BufferInfo info;
-    info.mIndex = index;
-    info.mSize = size;
-    info.mOffset = 0;
+    QueueEntry entry;
+    entry.mIsCommand = false;
+    entry.mIndex = index;
+    entry.mSize = size;
+    entry.mOffset = 0;
 
-    mFilledBuffers.push_back(info);
+    mQueue.push_back(entry);
     mCondition.signal();
 }
 
-void QueueDataSource::queueCommand(IStreamListener::Command cmd) {
+void QueueDataSource::issueCommand(
+        IStreamListener::Command cmd,
+        bool synchronous,
+        const sp<AMessage> &msg) {
     Mutex::Autolock autoLock(mLock);
 
-    if (cmd == IStreamListener::EOS) {
-        mEOS = true;
-        mCondition.signal();
-    }
+    CHECK(!synchronous);
+
+    QueueEntry entry;
+    entry.mIsCommand = true;
+    entry.mCommand = cmd;
+    entry.mCommandMessage = msg;
+    mQueue.push_back(entry);
+
+    mCondition.signal();
 }
 
 void QueueListener::clearOwner() {
@@ -343,12 +389,13 @@
     mOwner->queueBuffer(index, size);
 }
 
-void QueueListener::queueCommand(Command cmd) {
+void QueueListener::issueCommand(
+        Command cmd, bool synchronous, const sp<AMessage> &msg) {
     Mutex::Autolock autoLock(mLock);
     if (mOwner == NULL) {
         return;
     }
-    mOwner->queueCommand(cmd);
+    mOwner->issueCommand(cmd, synchronous, msg);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -469,18 +516,9 @@
     reset_l();
 
     sp<DataSource> dataSource = new QueueDataSource(source);
-
-#if 0
-    sp<MediaExtractor> extractor =
-        MediaExtractor::Create(dataSource, MEDIA_MIMETYPE_CONTAINER_MPEG2TS);
+    sp<MediaExtractor> extractor = new MPEG2TSExtractor(dataSource);
 
     return setDataSource_l(extractor);
-#else
-    sp<NuCachedSource2> cached = new NuCachedSource2(dataSource);
-    dataSource = cached;
-
-    return setDataSource_l(dataSource);
-#endif
 }
 
 status_t AwesomePlayer::setDataSource_l(
diff --git a/media/libstagefright/foundation/AMessage.cpp b/media/libstagefright/foundation/AMessage.cpp
index 26c6d42..7da9cb8 100644
--- a/media/libstagefright/foundation/AMessage.cpp
+++ b/media/libstagefright/foundation/AMessage.cpp
@@ -23,6 +23,8 @@
 #include "ALooperRoster.h"
 #include "AString.h"
 
+#include <binder/Parcel.h>
+
 namespace android {
 
 AMessage::AMessage(uint32_t what, ALooper::handler_id target)
@@ -341,4 +343,136 @@
     return s;
 }
 
+// static
+sp<AMessage> AMessage::FromParcel(const Parcel &parcel) {
+    int32_t what = parcel.readInt32();
+    sp<AMessage> msg = new AMessage(what);
+
+    msg->mNumItems = static_cast<size_t>(parcel.readInt32());
+
+    for (size_t i = 0; i < msg->mNumItems; ++i) {
+        Item *item = &msg->mItems[i];
+
+        item->mName = AAtomizer::Atomize(parcel.readCString());
+        item->mType = static_cast<Type>(parcel.readInt32());
+
+        switch (item->mType) {
+            case kTypeInt32:
+            {
+                item->u.int32Value = parcel.readInt32();
+                break;
+            }
+
+            case kTypeInt64:
+            {
+                item->u.int64Value = parcel.readInt64();
+                break;
+            }
+
+            case kTypeSize:
+            {
+                item->u.sizeValue = static_cast<size_t>(parcel.readInt32());
+                break;
+            }
+
+            case kTypeFloat:
+            {
+                item->u.floatValue = parcel.readFloat();
+                break;
+            }
+
+            case kTypeDouble:
+            {
+                item->u.doubleValue = parcel.readDouble();
+                break;
+            }
+
+            case kTypeString:
+            {
+                item->u.stringValue = new AString(parcel.readCString());
+                break;
+            }
+
+            case kTypeMessage:
+            {
+                sp<AMessage> subMsg = AMessage::FromParcel(parcel);
+                subMsg->incStrong(msg.get());
+
+                item->u.refValue = subMsg.get();
+                break;
+            }
+
+            default:
+            {
+                LOGE("This type of object cannot cross process boundaries.");
+                TRESPASS();
+            }
+        }
+    }
+
+    return msg;
+}
+
+void AMessage::writeToParcel(Parcel *parcel) const {
+    parcel->writeInt32(static_cast<int32_t>(mWhat));
+    parcel->writeInt32(static_cast<int32_t>(mNumItems));
+
+    for (size_t i = 0; i < mNumItems; ++i) {
+        const Item &item = mItems[i];
+
+        parcel->writeCString(item.mName);
+        parcel->writeInt32(static_cast<int32_t>(item.mType));
+
+        switch (item.mType) {
+            case kTypeInt32:
+            {
+                parcel->writeInt32(item.u.int32Value);
+                break;
+            }
+
+            case kTypeInt64:
+            {
+                parcel->writeInt64(item.u.int64Value);
+                break;
+            }
+
+            case kTypeSize:
+            {
+                parcel->writeInt32(static_cast<int32_t>(item.u.sizeValue));
+                break;
+            }
+
+            case kTypeFloat:
+            {
+                parcel->writeFloat(item.u.floatValue);
+                break;
+            }
+
+            case kTypeDouble:
+            {
+                parcel->writeDouble(item.u.doubleValue);
+                break;
+            }
+
+            case kTypeString:
+            {
+                parcel->writeCString(item.u.stringValue->c_str());
+                break;
+            }
+
+            case kTypeMessage:
+            {
+                static_cast<AMessage *>(item.u.refValue)->writeToParcel(parcel);
+                break;
+            }
+
+            default:
+            {
+                LOGE("This type of object cannot cross process boundaries.");
+                TRESPASS();
+            }
+        }
+    }
+}
+
 }  // namespace android
diff --git a/media/libstagefright/foundation/Android.mk b/media/libstagefright/foundation/Android.mk
index ffa7db0..a4d4809 100644
--- a/media/libstagefright/foundation/Android.mk
+++ b/media/libstagefright/foundation/Android.mk
@@ -18,14 +18,7 @@
 
 LOCAL_SHARED_LIBRARIES := \
         libbinder         \
-        libmedia          \
         libutils          \
-        libcutils         \
-        libui             \
-        libsonivox        \
-        libvorbisidec     \
-        libsurfaceflinger_client \
-        libcamera_client
 
 LOCAL_CFLAGS += -Wno-multichar