Support streaming data across binder boundaries.

Change-Id: Ifbac61406dcb81343765f99ccba08bd90f9274cc
diff --git a/cmds/stagefright/Android.mk b/cmds/stagefright/Android.mk
index 93baefd..f8650eb 100644
--- a/cmds/stagefright/Android.mk
+++ b/cmds/stagefright/Android.mk
@@ -98,3 +98,27 @@
 LOCAL_MODULE:= audioloop
 
 include $(BUILD_EXECUTABLE)
+
+################################################################################
+
+include $(CLEAR_VARS)
+
+LOCAL_SRC_FILES:=         \
+        stream.cpp    \
+
+LOCAL_SHARED_LIBRARIES := \
+	libstagefright liblog libutils libbinder libsurfaceflinger_client \
+        libstagefright_foundation libmedia
+
+LOCAL_C_INCLUDES:= \
+	$(JNI_H_INCLUDE) \
+	frameworks/base/media/libstagefright \
+	$(TOP)/frameworks/base/include/media/stagefright/openmax
+
+LOCAL_CFLAGS += -Wno-multichar
+
+LOCAL_MODULE_TAGS := debug
+
+LOCAL_MODULE:= stream
+
+include $(BUILD_EXECUTABLE)
diff --git a/cmds/stagefright/stream.cpp b/cmds/stagefright/stream.cpp
new file mode 100644
index 0000000..f2b5638
--- /dev/null
+++ b/cmds/stagefright/stream.cpp
@@ -0,0 +1,168 @@
+#include <binder/ProcessState.h>
+
+#include <media/IStreamSource.h>
+#include <media/mediaplayer.h>
+#include <media/stagefright/foundation/ADebug.h>
+
+#include <binder/IServiceManager.h>
+#include <media/IMediaPlayerService.h>
+#include <surfaceflinger/ISurfaceComposer.h>
+#include <surfaceflinger/SurfaceComposerClient.h>
+
+#include <fcntl.h>
+
+using namespace android;
+
+struct MyStreamSource : public BnStreamSource {
+    // Caller retains ownership of fd.
+    MyStreamSource(int fd);
+
+    virtual void setListener(const sp<IStreamListener> &listener);
+    virtual void setBuffers(const Vector<sp<IMemory> > &buffers);
+
+    virtual void onBufferAvailable(size_t index);
+
+protected:
+    virtual ~MyStreamSource();
+
+private:
+    int mFd;
+
+    sp<IStreamListener> mListener;
+    Vector<sp<IMemory> > mBuffers;
+
+    DISALLOW_EVIL_CONSTRUCTORS(MyStreamSource);
+};
+
+MyStreamSource::MyStreamSource(int fd)
+    : mFd(fd) {
+    CHECK_GE(fd, 0);
+}
+
+MyStreamSource::~MyStreamSource() {
+}
+
+void MyStreamSource::setListener(const sp<IStreamListener> &listener) {
+    mListener = listener;
+}
+
+void MyStreamSource::setBuffers(const Vector<sp<IMemory> > &buffers) {
+    mBuffers = buffers;
+}
+
+void MyStreamSource::onBufferAvailable(size_t index) {
+    CHECK_LT(index, mBuffers.size());
+    sp<IMemory> mem = mBuffers.itemAt(index);
+
+    ssize_t n = read(mFd, mem->pointer(), mem->size());
+    if (n <= 0) {
+        mListener->queueCommand(IStreamListener::EOS);
+    } else {
+        mListener->queueBuffer(index, n);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct MyClient : public BnMediaPlayerClient {
+    MyClient()
+        : mEOS(false) {
+    }
+
+    virtual void notify(int msg, int ext1, int ext2) {
+        Mutex::Autolock autoLock(mLock);
+
+        if (msg == MEDIA_ERROR || msg == MEDIA_PLAYBACK_COMPLETE) {
+            mEOS = true;
+            mCondition.signal();
+        }
+    }
+
+    void waitForEOS() {
+        Mutex::Autolock autoLock(mLock);
+        while (!mEOS) {
+            mCondition.wait(mLock);
+        }
+    }
+
+protected:
+    virtual ~MyClient() {
+    }
+
+private:
+    Mutex mLock;
+    Condition mCondition;
+
+    bool mEOS;
+
+    DISALLOW_EVIL_CONSTRUCTORS(MyClient);
+};
+
+int main(int argc, char **argv) {
+    android::ProcessState::self()->startThreadPool();
+
+    if (argc != 2) {
+        fprintf(stderr, "Usage: %s filename\n", argv[0]);
+        return 1;
+    }
+
+    sp<SurfaceComposerClient> composerClient = new SurfaceComposerClient;
+    CHECK_EQ(composerClient->initCheck(), (status_t)OK);
+
+    sp<SurfaceControl> control =
+        composerClient->createSurface(
+                getpid(),
+                String8("A Surface"),
+                0,
+                1280,
+                800,
+                PIXEL_FORMAT_RGB_565,
+                0);
+
+    CHECK(control != NULL);
+    CHECK(control->isValid());
+
+    CHECK_EQ(composerClient->openTransaction(), (status_t)OK);
+    CHECK_EQ(control->setLayer(30000), (status_t)OK);
+    CHECK_EQ(control->show(), (status_t)OK);
+    CHECK_EQ(composerClient->closeTransaction(), (status_t)OK);
+
+    sp<Surface> surface = control->getSurface();
+    CHECK(surface != NULL);
+
+    sp<IServiceManager> sm = defaultServiceManager();
+    sp<IBinder> binder = sm->getService(String16("media.player"));
+    sp<IMediaPlayerService> service = interface_cast<IMediaPlayerService>(binder);
+
+    CHECK(service.get() != NULL);
+
+    int fd = open(argv[1], O_RDONLY);
+
+    if (fd < 0) {
+        fprintf(stderr, "Failed to open file '%s'.", argv[1]);
+        return 1;
+    }
+
+    sp<MyClient> client = new MyClient;
+
+    sp<IMediaPlayer> player =
+        service->create(getpid(), client, new MyStreamSource(fd), 0);
+
+    if (player != NULL) {
+        player->setVideoSurface(surface);
+        player->start();
+
+        client->waitForEOS();
+
+        player->stop();
+    } else {
+        fprintf(stderr, "failed to instantiate player.\n");
+    }
+
+    close(fd);
+    fd = -1;
+
+    composerClient->dispose();
+
+    return 0;
+}
diff --git a/include/media/IMediaPlayerService.h b/include/media/IMediaPlayerService.h
index 9416ca1..0bfb166 100644
--- a/include/media/IMediaPlayerService.h
+++ b/include/media/IMediaPlayerService.h
@@ -32,6 +32,7 @@
 
 class IMediaRecorder;
 class IOMX;
+struct IStreamSource;
 
 class IMediaPlayerService: public IInterface
 {
@@ -45,6 +46,11 @@
             int audioSessionId = 0) = 0;
     virtual sp<IMediaPlayer> create(pid_t pid, const sp<IMediaPlayerClient>& client,
             int fd, int64_t offset, int64_t length, int audioSessionId) = 0;
+
+    virtual sp<IMediaPlayer> create(
+            pid_t pid, const sp<IMediaPlayerClient> &client,
+            const sp<IStreamSource> &source, int audioSessionId) = 0;
+
     virtual sp<IMemory>         decode(const char* url, uint32_t *pSampleRate, int* pNumChannels, int* pFormat) = 0;
     virtual sp<IMemory>         decode(int fd, int64_t offset, int64_t length, uint32_t *pSampleRate, int* pNumChannels, int* pFormat) = 0;
     virtual sp<IOMX>            getOMX() = 0;
diff --git a/include/media/IStreamSource.h b/include/media/IStreamSource.h
new file mode 100644
index 0000000..6291124
--- /dev/null
+++ b/include/media/IStreamSource.h
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2010 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ANDROID_ISTREAMSOURCE_H_
+
+#define ANDROID_ISTREAMSOURCE_H_
+
+#include <binder/IInterface.h>
+
+namespace android {
+
+struct IMemory;
+struct IStreamListener;
+
+struct IStreamSource : public IInterface {
+    DECLARE_META_INTERFACE(StreamSource);
+
+    virtual void setListener(const sp<IStreamListener> &listener) = 0;
+    virtual void setBuffers(const Vector<sp<IMemory> > &buffers) = 0;
+
+    virtual void onBufferAvailable(size_t index) = 0;
+};
+
+struct IStreamListener : public IInterface {
+    DECLARE_META_INTERFACE(StreamListener);
+
+    enum Command {
+        FLUSH,
+        DISCONTINUITY,
+        EOS
+    };
+
+    virtual void queueBuffer(size_t index, size_t size) = 0;
+    virtual void queueCommand(Command cmd) = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct BnStreamSource : public BnInterface<IStreamSource> {
+    virtual status_t onTransact(
+            uint32_t code, const Parcel &data, Parcel *reply,
+            uint32_t flags = 0);
+};
+
+struct BnStreamListener : public BnInterface<IStreamListener> {
+    virtual status_t onTransact(
+            uint32_t code, const Parcel &data, Parcel *reply,
+            uint32_t flags = 0);
+};
+
+}  // namespace android
+
+#endif  // ANDROID_ISTREAMSOURCE_H_
diff --git a/include/media/MediaPlayerInterface.h b/include/media/MediaPlayerInterface.h
index 672931e..e7f1d6d 100644
--- a/include/media/MediaPlayerInterface.h
+++ b/include/media/MediaPlayerInterface.h
@@ -106,6 +106,11 @@
             const KeyedVector<String8, String8> *headers = NULL) = 0;
 
     virtual status_t    setDataSource(int fd, int64_t offset, int64_t length) = 0;
+
+    virtual status_t    setDataSource(const sp<IStreamSource> &source) {
+        return INVALID_OPERATION;
+    }
+
     virtual status_t    setVideoSurface(const sp<Surface>& surface) = 0;
     virtual status_t    prepare() = 0;
     virtual status_t    prepareAsync() = 0;
diff --git a/media/libmedia/Android.mk b/media/libmedia/Android.mk
index 2e5cbe3..731c09d 100644
--- a/media/libmedia/Android.mk
+++ b/media/libmedia/Android.mk
@@ -15,6 +15,7 @@
     IMediaRecorderClient.cpp \
     IMediaPlayer.cpp \
     IMediaRecorder.cpp \
+    IStreamSource.cpp \
     Metadata.cpp \
     mediarecorder.cpp \
     IMediaMetadataRetriever.cpp \
diff --git a/media/libmedia/IMediaPlayerService.cpp b/media/libmedia/IMediaPlayerService.cpp
index 4abfa75..77199e1 100644
--- a/media/libmedia/IMediaPlayerService.cpp
+++ b/media/libmedia/IMediaPlayerService.cpp
@@ -23,6 +23,7 @@
 #include <media/IMediaPlayerService.h>
 #include <media/IMediaRecorder.h>
 #include <media/IOMX.h>
+#include <media/IStreamSource.h>
 
 #include <utils/Errors.h>  // for status_t
 
@@ -31,6 +32,7 @@
 enum {
     CREATE_URL = IBinder::FIRST_CALL_TRANSACTION,
     CREATE_FD,
+    CREATE_STREAM,
     DECODE_URL,
     DECODE_FD,
     CREATE_MEDIA_RECORDER,
@@ -107,6 +109,21 @@
         return interface_cast<IMediaPlayer>(reply.readStrongBinder());;
     }
 
+    virtual sp<IMediaPlayer> create(
+            pid_t pid, const sp<IMediaPlayerClient> &client,
+            const sp<IStreamSource> &source, int audioSessionId) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IMediaPlayerService::getInterfaceDescriptor());
+        data.writeInt32(static_cast<int32_t>(pid));
+        data.writeStrongBinder(client->asBinder());
+        data.writeStrongBinder(source->asBinder());
+        data.writeInt32(static_cast<int32_t>(audioSessionId));
+
+        remote()->transact(CREATE_STREAM, data, &reply);
+
+        return interface_cast<IMediaPlayer>(reply.readStrongBinder());;
+    }
+
     virtual sp<IMemory> decode(const char* url, uint32_t *pSampleRate, int* pNumChannels, int* pFormat)
     {
         Parcel data, reply;
@@ -184,6 +201,27 @@
             reply->writeStrongBinder(player->asBinder());
             return NO_ERROR;
         } break;
+        case CREATE_STREAM:
+        {
+            CHECK_INTERFACE(IMediaPlayerService, data, reply);
+
+            pid_t pid = static_cast<pid_t>(data.readInt32());
+
+            sp<IMediaPlayerClient> client =
+                interface_cast<IMediaPlayerClient>(data.readStrongBinder());
+
+            sp<IStreamSource> source =
+                interface_cast<IStreamSource>(data.readStrongBinder());
+
+            int audioSessionId = static_cast<int>(data.readInt32());
+
+            sp<IMediaPlayer> player =
+                create(pid, client, source, audioSessionId);
+
+            reply->writeStrongBinder(player->asBinder());
+            return OK;
+            break;
+        }
         case DECODE_URL: {
             CHECK_INTERFACE(IMediaPlayerService, data, reply);
             const char* url = data.readCString();
diff --git a/media/libmedia/IStreamSource.cpp b/media/libmedia/IStreamSource.cpp
new file mode 100644
index 0000000..89f2b44
--- /dev/null
+++ b/media/libmedia/IStreamSource.cpp
@@ -0,0 +1,168 @@
+/*
+ * Copyright (C) 2010 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#define LOG_NDEBUG 0
+#define LOG_TAG "IStreamSource"
+#include <utils/Log.h>
+
+#include <media/IStreamSource.h>
+
+#include <binder/IMemory.h>
+#include <binder/Parcel.h>
+
+namespace android {
+
+enum {
+    // IStreamSource
+    SET_LISTENER = IBinder::FIRST_CALL_TRANSACTION,
+    SET_BUFFERS,
+    ON_BUFFER_AVAILABLE,
+
+    // IStreamListener
+    QUEUE_BUFFER,
+    QUEUE_COMMAND,
+};
+
+struct BpStreamSource : public BpInterface<IStreamSource> {
+    BpStreamSource(const sp<IBinder> &impl)
+        : BpInterface<IStreamSource>(impl) {
+    }
+
+    virtual void setListener(const sp<IStreamListener> &listener) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IStreamSource::getInterfaceDescriptor());
+        data.writeStrongBinder(listener->asBinder());
+        remote()->transact(SET_LISTENER, data, &reply);
+    }
+
+    virtual void setBuffers(const Vector<sp<IMemory> > &buffers) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IStreamSource::getInterfaceDescriptor());
+        data.writeInt32(static_cast<int32_t>(buffers.size()));
+        for (size_t i = 0; i < buffers.size(); ++i) {
+            data.writeStrongBinder(buffers.itemAt(i)->asBinder());
+        }
+        remote()->transact(SET_BUFFERS, data, &reply);
+    }
+
+    virtual void onBufferAvailable(size_t index) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IStreamSource::getInterfaceDescriptor());
+        data.writeInt32(static_cast<int32_t>(index));
+        remote()->transact(
+                ON_BUFFER_AVAILABLE, data, &reply, IBinder::FLAG_ONEWAY);
+    }
+};
+
+IMPLEMENT_META_INTERFACE(StreamSource, "android.hardware.IStreamSource");
+
+status_t BnStreamSource::onTransact(
+        uint32_t code, const Parcel &data, Parcel *reply, uint32_t flags) {
+    switch (code) {
+        case SET_LISTENER:
+        {
+            CHECK_INTERFACE(IStreamSource, data, reply);
+            setListener(
+                    interface_cast<IStreamListener>(data.readStrongBinder()));
+            break;
+        }
+
+        case SET_BUFFERS:
+        {
+            CHECK_INTERFACE(IStreamSource, data, reply);
+            size_t n = static_cast<size_t>(data.readInt32());
+            Vector<sp<IMemory> > buffers;
+            for (size_t i = 0; i < n; ++i) {
+                sp<IMemory> mem =
+                    interface_cast<IMemory>(data.readStrongBinder());
+
+                buffers.push(mem);
+            }
+            setBuffers(buffers);
+            break;
+        }
+
+        case ON_BUFFER_AVAILABLE:
+        {
+            CHECK_INTERFACE(IStreamSource, data, reply);
+            onBufferAvailable(static_cast<size_t>(data.readInt32()));
+            break;
+        }
+
+        default:
+            return BBinder::onTransact(code, data, reply, flags);
+    }
+
+    return OK;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct BpStreamListener : public BpInterface<IStreamListener> {
+    BpStreamListener(const sp<IBinder> &impl)
+        : BpInterface<IStreamListener>(impl) {
+    }
+
+    virtual void queueBuffer(size_t index, size_t size) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IStreamListener::getInterfaceDescriptor());
+        data.writeInt32(static_cast<int32_t>(index));
+        data.writeInt32(static_cast<int32_t>(size));
+
+        remote()->transact(QUEUE_BUFFER, data, &reply, IBinder::FLAG_ONEWAY);
+    }
+
+    virtual void queueCommand(Command cmd) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IStreamListener::getInterfaceDescriptor());
+        data.writeInt32(static_cast<int32_t>(cmd));
+
+        remote()->transact(QUEUE_COMMAND, data, &reply, IBinder::FLAG_ONEWAY);
+    }
+};
+
+IMPLEMENT_META_INTERFACE(StreamListener, "android.hardware.IStreamListener");
+
+status_t BnStreamListener::onTransact(
+        uint32_t code, const Parcel &data, Parcel *reply, uint32_t flags) {
+    switch (code) {
+        case QUEUE_BUFFER:
+        {
+            CHECK_INTERFACE(IStreamListener, data, reply);
+            size_t index = static_cast<size_t>(data.readInt32());
+            size_t size = static_cast<size_t>(data.readInt32());
+
+            queueBuffer(index, size);
+            break;
+        }
+
+        case QUEUE_COMMAND:
+        {
+            CHECK_INTERFACE(IStreamListener, data, reply);
+            Command cmd = static_cast<Command>(data.readInt32());
+
+            queueCommand(cmd);
+            break;
+        }
+
+        default:
+            return BBinder::onTransact(code, data, reply, flags);
+    }
+
+    return OK;
+}
+
+}  // namespace android
diff --git a/media/libmediaplayerservice/MediaPlayerService.cpp b/media/libmediaplayerservice/MediaPlayerService.cpp
index 00e510b..63d09d6 100644
--- a/media/libmediaplayerservice/MediaPlayerService.cpp
+++ b/media/libmediaplayerservice/MediaPlayerService.cpp
@@ -278,6 +278,26 @@
     return c;
 }
 
+sp<IMediaPlayer> MediaPlayerService::create(
+        pid_t pid, const sp<IMediaPlayerClient> &client,
+        const sp<IStreamSource> &source, int audioSessionId) {
+    int32_t connId = android_atomic_inc(&mNextConnId);
+    sp<Client> c = new Client(this, pid, connId, client, audioSessionId);
+
+    LOGV("Create new client(%d) from pid %d, audioSessionId=%d",
+         connId, pid, audioSessionId);
+
+    if (OK != c->setDataSource(source)) {
+        c.clear();
+    } else {
+        wp<Client> w = c;
+        Mutex::Autolock lock(mLock);
+        mClients.add(w);
+    }
+
+    return c;
+}
+
 sp<IOMX> MediaPlayerService::getOMX() {
     Mutex::Autolock autoLock(mLock);
 
@@ -864,6 +884,30 @@
     return mStatus;
 }
 
+status_t MediaPlayerService::Client::setDataSource(
+        const sp<IStreamSource> &source) {
+    // create the right type of player
+    sp<MediaPlayerBase> p = createPlayer(STAGEFRIGHT_PLAYER);
+
+    if (p == NULL) {
+        return NO_INIT;
+    }
+
+    if (!p->hardwareOutput()) {
+        mAudioOutput = new AudioOutput(mAudioSessionId);
+        static_cast<MediaPlayerInterface*>(p.get())->setAudioSink(mAudioOutput);
+    }
+
+    // now set data source
+    mStatus = p->setDataSource(source);
+
+    if (mStatus == OK) {
+        mPlayer = p;
+    }
+
+    return mStatus;
+}
+
 status_t MediaPlayerService::Client::setVideoSurface(const sp<Surface>& surface)
 {
     LOGV("[%d] setVideoSurface(%p)", mConnId, surface.get());
diff --git a/media/libmediaplayerservice/MediaPlayerService.h b/media/libmediaplayerservice/MediaPlayerService.h
index 184324c..62f8ed6 100644
--- a/media/libmediaplayerservice/MediaPlayerService.h
+++ b/media/libmediaplayerservice/MediaPlayerService.h
@@ -191,6 +191,11 @@
             const KeyedVector<String8, String8> *headers, int audioSessionId);
 
     virtual sp<IMediaPlayer>    create(pid_t pid, const sp<IMediaPlayerClient>& client, int fd, int64_t offset, int64_t length, int audioSessionId);
+
+    virtual sp<IMediaPlayer>    create(
+            pid_t pid, const sp<IMediaPlayerClient> &client,
+            const sp<IStreamSource> &source, int audioSessionId);
+
     virtual sp<IMemory>         decode(const char* url, uint32_t *pSampleRate, int* pNumChannels, int* pFormat);
     virtual sp<IMemory>         decode(int fd, int64_t offset, int64_t length, uint32_t *pSampleRate, int* pNumChannels, int* pFormat);
     virtual sp<IOMX>            getOMX();
@@ -234,6 +239,9 @@
                         const KeyedVector<String8, String8> *headers);
 
                 status_t        setDataSource(int fd, int64_t offset, int64_t length);
+
+                status_t        setDataSource(const sp<IStreamSource> &source);
+
         static  void            notify(void* cookie, int msg, int ext1, int ext2);
 
                 pid_t           pid() const { return mPid; }
diff --git a/media/libmediaplayerservice/StagefrightPlayer.cpp b/media/libmediaplayerservice/StagefrightPlayer.cpp
index 58ef99b..da564dc 100644
--- a/media/libmediaplayerservice/StagefrightPlayer.cpp
+++ b/media/libmediaplayerservice/StagefrightPlayer.cpp
@@ -44,6 +44,10 @@
     return mPlayer->setDataSource(dup(fd), offset, length);
 }
 
+status_t StagefrightPlayer::setDataSource(const sp<IStreamSource> &source) {
+    return mPlayer->setDataSource(source);
+}
+
 status_t StagefrightPlayer::setVideoSurface(const sp<Surface> &surface) {
     LOGV("setVideoSurface");
 
diff --git a/media/libmediaplayerservice/StagefrightPlayer.h b/media/libmediaplayerservice/StagefrightPlayer.h
index c4a2588..fc72bfb 100644
--- a/media/libmediaplayerservice/StagefrightPlayer.h
+++ b/media/libmediaplayerservice/StagefrightPlayer.h
@@ -35,6 +35,9 @@
             const char *url, const KeyedVector<String8, String8> *headers);
 
     virtual status_t setDataSource(int fd, int64_t offset, int64_t length);
+
+    virtual status_t setDataSource(const sp<IStreamSource> &source);
+
     virtual status_t setVideoSurface(const sp<Surface> &surface);
     virtual status_t prepare();
     virtual status_t prepareAsync();
diff --git a/media/libstagefright/AwesomePlayer.cpp b/media/libstagefright/AwesomePlayer.cpp
index ec58919..a804866 100644
--- a/media/libstagefright/AwesomePlayer.cpp
+++ b/media/libstagefright/AwesomePlayer.cpp
@@ -34,13 +34,16 @@
 #include "UDPPusher.h"
 
 #include <binder/IPCThreadState.h>
+#include <binder/MemoryDealer.h>
+#include <media/IStreamSource.h>
+#include <media/stagefright/foundation/hexdump.h>
+#include <media/stagefright/foundation/ADebug.h>
 #include <media/stagefright/AudioPlayer.h>
 #include <media/stagefright/DataSource.h>
 #include <media/stagefright/FileSource.h>
 #include <media/stagefright/MediaBuffer.h>
 #include <media/stagefright/MediaDefs.h>
 #include <media/stagefright/MediaExtractor.h>
-#include <media/stagefright/MediaDebug.h>
 #include <media/stagefright/MediaSource.h>
 #include <media/stagefright/MetaData.h>
 #include <media/stagefright/OMXCodec.h>
@@ -155,6 +158,201 @@
             const AwesomeNativeWindowRenderer &);
 };
 
+////////////////////////////////////////////////////////////////////////////////
+
+struct QueueDataSource;
+
+struct QueueListener : public BnStreamListener {
+    QueueListener(QueueDataSource *owner)
+        : mOwner(owner) {
+    }
+
+    void clearOwner();
+
+    virtual void queueBuffer(size_t index, size_t size);
+    virtual void queueCommand(Command cmd);
+
+private:
+    Mutex mLock;
+
+    QueueDataSource *mOwner;
+
+    DISALLOW_EVIL_CONSTRUCTORS(QueueListener);
+};
+
+struct QueueDataSource : public DataSource {
+    QueueDataSource(const sp<IStreamSource> &source);
+
+    virtual status_t initCheck() const;
+
+    virtual ssize_t readAt(off64_t offset, void *data, size_t size);
+
+    virtual void queueBuffer(size_t index, size_t size);
+    virtual void queueCommand(IStreamListener::Command cmd);
+
+protected:
+    virtual ~QueueDataSource();
+
+private:
+    enum {
+        kNumBuffers = 16
+    };
+
+    struct BufferInfo {
+        size_t mIndex;
+        size_t mOffset;
+        size_t mSize;
+    };
+
+    Mutex mLock;
+    Condition mCondition;
+
+    sp<IStreamSource> mSource;
+    sp<QueueListener> mListener;
+    sp<MemoryDealer> mDealer;
+    Vector<sp<IMemory> > mBuffers;
+
+    List<BufferInfo> mFilledBuffers;
+
+    off64_t mPosition;
+    bool mEOS;
+
+    DISALLOW_EVIL_CONSTRUCTORS(QueueDataSource);
+};
+
+QueueDataSource::QueueDataSource(const sp<IStreamSource> &source)
+    : mSource(source),
+      mPosition(0),
+      mEOS(false) {
+    mListener = new QueueListener(this);
+    mSource->setListener(mListener);
+
+    static const size_t kBufferSize = 8192;
+
+    mDealer = new MemoryDealer(kNumBuffers * kBufferSize);
+    for (size_t i = 0; i < kNumBuffers; ++i) {
+        sp<IMemory> mem = mDealer->allocate(kBufferSize);
+        CHECK(mem != NULL);
+
+        mBuffers.push(mem);
+    }
+    mSource->setBuffers(mBuffers);
+
+    for (size_t i = 0; i < kNumBuffers; ++i) {
+        mSource->onBufferAvailable(i);
+    }
+}
+
+QueueDataSource::~QueueDataSource() {
+    Mutex::Autolock autoLock(mLock);
+
+    while (mFilledBuffers.size() < kNumBuffers && !mEOS) {
+        mCondition.wait(mLock);
+    }
+
+    mListener->clearOwner();
+}
+
+status_t QueueDataSource::initCheck() const {
+    return OK;
+}
+
+ssize_t QueueDataSource::readAt(off64_t offset, void *data, size_t size) {
+    if (offset != mPosition) {
+        return -EPIPE;
+    }
+
+    Mutex::Autolock autoLock(mLock);
+
+    size_t sizeDone = 0;
+
+    while (sizeDone < size) {
+        while (mFilledBuffers.empty() && !mEOS) {
+            mCondition.wait(mLock);
+        }
+
+        if (mFilledBuffers.empty()) {
+            if (sizeDone > 0) {
+                mPosition += sizeDone;
+                return sizeDone;
+            }
+            return ERROR_END_OF_STREAM;
+        }
+
+        BufferInfo &info = *mFilledBuffers.begin();
+
+        size_t copy = size - sizeDone;
+        if (copy > info.mSize) {
+            copy = info.mSize;
+        }
+
+        memcpy((uint8_t *)data + sizeDone,
+               (const uint8_t *)mBuffers.itemAt(info.mIndex)->pointer()
+                    + info.mOffset,
+               copy);
+
+        info.mSize -= copy;
+        info.mOffset += copy;
+        sizeDone += copy;
+
+        if (info.mSize == 0) {
+            mSource->onBufferAvailable(info.mIndex);
+            mFilledBuffers.erase(mFilledBuffers.begin());
+        }
+    }
+
+    mPosition += sizeDone;
+
+    return sizeDone;
+}
+
+void QueueDataSource::queueBuffer(size_t index, size_t size) {
+    Mutex::Autolock autoLock(mLock);
+
+    CHECK_LT(index, mBuffers.size());
+    CHECK_LE(size, mBuffers.itemAt(index)->size());
+
+    BufferInfo info;
+    info.mIndex = index;
+    info.mSize = size;
+    info.mOffset = 0;
+
+    mFilledBuffers.push_back(info);
+    mCondition.signal();
+}
+
+void QueueDataSource::queueCommand(IStreamListener::Command cmd) {
+    Mutex::Autolock autoLock(mLock);
+
+    if (cmd == IStreamListener::EOS) {
+        mEOS = true;
+        mCondition.signal();
+    }
+}
+
+void QueueListener::clearOwner() {
+    Mutex::Autolock autoLock(mLock);
+    mOwner = NULL;
+}
+
+void QueueListener::queueBuffer(size_t index, size_t size) {
+    Mutex::Autolock autoLock(mLock);
+    if (mOwner == NULL) {
+        return;
+    }
+    mOwner->queueBuffer(index, size);
+}
+
+void QueueListener::queueCommand(Command cmd) {
+    Mutex::Autolock autoLock(mLock);
+    if (mOwner == NULL) {
+        return;
+    }
+    mOwner->queueCommand(cmd);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
 AwesomePlayer::AwesomePlayer()
     : mQueueStarted(false),
       mTimeSource(NULL),
@@ -164,7 +362,7 @@
       mExtractorFlags(0),
       mVideoBuffer(NULL),
       mDecryptHandle(NULL) {
-    CHECK_EQ(mClient.connect(), OK);
+    CHECK_EQ(mClient.connect(), (status_t)OK);
 
     DataSource::RegisterDefaultSniffers();
 
@@ -264,6 +462,26 @@
     return setDataSource_l(dataSource);
 }
 
+status_t AwesomePlayer::setDataSource(const sp<IStreamSource> &source) {
+    Mutex::Autolock autoLock(mLock);
+
+    reset_l();
+
+    sp<DataSource> dataSource = new QueueDataSource(source);
+
+#if 0
+    sp<MediaExtractor> extractor =
+        MediaExtractor::Create(dataSource, MEDIA_MIMETYPE_CONTAINER_MPEG2TS);
+
+    return setDataSource_l(extractor);
+#else
+    sp<NuCachedSource2> cached = new NuCachedSource2(dataSource);
+    dataSource = cached;
+
+    return setDataSource_l(dataSource);
+#endif
+}
+
 status_t AwesomePlayer::setDataSource_l(
         const sp<DataSource> &dataSource) {
     sp<MediaExtractor> extractor = MediaExtractor::Create(dataSource);
@@ -619,7 +837,8 @@
         IPCThreadState::self()->flushCommands();
     }
 
-    CHECK_EQ(OK, initVideoDecoder(OMXCodec::kIgnoreCodecSpecificData));
+    CHECK_EQ((status_t)OK,
+             initVideoDecoder(OMXCodec::kIgnoreCodecSpecificData));
 }
 
 void AwesomePlayer::onStreamDone() {
@@ -1171,7 +1390,7 @@
             options.clearSeekTo();
 
             if (err != OK) {
-                CHECK_EQ(mVideoBuffer, NULL);
+                CHECK(mVideoBuffer == NULL);
 
                 if (err == INFO_FORMAT_CHANGED) {
                     LOGV("VideoSource signalled format change.");
diff --git a/media/libstagefright/include/AwesomePlayer.h b/media/libstagefright/include/AwesomePlayer.h
index e33f467..46f4a35 100644
--- a/media/libstagefright/include/AwesomePlayer.h
+++ b/media/libstagefright/include/AwesomePlayer.h
@@ -67,6 +67,8 @@
 
     status_t setDataSource(int fd, int64_t offset, int64_t length);
 
+    status_t setDataSource(const sp<IStreamSource> &source);
+
     void reset();
 
     status_t prepare();