Merge branch 'master' into server_channel_affinity
diff --git a/grpc.def b/grpc.def
index 61948ed..3477bd9 100644
--- a/grpc.def
+++ b/grpc.def
@@ -77,6 +77,7 @@
     grpc_server_request_registered_call
     grpc_server_create
     grpc_server_register_completion_queue
+    grpc_server_register_non_listening_completion_queue
     grpc_server_add_insecure_http2_port
     grpc_server_start
     grpc_server_shutdown_and_notify
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h
index 56864d6..1b84b44 100644
--- a/include/grpc++/impl/codegen/completion_queue.h
+++ b/include/grpc++/impl/codegen/completion_queue.h
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -222,9 +222,18 @@
 /// A specific type of completion queue used by the processing of notifications
 /// by servers. Instantiated by \a ServerBuilder.
 class ServerCompletionQueue : public CompletionQueue {
+ public:
+  bool IsFrequentlyPolled() { return is_frequently_polled_; }
+
  private:
+  bool is_frequently_polled_;
   friend class ServerBuilder;
-  ServerCompletionQueue() {}
+  /// \param is_frequently_polled Informs the GPRC library about whether the
+  /// server completion queue would be actively polled (by calling Next() or
+  /// AsyncNext()). By default all server completion queues are assumed to be
+  /// frequently polled.
+  ServerCompletionQueue(bool is_frequently_polled = true)
+      : is_frequently_polled_(is_frequently_polled) {}
 };
 
 }  // namespace grpc
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 86c7fec..5275bd3 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -102,7 +102,15 @@
   /// Add a completion queue for handling asynchronous services
   /// Caller is required to keep this completion queue live until
   /// the server is destroyed.
-  std::unique_ptr<ServerCompletionQueue> AddCompletionQueue();
+  ///
+  /// \param is_frequently_polled This is an optional parameter to inform GRPC
+  /// library about whether this completion queue would be frequently polled
+  /// (i.e by calling Next() or AsyncNext()). The default value is 'true' and is
+  /// the recommended setting. Setting this to 'false' (i.e not polling the
+  /// completion queue frequently) will have a significantly negative
+  /// performance impact and hence should not be used in production use cases.
+  std::unique_ptr<ServerCompletionQueue> AddCompletionQueue(
+      bool is_frequently_polled = true);
 
   /// Return a running server which is ready for processing calls.
   std::unique_ptr<Server> BuildAndStart();
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 0ca28c0..6f7a67b 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -334,6 +334,15 @@
                                                    grpc_completion_queue *cq,
                                                    void *reserved);
 
+/** Register a non-listening completion queue with the server. This API is
+    similar to grpc_server_register_completion_queue except that the server will
+    not use this completion_queue to listen to any incoming channels.
+
+    Registering a non-listening completion queue will have negative performance
+    impact and hence this API is not recommended for production use cases. */
+GRPCAPI void grpc_server_register_non_listening_completion_queue(
+    grpc_server *server, grpc_completion_queue *q, void *reserved);
+
 /** Add a HTTP2 over plaintext over tcp listener.
     Returns bound port number on success, 0 on failure.
     REQUIRES: server not started */
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 3c8127e..5800b37 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -126,6 +126,9 @@
   grpc_closure *on_done_closure;
 
   grpc_iomgr_object iomgr_object;
+
+  /* The pollset that last noticed and notified that the fd is readable */
+  grpc_pollset *read_notifier_pollset;
 };
 
 /* Begin polling on an fd.
@@ -147,7 +150,8 @@
    if got_read or got_write are 1, also does the become_{readable,writable} as
    appropriate. */
 static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
-                        int got_read, int got_write);
+                        int got_read, int got_write,
+                        grpc_pollset *read_notifier_pollset);
 
 /* Return 1 if this fd is orphaned, 0 otherwise */
 static bool fd_is_orphaned(grpc_fd *fd);
@@ -342,6 +346,7 @@
   r->on_done_closure = NULL;
   r->closed = 0;
   r->released = 0;
+  r->read_notifier_pollset = NULL;
   gpr_mu_unlock(&r->mu);
   return r;
 }
@@ -545,6 +550,11 @@
   }
 }
 
+static void set_read_notifier_pollset_locked(
+    grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) {
+  fd->read_notifier_pollset = read_notifier_pollset;
+}
+
 static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
   gpr_mu_lock(&fd->mu);
   GPR_ASSERT(!fd->shutdown);
@@ -568,6 +578,18 @@
   gpr_mu_unlock(&fd->mu);
 }
 
+/* Return the read-notifier pollset */
+static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+                                                  grpc_fd *fd) {
+  grpc_pollset *notifier = NULL;
+
+  gpr_mu_lock(&fd->mu);
+  notifier = fd->read_notifier_pollset;
+  gpr_mu_unlock(&fd->mu);
+
+  return notifier;
+}
+
 static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
                               grpc_pollset_worker *worker, uint32_t read_mask,
                               uint32_t write_mask, grpc_fd_watcher *watcher) {
@@ -620,7 +642,8 @@
 }
 
 static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
-                        int got_read, int got_write) {
+                        int got_read, int got_write,
+                        grpc_pollset *read_notifier_pollset) {
   int was_polling = 0;
   int kick = 0;
   grpc_fd *fd = watcher->fd;
@@ -656,6 +679,10 @@
     if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
       kick = 1;
     }
+
+    if (read_notifier_pollset != NULL) {
+      set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+    }
   }
   if (got_write) {
     if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
@@ -756,9 +783,14 @@
     specific_worker = pop_front_worker(p);
     if (specific_worker != NULL) {
       if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
+        /* Prefer not to kick self. Push the worker to the end of the list and
+         * pop the one from front */
         GPR_TIMER_MARK("kick_anonymous_not_self", 0);
         push_back_worker(p, specific_worker);
         specific_worker = pop_front_worker(p);
+        /* If there was only one worker on the pollset, we would get the same
+         * worker we pushed (the one set on current thread local) back. If so,
+         * kick it only if GRPC_POLLSET_CAN_KICK_SELF flag is set */
         if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
             gpr_tls_get(&g_current_thread_worker) ==
                 (intptr_t)specific_worker) {
@@ -1203,11 +1235,11 @@
       gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
     }
     if (fd) {
-      fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+      fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
     }
   } else if (r == 0) {
     if (fd) {
-      fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+      fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
     }
   } else {
     if (pfd[0].revents & POLLIN_CHECK) {
@@ -1218,9 +1250,9 @@
     }
     if (nfds > 2) {
       fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
-                  pfd[2].revents & POLLOUT_CHECK);
+                  pfd[2].revents & POLLOUT_CHECK, pollset);
     } else if (fd) {
-      fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+      fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL);
     }
   }
 
@@ -1356,11 +1388,11 @@
       gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
     }
     for (i = 2; i < pfd_count; i++) {
-      fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+      fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
     }
   } else if (r == 0) {
     for (i = 2; i < pfd_count; i++) {
-      fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+      fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
     }
   } else {
     if (pfds[0].revents & POLLIN_CHECK) {
@@ -1371,11 +1403,11 @@
     }
     for (i = 2; i < pfd_count; i++) {
       if (watchers[i].fd == NULL) {
-        fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+        fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
         continue;
       }
       fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
-                  pfds[i].revents & POLLOUT_CHECK);
+                  pfds[i].revents & POLLOUT_CHECK, pollset);
     }
   }
 
@@ -1451,20 +1483,31 @@
 #include "src/core/lib/profiling/timers.h"
 #include "src/core/lib/support/block_annotate.h"
 
-static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st,
+                      grpc_pollset *read_notifier_pollset) {
   /* only one set_ready can be active at once (but there may be a racing
      notify_on) */
   gpr_mu_lock(&fd->mu);
   set_ready_locked(exec_ctx, fd, st);
+
+  /* A non-NULL read_notifier_pollset means that the fd is readable. */
+  if (read_notifier_pollset != NULL) {
+    /* Note: Since the fd might be a part of multiple pollsets, this might be
+     * called multiple times (for each time the fd becomes readable) and it is
+     * okay to set the fd's read-notifier pollset to anyone of these pollsets */
+    set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset);
+  }
+
   gpr_mu_unlock(&fd->mu);
 }
 
-static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
-  set_ready(exec_ctx, fd, &fd->read_closure);
+static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+                               grpc_pollset *notifier_pollset) {
+  set_ready(exec_ctx, fd, &fd->read_closure, notifier_pollset);
 }
 
 static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
-  set_ready(exec_ctx, fd, &fd->write_closure);
+  set_ready(exec_ctx, fd, &fd->write_closure, NULL);
 }
 
 struct epoll_fd_list {
@@ -1556,7 +1599,7 @@
       }
     }
   }
-  fd_end_poll(exec_ctx, &watcher, 0, 0);
+  fd_end_poll(exec_ctx, &watcher, 0, 0, NULL);
 }
 
 static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
@@ -1670,7 +1713,7 @@
               grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
             } else {
               if (read_ev || cancel) {
-                fd_become_readable(exec_ctx, fd);
+                fd_become_readable(exec_ctx, fd, pollset);
               }
               if (write_ev || cancel) {
                 fd_become_writable(exec_ctx, fd);
@@ -1899,6 +1942,7 @@
     .fd_shutdown = fd_shutdown,
     .fd_notify_on_read = fd_notify_on_read,
     .fd_notify_on_write = fd_notify_on_write,
+    .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
 
     .pollset_init = pollset_init,
     .pollset_shutdown = pollset_shutdown,
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 7df1751..1fa02e1 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -82,6 +82,11 @@
   g_event_engine->fd_notify_on_write(exec_ctx, fd, closure);
 }
 
+grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+                                                grpc_fd *fd) {
+  return g_event_engine->fd_get_read_notifier_pollset(exec_ctx, fd);
+}
+
 size_t grpc_pollset_size(void) { return g_event_engine->pollset_size; }
 
 void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 1fa9f5e..344bf63 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -55,6 +55,8 @@
                             grpc_closure *closure);
   void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                              grpc_closure *closure);
+  grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
+                                                grpc_fd *fd);
 
   void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu);
   void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -137,6 +139,10 @@
 void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                              grpc_closure *closure);
 
+/* Return the read notifier pollset from the fd */
+grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
+                                                grpc_fd *fd);
+
 /* pollset_posix functions */
 
 /* Add an fd to a pollset */
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index aaeb384..97c945b 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -310,13 +310,15 @@
   grpc_tcp_listener *sp = arg;
   grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
                                        sp->fd_index};
+  grpc_pollset *read_notifier_pollset = NULL;
   grpc_fd *fdobj;
-  size_t i;
 
   if (!success) {
     goto error;
   }
 
+  read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd);
+
   /* loop until accept4 returns EAGAIN, and then re-arm notification */
   for (;;) {
     struct sockaddr_storage addr;
@@ -349,12 +351,14 @@
     }
 
     fdobj = grpc_fd_create(fd, name);
-    /* TODO(ctiller): revise this when we have server-side sharding
-       of channels -- we certainly should not be automatically adding every
-       incoming channel to every pollset owned by the server */
-    for (i = 0; i < sp->server->pollset_count; i++) {
-      grpc_pollset_add_fd(exec_ctx, sp->server->pollsets[i], fdobj);
+
+    if (read_notifier_pollset == NULL) {
+      gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
+      goto error;
     }
+
+    grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
+
     sp->server->on_accept_cb(
         exec_ctx, sp->server->on_accept_cb_arg,
         grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 1f82c3b..ae78f8f 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -70,6 +70,8 @@
   int shutdown;
   int shutdown_called;
   int is_server_cq;
+  /** Can the server cq accept incoming channels */
+  int is_non_listening_server_cq;
   int num_pluckers;
   plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
   grpc_closure pollset_shutdown_done;
@@ -149,6 +151,7 @@
   cc->shutdown = 0;
   cc->shutdown_called = 0;
   cc->is_server_cq = 0;
+  cc->is_non_listening_server_cq = 0;
   cc->num_pluckers = 0;
 #ifndef NDEBUG
   cc->outstanding_tag_count = 0;
@@ -511,6 +514,14 @@
   return POLLSET_FROM_CQ(cc);
 }
 
+void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
+  cc->is_non_listening_server_cq = 1;
+}
+
+bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
+  return (cc->is_non_listening_server_cq == 1);
+}
+
 void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
 
 int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index eef82cf..1528ca4 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -82,6 +82,8 @@
 
 grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
 
+void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc);
+bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
 void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
 int grpc_cq_is_server_cq(grpc_completion_queue *cc);
 
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 2db95b9..c9b458f 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -895,23 +895,45 @@
     "server",
 };
 
-void grpc_server_register_completion_queue(grpc_server *server,
-                                           grpc_completion_queue *cq,
-                                           void *reserved) {
+static void register_completion_queue(grpc_server *server,
+                                      grpc_completion_queue *cq,
+                                      bool is_non_listening, void *reserved) {
   size_t i, n;
-  GRPC_API_TRACE(
-      "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
-      (server, cq, reserved));
   GPR_ASSERT(!reserved);
   for (i = 0; i < server->cq_count; i++) {
     if (server->cqs[i] == cq) return;
   }
-  GRPC_CQ_INTERNAL_REF(cq, "server");
+
   grpc_cq_mark_server_cq(cq);
-  n = server->cq_count++;
-  server->cqs = gpr_realloc(server->cqs,
-                            server->cq_count * sizeof(grpc_completion_queue *));
-  server->cqs[n] = cq;
+
+  /* Non-listening completion queues are not added to server->cqs */
+  if (is_non_listening) {
+    grpc_cq_mark_non_listening_server_cq(cq);
+  } else {
+    GRPC_CQ_INTERNAL_REF(cq, "server");
+    n = server->cq_count++;
+    server->cqs = gpr_realloc(
+        server->cqs, server->cq_count * sizeof(grpc_completion_queue *));
+    server->cqs[n] = cq;
+  }
+}
+
+void grpc_server_register_completion_queue(grpc_server *server,
+                                           grpc_completion_queue *cq,
+                                           void *reserved) {
+  GRPC_API_TRACE(
+      "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
+      (server, cq, reserved));
+  register_completion_queue(server, cq, false, reserved);
+}
+
+void grpc_server_register_non_listening_completion_queue(
+    grpc_server *server, grpc_completion_queue *cq, void *reserved) {
+  GRPC_API_TRACE(
+      "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, "
+      "reserved=%p)",
+      3, (server, cq, reserved));
+  register_completion_queue(server, cq, true, reserved);
 }
 
 grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
@@ -1018,7 +1040,6 @@
 void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
                                  grpc_transport *transport,
                                  const grpc_channel_args *args) {
-  size_t i;
   size_t num_registered_methods;
   size_t alloc;
   registered_method *rm;
@@ -1033,12 +1054,6 @@
   uint32_t max_probes = 0;
   grpc_transport_op op;
 
-  for (i = 0; i < s->cq_count; i++) {
-    memset(&op, 0, sizeof(op));
-    op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
-    grpc_transport_perform_op(exec_ctx, transport, &op);
-  }
-
   channel =
       grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport);
   chand = (channel_data *)grpc_channel_stack_element(
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 68cc382..9cd7cb2 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -1,6 +1,6 @@
 /*
  *
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -46,8 +46,9 @@
   grpc_compression_options_init(&compression_options_);
 }
 
-std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
-  ServerCompletionQueue* cq = new ServerCompletionQueue();
+std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
+    bool is_frequently_polled) {
+  ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled);
   cqs_.push_back(cq);
   return std::unique_ptr<ServerCompletionQueue>(cq);
 }
@@ -85,8 +86,11 @@
 
 std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
   std::unique_ptr<ThreadPoolInterface> thread_pool;
+  // Does this server have atleast one sync method
+  bool has_sync_methods = false;
   for (auto it = services_.begin(); it != services_.end(); ++it) {
     if ((*it)->service->has_synchronous_methods()) {
+      has_sync_methods = true;
       if (thread_pool == nullptr) {
         thread_pool.reset(CreateDefaultThreadPool());
         break;
@@ -104,10 +108,33 @@
               compression_options_.enabled_algorithms_bitset);
   std::unique_ptr<Server> server(
       new Server(thread_pool.release(), true, max_message_size_, &args));
+
+  // If the server has atleast one sync methods, we know that this is a Sync
+  // server or a Hybrid server and the completion queue (server->cq_) would be
+  // frequently polled.
+  int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
+
   for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
-    grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
-                                          nullptr);
+    // A completion queue that is not polled frequently (by calling Next() or
+    // AsyncNext()) is not safe to use for listening to incoming channels.
+    // Register all such completion queues as non-listening completion queues
+    // with the GRPC core library.
+    if ((*cq)->IsFrequentlyPolled()) {
+      grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+                                            nullptr);
+      num_frequently_polled_cqs++;
+    } else {
+      grpc_server_register_non_listening_completion_queue(server->server_,
+                                                          (*cq)->cq(), nullptr);
+    }
   }
+
+  if (num_frequently_polled_cqs == 0) {
+    gpr_log(GPR_ERROR,
+            "Atleast one of the completion queues must be frequently polled");
+    return nullptr;
+  }
+
   for (auto service = services_.begin(); service != services_.end();
        service++) {
     if (!server->RegisterService((*service)->host.get(), (*service)->service)) {
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c
index f0a40db..17a9004 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.c
+++ b/src/python/grpcio/grpc/_cython/imports.generated.c
@@ -115,6 +115,7 @@
 grpc_server_request_registered_call_type grpc_server_request_registered_call_import;
 grpc_server_create_type grpc_server_create_import;
 grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
+grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
 grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
 grpc_server_start_type grpc_server_start_import;
 grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import;
@@ -385,6 +386,7 @@
   grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call");
   grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create");
   grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue");
+  grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue");
   grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port");
   grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start");
   grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify");
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h
index d5e810b..53ebe49 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.h
+++ b/src/python/grpcio/grpc/_cython/imports.generated.h
@@ -295,6 +295,9 @@
 typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved);
 extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
 #define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
+typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved);
+extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
+#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import
 typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr);
 extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
 #define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index bc43f9d..3b18e41 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -115,6 +115,7 @@
 grpc_server_request_registered_call_type grpc_server_request_registered_call_import;
 grpc_server_create_type grpc_server_create_import;
 grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
+grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
 grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
 grpc_server_start_type grpc_server_start_import;
 grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import;
@@ -381,6 +382,7 @@
   grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call");
   grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create");
   grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue");
+  grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue");
   grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port");
   grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start");
   grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index b67361c..f2f118a 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -295,6 +295,9 @@
 typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved);
 extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import;
 #define grpc_server_register_completion_queue grpc_server_register_completion_queue_import
+typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved);
+extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import;
+#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import
 typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr);
 extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import;
 #define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import
diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.c b/test/core/end2end/fixtures/h2_sockpair+trace.c
index 87533a9..b730df7 100644
--- a/test/core/end2end/fixtures/h2_sockpair+trace.c
+++ b/test/core/end2end/fixtures/h2_sockpair+trace.c
@@ -50,6 +50,7 @@
 #include "src/core/lib/iomgr/iomgr.h"
 #include "src/core/lib/support/env.h"
 #include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
 #include "src/core/lib/surface/server.h"
 #include "test/core/util/port.h"
 #include "test/core/util/test_config.h"
@@ -60,6 +61,8 @@
 static void server_setup_transport(void *ts, grpc_transport *transport) {
   grpc_end2end_test_fixture *f = ts;
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_endpoint_pair *sfd = f->fixture_data;
+  grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq));
   grpc_server_setup_transport(&exec_ctx, f->server, transport,
                               grpc_server_get_channel_args(f->server));
   grpc_exec_ctx_finish(&exec_ctx);
diff --git a/test/core/end2end/fixtures/h2_sockpair.c b/test/core/end2end/fixtures/h2_sockpair.c
index f28147c..41fcc1d 100644
--- a/test/core/end2end/fixtures/h2_sockpair.c
+++ b/test/core/end2end/fixtures/h2_sockpair.c
@@ -49,6 +49,7 @@
 #include "src/core/lib/iomgr/endpoint_pair.h"
 #include "src/core/lib/iomgr/iomgr.h"
 #include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
 #include "src/core/lib/surface/server.h"
 #include "test/core/util/port.h"
 #include "test/core/util/test_config.h"
@@ -59,6 +60,8 @@
 static void server_setup_transport(void *ts, grpc_transport *transport) {
   grpc_end2end_test_fixture *f = ts;
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_endpoint_pair *sfd = f->fixture_data;
+  grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq));
   grpc_server_setup_transport(&exec_ctx, f->server, transport,
                               grpc_server_get_channel_args(f->server));
   grpc_exec_ctx_finish(&exec_ctx);
diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c
index 302b16b..16ffb6e 100644
--- a/test/core/end2end/fixtures/h2_sockpair_1byte.c
+++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c
@@ -49,6 +49,7 @@
 #include "src/core/lib/iomgr/endpoint_pair.h"
 #include "src/core/lib/iomgr/iomgr.h"
 #include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
 #include "src/core/lib/surface/server.h"
 #include "test/core/util/port.h"
 #include "test/core/util/test_config.h"
@@ -59,6 +60,8 @@
 static void server_setup_transport(void *ts, grpc_transport *transport) {
   grpc_end2end_test_fixture *f = ts;
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_endpoint_pair *sfd = f->fixture_data;
+  grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq));
   grpc_server_setup_transport(&exec_ctx, f->server, transport,
                               grpc_server_get_channel_args(f->server));
   grpc_exec_ctx_finish(&exec_ctx);
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index f97f337..187720e 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -518,6 +518,134 @@
   grpc_pollset_destroy(p);
 }
 
+typedef struct read_notifier_test_fd_context {
+  grpc_fd *fd;
+  bool is_cb_called;
+} read_notifier_test_fd_context;
+
+static void read_notifier_test_callback(
+    grpc_exec_ctx *exec_ctx, void *arg /* (read_notifier_test_fd_context *) */,
+    bool success) {
+  read_notifier_test_fd_context *fd_context = arg;
+  grpc_fd *fd = fd_context->fd;
+
+  /* Verify that the read notifier pollset is set */
+  GPR_ASSERT(grpc_fd_get_read_notifier_pollset(exec_ctx, fd) != NULL);
+  fd_context->is_cb_called = true;
+}
+
+/* sv MUST to be an array of size 2 */
+static void get_socket_pair(int sv[]) {
+  int flags = 0;
+  GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
+  flags = fcntl(sv[0], F_GETFL, 0);
+  GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
+  flags = fcntl(sv[1], F_GETFL, 0);
+  GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
+}
+
+static grpc_pollset *create_grpc_pollset(gpr_mu **mu) {
+  grpc_pollset *pollset = gpr_malloc(grpc_pollset_size());
+  grpc_pollset_init(pollset, mu);
+  return pollset;
+}
+
+static void free_grpc_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
+  grpc_closure destroyed;
+  grpc_closure_init(&destroyed, destroy_pollset, pollset);
+  grpc_pollset_shutdown(exec_ctx, pollset, &destroyed);
+  grpc_exec_ctx_flush(exec_ctx);
+  gpr_free(pollset);
+}
+
+/* This tests that the read_notifier_pollset field of a grpc_fd is properly
+   set when the grpc_fd becomes readable
+   - This tests both basic and multi pollsets
+   - The parameter register_cb_after_read_event controls whether the on-read
+     callback registration (i.e the one done by grpc_fd_notify_on_read()) is
+     done either before or after the fd becomes readable
+ */
+static void test_grpc_fd_read_notifier_pollset(
+    bool register_cb_after_read_event) {
+  grpc_fd *em_fd[2];
+  int sv[2][2];
+  gpr_mu *mu[2];
+  grpc_pollset *pollset[2];
+  char data;
+  ssize_t result;
+  int i;
+  grpc_pollset_worker *worker;
+  read_notifier_test_fd_context fd_context;
+  grpc_closure on_read_closure;
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+  for (i = 0; i < 2; i++) {
+    pollset[i] = create_grpc_pollset(&mu[i]);
+    get_socket_pair(sv[i]); /* sv[i][0] & sv[i][1] will have the socket pair */
+    em_fd[i] = grpc_fd_create(sv[i][0], "test_grpc_fd_read_notifier_pollset");
+    grpc_pollset_add_fd(&exec_ctx, pollset[i], em_fd[i]);
+  }
+
+  /* At this point pollset[0] has em_fd[0] and pollset[1] has em_fd[1] and both
+     are basic pollsets. Make pollset[1] a multi-pollset by adding em_fd[0] to
+     it */
+  grpc_pollset_add_fd(&exec_ctx, pollset[1], em_fd[0]);
+  grpc_exec_ctx_flush(&exec_ctx);
+
+  /* The following tests that the read_notifier_pollset is correctly set on the
+     grpc_fd structure in both basic pollset and multi pollset cases.
+      pollset[0] is a basic pollset containing just em_fd[0]
+      pollset[1] is a multi pollset containing em_fd[0] and em_fd[1] */
+
+  for (i = 0; i < 2; i++) {
+    on_read_closure.cb = read_notifier_test_callback;
+    fd_context.fd = em_fd[i];
+    fd_context.is_cb_called = false;
+    on_read_closure.cb_arg = &fd_context;
+
+    if (!register_cb_after_read_event) {
+      /* Registering the callback BEFORE the fd is readable */
+      grpc_fd_notify_on_read(&exec_ctx, em_fd[i], &on_read_closure);
+    }
+
+    data = 0;
+    result = write(sv[i][1], &data, sizeof(data));
+    GPR_ASSERT(result == 1);
+
+    /* grpc_pollset_work requires the caller to hold the pollset mutex */
+    gpr_mu_lock(mu[i]);
+    worker = NULL;
+    grpc_pollset_work(&exec_ctx, pollset[i], &worker,
+                      gpr_now(GPR_CLOCK_MONOTONIC),
+                      gpr_inf_future(GPR_CLOCK_MONOTONIC));
+    gpr_mu_unlock(mu[i]);
+    grpc_exec_ctx_flush(&exec_ctx);
+
+    if (register_cb_after_read_event) {
+      /* Registering the callback after the fd is readable. In this case, the
+         callback should be executed right away. */
+      grpc_fd_notify_on_read(&exec_ctx, em_fd[i], &on_read_closure);
+      grpc_exec_ctx_flush(&exec_ctx);
+    }
+
+    /* The callback should have been called by now */
+    GPR_ASSERT(fd_context.is_cb_called);
+
+    /* Drain the socket (Not really needed for the test) */
+    result = read(sv[i][0], &data, 1);
+    GPR_ASSERT(result == 1);
+  }
+
+  /* Clean up */
+  for (i = 0; i < 2; i++) {
+    grpc_fd_orphan(&exec_ctx, em_fd[i], NULL, NULL, "");
+    close(sv[i][1]);
+    free_grpc_pollset(&exec_ctx, pollset[i]);
+  }
+
+  grpc_exec_ctx_finish(&exec_ctx);
+}
+
 int main(int argc, char **argv) {
   grpc_closure destroyed;
   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -527,6 +655,8 @@
   grpc_pollset_init(g_pollset, &g_mu);
   test_grpc_fd();
   test_grpc_fd_change();
+  test_grpc_fd_read_notifier_pollset(false);
+  test_grpc_fd_read_notifier_pollset(true);
   grpc_closure_init(&destroyed, destroy_pollset, g_pollset);
   grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
   grpc_exec_ctx_finish(&exec_ctx);
diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc
index 02043a8..0423448 100644
--- a/test/cpp/end2end/hybrid_end2end_test.cc
+++ b/test/cpp/end2end/hybrid_end2end_test.cc
@@ -216,7 +216,7 @@
     }
     // Create a separate cq for each potential handler.
     for (int i = 0; i < 5; i++) {
-      cqs_.push_back(builder.AddCompletionQueue());
+      cqs_.push_back(builder.AddCompletionQueue(false));
     }
     server_ = builder.BuildAndStart();
   }