Parellel mark stack processing

Enabled parallel mark stack processing by using a thread pool.

Optimized object scanning by removing dependent loads for IsClass.

Performance:
Prime: ~10% speedup of partial GC.
Nakasi: ~50% speedup of partial GC.

Change-Id: I43256a068efc47cb52d93108458ea18d4e02fccc
diff --git a/src/thread_pool.cc b/src/thread_pool.cc
index ba53113..26c83d2 100644
--- a/src/thread_pool.cc
+++ b/src/thread_pool.cc
@@ -1,3 +1,4 @@
+#include "casts.h"
 #include "runtime.h"
 #include "stl_util.h"
 #include "thread.h"
@@ -24,9 +25,10 @@
 
 void ThreadPoolWorker::Run() {
   Thread* self = Thread::Current();
-  Closure* task = NULL;
+  Task* task = NULL;
   while ((task = thread_pool_->GetTask(self)) != NULL) {
     task->Run(self);
+    task->Finalize();
   }
 }
 
@@ -40,7 +42,7 @@
   return NULL;
 }
 
-void ThreadPool::AddTask(Thread* self, Closure* task){
+void ThreadPool::AddTask(Thread* self, Task* task){
   MutexLock mu(self, task_queue_lock_);
   tasks_.push_back(task);
   // If we have any waiters, signal one.
@@ -49,14 +51,6 @@
   }
 }
 
-void ThreadPool::AddThread(size_t stack_size) {
-  threads_.push_back(
-      new ThreadPoolWorker(
-          this,
-          StringPrintf("Thread pool worker %d", static_cast<int>(GetThreadCount())),
-          stack_size));
-}
-
 ThreadPool::ThreadPool(size_t num_threads)
   : task_queue_lock_("task queue lock"),
     task_queue_condition_("task queue condition", task_queue_lock_),
@@ -65,7 +59,8 @@
     shutting_down_(false),
     waiting_count_(0) {
   while (GetThreadCount() < num_threads) {
-    AddThread(ThreadPoolWorker::kDefaultStackSize);
+    const std::string name = StringPrintf("Thread pool worker %zu", GetThreadCount());
+    threads_.push_back(new ThreadPoolWorker(this, name, ThreadPoolWorker::kDefaultStackSize));
   }
 }
 
@@ -75,9 +70,9 @@
     MutexLock mu(self, task_queue_lock_);
     // Tell any remaining workers to shut down.
     shutting_down_ = true;
-    android_memory_barrier();
     // Broadcast to everyone waiting.
     task_queue_condition_.Broadcast(self);
+    completion_condition_.Broadcast(self);
   }
   // Wait for the threads to finish.
   STLDeleteElements(&threads_);
@@ -86,22 +81,21 @@
 void ThreadPool::StartWorkers(Thread* self) {
   MutexLock mu(self, task_queue_lock_);
   started_ = true;
-  android_memory_barrier();
   task_queue_condition_.Broadcast(self);
+  start_time_ = NanoTime();
+  total_wait_time_ = 0;
 }
 
 void ThreadPool::StopWorkers(Thread* self) {
   MutexLock mu(self, task_queue_lock_);
   started_ = false;
-  android_memory_barrier();
 }
 
-Closure* ThreadPool::GetTask(Thread* self) {
+Task* ThreadPool::GetTask(Thread* self) {
   MutexLock mu(self, task_queue_lock_);
-  while (!shutting_down_) {
-    if (started_ && !tasks_.empty()) {
-      Closure* task = tasks_.front();
-      tasks_.pop_front();
+  while (!IsShuttingDown()) {
+    Task* task = TryGetTaskLocked(self);
+    if (task != NULL) {
       return task;
     }
 
@@ -110,7 +104,10 @@
       // We may be done, lets broadcast to the completion condition.
       completion_condition_.Broadcast(self);
     }
+    const uint64_t wait_start = NanoTime();
     task_queue_condition_.Wait(self);
+    const uint64_t wait_end = NanoTime();
+    total_wait_time_ += wait_end - std::max(wait_start, start_time_);
     waiting_count_--;
   }
 
@@ -118,12 +115,151 @@
   return NULL;
 }
 
-void ThreadPool::Wait(Thread* self) {
+Task* ThreadPool::TryGetTask(Thread* self) {
   MutexLock mu(self, task_queue_lock_);
+  return TryGetTaskLocked(self);
+}
+
+Task* ThreadPool::TryGetTaskLocked(Thread* self) {
+  if (started_ && !tasks_.empty()) {
+    Task* task = tasks_.front();
+    tasks_.pop_front();
+    return task;
+  }
+  return NULL;
+}
+
+void ThreadPool::Wait(Thread* self, bool do_work) {
+  Task* task = NULL;
+  while ((task = TryGetTask(self)) != NULL) {
+    task->Run(self);
+    task->Finalize();
+  }
+
   // Wait until each thread is waiting and the task list is empty.
-  while (waiting_count_ != GetThreadCount() || !tasks_.empty()) {
+  MutexLock mu(self, task_queue_lock_);
+  while (!shutting_down_ && (waiting_count_ != GetThreadCount() || !tasks_.empty())) {
     completion_condition_.Wait(self);
   }
 }
 
+size_t ThreadPool::GetTaskCount(Thread* self){
+  MutexLock mu(self, task_queue_lock_);
+  return tasks_.size();
+}
+
+WorkStealingWorker::WorkStealingWorker(ThreadPool* thread_pool, const std::string& name,
+                                       size_t stack_size)
+    : ThreadPoolWorker(thread_pool, name, stack_size),
+      task_(NULL) {
+
+}
+
+void WorkStealingWorker::Run() {
+  Thread* self = Thread::Current();
+  Task* task = NULL;
+  WorkStealingThreadPool* thread_pool = down_cast<WorkStealingThreadPool*>(thread_pool_);
+  while ((task = thread_pool_->GetTask(self)) != NULL) {
+    WorkStealingTask* stealing_task = down_cast<WorkStealingTask*>(task);
+
+    {
+      CHECK(task_ == NULL);
+      MutexLock mu(self, thread_pool->work_steal_lock_);
+      // Register that we are running the task
+      ++stealing_task->ref_count_;
+      task_ = stealing_task;
+    }
+    stealing_task->Run(self);
+    // Mark ourselves as not running a task so that nobody tries to steal from us.
+    // There is a race condition that someone starts stealing from us at this point. This is okay
+    // due to the reference counting.
+    task_ = NULL;
+
+    bool finalize;
+
+    // Steal work from tasks until there is none left to steal. Note: There is a race, but
+    // all that happens when the race occurs is that we steal some work instead of processing a
+    // task from the queue.
+    while (thread_pool->GetTaskCount(self) == 0) {
+      WorkStealingTask* steal_from_task  = NULL;
+
+      {
+        MutexLock mu(self, thread_pool->work_steal_lock_);
+        // Try finding a task to steal from.
+        steal_from_task = thread_pool->FindTaskToStealFrom(self);
+        if (steal_from_task != NULL) {
+          CHECK_NE(stealing_task, steal_from_task)
+              << "Attempting to steal from completed self task";
+          steal_from_task->ref_count_++;
+        } else {
+          break;
+        }
+      }
+
+      if (steal_from_task != NULL) {
+        // Task which completed earlier is going to steal some work.
+        stealing_task->StealFrom(self, steal_from_task);
+
+        {
+          // We are done stealing from the task, lets decrement its reference count.
+          MutexLock mu(self, thread_pool->work_steal_lock_);
+          finalize = !--steal_from_task->ref_count_;
+        }
+
+        if (finalize) {
+          steal_from_task->Finalize();
+        }
+      }
+    }
+
+    {
+      MutexLock mu(self, thread_pool->work_steal_lock_);
+      // If nobody is still referencing task_ we can finalize it.
+      finalize = !--stealing_task->ref_count_;
+    }
+
+    if (finalize) {
+      stealing_task->Finalize();
+    }
+  }
+}
+
+WorkStealingWorker::~WorkStealingWorker() {
+
+}
+
+WorkStealingThreadPool::WorkStealingThreadPool(size_t num_threads)
+    : ThreadPool(0),
+      work_steal_lock_("work stealing lock"),
+      steal_index_(0) {
+  while (GetThreadCount() < num_threads) {
+    const std::string name = StringPrintf("Work stealing worker %zu", GetThreadCount());
+    threads_.push_back(new WorkStealingWorker(this, name, ThreadPoolWorker::kDefaultStackSize));
+  }
+}
+
+WorkStealingTask* WorkStealingThreadPool::FindTaskToStealFrom(Thread* self) {
+  const size_t thread_count = GetThreadCount();
+  for (size_t i = 0; i < thread_count; ++i) {
+    // TODO: Use CAS instead of lock.
+    ++steal_index_;
+    if (steal_index_ >= thread_count) {
+      steal_index_-= thread_count;
+    }
+
+    WorkStealingWorker* worker = down_cast<WorkStealingWorker*>(threads_[steal_index_]);
+    WorkStealingTask* task = worker->task_;
+    if (task) {
+      // Not null, we can probably steal from this worker.
+      return task;
+    }
+  }
+  // Couldn't find something to steal.
+  return NULL;
+}
+
+WorkStealingThreadPool::~WorkStealingThreadPool() {
+
+}
+
 }  // namespace art