Merge github.com:grpc/grpc into y12kdm3
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index d49102f..35338a4 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -67,14 +67,10 @@
   WriteOptions(const WriteOptions& other) : flags_(other.flags_) {}
 
   /// Clear all flags.
-  inline void Clear() {
-    flags_ = 0;
-  }
+  inline void Clear() { flags_ = 0; }
 
   /// Returns raw flags bitset.
-  inline gpr_uint32 flags() const {
-    return flags_;
-  }
+  inline gpr_uint32 flags() const { return flags_; }
 
   /// Sets flag for the disabling of compression for the next message write.
   ///
@@ -122,9 +118,7 @@
   /// not go out on the wire immediately.
   ///
   /// \sa GRPC_WRITE_BUFFER_HINT
-  inline bool get_buffer_hint() const {
-    return GetBit(GRPC_WRITE_BUFFER_HINT);
-  }
+  inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
 
   WriteOptions& operator=(const WriteOptions& rhs) {
     flags_ = rhs.flags_;
@@ -132,17 +126,11 @@
   }
 
  private:
-  void SetBit(const gpr_int32 mask) {
-    flags_ |= mask;
-  }
+  void SetBit(const gpr_int32 mask) { flags_ |= mask; }
 
-  void ClearBit(const gpr_int32 mask) {
-    flags_ &= ~mask;
-  }
+  void ClearBit(const gpr_int32 mask) { flags_ &= ~mask; }
 
-  bool GetBit(const gpr_int32 mask) const {
-    return flags_ & mask;
-  }
+  bool GetBit(const gpr_int32 mask) const { return flags_ & mask; }
 
   gpr_uint32 flags_;
 };
@@ -553,8 +541,7 @@
 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
           class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
           class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
-class SneakyCallOpSet GRPC_FINAL
-    : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
+class SneakyCallOpSet : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> {
  public:
   bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
     typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base;
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index 925801e..c02ebec 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -211,13 +211,19 @@
 // Handle unknown method by returning UNIMPLEMENTED error.
 class UnknownMethodHandler : public MethodHandler {
  public:
-  void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+  template <class T>
+  static void FillOps(ServerContext* context, T* ops) {
     Status status(StatusCode::UNIMPLEMENTED, "");
-    CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
-    if (!param.server_context->sent_initial_metadata_) {
-      ops.SendInitialMetadata(param.server_context->initial_metadata_);
+    if (!context->sent_initial_metadata_) {
+      ops->SendInitialMetadata(context->initial_metadata_);
+      context->sent_initial_metadata_ = true;
     }
-    ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+    ops->ServerSendStatus(context->trailing_metadata_, status);
+  }
+
+  void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+    CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+    FillOps(param.server_context, &ops);
     param.call->PerformOps(&ops);
     param.call->cq()->Pluck(&ops);
   }
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 8755b4b..516266d 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -84,13 +84,14 @@
          int max_message_size);
   // Register a service. This call does not take ownership of the service.
   // The service must exist for the lifetime of the Server instance.
-  bool RegisterService(const grpc::string *host, RpcService* service);
-  bool RegisterAsyncService(const grpc::string *host, AsynchronousService* service);
+  bool RegisterService(const grpc::string* host, RpcService* service);
+  bool RegisterAsyncService(const grpc::string* host,
+                            AsynchronousService* service);
   void RegisterAsyncGenericService(AsyncGenericService* service);
   // Add a listening port. Can be called multiple times.
   int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
   // Start the server.
-  bool Start();
+  bool Start(ServerCompletionQueue** cqs, size_t num_cqs);
 
   void HandleQueueClosed();
   void RunRpc();
@@ -102,7 +103,8 @@
    public:
     BaseAsyncRequest(Server* server, ServerContext* context,
                      ServerAsyncStreamingInterface* stream,
-                     CompletionQueue* call_cq, void* tag);
+                     CompletionQueue* call_cq, void* tag,
+                     bool delete_on_finalize);
     virtual ~BaseAsyncRequest();
 
     bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
@@ -113,6 +115,7 @@
     ServerAsyncStreamingInterface* const stream_;
     CompletionQueue* const call_cq_;
     void* const tag_;
+    const bool delete_on_finalize_;
     grpc_call* call_;
     grpc_metadata_array initial_metadata_array_;
   };
@@ -174,12 +177,13 @@
     Message* const request_;
   };
 
-  class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest {
+  class GenericAsyncRequest : public BaseAsyncRequest {
    public:
     GenericAsyncRequest(Server* server, GenericServerContext* context,
                         ServerAsyncStreamingInterface* stream,
                         CompletionQueue* call_cq,
-                        ServerCompletionQueue* notification_cq, void* tag);
+                        ServerCompletionQueue* notification_cq, void* tag,
+                        bool delete_on_finalize);
 
     bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
 
@@ -187,6 +191,10 @@
     grpc_call_details call_details_;
   };
 
+  class UnimplementedAsyncRequestContext;
+  class UnimplementedAsyncRequest;
+  class UnimplementedAsyncResponse;
+
   template <class Message>
   void RequestAsyncCall(void* registered_method, ServerContext* context,
                         ServerAsyncStreamingInterface* stream,
@@ -211,7 +219,7 @@
                                ServerCompletionQueue* notification_cq,
                                void* tag) {
     new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
-                            tag);
+                            tag, true);
   }
 
   const int max_message_size_;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 44ee00e..8666252 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -76,15 +76,14 @@
   // The service must exist for the lifetime of the Server instance returned by
   // BuildAndStart().
   // Only matches requests with :authority \a host
-  void RegisterService(const grpc::string& host, 
-                       SynchronousService* service);
+  void RegisterService(const grpc::string& host, SynchronousService* service);
 
   // Register an asynchronous service.
   // This call does not take ownership of the service or completion queue.
   // The service and completion queuemust exist for the lifetime of the Server
   // instance returned by BuildAndStart().
   // Only matches requests with :authority \a host
-  void RegisterAsyncService(const grpc::string& host, 
+  void RegisterAsyncService(const grpc::string& host,
                             AsynchronousService* service);
 
   // Set max message size in bytes.
@@ -102,8 +101,8 @@
   void SetThreadPool(ThreadPoolInterface* thread_pool);
 
   // Add a completion queue for handling asynchronous services
-  // Caller is required to keep this completion queue live until calling
-  // BuildAndStart()
+  // Caller is required to keep this completion queue live until
+  // the server is destroyed.
   std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
 
   // Return a running server which is ready for processing rpcs.
@@ -117,9 +116,10 @@
   };
 
   typedef std::unique_ptr<grpc::string> HostString;
-  template <class T> struct NamedService {
+  template <class T>
+  struct NamedService {
     explicit NamedService(T* s) : service(s) {}
-    NamedService(const grpc::string& h, T *s)
+    NamedService(const grpc::string& h, T* s)
         : host(new grpc::string(h)), service(s) {}
     HostString host;
     T* service;
@@ -127,7 +127,8 @@
 
   int max_message_size_;
   std::vector<std::unique_ptr<NamedService<RpcService>>> services_;
-  std::vector<std::unique_ptr<NamedService<AsynchronousService>>> async_services_;
+  std::vector<std::unique_ptr<NamedService<AsynchronousService>>>
+      async_services_;
   std::vector<Port> ports_;
   std::vector<ServerCompletionQueue*> cqs_;
   std::shared_ptr<ServerCredentials> creds_;
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index bc0c3c0..4bffaff 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -85,9 +85,7 @@
   // Returns false when the stream has been closed.
   virtual bool Write(const W& msg, const WriteOptions& options) = 0;
 
-  inline bool Write(const W& msg) {
-    return Write(msg, WriteOptions());
-  }
+  inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
 };
 
 template <class R>
@@ -640,9 +638,8 @@
     }
     // The response is dropped if the status is not OK.
     if (status.ok()) {
-      finish_ops_.ServerSendStatus(
-          ctx_->trailing_metadata_,
-          finish_ops_.SendMessage(msg));
+      finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
+                                   finish_ops_.SendMessage(msg));
     } else {
       finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
     }
@@ -764,6 +761,8 @@
   }
 
  private:
+  friend class ::grpc::Server;
+
   void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
 
   Call call_;
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index c474e4d..337596c 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -74,10 +74,9 @@
    grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
    not be released by grpc_pollset_work AFTER worker has been destroyed.
 
-   Returns true if some work has been done, and false if the deadline
-   expired. */
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
-                      gpr_timespec deadline);
+   Tries not to block past deadline. */
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+                       gpr_timespec now, gpr_timespec deadline);
 
 /* Break one polling thread out of polling work for this pollset.
    If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers.
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 1320c64..4d41db0 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -181,7 +181,7 @@
   pfds[1].events = POLLIN;
   pfds[1].revents = 0;
 
-  poll_rv = poll(pfds, 2, timeout_ms);
+  poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
 
   if (poll_rv < 0) {
     if (errno != EINTR) {
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index b5b2d75..388b2d2 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -144,7 +144,7 @@
                                         POLLOUT, &watchers[i]);
   }
 
-  r = poll(pfds, pfd_count, timeout);
+  r = grpc_poll_function(pfds, pfd_count, timeout);
 
   for (i = 1; i < pfd_count; i++) {
     grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN,
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index d3a9193..6bd1b61 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -38,7 +38,6 @@
 #include "src/core/iomgr/pollset_posix.h"
 
 #include <errno.h>
-#include <poll.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
@@ -57,6 +56,8 @@
 GPR_TLS_DECL(g_current_thread_poller);
 GPR_TLS_DECL(g_current_thread_worker);
 
+grpc_poll_function_type grpc_poll_function = poll;
+
 static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
   worker->prev->next = worker->next;
   worker->next->prev = worker->prev;
@@ -89,6 +90,7 @@
 }
 
 void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+  /* pollset->mu already held */
   if (specific_worker != NULL) {
     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
       for (specific_worker = p->root_worker.next;
@@ -140,10 +142,10 @@
 void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
   gpr_mu_lock(&pollset->mu);
   pollset->vtable->add_fd(pollset, fd, 1);
-  /* the following (enabled only in debug) will reacquire and then release
-     our lock - meaning that if the unlocking flag passed to del_fd above is
-     not respected, the code will deadlock (in a way that we have a chance of
-     debugging) */
+/* the following (enabled only in debug) will reacquire and then release
+   our lock - meaning that if the unlocking flag passed to del_fd above is
+   not respected, the code will deadlock (in a way that we have a chance of
+   debugging) */
 #ifndef NDEBUG
   gpr_mu_lock(&pollset->mu);
   gpr_mu_unlock(&pollset->mu);
@@ -153,10 +155,10 @@
 void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
   gpr_mu_lock(&pollset->mu);
   pollset->vtable->del_fd(pollset, fd, 1);
-  /* the following (enabled only in debug) will reacquire and then release
-     our lock - meaning that if the unlocking flag passed to del_fd above is
-     not respected, the code will deadlock (in a way that we have a chance of
-     debugging) */
+/* the following (enabled only in debug) will reacquire and then release
+   our lock - meaning that if the unlocking flag passed to del_fd above is
+   not respected, the code will deadlock (in a way that we have a chance of
+   debugging) */
 #ifndef NDEBUG
   gpr_mu_lock(&pollset->mu);
   gpr_mu_unlock(&pollset->mu);
@@ -168,14 +170,10 @@
   pollset->shutdown_done_cb(pollset->shutdown_done_arg);
 }
 
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
-                      gpr_timespec deadline) {
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
+                       gpr_timespec now, gpr_timespec deadline) {
   /* pollset->mu already held */
-  gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
   int added_worker = 0;
-  if (gpr_time_cmp(now, deadline) > 0) {
-    return 0;
-  }
   /* this must happen before we (potentially) drop pollset->mu */
   worker->next = worker->prev = NULL;
   /* TODO(ctiller): pool these */
@@ -217,7 +215,6 @@
       gpr_mu_lock(&pollset->mu);
     }
   }
-  return 1;
 }
 
 void grpc_pollset_shutdown(grpc_pollset *pollset,
@@ -456,7 +453,7 @@
 
   /* poll fd count (argument 2) is shortened by one if we have no events
      to poll on - such that it only includes the kicker */
-  r = poll(pfd, nfds, timeout);
+  r = grpc_poll_function(pfd, nfds, timeout);
   GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
 
   if (fd) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 1c1b736..69bd9cc 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -34,6 +34,8 @@
 #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
 #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H
 
+#include <poll.h>
+
 #include <grpc/support/sync.h>
 #include "src/core/iomgr/wakeup_fd_posix.h"
 
@@ -102,7 +104,8 @@
    - longer than a millisecond polls are rounded up to the next nearest
      millisecond to avoid spinning
    - infinite timeouts are converted to -1 */
-int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now);
+int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
+                                         gpr_timespec now);
 
 /* turn a pollset into a multipoller: platform specific */
 typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset,
@@ -117,4 +120,8 @@
  * be locked) */
 int grpc_pollset_has_workers(grpc_pollset *pollset);
 
+/* override to allow tests to hook poll() usage */
+typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
+extern grpc_poll_function_type grpc_poll_function;
+
 #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 22dc589..1078fa5 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -100,13 +100,9 @@
   gpr_mu_destroy(&pollset->mu);
 }
 
-int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) {
-  gpr_timespec now;
+void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, 
+                       gpr_timespec now, gpr_timespec deadline) {
   int added_worker = 0;
-  now = gpr_now(GPR_CLOCK_MONOTONIC);
-  if (gpr_time_cmp(now, deadline) > 0) {
-    return 0 /* GPR_FALSE */;
-  }
   worker->next = worker->prev = NULL;
   gpr_cv_init(&worker->cv);
   if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) {
@@ -127,7 +123,6 @@
   if (added_worker) {
     remove_worker(pollset, worker);
   }
-  return 1 /* GPR_TRUE */;
 }
 
 void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index d1f2286..347b0da 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -115,7 +115,7 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset));
   while (!detector.is_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&detector.pollset, &worker,
+    grpc_pollset_work(&detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
                       gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 378b3f7..b58115a 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -167,10 +167,12 @@
 }
 
 grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
-                                      gpr_timespec deadline,
-                                      void *reserved) {
+                                      gpr_timespec deadline, void *reserved) {
   grpc_event ret;
   grpc_pollset_worker worker;
+  int first_loop = 1;
+  gpr_timespec now;
+
   GPR_ASSERT(!reserved);
 
   deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -197,12 +199,15 @@
       ret.type = GRPC_QUEUE_SHUTDOWN;
       break;
     }
-    if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+    now = gpr_now(GPR_CLOCK_MONOTONIC);
+    if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
       memset(&ret, 0, sizeof(ret));
       ret.type = GRPC_QUEUE_TIMEOUT;
       break;
     }
+    first_loop = 0;
+    grpc_pollset_work(&cc->pollset, &worker, now, deadline);
   }
   GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
   GRPC_CQ_INTERNAL_UNREF(cc, "next");
@@ -240,6 +245,9 @@
   grpc_cq_completion *c;
   grpc_cq_completion *prev;
   grpc_pollset_worker worker;
+  gpr_timespec now;
+  int first_loop = 1;
+
   GPR_ASSERT(!reserved);
 
   deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -272,8 +280,9 @@
       break;
     }
     if (!add_plucker(cc, tag, &worker)) {
-      gpr_log(GPR_DEBUG, 
-              "Too many outstanding grpc_completion_queue_pluck calls: maximum is %d",
+      gpr_log(GPR_DEBUG,
+              "Too many outstanding grpc_completion_queue_pluck calls: maximum "
+              "is %d",
               GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
       memset(&ret, 0, sizeof(ret));
@@ -281,13 +290,16 @@
       ret.type = GRPC_QUEUE_TIMEOUT;
       break;
     }
-    if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) {
+    now = gpr_now(GPR_CLOCK_MONOTONIC);
+    if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
       del_plucker(cc, tag, &worker);
       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
       memset(&ret, 0, sizeof(ret));
       ret.type = GRPC_QUEUE_TIMEOUT;
       break;
     }
+    first_loop = 0;
+    grpc_pollset_work(&cc->pollset, &worker, now, deadline);
     del_plucker(cc, tag, &worker);
   }
 done:
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index a70b555..dbe60e5 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -50,6 +50,52 @@
 
 namespace grpc {
 
+class Server::UnimplementedAsyncRequestContext {
+ protected:
+  UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
+
+  GenericServerContext server_context_;
+  GenericServerAsyncReaderWriter generic_stream_;
+};
+
+class Server::UnimplementedAsyncRequest GRPC_FINAL
+    : public UnimplementedAsyncRequestContext,
+      public GenericAsyncRequest {
+ public:
+  UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
+      : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
+                            NULL, false),
+        server_(server),
+        cq_(cq) {}
+
+  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
+
+  ServerContext* context() { return &server_context_; }
+  GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
+
+ private:
+  Server* const server_;
+  ServerCompletionQueue* const cq_;
+};
+
+typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
+    UnimplementedAsyncResponseOp;
+class Server::UnimplementedAsyncResponse GRPC_FINAL
+    : public UnimplementedAsyncResponseOp {
+ public:
+  UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
+  ~UnimplementedAsyncResponse() { delete request_; }
+
+  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
+    bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
+    delete this;
+    return r;
+  }
+
+ private:
+  UnimplementedAsyncRequest* const request_;
+};
+
 class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
  public:
   bool FinalizeResult(void** tag, bool* status) {
@@ -230,11 +276,11 @@
   delete sync_methods_;
 }
 
-bool Server::RegisterService(const grpc::string *host, RpcService* service) {
+bool Server::RegisterService(const grpc::string* host, RpcService* service) {
   for (int i = 0; i < service->GetMethodCount(); ++i) {
     RpcServiceMethod* method = service->GetMethod(i);
-    void* tag = grpc_server_register_method(
-        server_, method->name(), host ? host->c_str() : nullptr);
+    void* tag = grpc_server_register_method(server_, method->name(),
+                                            host ? host->c_str() : nullptr);
     if (!tag) {
       gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
               method->name());
@@ -277,27 +323,15 @@
   return creds->AddPortToServer(addr, server_);
 }
 
-bool Server::Start() {
+bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
   GPR_ASSERT(!started_);
   started_ = true;
   grpc_server_start(server_);
 
   if (!has_generic_service_) {
-    unknown_method_.reset(new RpcServiceMethod(
-        "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
-    // Use of emplace_back with just constructor arguments is not accepted here
-    // by gcc-4.4 because it can't match the anonymous nullptr with a proper
-    // constructor implicitly. Construct the object and use push_back.
-    sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
-  }
-  // Start processing rpcs.
-  if (!sync_methods_->empty()) {
-    for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
-      m->SetupRequest();
-      m->Request(server_, cq_.cq());
+    for (size_t i = 0; i < num_cqs; i++) {
+      new UnimplementedAsyncRequest(this, cqs[i]);
     }
-
-    ScheduleCallback();
   }
 
   return true;
@@ -335,12 +369,14 @@
 
 Server::BaseAsyncRequest::BaseAsyncRequest(
     Server* server, ServerContext* context,
-    ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+    ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
+    bool delete_on_finalize)
     : server_(server),
       context_(context),
       stream_(stream),
       call_cq_(call_cq),
       tag_(tag),
+      delete_on_finalize_(delete_on_finalize),
       call_(nullptr) {
   memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
 }
@@ -367,14 +403,16 @@
   // just the pointers inside call are copied here
   stream_->BindCall(&call);
   *tag = tag_;
-  delete this;
+  if (delete_on_finalize_) {
+    delete this;
+  }
   return true;
 }
 
 Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
     Server* server, ServerContext* context,
     ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
-    : BaseAsyncRequest(server, context, stream, call_cq, tag) {}
+    : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
 
 void Server::RegisteredAsyncRequest::IssueRequest(
     void* registered_method, grpc_byte_buffer** payload,
@@ -388,8 +426,9 @@
 Server::GenericAsyncRequest::GenericAsyncRequest(
     Server* server, GenericServerContext* context,
     ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
-    ServerCompletionQueue* notification_cq, void* tag)
-    : BaseAsyncRequest(server, context, stream, call_cq, tag) {
+    ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
+    : BaseAsyncRequest(server, context, stream, call_cq, tag,
+                       delete_on_finalize) {
   grpc_call_details_init(&call_details_);
   GPR_ASSERT(notification_cq);
   GPR_ASSERT(call_cq);
@@ -410,6 +449,25 @@
   return BaseAsyncRequest::FinalizeResult(tag, status);
 }
 
+bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
+                                                       bool* status) {
+  if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
+    new UnimplementedAsyncRequest(server_, cq_);
+    new UnimplementedAsyncResponse(this);
+  } else {
+    delete this;
+  }
+  return false;
+}
+
+Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
+    UnimplementedAsyncRequest* request)
+    : request_(request) {
+  Status status(StatusCode::UNIMPLEMENTED, "");
+  UnknownMethodHandler::FillOps(request_->context(), this);
+  request_->stream()->call_.PerformOps(this);
+}
+
 void Server::ScheduleCallback() {
   {
     grpc::unique_lock<grpc::mutex> lock(mu_);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 0911887..a13269a 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -59,14 +59,16 @@
   async_services_.emplace_back(new NamedService<AsynchronousService>(service));
 }
 
-void ServerBuilder::RegisterService(
-    const grpc::string& addr, SynchronousService* service) {
-  services_.emplace_back(new NamedService<RpcService>(addr, service->service()));
+void ServerBuilder::RegisterService(const grpc::string& addr,
+                                    SynchronousService* service) {
+  services_.emplace_back(
+      new NamedService<RpcService>(addr, service->service()));
 }
 
-void ServerBuilder::RegisterAsyncService(
-    const grpc::string& addr, AsynchronousService* service) {
-  async_services_.emplace_back(new NamedService<AsynchronousService>(addr, service));
+void ServerBuilder::RegisterAsyncService(const grpc::string& addr,
+                                         AsynchronousService* service) {
+  async_services_.emplace_back(
+      new NamedService<AsynchronousService>(addr, service));
 }
 
 void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
@@ -101,12 +103,6 @@
     thread_pool_ = CreateDefaultThreadPool();
     thread_pool_owned = true;
   }
-  // Async services only, create a thread pool to handle requests to unknown
-  // services.
-  if (!thread_pool_ && !generic_service_ && !async_services_.empty()) {
-    thread_pool_ = new FixedSizeThreadPool(1);
-    thread_pool_owned = true;
-  }
   std::unique_ptr<Server> server(
       new Server(thread_pool_, thread_pool_owned, max_message_size_));
   for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
@@ -119,9 +115,10 @@
       return nullptr;
     }
   }
-  for (auto service = async_services_.begin();
-       service != async_services_.end(); service++) {
-    if (!server->RegisterAsyncService((*service)->host.get(), (*service)->service)) {
+  for (auto service = async_services_.begin(); service != async_services_.end();
+       service++) {
+    if (!server->RegisterAsyncService((*service)->host.get(),
+                                      (*service)->service)) {
       return nullptr;
     }
   }
@@ -135,7 +132,7 @@
       *port->selected_port = r;
     }
   }
-  if (!server->Start()) {
+  if (!server->Start(&cqs_[0], cqs_.size())) {
     return nullptr;
   }
   return server;
diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c
index 8dddfbe..42b2661 100644
--- a/test/core/httpcli/httpcli_test.c
+++ b/test/core/httpcli/httpcli_test.c
@@ -88,7 +88,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (!g_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, n_seconds_time(20));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      n_seconds_time(20));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
   gpr_free(host);
@@ -114,7 +115,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (!g_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, n_seconds_time(20));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      n_seconds_time(20));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
   gpr_free(host);
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index 8186c96..6ef8e9c 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -256,7 +256,8 @@
   while (!state.read_done || !state.write_done) {
     grpc_pollset_worker worker;
     GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
-    grpc_pollset_work(g_pollset, &worker, deadline);
+    grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      deadline);
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
 
@@ -353,7 +354,8 @@
         while (!write_st.done) {
           grpc_pollset_worker worker;
           GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
-          grpc_pollset_work(g_pollset, &worker, deadline);
+          grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                            deadline);
         }
         gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
         grpc_endpoint_destroy(write_st.ep);
@@ -361,7 +363,8 @@
         while (!read_st.done) {
           grpc_pollset_worker worker;
           GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
-          grpc_pollset_work(g_pollset, &worker, deadline);
+          grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                            deadline);
         }
         gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
         gpr_free(slices);
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index adcbcaf..8bba87d 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -250,7 +250,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (!sv->done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 }
@@ -358,7 +359,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (!cl->done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 }
@@ -448,7 +450,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (a.cb_that_ran == NULL) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   GPR_ASSERT(a.cb_that_ran == first_read_callback);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -467,7 +470,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (b.cb_that_ran == NULL) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   /* Except now we verify that second_read_callback ran instead */
   GPR_ASSERT(b.cb_that_ran == second_read_callback);
diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c
index 07bbe1f..dea0b33 100644
--- a/test/core/iomgr/tcp_client_posix_test.c
+++ b/test/core/iomgr/tcp_client_posix_test.c
@@ -112,7 +112,8 @@
 
   while (g_connections_complete == connections_complete_before) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
   }
 
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -142,7 +143,8 @@
   /* wait for the connection callback to finish */
   while (g_connections_complete == connections_complete_before) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, test_deadline());
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      test_deadline());
   }
 
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -211,7 +213,8 @@
       GPR_ASSERT(g_connections_complete ==
                  connections_complete_before + is_after_deadline);
     }
-    grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
 
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 17a85ce..6ad8322 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -187,7 +187,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (state.read_bytes < state.target_read_bytes) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, deadline);
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      deadline);
   }
   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -224,7 +225,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
   while (state.read_bytes < state.target_read_bytes) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&g_pollset, &worker, deadline);
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      deadline);
   }
   GPR_ASSERT(state.read_bytes == state.target_read_bytes);
   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -285,7 +287,8 @@
   for (;;) {
     grpc_pollset_worker worker;
     gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
-    grpc_pollset_work(&g_pollset, &worker, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
+    grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
     gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     do {
       bytes_read =
@@ -365,7 +368,8 @@
       if (state.write_done) {
         break;
       }
-      grpc_pollset_work(&g_pollset, &worker, deadline);
+      grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                        deadline);
     }
     gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
   }
@@ -422,7 +426,8 @@
         if (state.write_done) {
           break;
         }
-        grpc_pollset_work(&g_pollset, &worker, deadline);
+        grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                          deadline);
       }
       gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
       break;
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index b82d7c0..29a20cb 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -137,7 +137,8 @@
     while (g_nconnects == nconnects_before &&
            gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
       grpc_pollset_worker worker;
-      grpc_pollset_work(&g_pollset, &worker, deadline);
+      grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                        deadline);
     }
     gpr_log(GPR_DEBUG, "wait done");
 
diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c
index 74b7895..471d5b5 100644
--- a/test/core/iomgr/udp_server_test.c
+++ b/test/core/iomgr/udp_server_test.c
@@ -49,8 +49,7 @@
 static int g_number_of_reads = 0;
 static int g_number_of_bytes_read = 0;
 
-static void on_connect(void *arg, grpc_endpoint *udp) {
-}
+static void on_connect(void *arg, grpc_endpoint *udp) {}
 
 static void on_read(int fd, grpc_udp_server_cb new_transport_cb, void *cb_arg) {
   char read_buffer[512];
@@ -122,7 +121,8 @@
 
   memset(&addr, 0, sizeof(addr));
   addr.ss_family = AF_INET;
-  GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, addr_len, on_read));
+  GPR_ASSERT(
+      grpc_udp_server_add_port(s, (struct sockaddr *)&addr, addr_len, on_read));
 
   svrfd = grpc_udp_server_get_fd(s, 0);
   GPR_ASSERT(svrfd >= 0);
@@ -146,7 +146,8 @@
     while (g_number_of_reads == number_of_reads_before &&
            gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
       grpc_pollset_worker worker;
-      grpc_pollset_work(&g_pollset, &worker, deadline);
+      grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                        deadline);
     }
     GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1);
     close(clifd);
diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c
index 990855a..7df6fad 100644
--- a/test/core/security/oauth2_utils.c
+++ b/test/core/security/oauth2_utils.c
@@ -85,7 +85,7 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset));
   while (!request.is_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&request.pollset, &worker,
+    grpc_pollset_work(&request.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
                       gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset));
diff --git a/test/core/security/print_google_default_creds_token.c b/test/core/security/print_google_default_creds_token.c
index 7238efb..129b19b 100644
--- a/test/core/security/print_google_default_creds_token.c
+++ b/test/core/security/print_google_default_creds_token.c
@@ -97,8 +97,8 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset));
   while (!sync.is_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&sync.pollset, &worker,
-                      gpr_inf_future(GPR_CLOCK_REALTIME));
+    grpc_pollset_work(&sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset));
 
diff --git a/test/core/security/verify_jwt.c b/test/core/security/verify_jwt.c
index 69bbc3c..8c401e4 100644
--- a/test/core/security/verify_jwt.c
+++ b/test/core/security/verify_jwt.c
@@ -111,7 +111,7 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset));
   while (!sync.is_done) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&sync.pollset, &worker,
+    grpc_pollset_work(&sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
                       gpr_inf_future(GPR_CLOCK_MONOTONIC));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset));
diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c
index 9bff18d..836e62a 100644
--- a/test/core/util/port_posix.c
+++ b/test/core/util/port_posix.c
@@ -66,9 +66,7 @@
   return 0;
 }
 
-static void free_chosen_ports() {
-  gpr_free(chosen_ports);
-}
+static void free_chosen_ports() { gpr_free(chosen_ports); }
 
 static void chose_port(int port) {
   if (chosen_ports == NULL) {
@@ -180,7 +178,7 @@
   gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset));
   while (pr.port == -1) {
     grpc_pollset_worker worker;
-    grpc_pollset_work(&pr.pollset, &worker,
+    grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
                       GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
   }
   gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset));
@@ -206,7 +204,8 @@
 
   /* Type of port to first pick in next iteration */
   int is_tcp = 1;
-  int try = 0;
+  int try
+    = 0;
 
   char *env = gpr_getenv("GRPC_TEST_PORT_SERVER");
   if (env) {
@@ -219,7 +218,8 @@
 
   for (;;) {
     int port;
-    try++;
+    try
+      ++;
     if (try == 1) {
       port = getpid() % (65536 - 30000) + 30000;
     } else if (try <= NUM_RANDOM_PORTS_TO_PICK) {
diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c
index 2a21133..a06cb50 100644
--- a/test/core/util/reconnect_server.c
+++ b/test/core/util/reconnect_server.c
@@ -134,7 +134,8 @@
       gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
                    gpr_time_from_seconds(seconds, GPR_TIMESPAN));
   gpr_mu_lock(GRPC_POLLSET_MU(&server->pollset));
-  grpc_pollset_work(&server->pollset, &worker, deadline);
+  grpc_pollset_work(&server->pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+                    deadline);
   gpr_mu_unlock(GRPC_POLLSET_MU(&server->pollset));
 }
 
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index f00d19e..9a9cca0 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -56,6 +56,10 @@
 #include <grpc/support/thd.h>
 #include <grpc/support/time.h>
 
+#ifdef GPR_POSIX_SOCKET
+#include "src/core/iomgr/pollset_posix.h"
+#endif
+
 using grpc::cpp::test::util::EchoRequest;
 using grpc::cpp::test::util::EchoResponse;
 using std::chrono::system_clock;
@@ -65,36 +69,102 @@
 
 namespace {
 
-void* tag(int i) { return (void*)(gpr_intptr) i; }
+void* tag(int i) { return (void*)(gpr_intptr)i; }
 
-class Verifier {
+#ifdef GPR_POSIX_SOCKET
+static int assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
+                                    int timeout) {
+  GPR_ASSERT(timeout == 0);
+  return poll(pfds, nfds, timeout);
+}
+
+class PollOverride {
  public:
+  PollOverride(grpc_poll_function_type f) {
+    prev_ = grpc_poll_function;
+    grpc_poll_function = f;
+  }
+
+  ~PollOverride() { grpc_poll_function = prev_; }
+
+ private:
+  grpc_poll_function_type prev_;
+};
+
+class PollingCheckRegion : public PollOverride {
+ public:
+  explicit PollingCheckRegion(bool allow_blocking)
+      : PollOverride(allow_blocking ? poll : assert_non_blocking_poll) {}
+};
+#else
+class PollingCheckRegion {
+ public:
+  explicit PollingCheckRegion(bool allow_blocking) {}
+};
+#endif
+
+class Verifier : public PollingCheckRegion {
+ public:
+  explicit Verifier(bool spin) : PollingCheckRegion(!spin), spin_(spin) {}
   Verifier& Expect(int i, bool expect_ok) {
     expectations_[tag(i)] = expect_ok;
     return *this;
   }
-  void Verify(CompletionQueue *cq) {
+  void Verify(CompletionQueue* cq) {
     GPR_ASSERT(!expectations_.empty());
     while (!expectations_.empty()) {
       bool ok;
       void* got_tag;
-      EXPECT_TRUE(cq->Next(&got_tag, &ok));
+      if (spin_) {
+        for (;;) {
+          auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
+          if (r == CompletionQueue::TIMEOUT) continue;
+          if (r == CompletionQueue::GOT_EVENT) break;
+          gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+          abort();
+        }
+      } else {
+        EXPECT_TRUE(cq->Next(&got_tag, &ok));
+      }
       auto it = expectations_.find(got_tag);
       EXPECT_TRUE(it != expectations_.end());
       EXPECT_EQ(it->second, ok);
       expectations_.erase(it);
     }
   }
-  void Verify(CompletionQueue *cq, std::chrono::system_clock::time_point deadline) {
+  void Verify(CompletionQueue* cq,
+              std::chrono::system_clock::time_point deadline) {
     if (expectations_.empty()) {
       bool ok;
-      void *got_tag;
-      EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), CompletionQueue::TIMEOUT);
+      void* got_tag;
+      if (spin_) {
+        while (std::chrono::system_clock::now() < deadline) {
+          EXPECT_EQ(
+              cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
+              CompletionQueue::TIMEOUT);
+        }
+      } else {
+        EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
+                  CompletionQueue::TIMEOUT);
+      }
     } else {
       while (!expectations_.empty()) {
         bool ok;
-        void *got_tag;
-        EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), CompletionQueue::GOT_EVENT);
+        void* got_tag;
+        if (spin_) {
+          for (;;) {
+            GPR_ASSERT(std::chrono::system_clock::now() < deadline);
+            auto r =
+                cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
+            if (r == CompletionQueue::TIMEOUT) continue;
+            if (r == CompletionQueue::GOT_EVENT) break;
+            gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+            abort();
+          }
+        } else {
+          EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
+                    CompletionQueue::GOT_EVENT);
+        }
         auto it = expectations_.find(got_tag);
         EXPECT_TRUE(it != expectations_.end());
         EXPECT_EQ(it->second, ok);
@@ -105,9 +175,10 @@
 
  private:
   std::map<void*, bool> expectations_;
+  bool spin_;
 };
 
-class AsyncEnd2endTest : public ::testing::Test {
+class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
  protected:
   AsyncEnd2endTest() {}
 
@@ -116,7 +187,8 @@
     server_address_ << "localhost:" << port;
     // Setup server
     ServerBuilder builder;
-    builder.AddListeningPort(server_address_.str(), grpc::InsecureServerCredentials());
+    builder.AddListeningPort(server_address_.str(),
+                             grpc::InsecureServerCredentials());
     builder.RegisterAsyncService(&service_);
     cq_ = builder.AddCompletionQueue();
     server_ = builder.BuildAndStart();
@@ -153,18 +225,18 @@
       std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
           stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
 
-      service_.RequestEcho(&srv_ctx, &recv_request, &response_writer,
-                           cq_.get(), cq_.get(), tag(2));
+      service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
+                           cq_.get(), tag(2));
 
-      Verifier().Expect(2, true).Verify(cq_.get());
+      Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
       EXPECT_EQ(send_request.message(), recv_request.message());
 
       send_response.set_message(recv_request.message());
       response_writer.Finish(send_response, Status::OK, tag(3));
-      Verifier().Expect(3, true).Verify(cq_.get());
+      Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
       response_reader->Finish(&recv_response, &recv_status, tag(4));
-      Verifier().Expect(4, true).Verify(cq_.get());
+      Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
 
       EXPECT_EQ(send_response.message(), recv_response.message());
       EXPECT_TRUE(recv_status.ok());
@@ -178,18 +250,18 @@
   std::ostringstream server_address_;
 };
 
-TEST_F(AsyncEnd2endTest, SimpleRpc) {
+TEST_P(AsyncEnd2endTest, SimpleRpc) {
   ResetStub();
   SendRpc(1);
 }
 
-TEST_F(AsyncEnd2endTest, SequentialRpcs) {
+TEST_P(AsyncEnd2endTest, SequentialRpcs) {
   ResetStub();
   SendRpc(10);
 }
 
 // Test a simple RPC using the async version of Next
-TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
+TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
   ResetStub();
 
   EchoRequest send_request;
@@ -210,28 +282,32 @@
       std::chrono::system_clock::now());
   std::chrono::system_clock::time_point time_limit(
       std::chrono::system_clock::now() + std::chrono::seconds(10));
-  Verifier().Verify(cq_.get(), time_now);
-  Verifier().Verify(cq_.get(), time_now);
+  Verifier(GetParam()).Verify(cq_.get(), time_now);
+  Verifier(GetParam()).Verify(cq_.get(), time_now);
 
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
 
-  Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   send_response.set_message(recv_request.message());
   response_writer.Finish(send_response, Status::OK, tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get(), std::chrono::system_clock::time_point::max());
+  Verifier(GetParam())
+      .Expect(3, true)
+      .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
 
   response_reader->Finish(&recv_response, &recv_status, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get(), std::chrono::system_clock::time_point::max());
+  Verifier(GetParam())
+      .Expect(4, true)
+      .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
 }
 
 // Two pings and a final pong.
-TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
+TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
   ResetStub();
 
   EchoRequest send_request;
@@ -247,44 +323,44 @@
   std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream(
       stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
 
-  service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(),
-                                cq_.get(), tag(2));
+  service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+                                tag(2));
 
-  Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
 
   cli_stream->Write(send_request, tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   cli_stream->Write(send_request, tag(5));
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(6));
-  Verifier().Expect(6, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
 
   EXPECT_EQ(send_request.message(), recv_request.message());
   cli_stream->WritesDone(tag(7));
-  Verifier().Expect(7, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(8));
-  Verifier().Expect(8, false).Verify(cq_.get());
+  Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
 
   send_response.set_message(recv_request.message());
   srv_stream.Finish(send_response, Status::OK, tag(9));
-  Verifier().Expect(9, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
 
   cli_stream->Finish(&recv_status, tag(10));
-  Verifier().Expect(10, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
 }
 
 // One ping, two pongs.
-TEST_F(AsyncEnd2endTest, SimpleServerStreaming) {
+TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
   ResetStub();
 
   EchoRequest send_request;
@@ -303,38 +379,38 @@
   service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
                                  cq_.get(), cq_.get(), tag(2));
 
-  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   send_response.set_message(recv_request.message());
   srv_stream.Write(send_response, tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   cli_stream->Read(&recv_response, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
   EXPECT_EQ(send_response.message(), recv_response.message());
 
   srv_stream.Write(send_response, tag(5));
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
 
   cli_stream->Read(&recv_response, tag(6));
-  Verifier().Expect(6, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
   EXPECT_EQ(send_response.message(), recv_response.message());
 
   srv_stream.Finish(Status::OK, tag(7));
-  Verifier().Expect(7, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
 
   cli_stream->Read(&recv_response, tag(8));
-  Verifier().Expect(8, false).Verify(cq_.get());
+  Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
 
   cli_stream->Finish(&recv_status, tag(9));
-  Verifier().Expect(9, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
 
   EXPECT_TRUE(recv_status.ok());
 }
 
 // One ping, one pong.
-TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) {
+TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
   ResetStub();
 
   EchoRequest send_request;
@@ -350,43 +426,43 @@
   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> >
       cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
 
-  service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(),
-                             cq_.get(), tag(2));
+  service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
+                             tag(2));
 
-  Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
 
   cli_stream->Write(send_request, tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   send_response.set_message(recv_request.message());
   srv_stream.Write(send_response, tag(5));
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
 
   cli_stream->Read(&recv_response, tag(6));
-  Verifier().Expect(6, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
   EXPECT_EQ(send_response.message(), recv_response.message());
 
   cli_stream->WritesDone(tag(7));
-  Verifier().Expect(7, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
 
   srv_stream.Read(&recv_request, tag(8));
-  Verifier().Expect(8, false).Verify(cq_.get());
+  Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
 
   srv_stream.Finish(Status::OK, tag(9));
-  Verifier().Expect(9, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
 
   cli_stream->Finish(&recv_status, tag(10));
-  Verifier().Expect(10, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
 
   EXPECT_TRUE(recv_status.ok());
 }
 
 // Metadata tests
-TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
+TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
   ResetStub();
 
   EchoRequest send_request;
@@ -410,7 +486,7 @@
 
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
   auto client_initial_metadata = srv_ctx.client_metadata();
   EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
@@ -420,16 +496,16 @@
   send_response.set_message(recv_request.message());
   response_writer.Finish(send_response, Status::OK, tag(3));
 
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   response_reader->Finish(&recv_response, &recv_status, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
 }
 
-TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
+TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
   ResetStub();
 
   EchoRequest send_request;
@@ -451,15 +527,15 @@
 
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
   srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
   srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
   response_writer.SendInitialMetadata(tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   response_reader->ReadInitialMetadata(tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
   auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
   EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
   EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
@@ -467,16 +543,16 @@
 
   send_response.set_message(recv_request.message());
   response_writer.Finish(send_response, Status::OK, tag(5));
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
 
   response_reader->Finish(&recv_response, &recv_status, tag(6));
-  Verifier().Expect(6, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
 }
 
-TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
+TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
   ResetStub();
 
   EchoRequest send_request;
@@ -498,20 +574,20 @@
 
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
   response_writer.SendInitialMetadata(tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
 
   send_response.set_message(recv_request.message());
   srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
   srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
   response_writer.Finish(send_response, Status::OK, tag(4));
 
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
 
   response_reader->Finish(&recv_response, &recv_status, tag(5));
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
   auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -520,7 +596,7 @@
   EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
 }
 
-TEST_F(AsyncEnd2endTest, MetadataRpc) {
+TEST_P(AsyncEnd2endTest, MetadataRpc) {
   ResetStub();
 
   EchoRequest send_request;
@@ -537,18 +613,17 @@
   std::pair<grpc::string, grpc::string> meta1("key1", "val1");
   std::pair<grpc::string, grpc::string> meta2(
       "key2-bin",
-      grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc",
-		   13));
+      grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
   std::pair<grpc::string, grpc::string> meta3("key3", "val3");
   std::pair<grpc::string, grpc::string> meta6(
       "key4-bin",
       grpc::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
-		   14));
+                   14));
   std::pair<grpc::string, grpc::string> meta5("key5", "val5");
   std::pair<grpc::string, grpc::string> meta4(
       "key6-bin",
-      grpc::string("\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee",
-		   15));
+      grpc::string(
+          "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
 
   cli_ctx.AddMetadata(meta1.first, meta1.second);
   cli_ctx.AddMetadata(meta2.first, meta2.second);
@@ -558,7 +633,7 @@
 
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
   auto client_initial_metadata = srv_ctx.client_metadata();
   EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
@@ -568,9 +643,9 @@
   srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
   srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
   response_writer.SendInitialMetadata(tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
   response_reader->ReadInitialMetadata(tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
   auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
   EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
   EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
@@ -581,10 +656,10 @@
   srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
   response_writer.Finish(send_response, Status::OK, tag(5));
 
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
 
   response_reader->Finish(&recv_response, &recv_status, tag(6));
-  Verifier().Expect(6, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
   auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -594,7 +669,7 @@
 }
 
 // Server uses AsyncNotifyWhenDone API to check for cancellation
-TEST_F(AsyncEnd2endTest, ServerCheckCancellation) {
+TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
   ResetStub();
 
   EchoRequest send_request;
@@ -615,21 +690,21 @@
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
 
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   cli_ctx.TryCancel();
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
   EXPECT_TRUE(srv_ctx.IsCancelled());
 
   response_reader->Finish(&recv_response, &recv_status, tag(4));
-  Verifier().Expect(4, false).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
 
   EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
 }
 
 // Server uses AsyncNotifyWhenDone API to check for normal finish
-TEST_F(AsyncEnd2endTest, ServerCheckDone) {
+TEST_P(AsyncEnd2endTest, ServerCheckDone) {
   ResetStub();
 
   EchoRequest send_request;
@@ -650,23 +725,23 @@
   service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
                        cq_.get(), tag(2));
 
-  Verifier().Expect(2, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
   EXPECT_EQ(send_request.message(), recv_request.message());
 
   send_response.set_message(recv_request.message());
   response_writer.Finish(send_response, Status::OK, tag(3));
-  Verifier().Expect(3, true).Verify(cq_.get());
-  Verifier().Expect(5, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
   EXPECT_FALSE(srv_ctx.IsCancelled());
 
   response_reader->Finish(&recv_response, &recv_status, tag(4));
-  Verifier().Expect(4, true).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
 
   EXPECT_EQ(send_response.message(), recv_response.message());
   EXPECT_TRUE(recv_status.ok());
 }
 
-TEST_F(AsyncEnd2endTest, UnimplementedRpc) {
+TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
   std::shared_ptr<ChannelInterface> channel = CreateChannel(
       server_address_.str(), InsecureCredentials(), ChannelArguments());
   std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub;
@@ -682,12 +757,15 @@
       stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
 
   response_reader->Finish(&recv_response, &recv_status, tag(4));
-  Verifier().Expect(4, false).Verify(cq_.get());
+  Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
 
   EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
   EXPECT_EQ("", recv_status.error_message());
 }
 
+INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
+                        ::testing::Values(false, true));
+
 }  // namespace
 }  // namespace testing
 }  // namespace grpc