Merge github.com:grpc/grpc into y12kdm3
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index bc1db4c..35338a4 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -541,8 +541,7 @@
 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
           class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
           class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
-class SneakyCallOpSet GRPC_FINAL
-    : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
+class SneakyCallOpSet : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
  public:
   bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
     typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base;
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index 925801e..c02ebec 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -211,13 +211,19 @@
 // Handle unknown method by returning UNIMPLEMENTED error.
 class UnknownMethodHandler : public MethodHandler {
  public:
-  void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+  template <class T>
+  static void FillOps(ServerContext* context, T* ops) {
     Status status(StatusCode::UNIMPLEMENTED, "");
-    CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
-    if (!param.server_context->sent_initial_metadata_) {
-      ops.SendInitialMetadata(param.server_context->initial_metadata_);
+    if (!context->sent_initial_metadata_) {
+      ops->SendInitialMetadata(context->initial_metadata_);
+      context->sent_initial_metadata_ = true;
     }
-    ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+    ops->ServerSendStatus(context->trailing_metadata_, status);
+  }
+
+  void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+    CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+    FillOps(param.server_context, &ops);
     param.call->PerformOps(&ops);
     param.call->cq()->Pluck(&ops);
   }
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index a2bc097..3cff07f 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -98,7 +98,7 @@
   // Add a listening port. Can be called multiple times.
   int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
   // Start the server.
-  bool Start();
+  bool Start(ServerCompletionQueue** cqs, size_t num_cqs);
 
   void HandleQueueClosed();
   void RunRpc();
@@ -112,7 +112,8 @@
    public:
     BaseAsyncRequest(Server* server, ServerContext* context,
                      ServerAsyncStreamingInterface* stream,
-                     CompletionQueue* call_cq, void* tag);
+                     CompletionQueue* call_cq, void* tag,
+                     bool delete_on_finalize);
     virtual ~BaseAsyncRequest();
 
     bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
@@ -123,6 +124,7 @@
     ServerAsyncStreamingInterface* const stream_;
     CompletionQueue* const call_cq_;
     void* const tag_;
+    const bool delete_on_finalize_;
     grpc_call* call_;
     grpc_metadata_array initial_metadata_array_;
   };
@@ -184,12 +186,13 @@
     Message* const request_;
   };
 
-  class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest {
+  class GenericAsyncRequest : public BaseAsyncRequest {
    public:
     GenericAsyncRequest(Server* server, GenericServerContext* context,
                         ServerAsyncStreamingInterface* stream,
                         CompletionQueue* call_cq,
-                        ServerCompletionQueue* notification_cq, void* tag);
+                        ServerCompletionQueue* notification_cq, void* tag,
+                        bool delete_on_finalize);
 
     bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
 
@@ -197,6 +200,10 @@
     grpc_call_details call_details_;
   };
 
+  class UnimplementedAsyncRequestContext;
+  class UnimplementedAsyncRequest;
+  class UnimplementedAsyncResponse;
+
   template <class Message>
   void RequestAsyncCall(void* registered_method, ServerContext* context,
                         ServerAsyncStreamingInterface* stream,
@@ -221,7 +228,7 @@
                                ServerCompletionQueue* notification_cq,
                                void* tag) {
     new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
-                            tag);
+                            tag, true);
   }
 
   const int max_message_size_;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 906daf1..8666252 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -101,8 +101,8 @@
   void SetThreadPool(ThreadPoolInterface* thread_pool);
 
   // Add a completion queue for handling asynchronous services
-  // Caller is required to keep this completion queue live until calling
-  // BuildAndStart()
+  // Caller is required to keep this completion queue live until
+  // the server is destroyed.
   std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
 
   // Return a running server which is ready for processing rpcs.
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index 45dafcd..4bffaff 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -761,6 +761,8 @@
   }
 
  private:
+  friend class ::grpc::Server;
+
   void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
 
   Call call_;
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index c474e4d..337596c 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -74,10 +74,9 @@
    grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
    not be released by grpc_pollset_work AFTER worker has been destroyed.
 
-   Returns true if some work has been done, and false if the deadline
-   expired. */
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
-                      gpr_timespec deadline);
+   Tries not to block past deadline. */
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+                       gpr_timespec now, gpr_timespec deadline);
 
 /* Break one polling thread out of polling work for this pollset.
    If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 5ea9dd2..fe66ebe 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -181,7 +181,7 @@
   pfds[1].events = POLLIN;
   pfds[1].revents = 0;
 
-  poll_rv = poll(pfds, 2, timeout_ms);
+  poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
 
   if (poll_rv < 0) {
     if (errno != EINTR) {
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 001fcec..30ee6e2 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -144,7 +144,7 @@
                                         POLLOUT, &watchers[i]);
   }
 
-  r = poll(pfds, pfd_count, timeout);
+  r = grpc_poll_function(pfds, pfd_count, timeout);
 
   for (i = 1; i < pfd_count; i++) {
     grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index a01f9ff..6bd1b61 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -38,7 +38,6 @@
 #include "src/core/iomgr/pollset_posix.h"
 
 #include <errno.h>
-#include <poll.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
@@ -57,6 +56,8 @@
 GPR_TLS_DECL(g_current_thread_poller);
 GPR_TLS_DECL(g_current_thread_worker);
 
+grpc_poll_function_type grpc_poll_function = poll;
+
 static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
   worker->prev->next = worker->next;
   worker->next->prev = worker->prev;
@@ -89,6 +90,7 @@
 }
 
 void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+  /* pollset->mu already held */
   if (specific_worker != NULL) {
     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
       for (specific_worker = p->root_worker.next;
@@ -168,14 +170,10 @@
   pollset->shutdown_done_cb(pollset->shutdown_done_arg);
 }
 
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
-                      gpr_timespec deadline) {
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+                       gpr_timespec now, gpr_timespec deadline) {
   /* pollset->mu already held */
-  gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
   int added_worker = 0;
-  if (gpr_time_cmp(now, deadline) > 0) {
-    return 0;
-  }
   /* this must happen before we (potentially) drop pollset->mu */
   worker->next = worker->prev = NULL;
   /* TODO(ctiller): pool these */
@@ -217,7 +215,6 @@
       gpr_mu_lock(&pollset->mu);
     }
   }
-  return 1;
 }
 
 void grpc_pollset_shutdown(grpc_pollset *pollset,
@@ -456,7 +453,7 @@
 
   /* poll fd count (argument 2) is shortened by one if we have no events
      to poll on - such that it only includes the kicker */
-  r = poll(pfd, nfds, timeout);
+  r = grpc_poll_function(pfd, nfds, timeout);
   GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
 
   if (fd) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index a3ea353..69bd9cc 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -34,6 +34,8 @@
 #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
 #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
 
+#include <poll.h>
+
 #include <grpc/support/sync.h>
 #include "src/core/iomgr/wakeup_fd_posix.h"
 
@@ -118,4 +120,8 @@
  * be locked) */
 int grpc_pollset_has_workers(grpc_pollset *pollset);
 
+/* override to allow tests to hook poll() usage */
+typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
+extern grpc_poll_function_type grpc_poll_function;
+
 #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 8710395..07522c8 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -99,14 +99,9 @@
   gpr_mu_destroy(&pollset->mu);
 }
 
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
-                      gpr_timespec deadline) {
-  gpr_timespec now;
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, 
+                       gpr_timespec now, gpr_timespec deadline) {
   int added_worker = 0;
-  now = gpr_now(GPR_CLOCK_MONOTONIC);
-  if (gpr_time_cmp(now, deadline) > 0) {
-    return 0 /* GPR_FALSE */;
-  }
   worker->next = worker->prev = NULL;
   gpr_cv_init(&worker->cv);
   if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
@@ -127,7 +122,6 @@
   if (added_worker) {
     remove_worker(pollset, worker);
   }
-  return 1 /* GPR_TRUE */;
 }
 
 void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index d6092ec..3631de8 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -115,7 +115,7 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
   while (!detector.is_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&detector.pollset, &worker,
+    grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
                       gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 77443a7..b58115a 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -170,6 +170,9 @@
                                       gpr_timespec deadline, void *reserved) {
   grpc_event ret;
   grpc_pollset_worker worker;
+  int first_loop = 1;
+  gpr_timespec now;
+
   GPR_ASSERT(!reserved);
 
   deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -196,12 +199,15 @@
       ret.type = GRPC_QUEUE_SHUTDOWN;
       break;
     }
-    if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+    now = gpr_now(GPR_CLOCK_MONOTONIC);
+    if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
       memset(&ret, 0, sizeof(ret));
       ret.type = GRPC_QUEUE_TIMEOUT;
       break;
     }
+    first_loop = 0;
+    grpc_pollset_work(&cc->pollset, &worker, now, deadline);
   }
   GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
   GRPC_CQ_INTERNAL_UNREF(cc, "next");
@@ -239,6 +245,9 @@
   grpc_cq_completion *c;
   grpc_cq_completion *prev;
   grpc_pollset_worker worker;
+  gpr_timespec now;
+  int first_loop = 1;
+
   GPR_ASSERT(!reserved);
 
   deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -281,13 +290,16 @@
       ret.type = GRPC_QUEUE_TIMEOUT;
       break;
     }
-    if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+    now = gpr_now(GPR_CLOCK_MONOTONIC);
+    if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
       del_plucker(cc, tag, &worker);
       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
       memset(&ret, 0, sizeof(ret));
       ret.type = GRPC_QUEUE_TIMEOUT;
       break;
     }
+    first_loop = 0;
+    grpc_pollset_work(&cc->pollset, &worker, now, deadline);
     del_plucker(cc, tag, &worker);
   }
 done:
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index e039c07..1fe8f82 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -50,6 +50,52 @@
 
 namespace grpc {
 
+class Server::UnimplementedAsyncRequestContext {
+ protected:
+  UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
+
+  GenericServerContext server_context_;
+  GenericServerAsyncReaderWriter generic_stream_;
+};
+
+class Server::UnimplementedAsyncRequest GRPC_FINAL
+    : public UnimplementedAsyncRequestContext,
+      public GenericAsyncRequest {
+ public:
+  UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
+      : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
+                            NULL, false),
+        server_(server),
+        cq_(cq) {}
+
+  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+
+  ServerContext* context() { return &server_context_; }
+  GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
+
+ private:
+  Server* const server_;
+  ServerCompletionQueue* const cq_;
+};
+
+typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
+    UnimplementedAsyncResponseOp;
+class Server::UnimplementedAsyncResponse GRPC_FINAL
+    : public UnimplementedAsyncResponseOp {
+ public:
+  UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
+  ~UnimplementedAsyncResponse() { delete request_; }
+
+  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+    bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
+    delete this;
+    return r;
+  }
+
+ private:
+  UnimplementedAsyncRequest* const request_;
+};
+
 class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
  public:
   bool FinalizeResult(void** tag, bool* status) {
@@ -297,18 +343,23 @@
   return creds->AddPortToServer(addr, server_);
 }
 
-bool Server::Start() {
+bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
   GPR_ASSERT(!started_);
   started_ = true;
   grpc_server_start(server_);
 
   if (!has_generic_service_) {
-    unknown_method_.reset(new RpcServiceMethod(
-        "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
-    // Use of emplace_back with just constructor arguments is not accepted here
-    // by gcc-4.4 because it can't match the anonymous nullptr with a proper
-    // constructor implicitly. Construct the object and use push_back.
-    sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+    if (!sync_methods_->empty()) {
+      unknown_method_.reset(new RpcServiceMethod(
+          "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+      // Use of emplace_back with just constructor arguments is not accepted
+      // here by gcc-4.4 because it can't match the anonymous nullptr with a 
+      // proper constructor implicitly. Construct the object and use push_back.
+      sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+    }
+    for (size_t i = 0; i < num_cqs; i++) {
+      new UnimplementedAsyncRequest(this, cqs[i]);
+    }
   }
   // Start processing rpcs.
   if (!sync_methods_->empty()) {
@@ -370,12 +421,14 @@
 
 Server::BaseAsyncRequest::BaseAsyncRequest(
     Server* server, ServerContext* context,
-    ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+    ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
+    bool delete_on_finalize)
     : server_(server),
       context_(context),
       stream_(stream),
       call_cq_(call_cq),
       tag_(tag),
+      delete_on_finalize_(delete_on_finalize),
       call_(nullptr) {
   memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
 }
@@ -402,14 +455,16 @@
   // just the pointers inside call are copied here
   stream_->BindCall(&call);
   *tag = tag_;
-  delete this;
+  if (delete_on_finalize_) {
+    delete this;
+  }
   return true;
 }
 
 Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
     Server* server, ServerContext* context,
     ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
-    : BaseAsyncRequest(server, context, stream, call_cq, tag) {}
+    : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
 
 void Server::RegisteredAsyncRequest::IssueRequest(
     void* registered_method, grpc_byte_buffer** payload,
@@ -423,8 +478,9 @@
 Server::GenericAsyncRequest::GenericAsyncRequest(
     Server* server, GenericServerContext* context,
     ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
-    ServerCompletionQueue* notification_cq, void* tag)
-    : BaseAsyncRequest(server, context, stream, call_cq, tag) {
+    ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
+    : BaseAsyncRequest(server, context, stream, call_cq, tag,
+                       delete_on_finalize) {
   grpc_call_details_init(&call_details_);
   GPR_ASSERT(notification_cq);
   GPR_ASSERT(call_cq);
@@ -445,6 +501,25 @@
   return BaseAsyncRequest::FinalizeResult(tag, status);
 }
 
+bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
+                                                       bool* status) {
+  if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
+    new UnimplementedAsyncRequest(server_, cq_);
+    new UnimplementedAsyncResponse(this);
+  } else {
+    delete this;
+  }
+  return false;
+}
+
+Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
+    UnimplementedAsyncRequest* request)
+    : request_(request) {
+  Status status(StatusCode::UNIMPLEMENTED, "");
+  UnknownMethodHandler::FillOps(request_->context(), this);
+  request_->stream()->call_.PerformOps(this);
+}
+
 void Server::ScheduleCallback() {
   {
     grpc::unique_lock<grpc::mutex> lock(mu_);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 0b11d86..a13269a 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -103,12 +103,6 @@
     thread_pool_ = CreateDefaultThreadPool();
     thread_pool_owned = true;
   }
-  // Async services only, create a thread pool to handle requests to unknown
-  // services.
-  if (!thread_pool_ && !generic_service_ && !async_services_.empty()) {
-    thread_pool_ = new FixedSizeThreadPool(1);
-    thread_pool_owned = true;
-  }
   std::unique_ptr<Server> server(
       new Server(thread_pool_, thread_pool_owned, max_message_size_));
   for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
@@ -138,7 +132,7 @@
       *port->selected_port = r;
     }
   }
-  if (!server->Start()) {
+  if (!server->Start(&cqs_[0], cqs_.size())) {
     return nullptr;
   }
   return server;
diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c
index 8dddfbe..42b2661 100644
--- a/test/core/httpcli/httpcli_test.c
+++ b/test/core/httpcli/httpcli_test.c
@@ -88,7 +88,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (!g_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, n_seconds_time(20));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      n_seconds_time(20));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
   gpr_free(host);
@@ -114,7 +115,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (!g_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, n_seconds_time(20));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      n_seconds_time(20));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
   gpr_free(host);
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index 8186c96..6ef8e9c 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -256,7 +256,8 @@
   while (!state.read_done || !state.write_done) {
     grpc_pollset_worker worker;
     GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
-    grpc_pollset_work(g_pollset, &worker, deadline);
+    grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      deadline);
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
 
@@ -353,7 +354,8 @@
         while (!write_st.done) {
           grpc_pollset_worker worker;
           GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
-          grpc_pollset_work(g_pollset, &worker, deadline);
+          grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                            deadline);
         }
         gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
         grpc_endpoint_destroy(write_st.ep);
@@ -361,7 +363,8 @@
         while (!read_st.done) {
           grpc_pollset_worker worker;
           GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
-          grpc_pollset_work(g_pollset, &worker, deadline);
+          grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                            deadline);
         }
         gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
         gpr_free(slices);
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index adcbcaf..8bba87d 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -250,7 +250,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (!sv->done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 }
@@ -358,7 +359,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (!cl->done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 }
@@ -448,7 +450,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (a.cb_that_ran == NULL) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   GPR_ASSERT(a.cb_that_ran == first_read_callback);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -467,7 +470,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (b.cb_that_ran == NULL) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   /* Except now we verify that second_read_callback ran instead */
   GPR_ASSERT(b.cb_that_ran == second_read_callback);
diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c
index 07bbe1f..dea0b33 100644
--- a/test/core/iomgr/tcp_client_posix_test.c
+++ b/test/core/iomgr/tcp_client_posix_test.c
@@ -112,7 +112,8 @@
 
   while (g_connections_complete == connections_complete_before) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
   }
 
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -142,7 +143,8 @@
   /* wait for the connection callback to finish */
   while (g_connections_complete == connections_complete_before) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, test_deadline());
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      test_deadline());
   }
 
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -211,7 +213,8 @@
       GPR_ASSERT(g_connections_complete ==
                  connections_complete_before + is_after_deadline);
     }
-    grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 17a85ce..6ad8322 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -187,7 +187,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (state.read_bytes < state.target_read_bytes) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, deadline);
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      deadline);
   }
   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -224,7 +225,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (state.read_bytes < state.target_read_bytes) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, deadline);
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      deadline);
   }
   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -285,7 +287,8 @@
   for (;;) {
     grpc_pollset_worker worker;
     gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
-    grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
     gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     do {
       bytes_read =
@@ -365,7 +368,8 @@
       if (state.write_done) {
         break;
       }
-      grpc_pollset_work(&g_pollset, &worker, deadline);
+      grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                        deadline);
     }
     gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
   }
@@ -422,7 +426,8 @@
         if (state.write_done) {
           break;
         }
-        grpc_pollset_work(&g_pollset, &worker, deadline);
+        grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                          deadline);
       }
       gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
       break;
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index b82d7c0..29a20cb 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -137,7 +137,8 @@
     while (g_nconnects == nconnects_before &&
            gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
       grpc_pollset_worker worker;
-      grpc_pollset_work(&g_pollset, &worker, deadline);
+      grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                        deadline);
     }
     gpr_log(GPR_DEBUG, "wait done");
 
diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c
index 5a5f99f..471d5b5 100644
--- a/test/core/iomgr/udp_server_test.c
+++ b/test/core/iomgr/udp_server_test.c
@@ -146,7 +146,8 @@
     while (g_number_of_reads == number_of_reads_before &&
            gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
       grpc_pollset_worker worker;
-      grpc_pollset_work(&g_pollset, &worker, deadline);
+      grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                        deadline);
     }
     GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1);
     close(clifd);
diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c
index 990855a..7df6fad 100644
--- a/test/core/security/oauth2_utils.c
+++ b/test/core/security/oauth2_utils.c
@@ -85,7 +85,7 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset));
   while (!request.is_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&request.pollset, &worker,
+    grpc_pollset_work(&request.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
                       gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset));
diff --git a/test/core/security/print_google_default_creds_token.c b/test/core/security/print_google_default_creds_token.c
index b4323ab..753221c 100644
--- a/test/core/security/print_google_default_creds_token.c
+++ b/test/core/security/print_google_default_creds_token.c
@@ -96,8 +96,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset));
   while (!sync.is_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&sync.pollset, &worker,
-                      gpr_inf_future(GPR_CLOCK_REALTIME));
+    grpc_pollset_work(&sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset));
 
diff --git a/test/core/security/verify_jwt.c b/test/core/security/verify_jwt.c
index 5ebde5f..f443266 100644
--- a/test/core/security/verify_jwt.c
+++ b/test/core/security/verify_jwt.c
@@ -111,7 +111,7 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset));
   while (!sync.is_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&sync.pollset, &worker,
+    grpc_pollset_work(&sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
                       gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset));
diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c
index cec0eeb..836e62a 100644
--- a/test/core/util/port_posix.c
+++ b/test/core/util/port_posix.c
@@ -178,7 +178,7 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset));
   while (pr.port == -1) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&pr.pollset, &worker,
+    grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
                       GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset));
diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c
index 2a21133..a06cb50 100644
--- a/test/core/util/reconnect_server.c
+++ b/test/core/util/reconnect_server.c
@@ -134,7 +134,8 @@
       gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
                    gpr_time_from_seconds(seconds, GPR_TIMESPAN));
   gpr_mu_lock(GRPC_POLLSET_MU(&server->pollset));
-  grpc_pollset_work(&server->pollset, &worker, deadline);
+  grpc_pollset_work(&server->pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                    deadline);
   gpr_mu_unlock(GRPC_POLLSET_MU(&server->pollset));
 }
 
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index a30c841..9a9cca0 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -56,6 +56,10 @@
 #include <grpc/support/thd.h>
 #include <grpc/support/time.h>
 
+#ifdef GPR_POSIX_SOCKET
+#include "src/core/iomgr/pollset_posix.h"
+#endif
+
 using grpc::cpp::test::util::EchoRequest;
 using grpc::cpp::test::util::EchoResponse;
 using std::chrono::system_clock;
@@ -67,8 +71,41 @@
 
 void* tag(int i) { return (void*)(gpr_intptr)i; }
 
-class Verifier {
+#ifdef GPR_POSIX_SOCKET
+static int assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
+                                    int timeout) {
+  GPR_ASSERT(timeout == 0);
+  return poll(pfds, nfds, timeout);
+}
+
+class PollOverride {
  public:
+  PollOverride(grpc_poll_function_type f) {
+    prev_ = grpc_poll_function;
+    grpc_poll_function = f;
+  }
+
+  ~PollOverride() { grpc_poll_function = prev_; }
+
+ private:
+  grpc_poll_function_type prev_;
+};
+
+class PollingCheckRegion : public PollOverride {
+ public:
+  explicit PollingCheckRegion(bool allow_blocking)
+      : PollOverride(allow_blocking ? poll : assert_non_blocking_poll) {}
+};
+#else
+class PollingCheckRegion {
+ public:
+  explicit PollingCheckRegion(bool allow_blocking) {}
+};
+#endif
+
+class Verifier : public PollingCheckRegion {
+ public:
+  explicit Verifier(bool spin) : PollingCheckRegion(!spin), spin_(spin) {}
   Verifier& Expect(int i, bool expect_ok) {
     expectations_[tag(i)] = expect_ok;
     return *this;
@@ -78,7 +115,17 @@
     while (!expectations_.empty()) {
       bool ok;
       void* got_tag;
-      EXPECT_TRUE(cq->Next(&got_tag, &ok));
+      if (spin_) {
+        for (;;) {
+          auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
+          if (r == CompletionQueue::TIMEOUT) continue;
+          if (r == CompletionQueue::GOT_EVENT) break;
+          gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+          abort();
+        }
+      } else {
+        EXPECT_TRUE(cq->Next(&got_tag, &ok));
+      }
       auto it = expectations_.find(got_tag);
       EXPECT_TRUE(it != expectations_.end());
       EXPECT_EQ(it->second, ok);
@@ -90,14 +137,34 @@
     if (expectations_.empty()) {
       bool ok;
       void* got_tag;
-      EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
-                CompletionQueue::TIMEOUT);
+      if (spin_) {
+        while (std::chrono::system_clock::now() < deadline) {
+          EXPECT_EQ(
+              cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
+              CompletionQueue::TIMEOUT);
+        }
+      } else {
+        EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
+                  CompletionQueue::TIMEOUT);
+      }
     } else {
       while (!expectations_.empty()) {
         bool ok;
         void* got_tag;
-        EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
-                  CompletionQueue::GOT_EVENT);
+        if (spin_) {
+          for (;;) {
+            GPR_ASSERT(std::chrono::system_clock::now() < deadline);
+            auto r =
+                cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
+            if (r == CompletionQueue::TIMEOUT) continue;
+            if (r == CompletionQueue::GOT_EVENT) break;
+            gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+            abort();
+          }
+        } else {
+          EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
+                    CompletionQueue::GOT_EVENT);
+        }
         auto it = expectations_.find(got_tag);
         EXPECT_TRUE(it != expectations_.end());
         EXPECT_EQ(it->second, ok);
@@ -108,9 +175,10 @@
 
  private:
   std::map<void*, bool> expectations_;
+  bool spin_;
 };
 
-class AsyncEnd2endTest : public ::testing::Test {
+class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
  protected:
   AsyncEnd2endTest() {}
 
@@ -160,15 +228,15 @@
       service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                            cq_.get(), tag(2));
 
-      Verifier().Expect(2, true).Verify(cq_.get());
+      Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
       EXPECT_EQ(send_request.message(), recv_request.message());
 
       send_response.set_message(recv_request.message());
       response_writer.Finish(send_response, Status::OK, tag(3));
-      Verifier().Expect(3, true).Verify(cq_.get());
+      Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
       response_reader->Finish(&recv_response, &recv_status, tag(4));
-      Verifier().Expect(4, true).Verify(cq_.get());
+      Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
 
       EXPECT_EQ(send_response.message(), recv_response.message());
       EXPECT_TRUE(recv_status.ok());
@@ -182,18 +250,18 @@
   std::ostringstream server_address_;
 };
 
-TEST_F(AsyncEnd2endTest, SimpleRpc) {
+TEST_P(AsyncEnd2endTest, SimpleRpc) {
   ResetStub();
   SendRpc(1);
 }
 
-TEST_F(AsyncEnd2endTest, SequentialRpcs) {
+TEST_P(AsyncEnd2endTest, SequentialRpcs) {
   ResetStub();
   SendRpc(10);
 }
 
 // Test a simple RPC using the async version of Next
-TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
+TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
   ResetStub();
 
   EchoRequest send_request;
@@ -214,30 +282,32 @@
       std::chrono::system_clock::now());
   std::chrono::system_clock::time_point time_limit(
       std::chrono::system_clock::now() + std::chrono::seconds(10));
-  Verifier().Verify(cq_.get(), time_now);
-  Verifier().Verify(cq_.get(), time_now);
+  Verifier(GetParam()).Verify(cq_.get(), time_now);
+  Verifier(GetParam()).Verify(cq_.get(), time_now);
 
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
 
-  Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   send_response.set_message(recv_request.message());
   response_writer.Finish(send_response, Status::OK, tag(3));
-  Verifier().Expect(3, true).Verify(
-      cq_.get(), std::chrono::system_clock::time_point::max());
+  Verifier(GetParam())
+      .Expect(3, true)
+      .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
 
   response_reader->Finish(&recv_response, &recv_status, tag(4));
-  Verifier().Expect(4, true).Verify(
-      cq_.get(), std::chrono::system_clock::time_point::max());
+  Verifier(GetParam())
+      .Expect(4, true)
+      .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
 }
 
 // Two pings and a final pong.
-TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
+TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
   ResetStub();
 
   EchoRequest send_request;
@@ -256,41 +326,41 @@
   service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
                                 tag(2));
 
-  Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
 
   cli_stream->Write(send_request, tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   cli_stream->Write(send_request, tag(5));
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(6));
-  Verifier().Expect(6, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
 
   EXPECT_EQ(send_request.message(), recv_request.message());
   cli_stream->WritesDone(tag(7));
-  Verifier().Expect(7, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(8));
-  Verifier().Expect(8, false).Verify(cq_.get());
+  Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
 
   send_response.set_message(recv_request.message());
   srv_stream.Finish(send_response, Status::OK, tag(9));
-  Verifier().Expect(9, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
 
   cli_stream->Finish(&recv_status, tag(10));
-  Verifier().Expect(10, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
 }
 
 // One ping, two pongs.
-TEST_F(AsyncEnd2endTest, SimpleServerStreaming) {
+TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
   ResetStub();
 
   EchoRequest send_request;
@@ -309,38 +379,38 @@
   service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
                                  cq_.get(), cq_.get(), tag(2));
 
-  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   send_response.set_message(recv_request.message());
   srv_stream.Write(send_response, tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   cli_stream->Read(&recv_response, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
   EXPECT_EQ(send_response.message(), recv_response.message());
 
   srv_stream.Write(send_response, tag(5));
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
 
   cli_stream->Read(&recv_response, tag(6));
-  Verifier().Expect(6, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
   EXPECT_EQ(send_response.message(), recv_response.message());
 
   srv_stream.Finish(Status::OK, tag(7));
-  Verifier().Expect(7, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
 
   cli_stream->Read(&recv_response, tag(8));
-  Verifier().Expect(8, false).Verify(cq_.get());
+  Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
 
   cli_stream->Finish(&recv_status, tag(9));
-  Verifier().Expect(9, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
 
   EXPECT_TRUE(recv_status.ok());
 }
 
 // One ping, one pong.
-TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) {
+TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
   ResetStub();
 
   EchoRequest send_request;
@@ -359,40 +429,40 @@
   service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
                              tag(2));
 
-  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
 
   cli_stream->Write(send_request, tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   send_response.set_message(recv_request.message());
   srv_stream.Write(send_response, tag(5));
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
 
   cli_stream->Read(&recv_response, tag(6));
-  Verifier().Expect(6, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
   EXPECT_EQ(send_response.message(), recv_response.message());
 
   cli_stream->WritesDone(tag(7));
-  Verifier().Expect(7, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(8));
-  Verifier().Expect(8, false).Verify(cq_.get());
+  Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
 
   srv_stream.Finish(Status::OK, tag(9));
-  Verifier().Expect(9, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
 
   cli_stream->Finish(&recv_status, tag(10));
-  Verifier().Expect(10, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
 
   EXPECT_TRUE(recv_status.ok());
 }
 
 // Metadata tests
-TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
+TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
   ResetStub();
 
   EchoRequest send_request;
@@ -416,7 +486,7 @@
 
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
   auto client_initial_metadata = srv_ctx.client_metadata();
   EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
@@ -426,16 +496,16 @@
   send_response.set_message(recv_request.message());
   response_writer.Finish(send_response, Status::OK, tag(3));
 
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   response_reader->Finish(&recv_response, &recv_status, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
 }
 
-TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
+TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
   ResetStub();
 
   EchoRequest send_request;
@@ -457,15 +527,15 @@
 
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
   srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
   srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
   response_writer.SendInitialMetadata(tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   response_reader->ReadInitialMetadata(tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
   auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
   EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
   EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
@@ -473,16 +543,16 @@
 
   send_response.set_message(recv_request.message());
   response_writer.Finish(send_response, Status::OK, tag(5));
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
 
   response_reader->Finish(&recv_response, &recv_status, tag(6));
-  Verifier().Expect(6, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
 }
 
-TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
+TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
   ResetStub();
 
   EchoRequest send_request;
@@ -504,20 +574,20 @@
 
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
   response_writer.SendInitialMetadata(tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   send_response.set_message(recv_request.message());
   srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
   srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
   response_writer.Finish(send_response, Status::OK, tag(4));
 
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
 
   response_reader->Finish(&recv_response, &recv_status, tag(5));
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
   auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -526,7 +596,7 @@
   EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
 }
 
-TEST_F(AsyncEnd2endTest, MetadataRpc) {
+TEST_P(AsyncEnd2endTest, MetadataRpc) {
   ResetStub();
 
   EchoRequest send_request;
@@ -563,7 +633,7 @@
 
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
   auto client_initial_metadata = srv_ctx.client_metadata();
   EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
@@ -573,9 +643,9 @@
   srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
   srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
   response_writer.SendInitialMetadata(tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
   response_reader->ReadInitialMetadata(tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
   auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
   EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
   EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
@@ -586,10 +656,10 @@
   srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
   response_writer.Finish(send_response, Status::OK, tag(5));
 
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
 
   response_reader->Finish(&recv_response, &recv_status, tag(6));
-  Verifier().Expect(6, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
   auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -599,7 +669,7 @@
 }
 
 // Server uses AsyncNotifyWhenDone API to check for cancellation
-TEST_F(AsyncEnd2endTest, ServerCheckCancellation) {
+TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
   ResetStub();
 
   EchoRequest send_request;
@@ -620,21 +690,21 @@
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
 
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   cli_ctx.TryCancel();
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
   EXPECT_TRUE(srv_ctx.IsCancelled());
 
   response_reader->Finish(&recv_response, &recv_status, tag(4));
-  Verifier().Expect(4, false).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
 
   EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
 }
 
 // Server uses AsyncNotifyWhenDone API to check for normal finish
-TEST_F(AsyncEnd2endTest, ServerCheckDone) {
+TEST_P(AsyncEnd2endTest, ServerCheckDone) {
   ResetStub();
 
   EchoRequest send_request;
@@ -655,23 +725,23 @@
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
 
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   send_response.set_message(recv_request.message());
   response_writer.Finish(send_response, Status::OK, tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
   EXPECT_FALSE(srv_ctx.IsCancelled());
 
   response_reader->Finish(&recv_response, &recv_status, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
 }
 
-TEST_F(AsyncEnd2endTest, UnimplementedRpc) {
+TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
   std::shared_ptr<ChannelInterface> channel = CreateChannel(
       server_address_.str(), InsecureCredentials(), ChannelArguments());
   std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub;
@@ -687,12 +757,15 @@
       stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
 
   response_reader->Finish(&recv_response, &recv_status, tag(4));
-  Verifier().Expect(4, false).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
 
   EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
   EXPECT_EQ("", recv_status.error_message());
 }
 
+INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
+                        ::testing::Values(false, true));
+
 }  // namespace
 }  // namespace testing
 }  // namespace grpc