Add more runtime options.

Changed HeapGCThreads to be split into two different options:
-XX:ParallelGCThreads: Which specifies how many threads the GC may
use when the mutators are suspended.

-XX:ConcGCThreads: Which specifies how many threads the GC may use
when the mutators are running.

Added runtime options to specify long pause / long GC thresholds:
-XX:LongPauseThreshold (default 5ms)
-XX:LongGCThreshold (default 100ms)
These thresholds were previously constants, but are now runtime
options. If we exceed either of the thresholds, we print the GC
message.

Added a new runtime option: -XX:IgnoreMaxFootprint which makes it
that the GC only does GC when the number of bytes allocated hits
the growth limit. This causes GC to occur much less frequently and
can be useful to measure how much of an impact GC has on performance.

Changed the GC behaviour to use only one thread when we do not care
about pauses to prevent jank that can be caused by 2 simultaneous GC
on different processes fighting for CPU time.

Added thread pool functionality for changing the maximum number of
active workers.

Fixed an accounting error where we didn't count large objects in the
total freed.

Bug: 9986416

Change-Id: I86afa358d93dcd3780e18ac5d85bdb1a130cb7e7
diff --git a/runtime/gc/collector/mark_sweep.cc b/runtime/gc/collector/mark_sweep.cc
index a854971..79bf39b 100644
--- a/runtime/gc/collector/mark_sweep.cc
+++ b/runtime/gc/collector/mark_sweep.cc
@@ -742,11 +742,23 @@
   }
 };
 
+size_t MarkSweep::GetThreadCount(bool paused) const {
+  if (heap_->GetThreadPool() == nullptr || !heap_->CareAboutPauseTimes()) {
+    return 0;
+  }
+  if (paused) {
+    return heap_->GetParallelGCThreadCount() + 1;
+  } else {
+    return heap_->GetConcGCThreadCount() + 1;
+  }
+}
+
 void MarkSweep::ScanGrayObjects(bool paused, byte minimum_age) {
   accounting::CardTable* card_table = GetHeap()->GetCardTable();
   ThreadPool* thread_pool = GetHeap()->GetThreadPool();
-  const bool parallel = kParallelCardScan && thread_pool != nullptr;
-  if (parallel) {
+  size_t thread_count = GetThreadCount(paused);
+  // The parallel version with only one thread is faster for card scanning, TODO: fix.
+  if (kParallelCardScan && thread_count > 0) {
     Thread* self = Thread::Current();
     // Can't have a different split for each space since multiple spaces can have their cards being
     // scanned at the same time.
@@ -755,7 +767,6 @@
     const Object** mark_stack_begin = const_cast<const Object**>(mark_stack_->Begin());
     const Object** mark_stack_end = const_cast<const Object**>(mark_stack_->End());
     const size_t mark_stack_size = mark_stack_end - mark_stack_begin;
-    const size_t thread_count = thread_pool->GetThreadCount() + 1;
     // Estimated number of work tasks we will create.
     const size_t mark_stack_tasks = GetHeap()->GetContinuousSpaces().size() * thread_count;
     DCHECK_NE(mark_stack_tasks, 0U);
@@ -788,8 +799,9 @@
         card_begin += card_increment;
       }
     }
+    thread_pool->SetMaxActiveWorkers(thread_count - 1);
     thread_pool->StartWorkers(self);
-    thread_pool->Wait(self, paused, true);  // Only do work in the main thread if we are paused.
+    thread_pool->Wait(self, true, true);
     thread_pool->StopWorkers(self);
     timings_.EndSplit();
   } else {
@@ -885,7 +897,8 @@
     ScanObjectVisitor scan_visitor(this);
     auto* self = Thread::Current();
     ThreadPool* thread_pool = heap_->GetThreadPool();
-    const bool parallel = kParallelRecursiveMark && thread_pool != NULL;
+    size_t thread_count = GetThreadCount(false);
+    const bool parallel = kParallelRecursiveMark && thread_count > 1;
     mark_stack_->Reset();
     for (const auto& space : GetHeap()->GetContinuousSpaces()) {
       if ((space->GetGcRetentionPolicy() == space::kGcRetentionPolicyAlwaysCollect) ||
@@ -904,7 +917,7 @@
           atomic_finger_ = static_cast<int32_t>(0xFFFFFFFF);
 
           // Create a few worker tasks.
-          size_t n = (thread_pool->GetThreadCount() + 1) * 2;
+          const size_t n = thread_count * 2;
           while (begin != end) {
             uintptr_t start = begin;
             uintptr_t delta = (end - begin) / n;
@@ -915,8 +928,9 @@
                                                begin);
             thread_pool->AddTask(self, task);
           }
+          thread_pool->SetMaxActiveWorkers(thread_count - 1);
           thread_pool->StartWorkers(self);
-          thread_pool->Wait(self, false, true);
+          thread_pool->Wait(self, true, true);
           thread_pool->StopWorkers(self);
         } else {
           // This function does not handle heap end increasing, so we must use the space end.
@@ -1369,13 +1383,11 @@
   ScanObjectVisit(obj, visitor);
 }
 
-void MarkSweep::ProcessMarkStackParallel(bool paused) {
+void MarkSweep::ProcessMarkStackParallel(size_t thread_count) {
   Thread* self = Thread::Current();
   ThreadPool* thread_pool = GetHeap()->GetThreadPool();
-  const size_t num_threads = thread_pool->GetThreadCount();
-  const size_t chunk_size =
-      std::min(mark_stack_->Size() / num_threads + 1,
-               static_cast<size_t>(MarkStackTask<false>::kMaxSize));
+  const size_t chunk_size = std::min(mark_stack_->Size() / thread_count + 1,
+                                     static_cast<size_t>(MarkStackTask<false>::kMaxSize));
   CHECK_GT(chunk_size, 0U);
   // Split the current mark stack up into work tasks.
   for (mirror::Object **it = mark_stack_->Begin(), **end = mark_stack_->End(); it < end; ) {
@@ -1384,10 +1396,9 @@
                                                         const_cast<const mirror::Object**>(it)));
     it += delta;
   }
+  thread_pool->SetMaxActiveWorkers(thread_count - 1);
   thread_pool->StartWorkers(self);
-  // Don't do work in the main thread since it assumed at least one other thread will require CPU
-  // time during the GC.
-  thread_pool->Wait(self, paused, true);
+  thread_pool->Wait(self, true, true);
   thread_pool->StopWorkers(self);
   mark_stack_->Reset();
   CHECK_EQ(work_chunks_created_, work_chunks_deleted_) << " some of the work chunks were leaked";
@@ -1396,10 +1407,10 @@
 // Scan anything that's on the mark stack.
 void MarkSweep::ProcessMarkStack(bool paused) {
   timings_.StartSplit("ProcessMarkStack");
-  const bool parallel = kParallelProcessMarkStack && GetHeap()->GetThreadPool() &&
-      mark_stack_->Size() >= kMinimumParallelMarkStackSize;
-  if (parallel) {
-    ProcessMarkStackParallel(paused);
+  size_t thread_count = GetThreadCount(paused);
+  if (kParallelProcessMarkStack && thread_count > 1 &&
+      mark_stack_->Size() >= kMinimumParallelMarkStackSize) {
+    ProcessMarkStackParallel(thread_count);
   } else {
     // TODO: Tune this.
     static const size_t kFifoSize = 4;
@@ -1610,8 +1621,8 @@
   total_time_ns_ += GetDurationNs();
   total_paused_time_ns_ += std::accumulate(GetPauseTimes().begin(), GetPauseTimes().end(), 0,
                                            std::plus<uint64_t>());
-  total_freed_objects_ += GetFreedObjects();
-  total_freed_bytes_ += GetFreedBytes();
+  total_freed_objects_ += GetFreedObjects() + GetFreedLargeObjects();
+  total_freed_bytes_ += GetFreedBytes() + GetFreedLargeObjectBytes();
 
   // Ensure that the mark stack is empty.
   CHECK(mark_stack_->IsEmpty());
diff --git a/runtime/gc/collector/mark_sweep.h b/runtime/gc/collector/mark_sweep.h
index 8430839..dbec3e9 100644
--- a/runtime/gc/collector/mark_sweep.h
+++ b/runtime/gc/collector/mark_sweep.h
@@ -308,6 +308,10 @@
   // Expand mark stack to 2x its current size. Thread safe.
   void ExpandMarkStack();
 
+  // Returns how many threads we should use for the current GC phase based on if we are paused,
+  // whether or not we care about pauses.
+  size_t GetThreadCount(bool paused) const;
+
   // Returns true if an object is inside of the immune region (assumed to be marked).
   bool IsImmune(const mirror::Object* obj) const {
     return obj >= immune_begin_ && obj < immune_end_;
@@ -367,7 +371,7 @@
       EXCLUSIVE_LOCKS_REQUIRED(Locks::heap_bitmap_lock_)
       SHARED_LOCKS_REQUIRED(Locks::mutator_lock_);
 
-  void ProcessMarkStackParallel(bool paused)
+  void ProcessMarkStackParallel(size_t thread_count)
       EXCLUSIVE_LOCKS_REQUIRED(Locks::heap_bitmap_lock_)
       SHARED_LOCKS_REQUIRED(Locks::mutator_lock_);
 
diff --git a/runtime/gc/heap.cc b/runtime/gc/heap.cc
index 800159a..e20c2c5 100644
--- a/runtime/gc/heap.cc
+++ b/runtime/gc/heap.cc
@@ -58,10 +58,6 @@
 namespace art {
 namespace gc {
 
-// When to create a log message about a slow GC, 100ms.
-static constexpr uint64_t kSlowGcThreshold = MsToNs(100);
-// When to create a log message about a long pause, 5ms.
-static constexpr uint64_t kLongGcPauseThreshold = MsToNs(5);
 static constexpr bool kGCALotMode = false;
 static constexpr size_t kGcAlotInterval = KB;
 static constexpr bool kDumpGcPerformanceOnShutdown = false;
@@ -72,12 +68,18 @@
 
 Heap::Heap(size_t initial_size, size_t growth_limit, size_t min_free, size_t max_free,
            double target_utilization, size_t capacity, const std::string& original_image_file_name,
-           bool concurrent_gc, size_t num_gc_threads, bool low_memory_mode)
+           bool concurrent_gc, size_t parallel_gc_threads, size_t conc_gc_threads,
+           bool low_memory_mode, size_t long_pause_log_threshold, size_t long_gc_log_threshold,
+           bool ignore_max_footprint)
     : alloc_space_(NULL),
       card_table_(NULL),
       concurrent_gc_(concurrent_gc),
-      num_gc_threads_(num_gc_threads),
+      parallel_gc_threads_(parallel_gc_threads),
+      conc_gc_threads_(conc_gc_threads),
       low_memory_mode_(low_memory_mode),
+      long_pause_log_threshold_(long_pause_log_threshold),
+      long_gc_log_threshold_(long_gc_log_threshold),
+      ignore_max_footprint_(ignore_max_footprint),
       have_zygote_space_(false),
       soft_ref_queue_lock_(NULL),
       weak_ref_queue_lock_(NULL),
@@ -230,6 +232,11 @@
   last_gc_time_ns_ = NanoTime();
   last_gc_size_ = GetBytesAllocated();
 
+  if (ignore_max_footprint_) {
+    SetIdealFootprint(std::numeric_limits<size_t>::max());
+    concurrent_start_bytes_ = max_allowed_footprint_;
+  }
+
   // Create our garbage collectors.
   for (size_t i = 0; i < 2; ++i) {
     const bool concurrent = i != 0;
@@ -245,13 +252,14 @@
 }
 
 void Heap::CreateThreadPool() {
-  if (num_gc_threads_ != 0) {
-    thread_pool_.reset(new ThreadPool(num_gc_threads_));
+  const size_t num_threads = std::max(parallel_gc_threads_, conc_gc_threads_);
+  if (num_threads != 0) {
+    thread_pool_.reset(new ThreadPool(num_threads));
   }
 }
 
 void Heap::DeleteThreadPool() {
-  thread_pool_.reset(NULL);
+  thread_pool_.reset(nullptr);
 }
 
 static bool ReadStaticInt(JNIEnvExt* env, jclass clz, const char* name, int* out_value) {
@@ -1249,11 +1257,11 @@
     const size_t duration = collector->GetDurationNs();
     std::vector<uint64_t> pauses = collector->GetPauseTimes();
     // GC for alloc pauses the allocating thread, so consider it as a pause.
-    bool was_slow = duration > kSlowGcThreshold ||
-            (gc_cause == kGcCauseForAlloc && duration > kLongGcPauseThreshold);
+    bool was_slow = duration > long_gc_log_threshold_ ||
+            (gc_cause == kGcCauseForAlloc && duration > long_pause_log_threshold_);
     if (!was_slow) {
       for (uint64_t pause : pauses) {
-        was_slow = was_slow || pause > kLongGcPauseThreshold;
+        was_slow = was_slow || pause > long_pause_log_threshold_;
       }
     }
 
@@ -1702,7 +1710,7 @@
         wait_time = NanoTime() - wait_start;
         total_wait_time_ += wait_time;
       }
-      if (wait_time > kLongGcPauseThreshold) {
+      if (wait_time > long_pause_log_threshold_) {
         LOG(INFO) << "WaitForConcurrentGcToComplete blocked for " << PrettyDuration(wait_time);
       }
     }
@@ -1776,28 +1784,32 @@
       target_size = std::max(bytes_allocated, max_allowed_footprint_);
     }
   }
-  SetIdealFootprint(target_size);
 
-  // Calculate when to perform the next ConcurrentGC.
-  if (concurrent_gc_) {
-    // Calculate the estimated GC duration.
-    double gc_duration_seconds = NsToMs(gc_duration) / 1000.0;
-    // Estimate how many remaining bytes we will have when we need to start the next GC.
-    size_t remaining_bytes = allocation_rate_ * gc_duration_seconds;
-    remaining_bytes = std::max(remaining_bytes, kMinConcurrentRemainingBytes);
-    if (UNLIKELY(remaining_bytes > max_allowed_footprint_)) {
-      // A never going to happen situation that from the estimated allocation rate we will exceed
-      // the applications entire footprint with the given estimated allocation rate. Schedule
-      // another GC straight away.
-      concurrent_start_bytes_ = bytes_allocated;
-    } else {
-      // Start a concurrent GC when we get close to the estimated remaining bytes. When the
-      // allocation rate is very high, remaining_bytes could tell us that we should start a GC
-      // right away.
-      concurrent_start_bytes_ = std::max(max_allowed_footprint_ - remaining_bytes, bytes_allocated);
+  if (!ignore_max_footprint_) {
+    SetIdealFootprint(target_size);
+
+    if (concurrent_gc_) {
+      // Calculate when to perform the next ConcurrentGC.
+
+      // Calculate the estimated GC duration.
+      double gc_duration_seconds = NsToMs(gc_duration) / 1000.0;
+      // Estimate how many remaining bytes we will have when we need to start the next GC.
+      size_t remaining_bytes = allocation_rate_ * gc_duration_seconds;
+      remaining_bytes = std::max(remaining_bytes, kMinConcurrentRemainingBytes);
+      if (UNLIKELY(remaining_bytes > max_allowed_footprint_)) {
+        // A never going to happen situation that from the estimated allocation rate we will exceed
+        // the applications entire footprint with the given estimated allocation rate. Schedule
+        // another GC straight away.
+        concurrent_start_bytes_ = bytes_allocated;
+      } else {
+        // Start a concurrent GC when we get close to the estimated remaining bytes. When the
+        // allocation rate is very high, remaining_bytes could tell us that we should start a GC
+        // right away.
+        concurrent_start_bytes_ = std::max(max_allowed_footprint_ - remaining_bytes, bytes_allocated);
+      }
+      DCHECK_LE(concurrent_start_bytes_, max_allowed_footprint_);
+      DCHECK_LE(max_allowed_footprint_, growth_limit_);
     }
-    DCHECK_LE(concurrent_start_bytes_, max_allowed_footprint_);
-    DCHECK_LE(max_allowed_footprint_, growth_limit_);
   }
 
   UpdateMaxNativeFootprint();
diff --git a/runtime/gc/heap.h b/runtime/gc/heap.h
index cda252e..c93dacb 100644
--- a/runtime/gc/heap.h
+++ b/runtime/gc/heap.h
@@ -107,6 +107,8 @@
   static constexpr size_t kDefaultMaximumSize = 32 * MB;
   static constexpr size_t kDefaultMaxFree = 2 * MB;
   static constexpr size_t kDefaultMinFree = kDefaultMaxFree / 4;
+  static constexpr size_t kDefaultLongPauseLogThreshold = MsToNs(5);
+  static constexpr size_t kDefaultLongGCLogThreshold = MsToNs(100);
 
   // Default target utilization.
   static constexpr double kDefaultTargetUtilization = 0.5;
@@ -120,7 +122,8 @@
   explicit Heap(size_t initial_size, size_t growth_limit, size_t min_free,
                 size_t max_free, double target_utilization, size_t capacity,
                 const std::string& original_image_file_name, bool concurrent_gc,
-                size_t num_gc_threads, bool low_memory_mode);
+                size_t parallel_gc_threads, size_t conc_gc_threads, bool low_memory_mode,
+                size_t long_pause_threshold, size_t long_gc_threshold, bool ignore_max_footprint);
 
   ~Heap();
 
@@ -401,12 +404,23 @@
   // GC performance measuring
   void DumpGcPerformanceInfo(std::ostream& os);
 
+  // Returns true if we currently care about pause times.
+  bool CareAboutPauseTimes() const {
+    return care_about_pause_times_;
+  }
+
   // Thread pool.
   void CreateThreadPool();
   void DeleteThreadPool();
   ThreadPool* GetThreadPool() {
     return thread_pool_.get();
   }
+  size_t GetParallelGCThreadCount() const {
+    return parallel_gc_threads_;
+  }
+  size_t GetConcGCThreadCount() const {
+    return conc_gc_threads_;
+  }
 
  private:
   // Allocates uninitialized storage. Passing in a null space tries to place the object in the
@@ -514,12 +528,26 @@
   // false for stop-the-world mark sweep.
   const bool concurrent_gc_;
 
-  // How many GC threads we may use for garbage collection.
-  const size_t num_gc_threads_;
+  // How many GC threads we may use for paused parts of garbage collection.
+  const size_t parallel_gc_threads_;
+
+  // How many GC threads we may use for unpaused parts of garbage collection.
+  const size_t conc_gc_threads_;
 
   // Boolean for if we are in low memory mode.
   const bool low_memory_mode_;
 
+  // If we get a pause longer than long pause log threshold, then we print out the GC after it
+  // finishes.
+  const size_t long_pause_log_threshold_;
+
+  // If we get a GC longer than long GC log threshold, then we print out the GC after it finishes.
+  const size_t long_gc_log_threshold_;
+
+  // If we ignore the max footprint it lets the heap grow until it hits the heap capacity, this is
+  // useful for benchmarking since it reduces time spent in GC to a low %.
+  const bool ignore_max_footprint_;
+
   // If we have a zygote space.
   bool have_zygote_space_;
 
@@ -544,14 +572,18 @@
 
   // Maximum size that the heap can reach.
   const size_t capacity_;
+
   // The size the heap is limited to. This is initially smaller than capacity, but for largeHeap
   // programs it is "cleared" making it the same as capacity.
   size_t growth_limit_;
+
   // When the number of bytes allocated exceeds the footprint TryAllocate returns NULL indicating
   // a GC should be triggered.
   size_t max_allowed_footprint_;
+
   // The watermark at which a concurrent GC is requested by registerNativeAllocation.
   size_t native_footprint_gc_watermark_;
+
   // The watermark at which a GC is performed inside of registerNativeAllocation.
   size_t native_footprint_limit_;
 
diff --git a/runtime/runtime.cc b/runtime/runtime.cc
index c4a9503..31800ce 100644
--- a/runtime/runtime.cc
+++ b/runtime/runtime.cc
@@ -339,7 +339,9 @@
   parsed->heap_target_utilization_ = gc::Heap::kDefaultTargetUtilization;
   parsed->heap_growth_limit_ = 0;  // 0 means no growth limit.
   // Default to number of processors minus one since the main GC thread also does work.
-  parsed->heap_gc_threads_ = sysconf(_SC_NPROCESSORS_CONF) - 1;
+  parsed->parallel_gc_threads_ = sysconf(_SC_NPROCESSORS_CONF) - 1;
+  // Only the main GC thread, no workers.
+  parsed->conc_gc_threads_ = 0;
   parsed->stack_size_ = 0;  // 0 means default.
   parsed->low_memory_mode_ = false;
 
@@ -349,6 +351,10 @@
   parsed->is_concurrent_gc_enabled_ = true;
   parsed->is_explicit_gc_disabled_ = false;
 
+  parsed->long_pause_log_threshold_ = gc::Heap::kDefaultLongPauseLogThreshold;
+  parsed->long_gc_log_threshold_ = gc::Heap::kDefaultLongGCLogThreshold;
+  parsed->ignore_max_footprint_ = false;
+
   parsed->lock_profiling_threshold_ = 0;
   parsed->hook_is_sensitive_thread_ = NULL;
 
@@ -480,9 +486,12 @@
         return NULL;
       }
       parsed->heap_target_utilization_ = value;
-    } else if (StartsWith(option, "-XX:HeapGCThreads=")) {
-      parsed->heap_gc_threads_ =
-          ParseMemoryOption(option.substr(strlen("-XX:HeapGCThreads=")).c_str(), 1024);
+    } else if (StartsWith(option, "-XX:ParallelGCThreads=")) {
+      parsed->parallel_gc_threads_ =
+          ParseMemoryOption(option.substr(strlen("-XX:ParallelGCThreads=")).c_str(), 1024);
+    } else if (StartsWith(option, "-XX:ConcGCThreads=")) {
+      parsed->conc_gc_threads_ =
+          ParseMemoryOption(option.substr(strlen("-XX:ConcGCThreads=")).c_str(), 1024);
     } else if (StartsWith(option, "-Xss")) {
       size_t size = ParseMemoryOption(option.substr(strlen("-Xss")).c_str(), 1);
       if (size == 0) {
@@ -494,6 +503,14 @@
         return NULL;
       }
       parsed->stack_size_ = size;
+    } else if (option == "-XX:LongPauseLogThreshold") {
+      parsed->long_pause_log_threshold_ =
+          ParseMemoryOption(option.substr(strlen("-XX:LongPauseLogThreshold=")).c_str(), 1024);
+    } else if (option == "-XX:LongGCLogThreshold") {
+          parsed->long_gc_log_threshold_ =
+              ParseMemoryOption(option.substr(strlen("-XX:LongGCLogThreshold")).c_str(), 1024);
+    } else if (option == "-XX:IgnoreMaxFootprint") {
+      parsed->ignore_max_footprint_ = true;
     } else if (option == "-XX:LowMemoryMode") {
       parsed->low_memory_mode_ = true;
     } else if (StartsWith(option, "-D")) {
@@ -865,8 +882,12 @@
                        options->heap_maximum_size_,
                        options->image_,
                        options->is_concurrent_gc_enabled_,
-                       options->heap_gc_threads_,
-                       options->low_memory_mode_);
+                       options->parallel_gc_threads_,
+                       options->conc_gc_threads_,
+                       options->low_memory_mode_,
+                       options->long_pause_log_threshold_,
+                       options->long_gc_log_threshold_,
+                       options->ignore_max_footprint_);
 
   BlockSignals();
   InitPlatformSignalHandlers();
diff --git a/runtime/runtime.h b/runtime/runtime.h
index 8aba762..6b78ce4 100644
--- a/runtime/runtime.h
+++ b/runtime/runtime.h
@@ -100,13 +100,17 @@
     bool interpreter_only_;
     bool is_concurrent_gc_enabled_;
     bool is_explicit_gc_disabled_;
+    size_t long_pause_log_threshold_;
+    size_t long_gc_log_threshold_;
+    bool ignore_max_footprint_;
     size_t heap_initial_size_;
     size_t heap_maximum_size_;
     size_t heap_growth_limit_;
-    size_t heap_gc_threads_;
     size_t heap_min_free_;
     size_t heap_max_free_;
     double heap_target_utilization_;
+    size_t parallel_gc_threads_;
+    size_t conc_gc_threads_;
     size_t stack_size_;
     bool low_memory_mode_;
     size_t lock_profiling_threshold_;
diff --git a/runtime/thread_pool.cc b/runtime/thread_pool.cc
index 39d30bb2..674ab9d 100644
--- a/runtime/thread_pool.cc
+++ b/runtime/thread_pool.cc
@@ -81,7 +81,8 @@
     start_time_(0),
     total_wait_time_(0),
     // Add one since the caller of constructor waits on the barrier too.
-    creation_barier_(num_threads + 1) {
+    creation_barier_(num_threads + 1),
+    max_active_workers_(num_threads) {
   Thread* self = Thread::Current();
   while (GetThreadCount() < num_threads) {
     const std::string name = StringPrintf("Thread pool worker %zu", GetThreadCount());
@@ -91,6 +92,12 @@
   creation_barier_.Wait(self);
 }
 
+void ThreadPool::SetMaxActiveWorkers(size_t threads) {
+  MutexLock mu(Thread::Current(), task_queue_lock_);
+  CHECK_LE(threads, GetThreadCount());
+  max_active_workers_ = threads;
+}
+
 ThreadPool::~ThreadPool() {
   {
     Thread* self = Thread::Current();
@@ -121,12 +128,18 @@
 Task* ThreadPool::GetTask(Thread* self) {
   MutexLock mu(self, task_queue_lock_);
   while (!IsShuttingDown()) {
-    Task* task = TryGetTaskLocked(self);
-    if (task != NULL) {
-      return task;
+    const size_t thread_count = GetThreadCount();
+    // Ensure that we don't use more threads than the maximum active workers.
+    const size_t active_threads = thread_count - waiting_count_;
+    // <= since self is considered an active worker.
+    if (active_threads <= max_active_workers_) {
+      Task* task = TryGetTaskLocked(self);
+      if (task != NULL) {
+        return task;
+      }
     }
 
-    waiting_count_++;
+    ++waiting_count_;
     if (waiting_count_ == GetThreadCount() && tasks_.empty()) {
       // We may be done, lets broadcast to the completion condition.
       completion_condition_.Broadcast(self);
diff --git a/runtime/thread_pool.h b/runtime/thread_pool.h
index 9c6d47b..b9a97a1 100644
--- a/runtime/thread_pool.h
+++ b/runtime/thread_pool.h
@@ -90,6 +90,10 @@
     return total_wait_time_;
   }
 
+  // Provides a way to bound the maximum number of worker threads, threads must be less the the
+  // thread count of the thread pool.
+  void SetMaxActiveWorkers(size_t threads);
+
  protected:
   // Get a task to run, blocks if there are no tasks left
   virtual Task* GetTask(Thread* self);
@@ -117,6 +121,7 @@
   uint64_t start_time_ GUARDED_BY(task_queue_lock_);
   uint64_t total_wait_time_;
   Barrier creation_barier_;
+  size_t max_active_workers_ GUARDED_BY(task_queue_lock_);
 
  private:
   friend class ThreadPoolWorker;