New executor design
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 75fd5b1..a8cf641 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -36,132 +36,140 @@
 #include <string.h>
 
 #include <grpc/support/alloc.h>
+#include <grpc/support/cpu.h>
 #include <grpc/support/log.h>
 #include <grpc/support/sync.h>
 #include <grpc/support/thd.h>
+#include <grpc/support/tls.h>
+#include <grpc/support/useful.h>
+
 #include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/spinlock.h"
 
-typedef struct grpc_executor_data {
-  int busy;          /**< is the thread currently running? */
-  int shutting_down; /**< has \a grpc_shutdown() been invoked? */
-  int pending_join;  /**< has the thread finished but not been joined? */
-  grpc_closure_list closures; /**< collection of pending work */
-  gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a
-                     pending_join are true */
-  gpr_thd_options options;
+#define MAX_DEPTH 32
+
+typedef struct {
   gpr_mu mu;
-} grpc_executor;
+  gpr_cv cv;
+  grpc_closure_list elems;
+  size_t depth;
+  bool shutdown;
+  gpr_thd_id id;
+} thread_state;
 
-static grpc_executor g_executor;
+static thread_state *g_thread_state;
+static size_t g_max_threads;
+static gpr_atm g_cur_threads;
+static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER;
+
+GPR_TLS_DECL(g_this_thread_state);
+
+static void executor_thread(void *arg);
 
 void grpc_executor_init() {
-  memset(&g_executor, 0, sizeof(grpc_executor));
-  gpr_mu_init(&g_executor.mu);
-  g_executor.options = gpr_thd_options_default();
-  gpr_thd_options_set_joinable(&g_executor.options);
+  g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores());
+  gpr_atm_no_barrier_store(&g_cur_threads, 1);
+  gpr_tls_init(&g_this_thread_state);
+  g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads);
+  for (size_t i = 0; i < g_max_threads; i++) {
+    gpr_mu_init(&g_thread_state[i].mu);
+    gpr_cv_init(&g_thread_state[i].cv);
+    g_thread_state[i].elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
+  }
+
+  gpr_thd_options opt = gpr_thd_options_default();
+  gpr_thd_options_set_joinable(&opt);
+  gpr_thd_new(&g_thread_state[0].id, executor_thread, &g_thread_state[0], &opt);
 }
 
-/* thread body */
-static void closure_exec_thread_func(void *ignored) {
-  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-  while (1) {
-    gpr_mu_lock(&g_executor.mu);
-    if (g_executor.shutting_down != 0) {
-      gpr_mu_unlock(&g_executor.mu);
-      break;
-    }
-    if (grpc_closure_list_empty(g_executor.closures)) {
-      /* no more work, time to die */
-      GPR_ASSERT(g_executor.busy == 1);
-      g_executor.busy = 0;
-      gpr_mu_unlock(&g_executor.mu);
-      break;
-    } else {
-      grpc_closure *c = g_executor.closures.head;
-      grpc_closure_list_init(&g_executor.closures);
-      gpr_mu_unlock(&g_executor.mu);
-      while (c != NULL) {
-        grpc_closure *next = c->next_data.next;
-        grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
-        c->scheduled = false;
-#endif
-        c->cb(&exec_ctx, c->cb_arg, error);
-        GRPC_ERROR_UNREF(error);
-        c = next;
-      }
-      grpc_exec_ctx_flush(&exec_ctx);
-    }
-  }
-  grpc_exec_ctx_finish(&exec_ctx);
-}
+static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
+  size_t n = 0;
 
-/* Spawn the thread if new work has arrived a no thread is up */
-static void maybe_spawn_locked() {
-  if (grpc_closure_list_empty(g_executor.closures) == 1) {
-    return;
-  }
-  if (g_executor.shutting_down == 1) {
-    return;
-  }
-
-  if (g_executor.busy != 0) {
-    /* Thread still working. New work will be picked up by already running
-     * thread. Not spawning anything. */
-    return;
-  } else if (g_executor.pending_join != 0) {
-    /* Pickup the remains of the previous incarnations of the thread. */
-    gpr_thd_join(g_executor.tid);
-    g_executor.pending_join = 0;
-  }
-
-  /* All previous instances of the thread should have been joined at this point.
-   * Spawn time! */
-  g_executor.busy = 1;
-  GPR_ASSERT(gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL,
-                         &g_executor.options));
-  g_executor.pending_join = 1;
-}
-
-static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
-                          grpc_error *error) {
-  gpr_mu_lock(&g_executor.mu);
-  if (g_executor.shutting_down == 0) {
-    grpc_closure_list_append(&g_executor.closures, closure, error);
-    maybe_spawn_locked();
-  }
-  gpr_mu_unlock(&g_executor.mu);
-}
-
-void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
-  int pending_join;
-
-  gpr_mu_lock(&g_executor.mu);
-  pending_join = g_executor.pending_join;
-  g_executor.shutting_down = 1;
-  gpr_mu_unlock(&g_executor.mu);
-  /* we can release the lock at this point despite the access to the closure
-   * list below because we aren't accepting new work */
-
-  /* Execute pending callbacks, some may be performing cleanups */
-  grpc_closure *c = g_executor.closures.head;
-  grpc_closure_list_init(&g_executor.closures);
+  grpc_closure *c = list.head;
   while (c != NULL) {
     grpc_closure *next = c->next_data.next;
     grpc_error *error = c->error_data.error;
 #ifndef NDEBUG
-    c->scheduled = false;
+    GPR_ASSERT(!c->scheduled);
+    c->scheduled = true;
 #endif
     c->cb(exec_ctx, c->cb_arg, error);
     GRPC_ERROR_UNREF(error);
     c = next;
   }
-  grpc_exec_ctx_flush(exec_ctx);
-  GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
-  if (pending_join) {
-    gpr_thd_join(g_executor.tid);
+
+  return n;
+}
+
+void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
+  for (size_t i = 0; i < g_max_threads; i++) {
+    gpr_mu_lock(&g_thread_state[i].mu);
+    g_thread_state[i].shutdown = true;
+    gpr_cv_signal(&g_thread_state[i].cv);
+    gpr_mu_unlock(&g_thread_state[i].mu);
   }
-  gpr_mu_destroy(&g_executor.mu);
+  for (gpr_atm i = 0; i < g_cur_threads; i++) {
+    gpr_thd_join(g_thread_state[i].id);
+  }
+  for (size_t i = 0; i < g_max_threads; i++) {
+    gpr_mu_destroy(&g_thread_state[i].mu);
+    gpr_cv_destroy(&g_thread_state[i].cv);
+    run_closures(exec_ctx, g_thread_state[i].elems);
+  }
+  gpr_free(g_thread_state);
+  gpr_tls_destroy(&g_this_thread_state);
+}
+
+static void executor_thread(void *arg) {
+  thread_state *ts = arg;
+  gpr_tls_set(&g_this_thread_state, (intptr_t)ts);
+
+  size_t subtract_depth = 0;
+  for (;;) {
+    gpr_mu_lock(&ts->mu);
+    ts->depth -= subtract_depth;
+    while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
+      gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
+    }
+    if (ts->shutdown) {
+      gpr_mu_unlock(&ts->mu);
+      break;
+    }
+    grpc_closure_list exec = ts->elems;
+    ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
+    gpr_mu_unlock(&ts->mu);
+
+    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+    subtract_depth = run_closures(&exec_ctx, exec);
+    grpc_exec_ctx_finish(&exec_ctx);
+  }
+}
+
+static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+                          grpc_error *error) {
+  thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state);
+  gpr_atm cur_thread_count = gpr_atm_no_barrier_load(&g_cur_threads);
+  if (ts == NULL) {
+    ts = &g_thread_state[rand() % cur_thread_count];
+  }
+  gpr_mu_lock(&ts->mu);
+  grpc_closure_list_append(&ts->elems, closure, error);
+  ts->depth++;
+  bool try_new_thread =
+      ts->depth > MAX_DEPTH && cur_thread_count < g_max_threads;
+  gpr_mu_unlock(&ts->mu);
+  if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) {
+    cur_thread_count = gpr_atm_no_barrier_load(&g_cur_threads);
+    if (cur_thread_count < g_max_threads) {
+      gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1);
+
+      gpr_thd_options opt = gpr_thd_options_default();
+      gpr_thd_options_set_joinable(&opt);
+      gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread,
+                  &g_thread_state[cur_thread_count], &opt);
+    }
+    gpr_spinlock_unlock(&g_adding_thread_lock);
+  }
 }
 
 static const grpc_closure_scheduler_vtable executor_vtable = {