Implement RS VSync on new vsync infrastructure.

Change-Id: I662159a086a56e28732dd64a3a3cb30f8d4b72b1

Replace lockless fifo from server to client with sockets.

Change-Id: I99a4ab4f18496c0fbac96ee7b8099797af4712ea
diff --git a/Android.mk b/Android.mk
index 58d3e5c..2166ce7 100644
--- a/Android.mk
+++ b/Android.mk
@@ -89,7 +89,6 @@
 	rsFifoSocket.cpp \
 	rsFileA3D.cpp \
 	rsFont.cpp \
-	rsLocklessFifo.cpp \
 	rsObjectBase.cpp \
 	rsMatrix2x2.cpp \
 	rsMatrix3x3.cpp \
@@ -128,7 +127,7 @@
 	driver/rsdShaderCache.cpp \
 	driver/rsdVertexArray.cpp
 
-LOCAL_SHARED_LIBRARIES += libz libcutils libutils libEGL libGLESv1_CM libGLESv2 libui libbcc libbcinfo
+LOCAL_SHARED_LIBRARIES += libz libcutils libutils libEGL libGLESv1_CM libGLESv2 libui libbcc libbcinfo libgui
 
 LOCAL_STATIC_LIBRARIES := libdex libft2
 
@@ -196,7 +195,6 @@
 	rsFifoSocket.cpp \
 	rsFileA3D.cpp \
 	rsFont.cpp \
-	rsLocklessFifo.cpp \
 	rsObjectBase.cpp \
 	rsMatrix2x2.cpp \
 	rsMatrix3x3.cpp \
diff --git a/driver/rsdGL.cpp b/driver/rsdGL.cpp
index 7acc054..368dd71 100644
--- a/driver/rsdGL.cpp
+++ b/driver/rsdGL.cpp
@@ -215,6 +215,8 @@
     ret = eglChooseConfig(dc->gl.egl.display, configAttribs, 0, 0, &numConfigs);
     checkEglError("eglGetConfigs", ret);
 
+    eglSwapInterval(dc->gl.egl.display, 0);
+
     if (numConfigs) {
         EGLConfig* const configs = new EGLConfig[numConfigs];
 
diff --git a/rs.spec b/rs.spec
index 6887b22..ffb1196 100644
--- a/rs.spec
+++ b/rs.spec
@@ -115,6 +115,7 @@
 	}
 
 ContextDestroyWorker {
+        sync
 }
 
 AssignName {
diff --git a/rsContext.cpp b/rsContext.cpp
index ad2ff0f..04284dd 100644
--- a/rsContext.cpp
+++ b/rsContext.cpp
@@ -18,6 +18,7 @@
 #include "rsContext.h"
 #include "rsThreadIO.h"
 #include <ui/FramebufferNativeWindow.h>
+#include <gui/DisplayEventReceiver.h>
 
 #include <sys/types.h>
 #include <sys/resource.h>
@@ -245,42 +246,55 @@
     }
 
     rsc->mRunning = true;
-    bool mDraw = true;
-    bool doWait = true;
-
-    uint64_t targetTime = rsc->getTime();
-    while (!rsc->mExit) {
-        uint64_t waitTime = 0;
-        uint64_t now = rsc->getTime();
-        if (!doWait) {
-            if (now < targetTime) {
-                waitTime = targetTime - now;
-                doWait = true;
-            }
+    if (!rsc->mIsGraphicsContext) {
+        while (!rsc->mExit) {
+            rsc->mIO.playCoreCommands(rsc, true, -1);
         }
+    } else {
+#ifndef ANDROID_RS_SERIALIZE
+        DisplayEventReceiver displayEvent;
+        DisplayEventReceiver::Event eventBuffer[1];
+#endif
+        int vsyncRate = 0;
+        int targetRate = 0;
 
-        mDraw |= rsc->mIO.playCoreCommands(rsc, doWait, waitTime);
-        mDraw &= (rsc->mRootScript.get() != NULL);
-        mDraw &= rsc->mHasSurface;
+        bool drawOnce = false;
+        while (!rsc->mExit) {
+            rsc->timerSet(RS_TIMER_IDLE);
 
-        if (mDraw && rsc->mIsGraphicsContext) {
-            uint64_t delay = rsc->runRootScript() * 1000000;
-            targetTime = rsc->getTime() + delay;
-            doWait = (delay == 0);
-
-            if (rsc->props.mLogVisual) {
-                rsc->displayDebugStats();
+#ifndef ANDROID_RS_SERIALIZE
+            if (vsyncRate != targetRate) {
+                displayEvent.setVsyncRate(targetRate);
+                vsyncRate = targetRate;
+            }
+            if (targetRate) {
+                drawOnce |= rsc->mIO.playCoreCommands(rsc, true, displayEvent.getFd());
+                while (displayEvent.getEvents(eventBuffer, 1) != 0) {
+                    //ALOGE("vs2 time past %lld", (rsc->getTime() - eventBuffer[0].header.timestamp) / 1000000);
+                }
+            } else
+#endif
+            {
+                drawOnce |= rsc->mIO.playCoreCommands(rsc, true, -1);
             }
 
-            mDraw = !rsc->mPaused;
-            rsc->timerSet(RS_TIMER_CLEAR_SWAP);
-            rsc->mHal.funcs.swap(rsc);
-            rsc->timerFrame();
-            rsc->timerSet(RS_TIMER_INTERNAL);
-            rsc->timerPrint();
-            rsc->timerReset();
-        } else {
-            doWait = true;
+            if ((rsc->mRootScript.get() != NULL) && rsc->mHasSurface &&
+                (targetRate || drawOnce) && !rsc->mPaused) {
+
+                drawOnce = false;
+                targetRate = ((rsc->runRootScript() + 15) / 16);
+
+                if (rsc->props.mLogVisual) {
+                    rsc->displayDebugStats();
+                }
+
+                rsc->timerSet(RS_TIMER_CLEAR_SWAP);
+                rsc->mHal.funcs.swap(rsc);
+                rsc->timerFrame();
+                rsc->timerSet(RS_TIMER_INTERNAL);
+                rsc->timerPrint();
+                rsc->timerReset();
+            }
         }
     }
 
@@ -315,8 +329,8 @@
          mFBOCache.deinit(this);
     }
     ObjectBase::freeAllChildren(this);
-    //ALOGV("destroyWorkerThreadResources 2");
     mExit = true;
+    //ALOGV("destroyWorkerThreadResources 2");
 }
 
 void Context::printWatchdogInfo(void *ctx) {
@@ -382,7 +396,7 @@
     pthread_mutex_lock(&gInitMutex);
 
     mIO.init();
-    mIO.setTimoutCallback(printWatchdogInfo, this, 2e9);
+    mIO.setTimeoutCallback(printWatchdogInfo, this, 2e9);
 
     dev->addContext(this);
     mDev = dev;
@@ -434,14 +448,12 @@
     ALOGV("%p Context::~Context", this);
 
     if (!mIsContextLite) {
-        mIO.coreFlush();
-        rsAssert(mExit);
-        mExit = true;
         mPaused = false;
         void *res;
 
         mIO.shutdown();
         int status = pthread_join(mThreadId, &res);
+        rsAssert(mExit);
 
         if (mHal.funcs.shutdownDriver) {
             mHal.funcs.shutdownDriver(this);
diff --git a/rsContext.h b/rsContext.h
index 61c29f9..a844a20 100644
--- a/rsContext.h
+++ b/rsContext.h
@@ -39,7 +39,6 @@
 #include "rsFBOCache.h"
 
 #include "rsgApiStructs.h"
-#include "rsLocklessFifo.h"
 
 // ---------------------------------------------------------------------------
 namespace android {
diff --git a/rsFifo.cpp b/rsFifo.cpp
deleted file mode 100644
index 3d5d8c4..0000000
--- a/rsFifo.cpp
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2011 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "rsFifoSocket.h"
-#include "utils/Timers.h"
-#include "utils/StopWatch.h"
-
-using namespace android;
-using namespace android::renderscript;
-
-Fifo::Fifo() {
-
-}
-
-Fifo::~Fifo() {
-
-}
-
diff --git a/rsFifo.h b/rsFifo.h
index f924b95..911f446 100644
--- a/rsFifo.h
+++ b/rsFifo.h
@@ -35,9 +35,9 @@
     virtual ~Fifo();
 
 public:
-    void virtual writeAsync(const void *data, size_t bytes) = 0;
+    bool virtual writeAsync(const void *data, size_t bytes, bool waitForSpace = true) = 0;
     void virtual writeWaitReturn(void *ret, size_t retSize) = 0;
-    size_t virtual read(void *data, size_t bytes) = 0;
+    size_t virtual read(void *data, size_t bytes, bool doWait = true, uint64_t timeToWait = 0) = 0;
     void virtual readReturn(const void *data, size_t bytes) = 0;
 
     void virtual flush() = 0;
diff --git a/rsFifoSocket.cpp b/rsFifoSocket.cpp
index 163a44b..bd511cf 100644
--- a/rsFifoSocket.cpp
+++ b/rsFifoSocket.cpp
@@ -22,6 +22,7 @@
 #include <stdlib.h>
 #include <ctype.h>
 #include <unistd.h>
+#include <poll.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 
@@ -29,55 +30,79 @@
 using namespace android::renderscript;
 
 FifoSocket::FifoSocket() {
-    sequence = 1;
+    mShutdown = false;
 }
 
 FifoSocket::~FifoSocket() {
 
 }
 
-bool FifoSocket::init() {
+bool FifoSocket::init(bool supportNonBlocking, bool supportReturnValues, size_t maxDataSize) {
     int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
     return false;
 }
 
 void FifoSocket::shutdown() {
+    mShutdown = true;
+    uint64_t d = 0;
+    ::send(sv[0], &d, sizeof(d), 0);
+    ::send(sv[1], &d, sizeof(d), 0);
+    close(sv[0]);
+    close(sv[1]);
 }
 
-void FifoSocket::writeAsync(const void *data, size_t bytes) {
+bool FifoSocket::writeAsync(const void *data, size_t bytes, bool waitForSpace) {
     if (bytes == 0) {
-        return;
+        return true;
     }
     //ALOGE("writeAsync %p %i", data, bytes);
     size_t ret = ::send(sv[0], data, bytes, 0);
     //ALOGE("writeAsync ret %i", ret);
     rsAssert(ret == bytes);
+    return true;
 }
 
 void FifoSocket::writeWaitReturn(void *retData, size_t retBytes) {
+    if (mShutdown) {
+        return;
+    }
+
     //ALOGE("writeWaitReturn %p %i", retData, retBytes);
-    size_t ret = ::recv(sv[0], retData, retBytes, 0);
+    size_t ret = ::recv(sv[0], retData, retBytes, MSG_WAITALL);
     //ALOGE("writeWaitReturn %i", ret);
     rsAssert(ret == retBytes);
 }
 
 size_t FifoSocket::read(void *data, size_t bytes) {
+    if (mShutdown) {
+        return 0;
+    }
+
     //ALOGE("read %p %i", data, bytes);
-    size_t ret = ::recv(sv[1], data, bytes, 0);
-    rsAssert(ret == bytes);
-    //ALOGE("read ret %i", ret);
+    size_t ret = ::recv(sv[1], data, bytes, MSG_WAITALL);
+    rsAssert(ret == bytes || mShutdown);
+    //ALOGE("read ret %i  bytes %i", ret, bytes);
+    if (mShutdown) {
+        ret = 0;
+    }
     return ret;
 }
 
-void FifoSocket::readReturn(const void *data, size_t bytes) {
-    ALOGE("readReturn %p %Zu", data, bytes);
-    size_t ret = ::send(sv[1], data, bytes, 0);
-    ALOGE("readReturn %Zu", ret);
-    rsAssert(ret == bytes);
+bool FifoSocket::isEmpty() {
+    struct pollfd p;
+    p.fd = sv[1];
+    p.events = POLLIN;
+    int r = poll(&p, 1, 0);
+    //ALOGE("poll r=%i", r);
+    return r == 0;
 }
 
 
-void FifoSocket::flush() {
+void FifoSocket::readReturn(const void *data, size_t bytes) {
+    //ALOGE("readReturn %p %Zu", data, bytes);
+    size_t ret = ::send(sv[1], data, bytes, 0);
+    //ALOGE("readReturn %Zu", ret);
+    //rsAssert(ret == bytes);
 }
 
 
diff --git a/rsFifoSocket.h b/rsFifoSocket.h
index 7df2b67..cac0a75 100644
--- a/rsFifoSocket.h
+++ b/rsFifoSocket.h
@@ -29,23 +29,23 @@
     FifoSocket();
     virtual ~FifoSocket();
 
-    bool init();
+    bool init(bool supportNonBlocking = true,
+              bool supportReturnValues = true,
+              size_t maxDataSize = 0);
     void shutdown();
 
+    bool writeAsync(const void *data, size_t bytes, bool waitForSpace = true);
+    void writeWaitReturn(void *ret, size_t retSize);
+    size_t read(void *data, size_t bytes);
+    void readReturn(const void *data, size_t bytes);
+    bool isEmpty();
 
-
-    void virtual writeAsync(const void *data, size_t bytes);
-    void virtual writeWaitReturn(void *ret, size_t retSize);
-    size_t virtual read(void *data, size_t bytes);
-    void virtual readReturn(const void *data, size_t bytes);
-
-    void virtual flush();
+    int getWriteFd() {return sv[0];}
+    int getReadFd() {return sv[1];}
 
 protected:
     int sv[2];
-    uint32_t sequence;
-
-
+    bool mShutdown;
 };
 
 }
diff --git a/rsLocklessFifo.cpp b/rsLocklessFifo.cpp
deleted file mode 100644
index 0466d8b..0000000
--- a/rsLocklessFifo.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Copyright (C) 2009 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "rsLocklessFifo.h"
-#include "utils/Timers.h"
-#include "utils/StopWatch.h"
-
-using namespace android;
-using namespace android::renderscript;
-
-LocklessCommandFifo::LocklessCommandFifo() : mBuffer(0), mInitialized(false) {
-    mTimeoutCallback = NULL;
-    mTimeoutCallbackData = NULL;
-    mTimeoutWait = 0;
-}
-
-LocklessCommandFifo::~LocklessCommandFifo() {
-    if (!mInShutdown && mInitialized) {
-        shutdown();
-    }
-    if (mBuffer) {
-        free(mBuffer);
-    }
-}
-
-void LocklessCommandFifo::shutdown() {
-    mInShutdown = true;
-    mSignalToWorker.set();
-}
-
-bool LocklessCommandFifo::init(uint32_t sizeInBytes) {
-    // Add room for a buffer reset command
-    mBuffer = static_cast<uint8_t *>(malloc(sizeInBytes + 4));
-    if (!mBuffer) {
-        ALOGE("LocklessFifo allocation failure");
-        return false;
-    }
-
-    if (!mSignalToControl.init() || !mSignalToWorker.init()) {
-        ALOGE("Signal setup failed");
-        free(mBuffer);
-        return false;
-    }
-
-    mInShutdown = false;
-    mSize = sizeInBytes;
-    mPut = mBuffer;
-    mGet = mBuffer;
-    mEnd = mBuffer + (sizeInBytes) - 1;
-    //dumpState("init");
-    mInitialized = true;
-    return true;
-}
-
-uint32_t LocklessCommandFifo::getFreeSpace() const {
-    int32_t freeSpace = 0;
-    //dumpState("getFreeSpace");
-
-    if (mPut >= mGet) {
-        freeSpace = mEnd - mPut;
-    } else {
-        freeSpace = mGet - mPut;
-    }
-
-    if (freeSpace < 0) {
-        freeSpace = 0;
-    }
-    return freeSpace;
-}
-
-bool LocklessCommandFifo::isEmpty() const {
-    uint32_t p = android_atomic_acquire_load((int32_t *)&mPut);
-    return ((uint8_t *)p) == mGet;
-}
-
-
-void * LocklessCommandFifo::reserve(uint32_t sizeInBytes) {
-    // Add space for command header and loop token;
-    sizeInBytes += 8;
-
-    //dumpState("reserve");
-    if (getFreeSpace() < sizeInBytes) {
-        makeSpace(sizeInBytes);
-    }
-
-    return mPut + 4;
-}
-
-void LocklessCommandFifo::commit(uint32_t command, uint32_t sizeInBytes) {
-    if (mInShutdown) {
-        return;
-    }
-    //dumpState("commit 1");
-    reinterpret_cast<uint16_t *>(mPut)[0] = command;
-    reinterpret_cast<uint16_t *>(mPut)[1] = sizeInBytes;
-
-    int32_t s = ((sizeInBytes + 3) & ~3) + 4;
-    android_atomic_add(s, (int32_t *)&mPut);
-    //dumpState("commit 2");
-    mSignalToWorker.set();
-}
-
-void LocklessCommandFifo::commitSync(uint32_t command, uint32_t sizeInBytes) {
-    if (mInShutdown) {
-        return;
-    }
-
-    //char buf[1024];
-    //sprintf(buf, "RenderScript LocklessCommandFifo::commitSync  %p %i  %i", this, command, sizeInBytes);
-    //StopWatch compileTimer(buf);
-    commit(command, sizeInBytes);
-    flush();
-}
-
-void LocklessCommandFifo::flush() {
-    //dumpState("flush 1");
-    while (mPut != mGet) {
-        while (!mSignalToControl.wait(mTimeoutWait)) {
-            if (mTimeoutCallback) {
-                mTimeoutCallback(mTimeoutCallbackData);
-            }
-        }
-    }
-    //dumpState("flush 2");
-}
-
-void LocklessCommandFifo::setTimoutCallback(void (*cbk)(void *), void *data, uint64_t timeout) {
-    mTimeoutCallback = cbk;
-    mTimeoutCallbackData = data;
-    mTimeoutWait = timeout;
-}
-
-bool LocklessCommandFifo::wait(uint64_t timeout) {
-    while (isEmpty() && !mInShutdown) {
-        mSignalToControl.set();
-        return mSignalToWorker.wait(timeout);
-    }
-    return true;
-}
-
-const void * LocklessCommandFifo::get(uint32_t *command, uint32_t *bytesData, uint64_t timeout) {
-    while (1) {
-        //dumpState("get");
-        wait(timeout);
-
-        if (isEmpty() || mInShutdown) {
-            *command = 0;
-            *bytesData = 0;
-            return NULL;
-        }
-
-        *command = reinterpret_cast<const uint16_t *>(mGet)[0];
-        *bytesData = reinterpret_cast<const uint16_t *>(mGet)[1];
-        if (*command) {
-            // non-zero command is valid
-            return mGet+4;
-        }
-
-        // zero command means reset to beginning.
-        mGet = mBuffer;
-    }
-}
-
-void LocklessCommandFifo::next() {
-    uint32_t bytes = reinterpret_cast<const uint16_t *>(mGet)[1];
-
-    android_atomic_add(((bytes + 3) & ~3) + 4, (int32_t *)&mGet);
-    //mGet += ((bytes + 3) & ~3) + 4;
-    if (isEmpty()) {
-        mSignalToControl.set();
-    }
-    //dumpState("next");
-}
-
-bool LocklessCommandFifo::makeSpaceNonBlocking(uint32_t bytes) {
-    //dumpState("make space non-blocking");
-    if ((mPut+bytes) > mEnd) {
-        // Need to loop regardless of where get is.
-        if ((mGet > mPut) || (mBuffer+4 >= mGet)) {
-            return false;
-        }
-
-        // Toss in a reset then the normal wait for space will do the rest.
-        reinterpret_cast<uint16_t *>(mPut)[0] = 0;
-        reinterpret_cast<uint16_t *>(mPut)[1] = 0;
-        mPut = mBuffer;
-        mSignalToWorker.set();
-    }
-
-    // it will fit here so we just need to wait for space.
-    if (getFreeSpace() < bytes) {
-        return false;
-    }
-
-    return true;
-}
-
-void LocklessCommandFifo::makeSpace(uint32_t bytes) {
-    //dumpState("make space");
-    if ((mPut+bytes) > mEnd) {
-        // Need to loop regardless of where get is.
-        while ((mGet > mPut) || (mBuffer+4 >= mGet)) {
-            usleep(100);
-        }
-
-        // Toss in a reset then the normal wait for space will do the rest.
-        reinterpret_cast<uint16_t *>(mPut)[0] = 0;
-        reinterpret_cast<uint16_t *>(mPut)[1] = 0;
-        mPut = mBuffer;
-        mSignalToWorker.set();
-    }
-
-    // it will fit here so we just need to wait for space.
-    while (getFreeSpace() < bytes) {
-        usleep(100);
-    }
-
-}
-
-void LocklessCommandFifo::dumpState(const char *s) const {
-    ALOGV("%s %p  put %p, get %p,  buf %p,  end %p", s, this, mPut, mGet, mBuffer, mEnd);
-}
-
-void LocklessCommandFifo::printDebugData() const {
-    dumpState("printing fifo debug");
-    const uint32_t *pptr = (const uint32_t *)mGet;
-    pptr -= 8 * 4;
-    if (mGet < mBuffer) {
-        pptr = (const uint32_t *)mBuffer;
-    }
-
-
-    for (int ct=0; ct < 16; ct++) {
-        ALOGV("fifo %p = 0x%08x  0x%08x  0x%08x  0x%08x", pptr, pptr[0], pptr[1], pptr[2], pptr[3]);
-        pptr += 4;
-    }
-
-}
diff --git a/rsLocklessFifo.h b/rsLocklessFifo.h
deleted file mode 100644
index dafc512..0000000
--- a/rsLocklessFifo.h
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (C) 2009 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef ANDROID_RS_LOCKLESS_FIFO_H
-#define ANDROID_RS_LOCKLESS_FIFO_H
-
-
-#include "rsUtils.h"
-#include "rsSignal.h"
-
-namespace android {
-namespace renderscript {
-
-
-// A simple FIFO to be used as a producer / consumer between two
-// threads.  One is writer and one is reader.  The common cases
-// will not require locking.  It is not threadsafe for multiple
-// readers or writers by design.
-
-class LocklessCommandFifo {
-public:
-    bool init(uint32_t size);
-    void shutdown();
-    void setTimoutCallback(void (*)(void *), void *, uint64_t timeout);
-
-    void printDebugData() const;
-
-    LocklessCommandFifo();
-    ~LocklessCommandFifo();
-
-protected:
-    uint8_t * volatile mPut;
-    uint8_t * volatile mGet;
-    uint8_t * mBuffer;
-    uint8_t * mEnd;
-    uint8_t mSize;
-    bool mInShutdown;
-    bool mInitialized;
-
-    Signal mSignalToWorker;
-    Signal mSignalToControl;
-
-public:
-    void * reserve(uint32_t bytes);
-    void commit(uint32_t command, uint32_t bytes);
-    void commitSync(uint32_t command, uint32_t bytes);
-
-    void flush();
-    bool wait(uint64_t timeout = 0);
-
-    const void * get(uint32_t *command, uint32_t *bytesData, uint64_t timeout = 0);
-    void next();
-
-    void makeSpace(uint32_t bytes);
-    bool makeSpaceNonBlocking(uint32_t bytes);
-
-    bool isEmpty() const;
-    uint32_t getFreeSpace() const;
-
-private:
-    void dumpState(const char *) const;
-
-    void (*mTimeoutCallback)(void *);
-    void * mTimeoutCallbackData;
-    uint64_t mTimeoutWait;
-};
-
-
-}
-}
-#endif
diff --git a/rsThreadIO.cpp b/rsThreadIO.cpp
index 1917774..8e4b988 100644
--- a/rsThreadIO.cpp
+++ b/rsThreadIO.cpp
@@ -18,227 +18,189 @@
 
 #include "rsThreadIO.h"
 
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include <fcntl.h>
+#include <poll.h>
+
+
 using namespace android;
 using namespace android::renderscript;
 
-ThreadIO::ThreadIO() : mUsingSocket(false) {
+ThreadIO::ThreadIO() {
+    mRunning = true;
 }
 
 ThreadIO::~ThreadIO() {
 }
 
-void ThreadIO::init(bool useSocket) {
-    mUsingSocket = useSocket;
-    mToCore.init(16 * 1024);
-
-    if (mUsingSocket) {
-        mToClientSocket.init();
-        mToCoreSocket.init();
-    } else {
-        mToClient.init(1024);
-    }
+void ThreadIO::init() {
+    mToClient.init();
+    mToCore.init();
 }
 
 void ThreadIO::shutdown() {
-    //ALOGE("shutdown 1");
+    mRunning = false;
     mToCore.shutdown();
-    //ALOGE("shutdown 2");
-}
-
-void ThreadIO::coreFlush() {
-    //ALOGE("coreFlush 1");
-    if (mUsingSocket) {
-    } else {
-        mToCore.flush();
-    }
-    //ALOGE("coreFlush 2");
 }
 
 void * ThreadIO::coreHeader(uint32_t cmdID, size_t dataLen) {
     //ALOGE("coreHeader %i %i", cmdID, dataLen);
-    if (mUsingSocket) {
-        CoreCmdHeader hdr;
-        hdr.bytes = dataLen;
-        hdr.cmdID = cmdID;
-        mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
-    } else {
-        mCoreCommandSize = dataLen;
-        mCoreCommandID = cmdID;
-        mCoreDataPtr = (uint8_t *)mToCore.reserve(dataLen);
-        mCoreDataBasePtr = mCoreDataPtr;
-    }
-    //ALOGE("coreHeader ret %p", mCoreDataPtr);
-    return mCoreDataPtr;
-}
-
-void ThreadIO::coreData(const void *data, size_t dataLen) {
-    //ALOGE("coreData %p %i", data, dataLen);
-    mToCoreSocket.writeAsync(data, dataLen);
-    //ALOGE("coreData ret %p", mCoreDataPtr);
+    CoreCmdHeader *hdr = (CoreCmdHeader *)&mSendBuffer[0];
+    hdr->bytes = dataLen;
+    hdr->cmdID = cmdID;
+    mSendLen = dataLen + sizeof(CoreCmdHeader);
+    //mToCoreSocket.writeAsync(&hdr, sizeof(hdr));
+    //ALOGE("coreHeader ret ");
+    return &mSendBuffer[sizeof(CoreCmdHeader)];
 }
 
 void ThreadIO::coreCommit() {
-    //ALOGE("coreCommit %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize);
-    if (mUsingSocket) {
-    } else {
-        rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize);
-        mToCore.commit(mCoreCommandID, mCoreCommandSize);
-    }
-    //ALOGE("coreCommit ret");
-}
-
-void ThreadIO::coreCommitSync() {
-    //ALOGE("coreCommitSync %p %p %i", mCoreDataPtr, mCoreDataBasePtr, mCoreCommandSize);
-    if (mUsingSocket) {
-    } else {
-        rsAssert((size_t)(mCoreDataPtr - mCoreDataBasePtr) <= mCoreCommandSize);
-        mToCore.commitSync(mCoreCommandID, mCoreCommandSize);
-    }
-    //ALOGE("coreCommitSync ret");
+    mToCore.writeAsync(&mSendBuffer, mSendLen);
 }
 
 void ThreadIO::clientShutdown() {
-    //ALOGE("coreShutdown 1");
     mToClient.shutdown();
-    //ALOGE("coreShutdown 2");
 }
 
 void ThreadIO::coreSetReturn(const void *data, size_t dataLen) {
-    rsAssert(dataLen <= sizeof(mToCoreRet));
-    memcpy(&mToCoreRet, data, dataLen);
+    uint32_t buf;
+    if (data == NULL) {
+        data = &buf;
+        dataLen = sizeof(buf);
+    }
+
+    mToCore.readReturn(data, dataLen);
 }
 
 void ThreadIO::coreGetReturn(void *data, size_t dataLen) {
-    memcpy(data, &mToCoreRet, dataLen);
+    uint32_t buf;
+    if (data == NULL) {
+        data = &buf;
+        dataLen = sizeof(buf);
+    }
+
+    mToCore.writeWaitReturn(data, dataLen);
 }
 
-void ThreadIO::setTimoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) {
-    mToCore.setTimoutCallback(cb, dat, timeout);
+void ThreadIO::setTimeoutCallback(void (*cb)(void *), void *dat, uint64_t timeout) {
+    //mToCore.setTimeoutCallback(cb, dat, timeout);
 }
 
-
-bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand, uint64_t timeToWait) {
+bool ThreadIO::playCoreCommands(Context *con, bool waitForCommand, int waitFd) {
     bool ret = false;
-    uint64_t startTime = con->getTime();
 
-    while (!mToCore.isEmpty() || waitForCommand) {
-        uint32_t cmdID = 0;
-        uint32_t cmdSize = 0;
-        if (con->props.mLogTimes) {
-            con->timerSet(Context::RS_TIMER_IDLE);
+    uint8_t buf[2 * 1024];
+    const CoreCmdHeader *cmd = (const CoreCmdHeader *)&buf[0];
+    const void * data = (const void *)&buf[sizeof(CoreCmdHeader)];
+
+    struct pollfd p[2];
+    p[0].fd = mToCore.getReadFd();
+    p[0].events = POLLIN;
+    p[0].revents = 0;
+    p[1].fd = waitFd;
+    p[1].events = POLLIN;
+    p[1].revents = 0;
+    int pollCount = 1;
+    if (waitFd >= 0) {
+        pollCount = 2;
+    }
+
+    if (con->props.mLogTimes) {
+        con->timerSet(Context::RS_TIMER_IDLE);
+    }
+
+    int waitTime = -1;
+    while (mRunning) {
+        int pr = poll(p, pollCount, waitTime);
+        if (pr <= 0) {
+            break;
         }
 
-        uint64_t delay = 0;
-        if (waitForCommand) {
-            delay = timeToWait - (con->getTime() - startTime);
-            if (delay > timeToWait) {
-                delay = 0;
+        if (p[0].revents) {
+            size_t r = mToCore.read(&buf[0], sizeof(CoreCmdHeader));
+            mToCore.read(&buf[sizeof(CoreCmdHeader)], cmd->bytes);
+
+            if (r != sizeof(CoreCmdHeader)) {
+                // exception or timeout occurred.
+                break;
+            }
+
+            ret = true;
+            if (con->props.mLogTimes) {
+                con->timerSet(Context::RS_TIMER_INTERNAL);
+            }
+            waitForCommand = false;
+            //ALOGV("playCoreCommands 3 %i %i", cmd->cmdID, cmd->bytes);
+
+            if (cmd->cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) {
+                rsAssert(cmd->cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *)));
+                ALOGE("playCoreCommands error con %p, cmd %i", con, cmd->cmdID);
+            }
+            gPlaybackFuncs[cmd->cmdID](con, data, cmd->bytes);
+
+            if (con->props.mLogTimes) {
+                con->timerSet(Context::RS_TIMER_IDLE);
+            }
+
+            if (waitFd < 0) {
+                // If we don't have a secondary wait object we should stop blocking now
+                // that at least one command has been processed.
+                waitTime = 0;
             }
         }
 
-        if (delay == 0 && timeToWait != 0 && mToCore.isEmpty()) {
+        if (p[1].revents && !p[0].revents) {
+            // We want to finish processing fifo events before processing the vsync.
+            // Otherwise we can end up falling behind and having tremendous lag.
             break;
         }
-
-        const void * data = mToCore.get(&cmdID, &cmdSize, delay);
-        if (!cmdSize) {
-            // exception or timeout occurred.
-            break;
-        }
-        ret = true;
-        if (con->props.mLogTimes) {
-            con->timerSet(Context::RS_TIMER_INTERNAL);
-        }
-        waitForCommand = false;
-        //ALOGV("playCoreCommands 3 %i %i", cmdID, cmdSize);
-
-        if (cmdID >= (sizeof(gPlaybackFuncs) / sizeof(void *))) {
-            rsAssert(cmdID < (sizeof(gPlaybackFuncs) / sizeof(void *)));
-            ALOGE("playCoreCommands error con %p, cmd %i", con, cmdID);
-            mToCore.printDebugData();
-        }
-        gPlaybackFuncs[cmdID](con, data, cmdSize << 2);
-        mToCore.next();
     }
     return ret;
 }
 
 RsMessageToClientType ThreadIO::getClientHeader(size_t *receiveLen, uint32_t *usrID) {
-    if (mUsingSocket) {
-        mToClientSocket.read(&mLastClientHeader, sizeof(mLastClientHeader));
-    } else {
-        size_t bytesData = 0;
-        const uint32_t *d = (const uint32_t *)mToClient.get(&mLastClientHeader.cmdID, (uint32_t*)&bytesData);
-        if (bytesData >= sizeof(uint32_t)) {
-            mLastClientHeader.userID = d[0];
-            mLastClientHeader.bytes = bytesData - sizeof(uint32_t);
-        } else {
-            mLastClientHeader.userID = 0;
-            mLastClientHeader.bytes = 0;
-        }
-    }
+    //ALOGE("getClientHeader");
+    mToClient.read(&mLastClientHeader, sizeof(mLastClientHeader));
+
     receiveLen[0] = mLastClientHeader.bytes;
     usrID[0] = mLastClientHeader.userID;
+    //ALOGE("getClientHeader %i %i %i", mLastClientHeader.cmdID, usrID[0], receiveLen[0]);
     return (RsMessageToClientType)mLastClientHeader.cmdID;
 }
 
 RsMessageToClientType ThreadIO::getClientPayload(void *data, size_t *receiveLen,
                                 uint32_t *usrID, size_t bufferLen) {
+    //ALOGE("getClientPayload");
     receiveLen[0] = mLastClientHeader.bytes;
     usrID[0] = mLastClientHeader.userID;
     if (bufferLen < mLastClientHeader.bytes) {
         return RS_MESSAGE_TO_CLIENT_RESIZE;
     }
-    if (mUsingSocket) {
-        if (receiveLen[0]) {
-            mToClientSocket.read(data, receiveLen[0]);
-        }
-        return (RsMessageToClientType)mLastClientHeader.cmdID;
-    } else {
-        uint32_t bytesData = 0;
-        uint32_t commandID = 0;
-        const uint32_t *d = (const uint32_t *)mToClient.get(&commandID, &bytesData);
-        //ALOGE("getMessageToClient 3    %i  %i", commandID, bytesData);
-        //ALOGE("getMessageToClient  %i %i", commandID, *subID);
-        if (bufferLen >= receiveLen[0]) {
-            memcpy(data, d+1, receiveLen[0]);
-            mToClient.next();
-            return (RsMessageToClientType)commandID;
-        }
+    if (receiveLen[0]) {
+        mToClient.read(data, receiveLen[0]);
     }
-    return RS_MESSAGE_TO_CLIENT_RESIZE;
+    //ALOGE("getClientPayload x");
+    return (RsMessageToClientType)mLastClientHeader.cmdID;
 }
 
 bool ThreadIO::sendToClient(RsMessageToClientType cmdID, uint32_t usrID, const void *data,
                             size_t dataLen, bool waitForSpace) {
+
+    //ALOGE("sendToClient %i %i %i", cmdID, usrID, (int)dataLen);
     ClientCmdHeader hdr;
     hdr.bytes = dataLen;
     hdr.cmdID = cmdID;
     hdr.userID = usrID;
-    if (mUsingSocket) {
-        mToClientSocket.writeAsync(&hdr, sizeof(hdr));
-        if (dataLen) {
-            mToClientSocket.writeAsync(data, dataLen);
-        }
-        return true;
-    } else {
-        if (!waitForSpace) {
-            if (!mToClient.makeSpaceNonBlocking(dataLen + sizeof(hdr))) {
-                // Not enough room, and not waiting.
-                return false;
-            }
-        }
 
-        //ALOGE("sendMessageToClient 2");
-        uint32_t *p = (uint32_t *)mToClient.reserve(dataLen + sizeof(usrID));
-        p[0] = usrID;
-        if (dataLen > 0) {
-            memcpy(p+1, data, dataLen);
-        }
-        mToClient.commit(cmdID, dataLen + sizeof(usrID));
-        //ALOGE("sendMessageToClient 3");
-        return true;
+    mToClient.writeAsync(&hdr, sizeof(hdr));
+    if (dataLen) {
+        mToClient.writeAsync(data, dataLen);
     }
-    return false;
+
+    //ALOGE("sendToClient x");
+    return true;
 }
 
diff --git a/rsThreadIO.h b/rsThreadIO.h
index ebce0ab..d56a1c9 100644
--- a/rsThreadIO.h
+++ b/rsThreadIO.h
@@ -18,7 +18,6 @@
 #define ANDROID_RS_THREAD_IO_H
 
 #include "rsUtils.h"
-#include "rsLocklessFifo.h"
 #include "rsFifoSocket.h"
 
 // ---------------------------------------------------------------------------
@@ -32,23 +31,17 @@
     ThreadIO();
     ~ThreadIO();
 
-    void init(bool useSocket = false);
+    void init();
     void shutdown();
 
     // Plays back commands from the client.
     // Returns true if any commands were processed.
-    bool playCoreCommands(Context *con, bool waitForCommand, uint64_t timeToWait);
+    bool playCoreCommands(Context *con, bool waitForCommand, int waitFd);
 
-    void setTimoutCallback(void (*)(void *), void *, uint64_t timeout);
-    //LocklessCommandFifo mToCore;
+    void setTimeoutCallback(void (*)(void *), void *, uint64_t timeout);
 
-
-
-    void coreFlush();
     void * coreHeader(uint32_t, size_t dataLen);
-    void coreData(const void *data, size_t dataLen);
     void coreCommit();
-    void coreCommitSync();
     void coreSetReturn(const void *data, size_t dataLen);
     void coreGetReturn(void *data, size_t dataLen);
 
@@ -71,20 +64,16 @@
     } ClientCmdHeader;
     ClientCmdHeader mLastClientHeader;
 
-    size_t mCoreCommandSize;
-    uint32_t mCoreCommandID;
-    uint8_t * mCoreDataPtr;
-    uint8_t * mCoreDataBasePtr;
+    bool mRunning;
 
-    bool mUsingSocket;
-    LocklessCommandFifo mToClient;
-    LocklessCommandFifo mToCore;
-
-    FifoSocket mToClientSocket;
-    FifoSocket mToCoreSocket;
+    FifoSocket mToClient;
+    FifoSocket mToCore;
 
     intptr_t mToCoreRet;
 
+    size_t mSendLen;
+    uint8_t mSendBuffer[2 * 1024];
+
 };
 
 
diff --git a/rsg_generator.c b/rsg_generator.c
index 6b84e56..7a90597 100644
--- a/rsg_generator.c
+++ b/rsg_generator.c
@@ -256,7 +256,7 @@
                     fprintf(f, "        memcpy(payload, %s, %s_length);\n", vt->name, vt->name);
                     fprintf(f, "        cmd->%s = (", vt->name);
                     printVarType(f, vt);
-                    fprintf(f, ")payload;\n");
+                    fprintf(f, ")(payload - ((uint8_t *)&cmd[1]));\n");
                     fprintf(f, "        payload += %s_length;\n", vt->name);
                     fprintf(f, "    } else {\n");
                     fprintf(f, "        cmd->%s = %s;\n", vt->name, vt->name);
@@ -270,26 +270,19 @@
                 needFlush = 1;
             }
 
+            fprintf(f, "    io->coreCommit();\n");
             if (hasInlineDataPointers(api)) {
-                fprintf(f, "    if (dataSize < 1024) {\n");
-                fprintf(f, "        io->coreCommit();\n");
-                fprintf(f, "    } else {\n");
-                fprintf(f, "        io->coreCommitSync();\n");
+                fprintf(f, "    if (dataSize >= 1024) {\n");
+                fprintf(f, "        io->coreGetReturn(NULL, 0);\n");
                 fprintf(f, "    }\n");
-            } else {
-                fprintf(f, "    io->coreCommit");
-                if (needFlush) {
-                    fprintf(f, "Sync");
-                }
-                fprintf(f, "();\n");
-            }
-
-            if (api->ret.typeName[0]) {
+            } else if (api->ret.typeName[0]) {
                 fprintf(f, "\n    ");
                 printVarType(f, &api->ret);
                 fprintf(f, " ret;\n");
                 fprintf(f, "    io->coreGetReturn(&ret, sizeof(ret));\n");
                 fprintf(f, "    return ret;\n");
+            } else if (needFlush) {
+                fprintf(f, "    io->coreGetReturn(NULL, 0);\n");
             }
         }
         fprintf(f, "};\n\n");
@@ -434,6 +427,7 @@
 
     for (ct=0; ct < apiCount; ct++) {
         const ApiEntry * api = &apis[ct];
+        int needFlush = 0;
 
         if (api->direct) {
             continue;
@@ -444,6 +438,13 @@
         //fprintf(f, "    ALOGE(\"play command %s\\n\");\n", api->name);
         fprintf(f, "    const RS_CMD_%s *cmd = static_cast<const RS_CMD_%s *>(vp);\n", api->name, api->name);
 
+        if (hasInlineDataPointers(api)) {
+            fprintf(f, "    const uint8_t *baseData = 0;\n");
+            fprintf(f, "    if (cmdSizeBytes != sizeof(RS_CMD_%s)) {\n", api->name);
+            fprintf(f, "        baseData = &((const uint8_t *)vp)[sizeof(*cmd)];\n");
+            fprintf(f, "    }\n");
+        }
+
         fprintf(f, "    ");
         if (api->ret.typeName[0]) {
             fprintf(f, "\n    ");
@@ -453,12 +454,24 @@
         fprintf(f, "rsi_%s(con", api->name);
         for (ct2=0; ct2 < api->paramCount; ct2++) {
             const VarType *vt = &api->params[ct2];
-            fprintf(f, ",\n           cmd->%s", vt->name);
+            needFlush += vt->ptrLevel;
+
+            if (hasInlineDataPointers(api) && vt->ptrLevel) {
+                fprintf(f, ",\n           (const %s *)&baseData[(intptr_t)cmd->%s]", vt->typeName, vt->name);
+            } else {
+                fprintf(f, ",\n           cmd->%s", vt->name);
+            }
         }
         fprintf(f, ");\n");
 
-        if (api->ret.typeName[0]) {
+        if (hasInlineDataPointers(api)) {
+            fprintf(f, "    if (cmdSizeBytes == sizeof(RS_CMD_%s)) {\n", api->name);
+            fprintf(f, "        con->mIO.coreSetReturn(NULL, 0);\n");
+            fprintf(f, "    }\n");
+        } else if (api->ret.typeName[0]) {
             fprintf(f, "    con->mIO.coreSetReturn(&ret, sizeof(ret));\n");
+        } else if (api->sync || needFlush) {
+            fprintf(f, "    con->mIO.coreSetReturn(NULL, 0);\n");
         }
 
         fprintf(f, "};\n\n");
@@ -466,6 +479,7 @@
 
     for (ct=0; ct < apiCount; ct++) {
         const ApiEntry * api = &apis[ct];
+        int needFlush = 0;
 
         fprintf(f, "void rspr_%s(Context *con, Fifo *f, uint8_t *scratch, size_t scratchSize) {\n", api->name);
 
@@ -475,6 +489,7 @@
 
         for (ct2=0; ct2 < api->paramCount; ct2++) {
             const VarType *vt = &api->params[ct2];
+            needFlush += vt->ptrLevel;
             if (vt->ptrLevel == 1) {
                 fprintf(f, "    cmd.%s = (", vt->name);
                 printVarType(f, vt);
@@ -515,6 +530,8 @@
 
         if (api->ret.typeName[0]) {
             fprintf(f, "    f->readReturn(&ret, sizeof(ret));\n");
+        } else if (needFlush) {
+            fprintf(f, "    f->readReturn(NULL, 0);\n");
         }
 
         fprintf(f, "};\n\n");