Add a simple work queue abstraction.

Makes it easy to schedule a bunch of work to happen in parallel.

Change-Id: Id9c0e52fc8b6d78d2b9ed4c2ee47abce0a01775c
diff --git a/libs/utils/Android.mk b/libs/utils/Android.mk
index 87d0fb2..b4fc994 100644
--- a/libs/utils/Android.mk
+++ b/libs/utils/Android.mk
@@ -41,6 +41,7 @@
 	Tokenizer.cpp \
 	Unicode.cpp \
 	VectorImpl.cpp \
+	WorkQueue.cpp \
 	misc.cpp
 
 host_commonCflags := -DLIBUTILS_NATIVE=1 $(TOOL_CFLAGS)
diff --git a/libs/utils/Threads.cpp b/libs/utils/Threads.cpp
index ab207f5..f9277de 100644
--- a/libs/utils/Threads.cpp
+++ b/libs/utils/Threads.cpp
@@ -323,6 +323,7 @@
 #endif
 }
 
+#ifdef HAVE_ANDROID_OS
 int androidSetThreadSchedulingGroup(pid_t tid, int grp)
 {
     if (grp > ANDROID_TGROUP_MAX || grp < 0) { 
@@ -425,6 +426,7 @@
 
     return ret;
 }
+#endif
 
 namespace android {
 
diff --git a/libs/utils/WorkQueue.cpp b/libs/utils/WorkQueue.cpp
new file mode 100644
index 0000000..3bb99a1
--- /dev/null
+++ b/libs/utils/WorkQueue.cpp
@@ -0,0 +1,171 @@
+/*
+ * Copyright (C) 2012 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// #define LOG_NDEBUG 0
+#define LOG_TAG "WorkQueue"
+
+#include <utils/Log.h>
+#include <utils/WorkQueue.h>
+
+namespace android {
+
+// --- WorkQueue ---
+
+WorkQueue::WorkQueue(size_t maxThreads, bool canCallJava) :
+        mMaxThreads(maxThreads), mCanCallJava(canCallJava),
+        mCanceled(false), mFinished(false), mIdleThreads(0) {
+}
+
+WorkQueue::~WorkQueue() {
+    if (!cancel()) {
+        finish();
+    }
+}
+
+status_t WorkQueue::schedule(WorkUnit* workUnit, size_t backlog) {
+    AutoMutex _l(mLock);
+
+    if (mFinished || mCanceled) {
+        return INVALID_OPERATION;
+    }
+
+    if (mWorkThreads.size() < mMaxThreads
+            && mIdleThreads < mWorkUnits.size() + 1) {
+        sp<WorkThread> workThread = new WorkThread(this, mCanCallJava);
+        status_t status = workThread->run("WorkQueue::WorkThread");
+        if (status) {
+            return status;
+        }
+        mWorkThreads.add(workThread);
+        mIdleThreads += 1;
+    } else if (backlog) {
+        while (mWorkUnits.size() >= mMaxThreads * backlog) {
+            mWorkDequeuedCondition.wait(mLock);
+            if (mFinished || mCanceled) {
+                return INVALID_OPERATION;
+            }
+        }
+    }
+
+    mWorkUnits.add(workUnit);
+    mWorkChangedCondition.broadcast();
+    return OK;
+}
+
+status_t WorkQueue::cancel() {
+    AutoMutex _l(mLock);
+
+    return cancelLocked();
+}
+
+status_t WorkQueue::cancelLocked() {
+    if (mFinished) {
+        return INVALID_OPERATION;
+    }
+
+    if (!mCanceled) {
+        mCanceled = true;
+
+        size_t count = mWorkUnits.size();
+        for (size_t i = 0; i < count; i++) {
+            delete mWorkUnits.itemAt(i);
+        }
+        mWorkUnits.clear();
+        mWorkChangedCondition.broadcast();
+        mWorkDequeuedCondition.broadcast();
+    }
+    return OK;
+}
+
+status_t WorkQueue::finish() {
+    { // acquire lock
+        AutoMutex _l(mLock);
+
+        if (mFinished) {
+            return INVALID_OPERATION;
+        }
+
+        mFinished = true;
+        mWorkChangedCondition.broadcast();
+    } // release lock
+
+    // It is not possible for the list of work threads to change once the mFinished
+    // flag has been set, so we can access mWorkThreads outside of the lock here.
+    size_t count = mWorkThreads.size();
+    for (size_t i = 0; i < count; i++) {
+        mWorkThreads.itemAt(i)->join();
+    }
+    mWorkThreads.clear();
+    return OK;
+}
+
+bool WorkQueue::threadLoop() {
+    WorkUnit* workUnit;
+    { // acquire lock
+        AutoMutex _l(mLock);
+
+        for (;;) {
+            if (mCanceled) {
+                return false;
+            }
+
+            if (!mWorkUnits.isEmpty()) {
+                workUnit = mWorkUnits.itemAt(0);
+                mWorkUnits.removeAt(0);
+                mIdleThreads -= 1;
+                mWorkDequeuedCondition.broadcast();
+                break;
+            }
+
+            if (mFinished) {
+                return false;
+            }
+
+            mWorkChangedCondition.wait(mLock);
+        }
+    } // release lock
+
+    bool shouldContinue = workUnit->run();
+    delete workUnit;
+
+    { // acquire lock
+        AutoMutex _l(mLock);
+
+        mIdleThreads += 1;
+
+        if (!shouldContinue) {
+            cancelLocked();
+            return false;
+        }
+    } // release lock
+
+    return true;
+}
+
+// --- WorkQueue::WorkThread ---
+
+WorkQueue::WorkThread::WorkThread(WorkQueue* workQueue, bool canCallJava) :
+        Thread(canCallJava), mWorkQueue(workQueue) {
+}
+
+WorkQueue::WorkThread::~WorkThread() {
+}
+
+bool WorkQueue::WorkThread::threadLoop() {
+    return mWorkQueue->threadLoop();
+}
+
+};  // namespace android