Parellel mark stack processing

Enabled parallel mark stack processing by using a thread pool.

Optimized object scanning by removing dependent loads for IsClass.

Performance:
Prime: ~10% speedup of partial GC.
Nakasi: ~50% speedup of partial GC.

Change-Id: I43256a068efc47cb52d93108458ea18d4e02fccc
diff --git a/src/gc/mark_sweep.cc b/src/gc/mark_sweep.cc
index e93eb1a..4662cf6 100644
--- a/src/gc/mark_sweep.cc
+++ b/src/gc/mark_sweep.cc
@@ -41,8 +41,17 @@
 
 namespace art {
 
+// Performance options.
+static const bool kParallelMarkStack = true;
+static const bool kDisableFinger = true;
 static const bool kUseMarkStackPrefetch = true;
 
+// Profiling and information flags.
+static const bool kCountClassesMarked = false;
+static const bool kProfileLargeObjects = false;
+static const bool kMeasureOverhead = false;
+static const bool kCountTasks = false;
+
 class SetFingerVisitor {
  public:
   SetFingerVisitor(MarkSweep* const mark_sweep) : mark_sweep_(mark_sweep) {
@@ -71,17 +80,20 @@
       cleared_reference_list_(NULL),
       freed_bytes_(0), freed_objects_(0),
       class_count_(0), array_count_(0), other_count_(0),
-      gc_barrier_(new Barrier) {
+      large_object_test_(0), large_object_mark_(0),
+      classes_marked_(0), overhead_time_(0),
+      work_chunks_created_(0), work_chunks_deleted_(0),
+      gc_barrier_(new Barrier),
+      large_object_lock_("large object lock") {
   DCHECK(mark_stack_ != NULL);
 }
 
 void MarkSweep::Init() {
+  java_lang_Class_ = Class::GetJavaLangClass();
+  CHECK(java_lang_Class_ != NULL);
   heap_ = Runtime::Current()->GetHeap();
   mark_stack_->Reset();
-  // TODO: C++0x auto
   FindDefaultMarkBitmap();
-  // TODO: if concurrent, enable card marking in compiler
-  // TODO: check that the mark bitmap is entirely clear.
   // Mark any concurrent roots as dirty since we need to scan them at least once during this GC.
   Runtime::Current()->DirtyRoots();
 }
@@ -99,7 +111,7 @@
   LOG(FATAL) << "Could not find a default mark bitmap";
 }
 
-inline void MarkSweep::MarkObject0(const Object* obj, bool check_finger) {
+inline void MarkSweep::MarkObjectNonNull(const Object* obj, bool check_finger) {
   DCHECK(obj != NULL);
 
   if (obj >= immune_begin_ && obj < immune_end_) {
@@ -109,32 +121,21 @@
 
   // Try to take advantage of locality of references within a space, failing this find the space
   // the hard way.
-  if (UNLIKELY(!current_mark_bitmap_->HasAddress(obj))) {
+  SpaceBitmap* object_bitmap = current_mark_bitmap_;
+  if (UNLIKELY(!object_bitmap->HasAddress(obj))) {
     SpaceBitmap* new_bitmap = heap_->GetMarkBitmap()->GetSpaceBitmap(obj);
     if (new_bitmap != NULL) {
-      current_mark_bitmap_ = new_bitmap;
+      object_bitmap = new_bitmap;
     } else {
-      LargeObjectSpace* large_object_space = GetHeap()->GetLargeObjectsSpace();
-      SpaceSetMap* large_objects = large_object_space->GetMarkObjects();
-      if (!large_objects->Test(obj)) {
-        if (!large_object_space->Contains(obj)) {
-          LOG(ERROR) << "Tried to mark " << obj << " not contained by any spaces";
-          LOG(ERROR) << "Attempting see if it's a bad root";
-          VerifyRoots();
-          LOG(FATAL) << "Can't mark bad root";
-        }
-        large_objects->Set(obj);
-        // Don't need to check finger since large objects never have any object references.
-      }
-      // TODO: Improve clarity of control flow in this function?
+      MarkLargeObject(obj);
       return;
     }
   }
 
   // This object was not previously marked.
-  if (!current_mark_bitmap_->Test(obj)) {
-    current_mark_bitmap_->Set(obj);
-    if (check_finger && obj < finger_) {
+  if (!object_bitmap->Test(obj)) {
+    object_bitmap->Set(obj);
+    if (kDisableFinger || (check_finger && obj < finger_)) {
       // Do we need to expand the mark stack?
       if (UNLIKELY(mark_stack_->Size() >= mark_stack_->Capacity())) {
         std::vector<Object*> temp;
@@ -150,6 +151,57 @@
   }
 }
 
+// Rare case, probably not worth inlining since it will increase instruction cache miss rate.
+bool MarkSweep::MarkLargeObject(const Object* obj) {
+  LargeObjectSpace* large_object_space = GetHeap()->GetLargeObjectsSpace();
+  SpaceSetMap* large_objects = large_object_space->GetMarkObjects();
+  if (kProfileLargeObjects) {
+    ++large_object_test_;
+  }
+  if (UNLIKELY(!large_objects->Test(obj))) {
+    if (!large_object_space->Contains(obj)) {
+      LOG(ERROR) << "Tried to mark " << obj << " not contained by any spaces";
+      LOG(ERROR) << "Attempting see if it's a bad root";
+      VerifyRoots();
+      LOG(FATAL) << "Can't mark bad root";
+    }
+    if (kProfileLargeObjects) {
+      ++large_object_mark_;
+    }
+    large_objects->Set(obj);
+    // Don't need to check finger since large objects never have any object references.
+    return true;
+  }
+  return false;
+}
+
+inline bool MarkSweep::MarkObjectParallel(const Object* obj) {
+  DCHECK(obj != NULL);
+
+  if (obj >= immune_begin_ && obj < immune_end_) {
+    DCHECK(IsMarked(obj));
+    return false;
+  }
+
+  // Try to take advantage of locality of references within a space, failing this find the space
+  // the hard way.
+  SpaceBitmap* object_bitmap = current_mark_bitmap_;
+  if (UNLIKELY(!object_bitmap->HasAddress(obj))) {
+    SpaceBitmap* new_bitmap = heap_->GetMarkBitmap()->GetSpaceBitmap(obj);
+    if (new_bitmap != NULL) {
+      object_bitmap = new_bitmap;
+    } else {
+      // TODO: Remove the Thread::Current here?
+      // TODO: Convert this to some kind of atomic marking?
+      MutexLock mu(Thread::Current(), large_object_lock_);
+      return MarkLargeObject(obj);
+    }
+  }
+
+  // Return true if the object was not previously marked.
+  return !object_bitmap->AtomicTestAndSet(obj);
+}
+
 // Used to mark objects when recursing.  Recursion is done by moving
 // the finger across the bitmaps in address order and marking child
 // objects.  Any newly-marked objects whose addresses are lower than
@@ -157,22 +209,22 @@
 // need to be added to the mark stack.
 void MarkSweep::MarkObject(const Object* obj) {
   if (obj != NULL) {
-    MarkObject0(obj, true);
+    MarkObjectNonNull(obj, true);
   }
 }
 
-void MarkSweep::MarkObjectVisitor(const Object* root, void* arg) {
+void MarkSweep::MarkObjectCallback(const Object* root, void* arg) {
   DCHECK(root != NULL);
   DCHECK(arg != NULL);
   MarkSweep* mark_sweep = reinterpret_cast<MarkSweep*>(arg);
-  mark_sweep->MarkObject0(root, false);
+  mark_sweep->MarkObjectNonNull(root, false);
 }
 
 void MarkSweep::ReMarkObjectVisitor(const Object* root, void* arg) {
   DCHECK(root != NULL);
   DCHECK(arg != NULL);
   MarkSweep* mark_sweep = reinterpret_cast<MarkSweep*>(arg);
-  mark_sweep->MarkObject0(root, true);
+  mark_sweep->MarkObjectNonNull(root, true);
 }
 
 void MarkSweep::VerifyRootCallback(const Object* root, void* arg, size_t vreg,
@@ -201,15 +253,15 @@
 
 // Marks all objects in the root set.
 void MarkSweep::MarkRoots() {
-  Runtime::Current()->VisitNonConcurrentRoots(MarkObjectVisitor, this);
+  Runtime::Current()->VisitNonConcurrentRoots(MarkObjectCallback, this);
 }
 
 void MarkSweep::MarkNonThreadRoots() {
-  Runtime::Current()->VisitNonThreadRoots(MarkObjectVisitor, this);
+  Runtime::Current()->VisitNonThreadRoots(MarkObjectCallback, this);
 }
 
 void MarkSweep::MarkConcurrentRoots() {
-  Runtime::Current()->VisitConcurrentRoots(MarkObjectVisitor, this);
+  Runtime::Current()->VisitConcurrentRoots(MarkObjectCallback, this);
 }
 
 class CheckObjectVisitor {
@@ -259,26 +311,26 @@
   alloc_space->mark_bitmap_.reset(live_bitmap);
 }
 
-class ScanImageRootVisitor {
+class ScanObjectVisitor {
  public:
-  ScanImageRootVisitor(MarkSweep* const mark_sweep) : mark_sweep_(mark_sweep) {
+  ScanObjectVisitor(MarkSweep* const mark_sweep) : mark_sweep_(mark_sweep) {
+
   }
 
-  void operator ()(const Object* root) const
+  void operator ()(const Object* obj) const
       EXCLUSIVE_LOCKS_REQUIRED(Locks::heap_bitmap_lock_)
       SHARED_LOCKS_REQUIRED(Locks::mutator_lock_) {
-    DCHECK(root != NULL);
-    mark_sweep_->ScanObject(root);
+    mark_sweep_->ScanObject(obj);
   }
 
  private:
   MarkSweep* const mark_sweep_;
 };
 
-void MarkSweep::ScanGrayObjects(bool update_finger) {
+void MarkSweep::ScanGrayObjects() {
   const Spaces& spaces = heap_->GetSpaces();
   CardTable* card_table = heap_->GetCardTable();
-  ScanImageRootVisitor image_root_visitor(this);
+  ScanObjectVisitor visitor(this);
   SetFingerVisitor finger_visitor(this);
   // TODO: C++ 0x auto
   for (Spaces::const_iterator it = spaces.begin(); it != spaces.end(); ++it) {
@@ -287,11 +339,7 @@
     byte* end = space->End();
     // Image spaces are handled properly since live == marked for them.
     SpaceBitmap* mark_bitmap = space->GetMarkBitmap();
-    if (update_finger) {
-      card_table->Scan(mark_bitmap, begin, end, image_root_visitor, finger_visitor);
-    } else {
-      card_table->Scan(mark_bitmap, begin, end, image_root_visitor, IdentityFunctor());
-    }
+    card_table->Scan(mark_bitmap, begin, end, visitor, IdentityFunctor());
   }
 }
 
@@ -330,22 +378,6 @@
   }
 }
 
-class ScanObjectVisitor {
- public:
-  ScanObjectVisitor(MarkSweep* const mark_sweep) : mark_sweep_(mark_sweep) {
-
-  }
-
-  void operator ()(const Object* obj) const
-      EXCLUSIVE_LOCKS_REQUIRED(Locks::heap_bitmap_lock_)
-      SHARED_LOCKS_REQUIRED(Locks::mutator_lock_) {
-    mark_sweep_->ScanObject(obj);
-  }
-
- private:
-  MarkSweep* const mark_sweep_;
-};
-
 // Populates the mark stack based on the set of marked objects and
 // recursively marks until the mark stack is emptied.
 void MarkSweep::RecursiveMark(bool partial, TimingLogger& timings) {
@@ -358,12 +390,11 @@
   CHECK(cleared_reference_list_ == NULL);
 
   const Spaces& spaces = heap_->GetSpaces();
-
   SetFingerVisitor set_finger_visitor(this);
   ScanObjectVisitor scan_visitor(this);
   for (Spaces::const_iterator it = spaces.begin(); it != spaces.end(); ++it) {
     ContinuousSpace* space = *it;
-    if (space->GetGcRetentionPolicy() == kGcRetentionPolicyAlwaysCollect ||
+    if ((!kDisableFinger && space->GetGcRetentionPolicy() == kGcRetentionPolicyAlwaysCollect) ||
         (!partial && space->GetGcRetentionPolicy() == kGcRetentionPolicyFullCollect)
         ) {
       current_mark_bitmap_ = space->GetMarkBitmap();
@@ -386,7 +417,7 @@
 
 void MarkSweep::RecursiveMarkCards(CardTable* card_table, const std::vector<byte*>& cards,
                                    TimingLogger& timings) {
-  ScanImageRootVisitor image_root_visitor(this);
+  ScanObjectVisitor image_root_visitor(this);
   SetFingerVisitor finger_visitor(this);
   const size_t card_count = cards.size();
   SpaceBitmap* active_bitmap = NULL;
@@ -399,14 +430,16 @@
     }
     if (active_bitmap == NULL || !active_bitmap->HasAddress(start_obj)) {
       active_bitmap = heap_->GetMarkBitmap()->GetSpaceBitmap(start_obj);
-#ifndef NDEBUG
-      if (active_bitmap == NULL) {
+      if (kIsDebugBuild && active_bitmap == NULL) {
         GetHeap()->DumpSpaces();
         LOG(FATAL) << "Object " << reinterpret_cast<const void*>(start_obj);
       }
-#endif
     }
-    active_bitmap->VisitMarkedRange(begin, end, image_root_visitor, finger_visitor);
+    if (kDisableFinger) {
+      active_bitmap->VisitMarkedRange(begin, end, image_root_visitor, IdentityFunctor());
+    } else {
+      active_bitmap->VisitMarkedRange(begin, end, image_root_visitor, finger_visitor);
+    }
   }
   timings.AddSplit("RecursiveMarkCards");
   ProcessMarkStack();
@@ -419,8 +452,8 @@
       !reinterpret_cast<MarkSweep*>(arg)->GetHeap()->GetLiveBitmap()->Test(object);
 }
 
-void MarkSweep::RecursiveMarkDirtyObjects(bool update_finger) {
-  ScanGrayObjects(update_finger);
+void MarkSweep::RecursiveMarkDirtyObjects() {
+  ScanGrayObjects();
   ProcessMarkStack();
 }
 
@@ -539,7 +572,7 @@
     DCHECK(thread == self || thread->IsSuspended() || thread->GetState() == kWaitingPerformingGc)
         << thread->GetState();
     WriterMutexLock mu(self, *Locks::heap_bitmap_lock_);
-    thread->VisitRoots(MarkSweep::MarkObjectVisitor, mark_sweep_);
+    thread->VisitRoots(MarkSweep::MarkObjectCallback, mark_sweep_);
     mark_sweep_->GetBarrier().Pass(self);
   }
 
@@ -561,8 +594,6 @@
 }
 
 void MarkSweep::SweepCallback(size_t num_ptrs, Object** ptrs, void* arg) {
-  size_t freed_objects = num_ptrs;
-  size_t freed_bytes = 0;
   SweepCallbackContext* context = static_cast<SweepCallbackContext*>(arg);
   MarkSweep* mark_sweep = context->mark_sweep;
   Heap* heap = mark_sweep->GetHeap();
@@ -572,17 +603,9 @@
   // Use a bulk free, that merges consecutive objects before freeing or free per object?
   // Documentation suggests better free performance with merging, but this may be at the expensive
   // of allocation.
-  // TODO: investigate performance
-  static const bool kUseFreeList = true;
-  if (kUseFreeList) {
-    // AllocSpace::FreeList clears the value in ptrs, so perform after clearing the live bit
-    freed_bytes += space->FreeList(self, num_ptrs, ptrs);
-  } else {
-    for (size_t i = 0; i < num_ptrs; ++i) {
-      freed_bytes += space->Free(self, ptrs[i]);
-    }
-  }
-
+  size_t freed_objects = num_ptrs;
+  // AllocSpace::FreeList clears the value in ptrs, so perform after clearing the live bit
+  size_t freed_bytes = space->FreeList(self, num_ptrs, ptrs);
   heap->RecordFree(freed_objects, freed_bytes);
   mark_sweep->freed_objects_ += freed_objects;
   mark_sweep->freed_bytes_ += freed_bytes;
@@ -606,7 +629,6 @@
 
   // If we don't swap bitmaps then newly allocated Weaks go into the live bitmap but not mark
   // bitmap, resulting in occasional frees of Weaks which are still in use.
-  // TODO: Fix when sweeping weaks works properly with mutators unpaused + allocation list.
   SweepSystemWeaksArray(allocations);
   logger.AddSplit("SweepSystemWeaks");
 
@@ -656,12 +678,13 @@
   logger.AddSplit("Reset stack");
 }
 
-void MarkSweep::Sweep(bool partial, bool swap_bitmaps) {
+void MarkSweep::Sweep(TimingLogger& timings, bool partial, bool swap_bitmaps) {
   DCHECK(mark_stack_->IsEmpty());
 
   // If we don't swap bitmaps then newly allocated Weaks go into the live bitmap but not mark
   // bitmap, resulting in occasional frees of Weaks which are still in use.
   SweepSystemWeaks();
+  timings.AddSplit("SweepSystemWeaks");
 
   const Spaces& spaces = heap_->GetSpaces();
   SweepCallbackContext scc;
@@ -693,6 +716,7 @@
       }
     }
   }
+  timings.AddSplit("Sweep");
 }
 
 void MarkSweep::SweepLargeObjects(bool swap_bitmaps) {
@@ -721,53 +745,6 @@
   GetHeap()->RecordFree(freed_objects, freed_bytes);
 }
 
-// Scans instance fields.
-inline void MarkSweep::ScanInstanceFields(const Object* obj) {
-  DCHECK(obj != NULL);
-  Class* klass = obj->GetClass();
-  DCHECK(klass != NULL);
-  ScanFields(obj, klass->GetReferenceInstanceOffsets(), false);
-}
-
-// Scans static storage on a Class.
-inline void MarkSweep::ScanStaticFields(const Class* klass) {
-  DCHECK(klass != NULL);
-  ScanFields(klass, klass->GetReferenceStaticOffsets(), true);
-}
-
-inline void MarkSweep::ScanFields(const Object* obj, uint32_t ref_offsets, bool is_static) {
-  if (ref_offsets != CLASS_WALK_SUPER) {
-    // Found a reference offset bitmap.  Mark the specified offsets.
-    while (ref_offsets != 0) {
-      const size_t right_shift = CLZ(ref_offsets);
-      MemberOffset byte_offset = CLASS_OFFSET_FROM_CLZ(right_shift);
-      const Object* ref = obj->GetFieldObject<const Object*>(byte_offset, false);
-      MarkObject(ref);
-      ref_offsets ^= CLASS_HIGH_BIT >> right_shift;
-    }
-  } else {
-    // There is no reference offset bitmap.  In the non-static case,
-    // walk up the class inheritance hierarchy and find reference
-    // offsets the hard way. In the static case, just consider this
-    // class.
-    for (const Class* klass = is_static ? obj->AsClass() : obj->GetClass();
-         klass != NULL;
-         klass = is_static ? NULL : klass->GetSuperClass()) {
-      size_t num_reference_fields = (is_static
-                                     ? klass->NumReferenceStaticFields()
-                                     : klass->NumReferenceInstanceFields());
-      for (size_t i = 0; i < num_reference_fields; ++i) {
-        Field* field = (is_static
-                        ? klass->GetStaticField(i)
-                        : klass->GetInstanceField(i));
-        MemberOffset field_offset = field->GetOffset();
-        const Object* ref = obj->GetFieldObject<const Object*>(field_offset, false);
-        MarkObject(ref);
-      }
-    }
-  }
-}
-
 void MarkSweep::CheckReference(const Object* obj, const Object* ref, MemberOffset offset, bool is_static) {
   const Spaces& spaces = heap_->GetSpaces();
   // TODO: C++0x auto
@@ -812,32 +789,6 @@
   }
 }
 
-// Scans the header, static field references, and interface pointers
-// of a class object.
-inline void MarkSweep::ScanClass(const Object* obj) {
-#ifndef NDEBUG
-  ++class_count_;
-#endif
-  ScanInstanceFields(obj);
-  ScanStaticFields(obj->AsClass());
-}
-
-// Scans the header of all array objects.  If the array object is
-// specialized to a reference type, scans the array data as well.
-inline void MarkSweep::ScanArray(const Object* obj) {
-#ifndef NDEBUG
-  ++array_count_;
-#endif
-  MarkObject(obj->GetClass());
-  if (obj->IsObjectArray()) {
-    const ObjectArray<Object>* array = obj->AsObjectArray<Object>();
-    for (int32_t i = 0; i < array->GetLength(); ++i) {
-      const Object* element = array->GetWithoutChecks(i);
-      MarkObject(element);
-    }
-  }
-}
-
 // Process the "referent" field in a java.lang.ref.Reference.  If the
 // referent has not yet been marked, put it on the appropriate list in
 // the gcHeap for later processing.
@@ -860,49 +811,205 @@
       list = &phantom_reference_list_;
     }
     DCHECK(list != NULL) << PrettyClass(klass) << " " << std::hex << klass->GetAccessFlags();
+    // TODO: One lock per list?
     heap_->EnqueuePendingReference(obj, list);
   }
 }
 
-// Scans the header and field references of a data object.  If the
-// scanned object is a reference subclass, it is scheduled for later
-// processing.
-inline void MarkSweep::ScanOther(const Object* obj) {
-#ifndef NDEBUG
-  ++other_count_;
-#endif
-  ScanInstanceFields(obj);
-  if (obj->GetClass()->IsReferenceClass()) {
-    DelayReferenceReferent(const_cast<Object*>(obj));
-  }
-}
-
 void MarkSweep::ScanRoot(const Object* obj) {
   ScanObject(obj);
 }
 
+class MarkObjectVisitor {
+ public:
+  MarkObjectVisitor(MarkSweep* const mark_sweep) : mark_sweep_(mark_sweep) {
+  }
+
+  void operator ()(const Object* /* obj */, const Object* ref, const MemberOffset& /* offset */,
+                   bool /* is_static */) const
+      EXCLUSIVE_LOCKS_REQUIRED(Locks::heap_bitmap_lock_)
+      SHARED_LOCKS_REQUIRED(Locks::mutator_lock_) {
+    mark_sweep_->MarkObject(ref);
+  }
+
+ private:
+  MarkSweep* const mark_sweep_;
+};
+
 // Scans an object reference.  Determines the type of the reference
 // and dispatches to a specialized scanning routine.
 void MarkSweep::ScanObject(const Object* obj) {
-  DCHECK(obj != NULL);
-  DCHECK(obj->GetClass() != NULL);
-#ifndef NDEBUG
-  if (!IsMarked(obj)) {
-    heap_->DumpSpaces();
-    LOG(FATAL) << "Scanning unmarked object " << reinterpret_cast<const void*>(obj);
+  MarkObjectVisitor visitor(this);
+  ScanObjectVisit(obj, visitor);
+}
+
+class MarkStackChunk : public Task {
+public:
+  MarkStackChunk(ThreadPool* thread_pool, MarkSweep* mark_sweep, Object** begin, Object** end)
+      : mark_sweep_(mark_sweep),
+        thread_pool_(thread_pool),
+        index_(0),
+        length_(0),
+        output_(NULL) {
+    length_ = end - begin;
+    if (begin != end) {
+      // Cost not significant since we only do this for the initial set of mark stack chunks.
+      memcpy(data_, begin, length_ * sizeof(*begin));
+    }
+    if (kCountTasks) {
+      ++mark_sweep_->work_chunks_created_;
+    }
   }
-#endif
-  if (obj->IsClass()) {
-    ScanClass(obj);
-  } else if (obj->IsArrayInstance()) {
-    ScanArray(obj);
-  } else {
-    ScanOther(obj);
+
+  ~MarkStackChunk() {
+    DCHECK(output_ == NULL || output_->length_ == 0);
+    DCHECK_GE(index_, length_);
+    delete output_;
+    if (kCountTasks) {
+      ++mark_sweep_->work_chunks_deleted_;
+    }
   }
+
+  MarkSweep* const mark_sweep_;
+  ThreadPool* const thread_pool_;
+  static const size_t max_size = 1 * KB;
+  // Index of which object we are scanning. Only needs to be atomic if we are doing work stealing.
+  size_t index_;
+  // Input / output mark stack. We add newly marked references to data_ until length reaches
+  // max_size. This is an optimization so that less tasks are created.
+  // TODO: Investigate using a bounded buffer FIFO.
+  Object* data_[max_size];
+  // How many elements in data_ we need to scan.
+  size_t length_;
+  // Output block, newly marked references get added to the ouput block so that another thread can
+  // scan them.
+  MarkStackChunk* output_;
+
+  class MarkObjectParallelVisitor {
+   public:
+    MarkObjectParallelVisitor(MarkStackChunk* chunk_task) : chunk_task_(chunk_task) {
+
+    }
+
+    void operator ()(const Object* /* obj */, const Object* ref,
+                     const MemberOffset& /* offset */, bool /* is_static */) const {
+      if (ref != NULL && chunk_task_->mark_sweep_->MarkObjectParallel(ref)) {
+        chunk_task_->MarkStackPush(ref);
+      }
+    }
+
+   private:
+    MarkStackChunk* const chunk_task_;
+  };
+
+  // Push an object into the block.
+  // Don't need to use atomic ++ since we only one thread is writing to an output block at any
+  // given time.
+  void Push(Object* obj) {
+    data_[length_++] = obj;
+  }
+
+  void MarkStackPush(const Object* obj) {
+    if (static_cast<size_t>(length_) < max_size) {
+      Push(const_cast<Object*>(obj));
+    } else {
+      // Internal buffer is full, push to a new buffer instead.
+      if (UNLIKELY(output_ == NULL)) {
+        AllocateOutputChunk();
+      } else if (UNLIKELY(static_cast<size_t>(output_->length_) == max_size)) {
+        // Output block is full, queue it up for processing and obtain a new block.
+        EnqueueOutput();
+        AllocateOutputChunk();
+      }
+      output_->Push(const_cast<Object*>(obj));
+    }
+  }
+
+  void ScanObject(Object* obj) {
+    mark_sweep_->ScanObjectVisit(obj, MarkObjectParallelVisitor(this));
+  }
+
+  void EnqueueOutput() {
+    if (output_ != NULL) {
+      uint64_t start = 0;
+      if (kMeasureOverhead) {
+        start = NanoTime();
+      }
+      thread_pool_->AddTask(Thread::Current(), output_);
+      output_ = NULL;
+      if (kMeasureOverhead) {
+        mark_sweep_->overhead_time_ += NanoTime() - start;
+      }
+    }
+  }
+
+  void AllocateOutputChunk() {
+    uint64_t start = 0;
+    if (kMeasureOverhead) {
+      start = NanoTime();
+    }
+    output_ = new MarkStackChunk(thread_pool_, mark_sweep_, NULL, NULL);
+    if (kMeasureOverhead) {
+      mark_sweep_->overhead_time_ += NanoTime() - start;
+    }
+  }
+
+  void Finalize() {
+    EnqueueOutput();
+    delete this;
+  }
+
+  // Scans all of the objects
+  virtual void Run(Thread* self) {
+    int index;
+    while ((index = index_++) < length_) {
+      static const size_t prefetch_look_ahead = 4;
+      if (kUseMarkStackPrefetch) {
+        if (index + prefetch_look_ahead < length_) {
+          __builtin_prefetch(&data_[index + prefetch_look_ahead]);
+        } else {
+          __builtin_prefetch(&data_[length_ - 1]);
+        }
+      }
+      Object* obj = data_[index];
+      DCHECK(obj != NULL);
+      ScanObject(obj);
+    }
+  }
+};
+
+void MarkSweep::ProcessMarkStackParallel() {
+  CHECK(kDisableFinger) << "parallel mark stack processing cannot work when finger is enabled";
+  Thread* self = Thread::Current();
+  ThreadPool* thread_pool = GetHeap()->GetThreadPool();
+  // Split the current mark stack up into work tasks.
+  const size_t num_threads = thread_pool->GetThreadCount();
+  const size_t stack_size = mark_stack_->Size();
+  const size_t chunk_size =
+      std::min((stack_size + num_threads - 1) / num_threads,
+               static_cast<size_t>(MarkStackChunk::max_size));
+  size_t index = 0;
+  for (size_t i = 0; i < num_threads || index < stack_size; ++i) {
+    Object** begin = &mark_stack_->Begin()[std::min(stack_size, index)];
+    Object** end = &mark_stack_->Begin()[std::min(stack_size, index + chunk_size)];
+    index += chunk_size;
+    thread_pool->AddTask(self, new MarkStackChunk(thread_pool, this, begin, end));
+  }
+  thread_pool->StartWorkers(self);
+  mark_stack_->Reset();
+  thread_pool->Wait(self, true);
+  //LOG(INFO) << "Idle wait time " << PrettyDuration(thread_pool->GetWaitTime());
+  CHECK_EQ(work_chunks_created_, work_chunks_deleted_) << " some of the work chunks were leaked";
 }
 
 // Scan anything that's on the mark stack.
 void MarkSweep::ProcessMarkStack() {
+  ThreadPool* thread_pool = GetHeap()->GetThreadPool();
+  if (kParallelMarkStack && thread_pool != NULL && thread_pool->GetThreadCount() > 0) {
+    ProcessMarkStackParallel();
+    return;
+  }
+
   if (kUseMarkStackPrefetch) {
     const size_t fifo_size = 4;
     const size_t fifo_mask = fifo_size - 1;
@@ -1096,13 +1203,31 @@
 }
 
 MarkSweep::~MarkSweep() {
-#ifndef NDEBUG
-  VLOG(heap) << "MarkSweep scanned classes=" << class_count_ << " arrays=" << array_count_ << " other=" << other_count_;
-#endif
+  if (class_count_ != 0 || array_count_ != 0 || other_count_ != 0) {
+    LOG(INFO) << "MarkSweep scanned classes=" << class_count_ << " arrays=" << array_count_
+               << " other=" << other_count_;
+  }
+
+  if (kCountTasks) {
+    LOG(INFO) << "Total number of work chunks allocated: " << work_chunks_created_;
+  }
+
+  if (kMeasureOverhead) {
+    LOG(INFO) << "Overhead time " << PrettyDuration(overhead_time_);
+  }
+
+  if (kProfileLargeObjects) {
+    LOG(INFO) << "Large objects tested " << large_object_test_ << " marked " << large_object_mark_;
+  }
+
+  if (kCountClassesMarked) {
+    LOG(INFO) << "Classes marked " << classes_marked_;
+  }
+
   // Ensure that the mark stack is empty.
   CHECK(mark_stack_->IsEmpty());
 
-  // Clear all of the alloc spaces' mark bitmaps.
+  // Clear all of the spaces' mark bitmaps.
   const Spaces& spaces = heap_->GetSpaces();
   // TODO: C++0x auto
   for (Spaces::const_iterator it = spaces.begin(); it != spaces.end(); ++it) {