Merge remote-tracking branch 'upstream/master' into pick_first_subchannel_list
diff --git a/grpc.def b/grpc.def
index 558be60..e4281f3 100644
--- a/grpc.def
+++ b/grpc.def
@@ -54,6 +54,8 @@
     grpc_completion_queue_pluck
     grpc_completion_queue_shutdown
     grpc_completion_queue_destroy
+    grpc_completion_queue_thread_local_cache_init
+    grpc_completion_queue_thread_local_cache_flush
     grpc_alarm_create
     grpc_alarm_set
     grpc_alarm_cancel
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h
index ca757e2..e2c0c29 100644
--- a/include/grpc++/impl/codegen/completion_queue.h
+++ b/include/grpc++/impl/codegen/completion_queue.h
@@ -109,6 +109,30 @@
     TIMEOUT     ///< deadline was reached.
   };
 
+  /// EXPERIMENTAL
+  /// First executes \a F, then reads from the queue, blocking up to
+  /// \a deadline (or the queue's shutdown).
+  /// Both \a tag and \a ok are updated upon success (if an event is available
+  /// within the \a deadline).  A \a tag points to an arbitrary location usually
+  /// employed to uniquely identify an event.
+  ///
+  /// \param F[in] Function to execute before calling AsyncNext on this queue.
+  /// \param tag[out] Upon sucess, updated to point to the event's tag.
+  /// \param ok[out] Upon sucess, true if read a regular event, false otherwise.
+  /// \param deadline[in] How long to block in wait for an event.
+  ///
+  /// \return The type of event read.
+  template <typename T, typename F>
+  NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) {
+    CompletionQueueTLSCache cache = CompletionQueueTLSCache(this);
+    f();
+    if (cache.Flush(tag, ok)) {
+      return GOT_EVENT;
+    } else {
+      return AsyncNext(tag, ok, deadline);
+    }
+  }
+
   /// Read from the queue, blocking up to \a deadline (or the queue's shutdown).
   /// Both \a tag and \a ok are updated upon success (if an event is available
   /// within the \a deadline).  A \a tag points to an arbitrary location usually
@@ -213,6 +237,21 @@
                                   const InputMessage& request,
                                   OutputMessage* result);
 
+  /// EXPERIMENTAL
+  /// Creates a Thread Local cache to store the first event
+  /// On this completion queue queued from this thread.  Once
+  /// initialized, it must be flushed on the same thread.
+  class CompletionQueueTLSCache {
+   public:
+    CompletionQueueTLSCache(CompletionQueue* cq);
+    ~CompletionQueueTLSCache();
+    bool Flush(void** tag, bool* ok);
+
+   private:
+    CompletionQueue* cq_;
+    bool flushed_;
+  };
+
   NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
 
   /// Wraps \a grpc_completion_queue_pluck.
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 1de289f..6df3b80 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -143,6 +143,23 @@
     drained and no threads are executing grpc_completion_queue_next */
 GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq);
 
+/*********** EXPERIMENTAL API ************/
+/** Initializes a thread local cache for \a cq.
+  * grpc_flush_cq_tls_cache() MUST be called on the same thread,
+  * with the same cq.
+  */
+GRPCAPI void grpc_completion_queue_thread_local_cache_init(
+    grpc_completion_queue *cq);
+
+/*********** EXPERIMENTAL API ************/
+/** Flushes the thread local cache for \a cq.
+  * Returns 1 if there was contents in the cache.  If there was an event
+  * in \a cq tls cache, its tag is placed in tag, and ok is set to the
+  * event success.
+  */
+GRPCAPI int grpc_completion_queue_thread_local_cache_flush(
+    grpc_completion_queue *cq, void **tag, int *ok);
+
 /** Create a completion queue alarm instance */
 GRPCAPI grpc_alarm *grpc_alarm_create(void *reserved);
 
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index 5aba507..6efcff8 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -452,6 +452,7 @@
   tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
   /* Tell network status tracking code about the new endpoint */
   grpc_network_status_register_endpoint(&tcp->base);
+  grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
 
   return &tcp->base;
 }
diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc
index 21664f0..5009f78 100644
--- a/src/core/lib/surface/completion_queue.cc
+++ b/src/core/lib/surface/completion_queue.cc
@@ -28,6 +28,7 @@
 #include <grpc/support/log.h>
 #include <grpc/support/string_util.h>
 #include <grpc/support/time.h>
+#include <grpc/support/tls.h>
 
 #include "src/core/lib/debug/stats.h"
 #include "src/core/lib/iomgr/pollset.h"
@@ -48,6 +49,14 @@
     GRPC_TRACER_INITIALIZER(false, "cq_refcount");
 #endif
 
+// Specifies a cq thread local cache.
+// The first event that occurs on a thread
+// with a cq cache will go into that cache, and
+// will only be returned on the thread that initialized the cache.
+// NOTE: Only one event will ever be cached.
+GPR_TLS_DECL(g_cached_event);
+GPR_TLS_DECL(g_cached_cq);
+
 typedef struct {
   grpc_pollset_worker **worker;
   void *tag;
@@ -345,6 +354,46 @@
 static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq,
                                      grpc_error *error);
 
+void grpc_cq_global_init() {
+  gpr_tls_init(&g_cached_event);
+  gpr_tls_init(&g_cached_cq);
+}
+
+void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq) {
+  if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == nullptr) {
+    gpr_tls_set(&g_cached_event, (intptr_t)0);
+    gpr_tls_set(&g_cached_cq, (intptr_t)cq);
+  }
+}
+
+int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq,
+                                                   void **tag, int *ok) {
+  grpc_cq_completion *storage =
+      (grpc_cq_completion *)gpr_tls_get(&g_cached_event);
+  int ret = 0;
+  if (storage != NULL &&
+      (grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq) {
+    *tag = storage->tag;
+    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+    storage->done(&exec_ctx, storage->done_arg, storage);
+    *ok = (storage->next & (uintptr_t)(1)) == 1;
+    ret = 1;
+    cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
+    if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+      GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+      gpr_mu_lock(cq->mu);
+      cq_finish_shutdown_next(&exec_ctx, cq);
+      gpr_mu_unlock(cq->mu);
+      GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down");
+    }
+    grpc_exec_ctx_finish(&exec_ctx);
+  }
+  gpr_tls_set(&g_cached_event, (intptr_t)0);
+  gpr_tls_set(&g_cached_cq, (intptr_t)0);
+
+  return ret;
+}
+
 static void cq_event_queue_init(grpc_cq_event_queue *q) {
   gpr_mpscq_init(&q->queue);
   q->queue_lock = GPR_SPINLOCK_INITIALIZER;
@@ -617,7 +666,6 @@
       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
     }
   }
-
   cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
   int is_success = (error == GRPC_ERROR_NONE);
 
@@ -628,44 +676,50 @@
 
   cq_check_tag(cq, tag, true); /* Used in debug builds only */
 
-  /* Add the completion to the queue */
-  bool is_first = cq_event_queue_push(&cqd->queue, storage);
-  gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
+  if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq &&
+      (grpc_cq_completion *)gpr_tls_get(&g_cached_event) == nullptr) {
+    gpr_tls_set(&g_cached_event, (intptr_t)storage);
+  } else {
+    /* Add the completion to the queue */
+    bool is_first = cq_event_queue_push(&cqd->queue, storage);
+    gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
 
-  /* Since we do not hold the cq lock here, it is important to do an 'acquire'
-     load here (instead of a 'no_barrier' load) to match with the release store
-     (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
-     */
-  bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
+    /* Since we do not hold the cq lock here, it is important to do an 'acquire'
+       load here (instead of a 'no_barrier' load) to match with the release
+       store
+       (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
+       */
+    bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
 
-  if (!will_definitely_shutdown) {
-    /* Only kick if this is the first item queued */
-    if (is_first) {
-      gpr_mu_lock(cq->mu);
-      grpc_error *kick_error =
-          cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL);
-      gpr_mu_unlock(cq->mu);
+    if (!will_definitely_shutdown) {
+      /* Only kick if this is the first item queued */
+      if (is_first) {
+        gpr_mu_lock(cq->mu);
+        grpc_error *kick_error =
+            cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL);
+        gpr_mu_unlock(cq->mu);
 
-      if (kick_error != GRPC_ERROR_NONE) {
-        const char *msg = grpc_error_string(kick_error);
-        gpr_log(GPR_ERROR, "Kick failed: %s", msg);
-        GRPC_ERROR_UNREF(kick_error);
+        if (kick_error != GRPC_ERROR_NONE) {
+          const char *msg = grpc_error_string(kick_error);
+          gpr_log(GPR_ERROR, "Kick failed: %s", msg);
+          GRPC_ERROR_UNREF(kick_error);
+        }
       }
-    }
-    if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+      if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+        GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+        gpr_mu_lock(cq->mu);
+        cq_finish_shutdown_next(exec_ctx, cq);
+        gpr_mu_unlock(cq->mu);
+        GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
+      }
+    } else {
       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
+      gpr_atm_rel_store(&cqd->pending_events, 0);
       gpr_mu_lock(cq->mu);
       cq_finish_shutdown_next(exec_ctx, cq);
       gpr_mu_unlock(cq->mu);
       GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
     }
-  } else {
-    GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
-    gpr_atm_rel_store(&cqd->pending_events, 0);
-    gpr_mu_lock(cq->mu);
-    cq_finish_shutdown_next(exec_ctx, cq);
-    gpr_mu_unlock(cq->mu);
-    GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
   }
 
   GPR_TIMER_END("cq_end_op_for_next", 0);
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 69d144b..c02bc5d 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -70,6 +70,9 @@
 #define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc)
 #endif
 
+/* Initializes global variables used by completion queues */
+void grpc_cq_global_init();
+
 /* Flag that an operation is beginning: the completion channel will not finish
    shutdown until a corrensponding grpc_cq_end_* call is made.
    \a tag is currently used only in debug builds. Return true on success, and
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index b089da2..058e88f 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -64,6 +64,7 @@
   gpr_log_verbosity_init();
   gpr_mu_init(&g_init_mu);
   grpc_register_built_in_plugins();
+  grpc_cq_global_init();
   g_initializations = 0;
 }
 
diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc
index f34b0f3..4a2e2be 100644
--- a/src/cpp/common/completion_queue_cc.cc
+++ b/src/cpp/common/completion_queue_cc.cc
@@ -71,4 +71,29 @@
   }
 }
 
+CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
+    CompletionQueue* cq)
+    : cq_(cq), flushed_(false) {
+  grpc_completion_queue_thread_local_cache_init(cq_->cq_);
+}
+
+CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
+  GPR_ASSERT(flushed_);
+}
+
+bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
+  int res = 0;
+  void* res_tag;
+  flushed_ = true;
+  if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
+                                                     &res)) {
+    auto cq_tag = static_cast<CompletionQueueTag*>(res_tag);
+    *ok = res == 1;
+    if (cq_tag->FinalizeResult(tag, ok)) {
+      return true;
+    }
+  }
+  return false;
+}
+
 }  // namespace grpc
diff --git a/src/cpp/util/error_details.cc b/src/cpp/util/error_details.cc
index 44bc4d1..f06b475 100644
--- a/src/cpp/util/error_details.cc
+++ b/src/cpp/util/error_details.cc
@@ -37,7 +37,8 @@
     return Status(StatusCode::FAILED_PRECONDITION, "");
   }
   StatusCode code = StatusCode::UNKNOWN;
-  if (from.code() >= StatusCode::OK && from.code() <= StatusCode::DATA_LOSS) {
+  if (from.code() >= StatusCode::OK &&
+      from.code() <= StatusCode::UNAUTHENTICATED) {
     code = static_cast<StatusCode>(from.code());
   }
   *to = Status(code, from.message(), from.SerializeAsString());
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 7083149..cd1bd98 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -77,6 +77,8 @@
 grpc_completion_queue_pluck_type grpc_completion_queue_pluck_import;
 grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import;
 grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import;
+grpc_completion_queue_thread_local_cache_init_type grpc_completion_queue_thread_local_cache_init_import;
+grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue_thread_local_cache_flush_import;
 grpc_alarm_create_type grpc_alarm_create_import;
 grpc_alarm_set_type grpc_alarm_set_import;
 grpc_alarm_cancel_type grpc_alarm_cancel_import;
@@ -385,6 +387,8 @@
   grpc_completion_queue_pluck_import = (grpc_completion_queue_pluck_type) GetProcAddress(library, "grpc_completion_queue_pluck");
   grpc_completion_queue_shutdown_import = (grpc_completion_queue_shutdown_type) GetProcAddress(library, "grpc_completion_queue_shutdown");
   grpc_completion_queue_destroy_import = (grpc_completion_queue_destroy_type) GetProcAddress(library, "grpc_completion_queue_destroy");
+  grpc_completion_queue_thread_local_cache_init_import = (grpc_completion_queue_thread_local_cache_init_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_init");
+  grpc_completion_queue_thread_local_cache_flush_import = (grpc_completion_queue_thread_local_cache_flush_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_flush");
   grpc_alarm_create_import = (grpc_alarm_create_type) GetProcAddress(library, "grpc_alarm_create");
   grpc_alarm_set_import = (grpc_alarm_set_type) GetProcAddress(library, "grpc_alarm_set");
   grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index 868772c..c7e78b7 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -212,6 +212,12 @@
 typedef void(*grpc_completion_queue_destroy_type)(grpc_completion_queue *cq);
 extern grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import;
 #define grpc_completion_queue_destroy grpc_completion_queue_destroy_import
+typedef void(*grpc_completion_queue_thread_local_cache_init_type)(grpc_completion_queue *cq);
+extern grpc_completion_queue_thread_local_cache_init_type grpc_completion_queue_thread_local_cache_init_import;
+#define grpc_completion_queue_thread_local_cache_init grpc_completion_queue_thread_local_cache_init_import
+typedef int(*grpc_completion_queue_thread_local_cache_flush_type)(grpc_completion_queue *cq, void **tag, int *ok);
+extern grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue_thread_local_cache_flush_import;
+#define grpc_completion_queue_thread_local_cache_flush grpc_completion_queue_thread_local_cache_flush_import
 typedef grpc_alarm *(*grpc_alarm_create_type)(void *reserved);
 extern grpc_alarm_create_type grpc_alarm_create_import;
 #define grpc_alarm_create grpc_alarm_create_import
diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c
index e6372a3..e4e4c9f 100644
--- a/test/core/surface/completion_queue_test.c
+++ b/test/core/surface/completion_queue_test.c
@@ -158,6 +158,80 @@
   }
 }
 
+static void test_cq_tls_cache_full(void) {
+  grpc_event ev;
+  grpc_completion_queue *cc;
+  grpc_cq_completion completion;
+  grpc_cq_polling_type polling_types[] = {
+      GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
+  grpc_completion_queue_attributes attr;
+  grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_exec_ctx exec_ctx;
+  void *tag = create_test_tag();
+  void *res_tag;
+  int ok;
+
+  LOG_TEST("test_cq_tls_cache_full");
+
+  attr.version = 1;
+  attr.cq_completion_type = GRPC_CQ_NEXT;
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
+    exec_ctx = init_exec_ctx;  // Reset exec_ctx
+    attr.cq_polling_type = polling_types[i];
+    cc = grpc_completion_queue_create(
+        grpc_completion_queue_factory_lookup(&attr), &attr, NULL);
+
+    grpc_completion_queue_thread_local_cache_init(cc);
+    GPR_ASSERT(grpc_cq_begin_op(cc, tag));
+    grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE,
+                   do_nothing_end_completion, NULL, &completion);
+
+    ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
+    GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
+
+    GPR_ASSERT(
+        grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1);
+    GPR_ASSERT(res_tag == tag);
+    GPR_ASSERT(ok);
+
+    ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
+    GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
+
+    shutdown_and_destroy(cc);
+    grpc_exec_ctx_finish(&exec_ctx);
+  }
+}
+
+static void test_cq_tls_cache_empty(void) {
+  grpc_completion_queue *cc;
+  grpc_cq_polling_type polling_types[] = {
+      GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
+  grpc_completion_queue_attributes attr;
+  grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_exec_ctx exec_ctx;
+  void *res_tag;
+  int ok;
+
+  LOG_TEST("test_cq_tls_cache_empty");
+
+  attr.version = 1;
+  attr.cq_completion_type = GRPC_CQ_NEXT;
+  for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
+    exec_ctx = init_exec_ctx;  // Reset exec_ctx
+    attr.cq_polling_type = polling_types[i];
+    cc = grpc_completion_queue_create(
+        grpc_completion_queue_factory_lookup(&attr), &attr, NULL);
+
+    GPR_ASSERT(
+        grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
+    grpc_completion_queue_thread_local_cache_init(cc);
+    GPR_ASSERT(
+        grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
+    shutdown_and_destroy(cc);
+    grpc_exec_ctx_finish(&exec_ctx);
+  }
+}
+
 static void test_shutdown_then_next_polling(void) {
   grpc_cq_polling_type polling_types[] = {
       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
@@ -300,6 +374,8 @@
   test_cq_end_op();
   test_pluck();
   test_pluck_after_shutdown();
+  test_cq_tls_cache_full();
+  test_cq_tls_cache_empty();
   grpc_shutdown();
   return 0;
 }
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 2a33e8a..b7634d0 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -99,7 +99,7 @@
 
 class Verifier {
  public:
-  explicit Verifier(bool spin) : spin_(spin) {}
+  explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {}
   // Expect sets the expected ok value for a specific tag
   Verifier& Expect(int i, bool expect_ok) {
     return ExpectUnless(i, expect_ok, false);
@@ -142,6 +142,18 @@
     return detag(got_tag);
   }
 
+  template <typename T>
+  CompletionQueue::NextStatus DoOnceThenAsyncNext(
+      CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
+      std::function<void(void)> lambda) {
+    if (lambda_run_) {
+      return cq->AsyncNext(got_tag, ok, deadline);
+    } else {
+      lambda_run_ = true;
+      return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
+    }
+  }
+
   // Verify keeps calling Next until all currently set
   // expected tags are complete
   void Verify(CompletionQueue* cq) { Verify(cq, false); }
@@ -154,6 +166,7 @@
       Next(cq, ignore_ok);
     }
   }
+
   // This version of Verify stops after a certain deadline
   void Verify(CompletionQueue* cq,
               std::chrono::system_clock::time_point deadline) {
@@ -193,6 +206,47 @@
     }
   }
 
+  // This version of Verify stops after a certain deadline, and uses the
+  // DoThenAsyncNext API
+  // to call the lambda
+  void Verify(CompletionQueue* cq,
+              std::chrono::system_clock::time_point deadline,
+              std::function<void(void)> lambda) {
+    if (expectations_.empty()) {
+      bool ok;
+      void* got_tag;
+      if (spin_) {
+        while (std::chrono::system_clock::now() < deadline) {
+          EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+                    CompletionQueue::TIMEOUT);
+        }
+      } else {
+        EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+                  CompletionQueue::TIMEOUT);
+      }
+    } else {
+      while (!expectations_.empty()) {
+        bool ok;
+        void* got_tag;
+        if (spin_) {
+          for (;;) {
+            GPR_ASSERT(std::chrono::system_clock::now() < deadline);
+            auto r = DoOnceThenAsyncNext(
+                cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda);
+            if (r == CompletionQueue::TIMEOUT) continue;
+            if (r == CompletionQueue::GOT_EVENT) break;
+            gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+            abort();
+          }
+        } else {
+          EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+                    CompletionQueue::GOT_EVENT);
+        }
+        GotTag(got_tag, ok, false);
+      }
+    }
+  }
+
  private:
   void GotTag(void* got_tag, bool ok, bool ignore_ok) {
     auto it = expectations_.find(got_tag);
@@ -226,6 +280,7 @@
   std::map<void*, bool> expectations_;
   std::map<void*, MaybeExpect> maybe_expectations_;
   bool spin_;
+  bool lambda_run_;
 };
 
 bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
@@ -490,6 +545,60 @@
   EXPECT_TRUE(recv_status.ok());
 }
 
+// Test a simple RPC using the async version of Next
+TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
+  ResetStub();
+
+  EchoRequest send_request;
+  EchoRequest recv_request;
+  EchoResponse send_response;
+  EchoResponse recv_response;
+  Status recv_status;
+
+  ClientContext cli_ctx;
+  ServerContext srv_ctx;
+  grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+  send_request.set_message(GetParam().message_content);
+  std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+      stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+
+  std::chrono::system_clock::time_point time_now(
+      std::chrono::system_clock::now());
+  std::chrono::system_clock::time_point time_limit(
+      std::chrono::system_clock::now() + std::chrono::seconds(10));
+  Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+  Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+
+  auto resp_writer_ptr = &response_writer;
+  auto lambda_2 = [&, this, resp_writer_ptr]() {
+    gpr_log(GPR_ERROR, "CALLED");
+    service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
+                          cq_.get(), tag(2));
+  };
+
+  Verifier(GetParam().disable_blocking)
+      .Expect(2, true)
+      .Verify(cq_.get(), time_limit, lambda_2);
+  EXPECT_EQ(send_request.message(), recv_request.message());
+
+  auto recv_resp_ptr = &recv_response;
+  auto status_ptr = &recv_status;
+  send_response.set_message(recv_request.message());
+  auto lambda_3 = [&, this, resp_writer_ptr, send_response]() {
+    resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
+  };
+  response_reader->Finish(recv_resp_ptr, status_ptr, tag(4));
+  Verifier(GetParam().disable_blocking)
+      .Expect(3, true)
+      .Expect(4, true)
+      .Verify(cq_.get(), std::chrono::system_clock::time_point::max(),
+              lambda_3);
+
+  EXPECT_EQ(send_response.message(), recv_response.message());
+  EXPECT_TRUE(recv_status.ok());
+}
+
 // Two pings and a final pong.
 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
   ResetStub();
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index a5049e5..82c6361 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -230,8 +230,6 @@
   }
 
   virtual void DestroyMultithreading() = 0;
-  virtual void InitThreadFunc(size_t thread_idx) = 0;
-  virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
 
   void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
     // Set up the load distribution based on the number of threads
@@ -279,7 +277,6 @@
                         : std::bind(&Client::NextIssueTime, this, thread_idx);
   }
 
- private:
   class Thread {
    public:
     Thread(Client* client, size_t idx)
@@ -299,6 +296,16 @@
       MergeStatusHistogram(statuses_, s);
     }
 
+    void UpdateHistogram(HistogramEntry* entry) {
+      std::lock_guard<std::mutex> g(mu_);
+      if (entry->value_used()) {
+        histogram_.Add(entry->value());
+      }
+      if (entry->status_used()) {
+        statuses_[entry->status()]++;
+      }
+    }
+
    private:
     Thread(const Thread&);
     Thread& operator=(const Thread&);
@@ -314,29 +321,8 @@
         wait_loop++;
       }
 
-      client_->InitThreadFunc(idx_);
-
-      for (;;) {
-        // run the loop body
-        HistogramEntry entry;
-        const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
-        // lock, update histogram if needed and see if we're done
-        std::lock_guard<std::mutex> g(mu_);
-        if (entry.value_used()) {
-          histogram_.Add(entry.value());
-        }
-        if (entry.status_used()) {
-          statuses_[entry.status()]++;
-        }
-        if (!thread_still_ok) {
-          gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
-        }
-        if (!thread_still_ok ||
-            static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) {
-          client_->CompleteThread();
-          return;
-        }
-      }
+      client_->ThreadFunc(idx_, this);
+      client_->CompleteThread();
     }
 
     std::mutex mu_;
@@ -347,6 +333,12 @@
     std::thread impl_;
   };
 
+  bool ThreadCompleted() {
+    return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
+  }
+
+  virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
+
   std::vector<std::unique_ptr<Thread>> threads_;
   std::unique_ptr<UsageTimer> timer_;
 
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 9ed4e0b..b5c7208 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -236,33 +236,47 @@
     this->EndThreads();  // this needed for resolution
   }
 
-  void InitThreadFunc(size_t thread_idx) override final {}
-  bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final {
+  void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {
     void* got_tag;
     bool ok;
 
-    if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+    HistogramEntry entry;
+    HistogramEntry* entry_ptr = &entry;
+    if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+      return;
+    }
+    ClientRpcContext* ctx;
+    std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
+    do {
+      t->UpdateHistogram(entry_ptr);
       // Got a regular event, so process it
-      ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+      ctx = ClientRpcContext::detag(got_tag);
       // Proceed while holding a lock to make sure that
       // this thread isn't supposed to shut down
-      std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+      shutdown_mu->lock();
       if (shutdown_state_[thread_idx]->shutdown) {
         ctx->TryCancel();
         delete ctx;
-        return true;
+        while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+          ctx = ClientRpcContext::detag(got_tag);
+          ctx->TryCancel();
+          delete ctx;
+        }
+        shutdown_mu->unlock();
+        return;
       }
-      if (!ctx->RunNextState(ok, entry)) {
-        // The RPC and callback are done, so clone the ctx
-        // and kickstart the new one
-        ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
-        delete ctx;
-      }
-      return true;
-    } else {
-      // queue is shutting down, so we must be  done
-      return true;
-    }
+    } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+        [&, ctx, ok, entry_ptr, shutdown_mu]() {
+          bool next_ok = ok;
+          if (!ctx->RunNextState(next_ok, entry_ptr)) {
+            // The RPC and callback are done, so clone the ctx
+            // and kickstart the new one
+            ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+            delete ctx;
+          }
+          shutdown_mu->unlock();
+        },
+        &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
   }
 
   std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 94554a4..9f20b14 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -62,6 +62,25 @@
 
   virtual ~SynchronousClient(){};
 
+  virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
+  virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
+
+  void ThreadFunc(size_t thread_idx, Thread* t) override {
+    InitThreadFuncImpl(thread_idx);
+    for (;;) {
+      // run the loop body
+      HistogramEntry entry;
+      const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
+      t->UpdateHistogram(&entry);
+      if (!thread_still_ok) {
+        gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
+      }
+      if (!thread_still_ok || ThreadCompleted()) {
+        return;
+      }
+    }
+  }
+
  protected:
   // WaitToIssue returns false if we realize that we need to break out
   bool WaitToIssue(int thread_idx) {
@@ -103,9 +122,9 @@
   }
   ~SynchronousUnaryClient() {}
 
-  void InitThreadFunc(size_t thread_idx) override {}
+  void InitThreadFuncImpl(size_t thread_idx) override {}
 
-  bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+  bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
     if (!WaitToIssue(thread_idx)) {
       return true;
     }
@@ -192,13 +211,13 @@
     }
   }
 
-  void InitThreadFunc(size_t thread_idx) override {
+  void InitThreadFuncImpl(size_t thread_idx) override {
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
     stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
     messages_issued_[thread_idx] = 0;
   }
 
-  bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+  bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
     if (!WaitToIssue(thread_idx)) {
       return true;
     }
@@ -246,14 +265,14 @@
     }
   }
 
-  void InitThreadFunc(size_t thread_idx) override {
+  void InitThreadFuncImpl(size_t thread_idx) override {
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
     stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
                                                     &responses_[thread_idx]);
     last_issue_[thread_idx] = UsageTimer::Now();
   }
 
-  bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+  bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
     // Figure out how to make histogram sensible if this is rate-paced
     if (!WaitToIssue(thread_idx)) {
       return true;
@@ -282,13 +301,13 @@
  public:
   SynchronousStreamingFromServerClient(const ClientConfig& config)
       : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
-  void InitThreadFunc(size_t thread_idx) override {
+  void InitThreadFuncImpl(size_t thread_idx) override {
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
     stream_[thread_idx] =
         stub->StreamingFromServer(&context_[thread_idx], request_);
     last_recv_[thread_idx] = UsageTimer::Now();
   }
-  bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+  bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
     GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
     if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
       double now = UsageTimer::Now();
@@ -328,11 +347,11 @@
     }
   }
 
-  void InitThreadFunc(size_t thread_idx) override {
+  void InitThreadFuncImpl(size_t thread_idx) override {
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
     stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
   }
-  bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+  bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
     // TODO (vjpai): Do this
     return true;
   }
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 776371a..4576be5 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -202,23 +202,32 @@
     // Wait until work is available or we are shutting down
     bool ok;
     void *got_tag;
-    while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
-      ServerRpcContext *ctx = detag(got_tag);
+    if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+      return;
+    }
+    ServerRpcContext *ctx;
+    std::mutex *mu_ptr;
+    do {
+      ctx = detag(got_tag);
       // The tag is a pointer to an RPC context to invoke
       // Proceed while holding a lock to make sure that
       // this thread isn't supposed to shut down
-      std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+      mu_ptr = &shutdown_state_[thread_idx]->mutex;
+      mu_ptr->lock();
       if (shutdown_state_[thread_idx]->shutdown) {
+        mu_ptr->unlock();
         return;
       }
-      std::lock_guard<ServerRpcContext> l2(*ctx);
-      const bool still_going = ctx->RunNextState(ok);
-      // if this RPC context is done, refresh it
-      if (!still_going) {
-        ctx->Reset();
-      }
-    }
-    return;
+    } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+        [&, ctx, ok, mu_ptr]() {
+          ctx->lock();
+          if (!ctx->RunNextState(ok)) {
+            ctx->Reset();
+          }
+          ctx->unlock();
+          mu_ptr->unlock();
+        },
+        &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
   }
 
   class ServerRpcContext {
diff --git a/test/cpp/util/error_details_test.cc b/test/cpp/util/error_details_test.cc
index 69a6876..16a00fb 100644
--- a/test/cpp/util/error_details_test.cc
+++ b/test/cpp/util/error_details_test.cc
@@ -82,7 +82,7 @@
 
 TEST(SetTest, OutOfScopeErrorCode) {
   google::rpc::Status expected;
-  expected.set_code(20);  // Out of scope (DATA_LOSS is 15).
+  expected.set_code(17);  // Out of scope (UNAUTHENTICATED is 16).
   expected.set_message("I am an error message");
   testing::EchoRequest expected_details;
   expected_details.set_message(grpc::string(100, '\0'));
@@ -96,6 +96,24 @@
   EXPECT_EQ(expected.SerializeAsString(), to.error_details());
 }
 
+TEST(SetTest, ValidScopeErrorCode) {
+  for (int c = StatusCode::OK; c <= StatusCode::UNAUTHENTICATED; c++) {
+    google::rpc::Status expected;
+    expected.set_code(c);
+    expected.set_message("I am an error message");
+    testing::EchoRequest expected_details;
+    expected_details.set_message(grpc::string(100, '\0'));
+    expected.add_details()->PackFrom(expected_details);
+
+    Status to;
+    Status s = SetErrorDetails(expected, &to);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(c, to.error_code());
+    EXPECT_EQ(expected.message(), to.error_message());
+    EXPECT_EQ(expected.SerializeAsString(), to.error_details());
+  }
+}
+
 }  // namespace
 }  // namespace grpc
 
diff --git a/tools/internal_ci/linux/grpc_sanity.cfg b/tools/internal_ci/linux/grpc_sanity.cfg
index 24e7984..e06a2f4 100644
--- a/tools/internal_ci/linux/grpc_sanity.cfg
+++ b/tools/internal_ci/linux/grpc_sanity.cfg
@@ -16,7 +16,7 @@
 
 # Location of the continuous shell script in repository.
 build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh"
-timeout_mins: 20
+timeout_mins: 40
 action {
   define_artifacts {
     regex: "**/*sponge_log.xml"
diff --git a/tools/interop_matrix/run_interop_matrix_tests.py b/tools/interop_matrix/run_interop_matrix_tests.py
index d037e13..bb7a864 100755
--- a/tools/interop_matrix/run_interop_matrix_tests.py
+++ b/tools/interop_matrix/run_interop_matrix_tests.py
@@ -122,15 +122,13 @@
   return images
 
 # caches test cases (list of JobSpec) loaded from file.  Keyed by lang and runtime.
-_loaded_testcases = {}
 def find_test_cases(lang, release, suite_name):
   """Returns the list of test cases from testcase files per lang/release."""
   file_tmpl = os.path.join(os.path.dirname(__file__), 'testcases/%s__%s')
+  testcase_release = release
   if not os.path.exists(file_tmpl % (lang, release)):
-    release = 'master'
-  testcases = file_tmpl % (lang, release)
-  if lang in _loaded_testcases.keys() and release in _loaded_testcases[lang].keys():
-    return _loaded_testcases[lang][release]
+    testcase_release = 'master'
+  testcases = file_tmpl % (lang, testcase_release)
 
   job_spec_list=[]
   try:
@@ -155,9 +153,6 @@
                      do_newline=True)
   except IOError as err:
     jobset.message('FAILED', err, do_newline=True)
-  if lang not in _loaded_testcases.keys():
-    _loaded_testcases[lang] = {}
-  _loaded_testcases[lang][release]=job_spec_list
   return job_spec_list
 
 _xml_report_tree = report_utils.new_junit_xml_tree()