Merge branch 'master' into rpc_mgr
diff --git a/BUILD b/BUILD
index 86cbf0c..b298615 100644
--- a/BUILD
+++ b/BUILD
@@ -1333,6 +1333,7 @@
     "src/cpp/common/channel_filter.h",
     "src/cpp/server/dynamic_thread_pool.h",
     "src/cpp/server/thread_pool_interface.h",
+    "src/cpp/thread_manager/thread_manager.h",
     "src/cpp/client/insecure_credentials.cc",
     "src/cpp/client/secure_credentials.cc",
     "src/cpp/common/auth_property_iterator.cc",
@@ -1361,6 +1362,7 @@
     "src/cpp/server/server_context.cc",
     "src/cpp/server/server_credentials.cc",
     "src/cpp/server/server_posix.cc",
+    "src/cpp/thread_manager/thread_manager.cc",
     "src/cpp/util/byte_buffer_cc.cc",
     "src/cpp/util/slice_cc.cc",
     "src/cpp/util/status.cc",
@@ -1485,6 +1487,7 @@
     "src/cpp/common/channel_filter.h",
     "src/cpp/server/dynamic_thread_pool.h",
     "src/cpp/server/thread_pool_interface.h",
+    "src/cpp/thread_manager/thread_manager.h",
     "src/cpp/client/cronet_credentials.cc",
     "src/cpp/client/insecure_credentials.cc",
     "src/cpp/common/insecure_create_auth_context.cc",
@@ -1509,6 +1512,7 @@
     "src/cpp/server/server_context.cc",
     "src/cpp/server/server_credentials.cc",
     "src/cpp/server/server_posix.cc",
+    "src/cpp/thread_manager/thread_manager.cc",
     "src/cpp/util/byte_buffer_cc.cc",
     "src/cpp/util/slice_cc.cc",
     "src/cpp/util/status.cc",
@@ -1708,6 +1712,7 @@
     "src/cpp/common/channel_filter.h",
     "src/cpp/server/dynamic_thread_pool.h",
     "src/cpp/server/thread_pool_interface.h",
+    "src/cpp/thread_manager/thread_manager.h",
     "src/cpp/client/insecure_credentials.cc",
     "src/cpp/common/insecure_create_auth_context.cc",
     "src/cpp/server/insecure_server_credentials.cc",
@@ -1731,6 +1736,7 @@
     "src/cpp/server/server_context.cc",
     "src/cpp/server/server_credentials.cc",
     "src/cpp/server/server_posix.cc",
+    "src/cpp/thread_manager/thread_manager.cc",
     "src/cpp/util/byte_buffer_cc.cc",
     "src/cpp/util/slice_cc.cc",
     "src/cpp/util/status.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index af1bb59..2b9e678 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1070,6 +1070,7 @@
   src/cpp/server/server_context.cc
   src/cpp/server/server_credentials.cc
   src/cpp/server/server_posix.cc
+  src/cpp/thread_manager/thread_manager.cc
   src/cpp/util/byte_buffer_cc.cc
   src/cpp/util/slice_cc.cc
   src/cpp/util/status.cc
@@ -1233,6 +1234,7 @@
   src/cpp/server/server_context.cc
   src/cpp/server/server_credentials.cc
   src/cpp/server/server_posix.cc
+  src/cpp/thread_manager/thread_manager.cc
   src/cpp/util/byte_buffer_cc.cc
   src/cpp/util/slice_cc.cc
   src/cpp/util/status.cc
@@ -1487,6 +1489,7 @@
   src/cpp/server/server_context.cc
   src/cpp/server/server_credentials.cc
   src/cpp/server/server_posix.cc
+  src/cpp/thread_manager/thread_manager.cc
   src/cpp/util/byte_buffer_cc.cc
   src/cpp/util/slice_cc.cc
   src/cpp/util/status.cc
diff --git a/Makefile b/Makefile
index 89b9f05..d3fd184 100644
--- a/Makefile
+++ b/Makefile
@@ -1085,6 +1085,7 @@
 status_test: $(BINDIR)/$(CONFIG)/status_test
 streaming_throughput_test: $(BINDIR)/$(CONFIG)/streaming_throughput_test
 stress_test: $(BINDIR)/$(CONFIG)/stress_test
+thread_manager_test: $(BINDIR)/$(CONFIG)/thread_manager_test
 thread_stress_test: $(BINDIR)/$(CONFIG)/thread_stress_test
 public_headers_must_be_c89: $(BINDIR)/$(CONFIG)/public_headers_must_be_c89
 boringssl_aes_test: $(BINDIR)/$(CONFIG)/boringssl_aes_test
@@ -1461,6 +1462,7 @@
   $(BINDIR)/$(CONFIG)/status_test \
   $(BINDIR)/$(CONFIG)/streaming_throughput_test \
   $(BINDIR)/$(CONFIG)/stress_test \
+  $(BINDIR)/$(CONFIG)/thread_manager_test \
   $(BINDIR)/$(CONFIG)/thread_stress_test \
   $(BINDIR)/$(CONFIG)/boringssl_aes_test \
   $(BINDIR)/$(CONFIG)/boringssl_asn1_test \
@@ -1549,6 +1551,7 @@
   $(BINDIR)/$(CONFIG)/status_test \
   $(BINDIR)/$(CONFIG)/streaming_throughput_test \
   $(BINDIR)/$(CONFIG)/stress_test \
+  $(BINDIR)/$(CONFIG)/thread_manager_test \
   $(BINDIR)/$(CONFIG)/thread_stress_test \
 
 endif
@@ -1861,6 +1864,8 @@
 	$(Q) $(BINDIR)/$(CONFIG)/status_test || ( echo test status_test failed ; exit 1 )
 	$(E) "[RUN]     Testing streaming_throughput_test"
 	$(Q) $(BINDIR)/$(CONFIG)/streaming_throughput_test || ( echo test streaming_throughput_test failed ; exit 1 )
+	$(E) "[RUN]     Testing thread_manager_test"
+	$(Q) $(BINDIR)/$(CONFIG)/thread_manager_test || ( echo test thread_manager_test failed ; exit 1 )
 	$(E) "[RUN]     Testing thread_stress_test"
 	$(Q) $(BINDIR)/$(CONFIG)/thread_stress_test || ( echo test thread_stress_test failed ; exit 1 )
 
@@ -3702,6 +3707,7 @@
     src/cpp/server/server_context.cc \
     src/cpp/server/server_credentials.cc \
     src/cpp/server/server_posix.cc \
+    src/cpp/thread_manager/thread_manager.cc \
     src/cpp/util/byte_buffer_cc.cc \
     src/cpp/util/slice_cc.cc \
     src/cpp/util/status.cc \
@@ -3894,6 +3900,7 @@
     src/cpp/server/server_context.cc \
     src/cpp/server/server_credentials.cc \
     src/cpp/server/server_posix.cc \
+    src/cpp/thread_manager/thread_manager.cc \
     src/cpp/util/byte_buffer_cc.cc \
     src/cpp/util/slice_cc.cc \
     src/cpp/util/status.cc \
@@ -4473,6 +4480,7 @@
     src/cpp/server/server_context.cc \
     src/cpp/server/server_credentials.cc \
     src/cpp/server/server_posix.cc \
+    src/cpp/thread_manager/thread_manager.cc \
     src/cpp/util/byte_buffer_cc.cc \
     src/cpp/util/slice_cc.cc \
     src/cpp/util/status.cc \
@@ -13474,6 +13482,49 @@
 $(OBJDIR)/$(CONFIG)/test/cpp/util/metrics_server.o: $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/metrics.pb.cc $(GENDIR)/src/proto/grpc/testing/metrics.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc
 
 
+THREAD_MANAGER_TEST_SRC = \
+    test/cpp/thread_manager/thread_manager_test.cc \
+
+THREAD_MANAGER_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(THREAD_MANAGER_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/thread_manager_test: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
+
+$(BINDIR)/$(CONFIG)/thread_manager_test: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/thread_manager_test: $(PROTOBUF_DEP) $(THREAD_MANAGER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
+	$(E) "[LD]      Linking $@"
+	$(Q) mkdir -p `dirname $@`
+	$(Q) $(LDXX) $(LDFLAGS) $(THREAD_MANAGER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/thread_manager_test
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/cpp/thread_manager/thread_manager_test.o:  $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
+
+deps_thread_manager_test: $(THREAD_MANAGER_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(THREAD_MANAGER_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
 THREAD_STRESS_TEST_SRC = \
     test/cpp/end2end/thread_stress_test.cc \
 
diff --git a/build.yaml b/build.yaml
index ed42dcd..8bc8729 100644
--- a/build.yaml
+++ b/build.yaml
@@ -738,6 +738,7 @@
   - src/cpp/common/channel_filter.h
   - src/cpp/server/dynamic_thread_pool.h
   - src/cpp/server/thread_pool_interface.h
+  - src/cpp/thread_manager/thread_manager.h
   src:
   - src/cpp/client/channel_cc.cc
   - src/cpp/client/client_context.cc
@@ -759,6 +760,7 @@
   - src/cpp/server/server_context.cc
   - src/cpp/server/server_credentials.cc
   - src/cpp/server/server_posix.cc
+  - src/cpp/thread_manager/thread_manager.cc
   - src/cpp/util/byte_buffer_cc.cc
   - src/cpp/util/slice_cc.cc
   - src/cpp/util/status.cc
@@ -3474,6 +3476,16 @@
   - gpr_test_util
   - gpr
   - grpc++_test_config
+- name: thread_manager_test
+  build: test
+  language: c++
+  src:
+  - test/cpp/thread_manager/thread_manager_test.cc
+  deps:
+  - grpc++
+  - grpc
+  - gpr
+  - grpc++_test_config
 - name: thread_stress_test
   gtest: true
   cpu_cost: 100
diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h
index 4a00d7a..5c41ca5 100644
--- a/include/grpc++/impl/codegen/server_interface.h
+++ b/include/grpc++/impl/codegen/server_interface.h
@@ -126,12 +126,6 @@
   /// \return true on a successful shutdown.
   virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
 
-  /// Process one or more incoming calls.
-  virtual void RunRpc() = 0;
-
-  /// Schedule \a RunRpc to run in the threadpool.
-  virtual void ScheduleCallback() = 0;
-
   virtual void ShutdownInternal(gpr_timespec deadline) = 0;
 
   virtual int max_receive_message_size() const = 0;
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index f51a6c6..a6d70c7 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -105,18 +105,41 @@
   class AsyncRequest;
   class ShutdownRequest;
 
+  /// SyncRequestThreadManager is an implementation of ThreadManager. This class
+  /// is responsible for polling for incoming RPCs and calling the RPC handlers.
+  /// This is only used in case of a Sync server (i.e a server exposing a sync
+  /// interface)
+  class SyncRequestThreadManager;
+
   class UnimplementedAsyncRequestContext;
   class UnimplementedAsyncRequest;
   class UnimplementedAsyncResponse;
 
   /// Server constructors. To be used by \a ServerBuilder only.
   ///
-  /// \param thread_pool The threadpool instance to use for call processing.
-  /// \param thread_pool_owned Does the server own the \a thread_pool instance?
-  /// \param max_receive_message_size Maximum message length that the channel
-  /// can receive.
-  Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
-         int max_receive_message_size, ChannelArguments* args);
+  /// \param max_message_size Maximum message length that the channel can
+  /// receive.
+  ///
+  /// \param args The channel args
+  ///
+  /// \param sync_server_cqs The completion queues to use if the server is a
+  /// synchronous server (or a hybrid server). The server polls for new RPCs on
+  /// these queues
+  ///
+  /// \param min_pollers The minimum number of polling threads per server
+  /// completion queue (in param sync_server_cqs) to use for listening to
+  /// incoming requests (used only in case of sync server)
+  ///
+  /// \param max_pollers The maximum number of polling threads per server
+  /// completion queue (in param sync_server_cqs) to use for listening to
+  /// incoming requests (used only in case of sync server)
+  ///
+  /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
+  /// server completion queues passed via sync_server_cqs param.
+  Server(int max_message_size, ChannelArguments* args,
+         std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+             sync_server_cqs,
+         int min_pollers, int max_pollers, int sync_cq_timeout_msec);
 
   /// Register a service. This call does not take ownership of the service.
   /// The service must exist for the lifetime of the Server instance.
@@ -151,12 +174,6 @@
   /// \return true on a successful shutdown.
   bool Start(ServerCompletionQueue** cqs, size_t num_cqs) GRPC_OVERRIDE;
 
-  /// Process one or more incoming calls.
-  void RunRpc() GRPC_OVERRIDE;
-
-  /// Schedule \a RunRpc to run in the threadpool.
-  void ScheduleCallback() GRPC_OVERRIDE;
-
   void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
 
   void ShutdownInternal(gpr_timespec deadline) GRPC_OVERRIDE;
@@ -171,34 +188,31 @@
 
   const int max_receive_message_size_;
 
-  // Completion queue.
-  CompletionQueue cq_;
+  /// The following completion queues are ONLY used in case of Sync API i.e if
+  /// the server has any services with sync methods. The server uses these
+  /// completion queues to poll for new RPCs
+  std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+      sync_server_cqs_;
+
+  /// List of ThreadManager instances (one for each cq in the sync_server_cqs)
+  std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
 
   // Sever status
   grpc::mutex mu_;
   bool started_;
   bool shutdown_;
-  bool shutdown_notified_;
-  // The number of threads which are running callbacks.
-  int num_running_cb_;
-  grpc::condition_variable callback_cv_;
+  bool shutdown_notified_;  // Was notify called on the shutdown_cv_
 
   grpc::condition_variable shutdown_cv_;
 
   std::shared_ptr<GlobalCallbacks> global_callbacks_;
 
-  std::list<SyncRequest>* sync_methods_;
   std::vector<grpc::string> services_;
-  std::unique_ptr<RpcServiceMethod> unknown_method_;
   bool has_generic_service_;
 
-  // Pointer to the c grpc server.
+  // Pointer to the wrapped grpc_server.
   grpc_server* server_;
 
-  ThreadPoolInterface* thread_pool_;
-  // Whether the thread pool is created and owned by the server.
-  bool thread_pool_owned_;
-
   std::unique_ptr<ServerInitializer> server_initializer_;
 };
 
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 37f1f8c..c6bcf8b 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -34,6 +34,7 @@
 #ifndef GRPCXX_SERVER_BUILDER_H
 #define GRPCXX_SERVER_BUILDER_H
 
+#include <climits>
 #include <map>
 #include <memory>
 #include <vector>
@@ -42,6 +43,8 @@
 #include <grpc++/impl/server_builder_plugin.h>
 #include <grpc++/support/config.h>
 #include <grpc/compression.h>
+#include <grpc/support/cpu.h>
+#include <grpc/support/useful.h>
 
 namespace grpc {
 
@@ -62,6 +65,8 @@
  public:
   ServerBuilder();
 
+  enum SyncServerOption { NUM_CQS, MIN_POLLERS, MAX_POLLERS, CQ_TIMEOUT_MSEC };
+
   /// Register a service. This call does not take ownership of the service.
   /// The service must exist for the lifetime of the \a Server instance returned
   /// by \a BuildAndStart().
@@ -115,6 +120,9 @@
 
   ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option);
 
+  /// Only useful if this is a Synchronous server.
+  ServerBuilder& SetSyncServerOption(SyncServerOption option, int value);
+
   /// Tries to bind \a server to the given \a addr.
   ///
   /// It can be invoked multiple times.
@@ -170,6 +178,28 @@
     int* selected_port;
   };
 
+  struct SyncServerSettings {
+    SyncServerSettings()
+        : num_cqs(GPR_MAX(gpr_cpu_num_cores(), 4)),
+          min_pollers(1),
+          max_pollers(INT_MAX),
+          cq_timeout_msec(1000) {}
+
+    // Number of server completion queues to create to listen to incoming RPCs.
+    int num_cqs;
+
+    // Minimum number of threads per completion queue that should be listening
+    // to incoming RPCs.
+    int min_pollers;
+
+    // Maximum number of threads per completion queue that can be listening to
+    // incoming RPCs.
+    int max_pollers;
+
+    // The timeout for server completion queue's AsyncNext call.
+    int cq_timeout_msec;
+  };
+
   typedef std::unique_ptr<grpc::string> HostString;
   struct NamedService {
     explicit NamedService(Service* s) : service(s) {}
@@ -184,7 +214,12 @@
   std::vector<std::unique_ptr<ServerBuilderOption>> options_;
   std::vector<std::unique_ptr<NamedService>> services_;
   std::vector<Port> ports_;
+
+  SyncServerSettings sync_server_settings_;
+
+  // List of completion queues added via AddCompletionQueue() method
   std::vector<ServerCompletionQueue*> cqs_;
+
   std::shared_ptr<ServerCredentials> creds_;
   std::vector<std::unique_ptr<ServerBuilderPlugin>> plugins_;
   AsyncGenericService* generic_service_;
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 2980b16..8ca29ee 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -35,6 +35,7 @@
 
 #include <grpc++/impl/service_type.h>
 #include <grpc++/server.h>
+#include <grpc/support/cpu.h>
 #include <grpc/support/log.h>
 #include <grpc/support/useful.h>
 
@@ -54,6 +55,7 @@
 ServerBuilder::ServerBuilder()
     : max_receive_message_size_(-1),
       max_send_message_size_(-1),
+      sync_server_settings_(SyncServerSettings()),
       generic_service_(nullptr) {
   gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
   for (auto it = g_plugin_factory_list->begin();
@@ -61,6 +63,7 @@
     auto& factory = *it;
     plugins_.emplace_back(factory());
   }
+
   // all compression algorithms enabled by default.
   enabled_compression_algorithms_bitset_ =
       (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
@@ -94,7 +97,7 @@
     gpr_log(GPR_ERROR,
             "Adding multiple AsyncGenericService is unsupported for now. "
             "Dropping the service %p",
-            service);
+            (void*)service);
   } else {
     generic_service_ = service;
   }
@@ -107,6 +110,25 @@
   return *this;
 }
 
+ServerBuilder& ServerBuilder::SetSyncServerOption(
+    ServerBuilder::SyncServerOption option, int val) {
+  switch (option) {
+    case NUM_CQS:
+      sync_server_settings_.num_cqs = val;
+      break;
+    case MIN_POLLERS:
+      sync_server_settings_.min_pollers = val;
+      break;
+    case MAX_POLLERS:
+      sync_server_settings_.max_pollers = val;
+      break;
+    case CQ_TIMEOUT_MSEC:
+      sync_server_settings_.cq_timeout_msec = val;
+      break;
+  }
+  return *this;
+}
+
 ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus(
     grpc_compression_algorithm algorithm, bool enabled) {
   if (enabled) {
@@ -139,35 +161,24 @@
 }
 
 std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
-  std::unique_ptr<ThreadPoolInterface> thread_pool;
-  bool has_sync_methods = false;
-  for (auto it = services_.begin(); it != services_.end(); ++it) {
-    if ((*it)->service->has_synchronous_methods()) {
-      if (!thread_pool) {
-        thread_pool.reset(CreateDefaultThreadPool());
-        has_sync_methods = true;
-        break;
-      }
-    }
-  }
   ChannelArguments args;
   for (auto option = options_.begin(); option != options_.end(); ++option) {
     (*option)->UpdateArguments(&args);
     (*option)->UpdatePlugins(&plugins_);
   }
+
   for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
-    if (!thread_pool && (*plugin)->has_sync_methods()) {
-      thread_pool.reset(CreateDefaultThreadPool());
-      has_sync_methods = true;
-    }
     (*plugin)->UpdateChannelArguments(&args);
   }
+
   if (max_receive_message_size_ >= 0) {
     args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
   }
+
   if (max_send_message_size_ >= 0) {
     args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
   }
+
   args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
               enabled_compression_algorithms_bitset_);
   if (maybe_default_compression_level_.is_set) {
@@ -178,27 +189,84 @@
     args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
                 maybe_default_compression_algorithm_.algorithm);
   }
-  std::unique_ptr<Server> server(new Server(thread_pool.release(), true,
-                                            max_receive_message_size_, &args));
+
+  // == Determine if the server has any syncrhonous methods ==
+  bool has_sync_methods = false;
+  for (auto it = services_.begin(); it != services_.end(); ++it) {
+    if ((*it)->service->has_synchronous_methods()) {
+      has_sync_methods = true;
+      break;
+    }
+  }
+
+  if (!has_sync_methods) {
+    for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+      if ((*plugin)->has_sync_methods()) {
+        has_sync_methods = true;
+        break;
+      }
+    }
+  }
+
+  // If this is a Sync server, i.e a server expositing sync API, then the server
+  // needs to create some completion queues to listen for incoming requests.
+  // 'sync_server_cqs' are those internal completion queues.
+  //
+  // This is different from the completion queues added to the server via
+  // ServerBuilder's AddCompletionQueue() method (those completion queues
+  // are in 'cqs_' member variable of ServerBuilder object)
+  std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+      sync_server_cqs(std::make_shared<
+                      std::vector<std::unique_ptr<ServerCompletionQueue>>>());
+
+  if (has_sync_methods) {
+    // This is a Sync server
+    gpr_log(GPR_INFO,
+            "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
+            "%d, CQ timeout (msec): %d",
+            sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
+            sync_server_settings_.max_pollers,
+            sync_server_settings_.cq_timeout_msec);
+
+    // Create completion queues to listen to incoming rpc requests
+    for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
+      sync_server_cqs->emplace_back(new ServerCompletionQueue());
+    }
+  }
+
+  std::unique_ptr<Server> server(new Server(
+      max_receive_message_size_, &args, sync_server_cqs,
+      sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
+      sync_server_settings_.cq_timeout_msec));
+
   ServerInitializer* initializer = server->initializer();
 
-  // If the server has atleast one sync methods, we know that this is a Sync
-  // server or a Hybrid server and the completion queue (server->cq_) would be
-  // frequently polled.
-  int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
+  // Register all the completion queues with the server. i.e
+  //  1. sync_server_cqs: internal completion queues created IF this is a sync
+  //     server
+  //  2. cqs_: Completion queues added via AddCompletionQueue() call
 
-  for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
-    // A completion queue that is not polled frequently (by calling Next() or
-    // AsyncNext()) is not safe to use for listening to incoming channels.
-    // Register all such completion queues as non-listening completion queues
-    // with the GRPC core library.
-    if ((*cq)->IsFrequentlyPolled()) {
-      grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+  // All sync cqs (if any) are frequently polled by ThreadManager
+  int num_frequently_polled_cqs = sync_server_cqs->size();
+
+  for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
+    grpc_server_register_completion_queue(server->server_, (*it)->cq(),
+                                          nullptr);
+  }
+
+  // cqs_ contains the completion queue added by calling the ServerBuilder's
+  // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
+  // calling Next() or AsyncNext()) and hence are not safe to be used for
+  // listening to incoming channels. Such completion queues must be registered
+  // as non-listening queues
+  for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+    if ((*it)->IsFrequentlyPolled()) {
+      grpc_server_register_completion_queue(server->server_, (*it)->cq(),
                                             nullptr);
       num_frequently_polled_cqs++;
     } else {
       grpc_server_register_non_listening_completion_queue(server->server_,
-                                                          (*cq)->cq(), nullptr);
+                                                          (*it)->cq(), nullptr);
     }
   }
 
@@ -214,9 +282,11 @@
       return nullptr;
     }
   }
+
   for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
     (*plugin)->InitServer(initializer);
   }
+
   if (generic_service_) {
     server->RegisterAsyncGenericService(generic_service_);
   } else {
@@ -229,6 +299,7 @@
       }
     }
   }
+
   for (auto port = ports_.begin(); port != ports_.end(); port++) {
     int r = server->AddListeningPort(port->addr, port->creds.get());
     if (!r) return nullptr;
@@ -236,13 +307,16 @@
       *port->selected_port = r;
     }
   }
+
   auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
   if (!server->Start(cqs_data, cqs_.size())) {
     return nullptr;
   }
+
   for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
     (*plugin)->Finish(initializer);
   }
+
   return server;
 }
 
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 3f89275..d46942d 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -1,5 +1,4 @@
 /*
- *
  * Copyright 2015, Google Inc.
  * All rights reserved.
  *
@@ -52,7 +51,7 @@
 #include <grpc/support/log.h>
 
 #include "src/core/lib/profiling/timers.h"
-#include "src/cpp/server/thread_pool_interface.h"
+#include "src/cpp/thread_manager/thread_manager.h"
 
 namespace grpc {
 
@@ -118,12 +117,9 @@
   UnimplementedAsyncRequest* const request_;
 };
 
-class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
+class ShutdownTag : public CompletionQueueTag {
  public:
-  bool FinalizeResult(void** tag, bool* status) {
-    delete this;
-    return false;
-  }
+  bool FinalizeResult(void** tag, bool* status) { return false; }
 };
 
 class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
@@ -147,36 +143,6 @@
     grpc_metadata_array_destroy(&request_metadata_);
   }
 
-  static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
-    void* tag = nullptr;
-    *ok = false;
-    if (!cq->Next(&tag, ok)) {
-      return nullptr;
-    }
-    auto* mrd = static_cast<SyncRequest*>(tag);
-    GPR_ASSERT(mrd->in_flight_);
-    return mrd;
-  }
-
-  static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
-                        gpr_timespec deadline) {
-    void* tag = nullptr;
-    *ok = false;
-    switch (cq->AsyncNext(&tag, ok, deadline)) {
-      case CompletionQueue::TIMEOUT:
-        *req = nullptr;
-        return true;
-      case CompletionQueue::SHUTDOWN:
-        *req = nullptr;
-        return false;
-      case CompletionQueue::GOT_EVENT:
-        *req = static_cast<SyncRequest*>(tag);
-        GPR_ASSERT((*req)->in_flight_);
-        return true;
-    }
-    GPR_UNREACHABLE_CODE(return false);
-  }
-
   void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
 
   void TeardownRequest() {
@@ -266,7 +232,6 @@
   void* const tag_;
   bool in_flight_;
   const bool has_request_payload_;
-  uint32_t incoming_flags_;
   grpc_call* call_;
   grpc_call_details* call_details_;
   gpr_timespec deadline_;
@@ -275,33 +240,141 @@
   grpc_completion_queue* cq_;
 };
 
+// Implementation of ThreadManager. Each instance of SyncRequestThreadManager
+// manages a pool of threads that poll for incoming Sync RPCs and call the
+// appropriate RPC handlers
+class Server::SyncRequestThreadManager : public ThreadManager {
+ public:
+  SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
+                           std::shared_ptr<GlobalCallbacks> global_callbacks,
+                           int min_pollers, int max_pollers,
+                           int cq_timeout_msec)
+      : ThreadManager(min_pollers, max_pollers),
+        server_(server),
+        server_cq_(server_cq),
+        cq_timeout_msec_(cq_timeout_msec),
+        global_callbacks_(global_callbacks) {}
+
+  WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
+    *tag = nullptr;
+    gpr_timespec deadline =
+        gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN);
+
+    switch (server_cq_->AsyncNext(tag, ok, deadline)) {
+      case CompletionQueue::TIMEOUT:
+        return TIMEOUT;
+      case CompletionQueue::SHUTDOWN:
+        return SHUTDOWN;
+      case CompletionQueue::GOT_EVENT:
+        return WORK_FOUND;
+    }
+
+    GPR_UNREACHABLE_CODE(return TIMEOUT);
+  }
+
+  void DoWork(void* tag, bool ok) GRPC_OVERRIDE {
+    SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+
+    if (!sync_req) {
+      // No tag. Nothing to work on. This is an unlikley scenario and possibly a
+      // bug in RPC Manager implementation.
+      gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag");
+      return;
+    }
+
+    if (ok) {
+      // Calldata takes ownership of the completion queue inside sync_req
+      SyncRequest::CallData cd(server_, sync_req);
+      {
+        // Prepare for the next request
+        if (!IsShutdown()) {
+          sync_req->SetupRequest();  // Create new completion queue for sync_req
+          sync_req->Request(server_->c_server(), server_cq_->cq());
+        }
+      }
+
+      GPR_TIMER_SCOPE("cd.Run()", 0);
+      cd.Run(global_callbacks_);
+    }
+    // TODO (sreek) If ok is false here (which it isn't in case of
+    // grpc_request_registered_call), we should still re-queue the request
+    // object
+  }
+
+  void AddSyncMethod(RpcServiceMethod* method, void* tag) {
+    sync_requests_.emplace_back(new SyncRequest(method, tag));
+  }
+
+  void AddUnknownSyncMethod() {
+    if (!sync_requests_.empty()) {
+      unknown_method_.reset(new RpcServiceMethod(
+          "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+      sync_requests_.emplace_back(
+          new SyncRequest(unknown_method_.get(), nullptr));
+    }
+  }
+
+  void ShutdownAndDrainCompletionQueue() {
+    server_cq_->Shutdown();
+
+    // Drain any pending items from the queue
+    void* tag;
+    bool ok;
+    while (server_cq_->Next(&tag, &ok)) {
+      // Nothing to be done here
+    }
+  }
+
+  void Start() {
+    if (!sync_requests_.empty()) {
+      for (auto m = sync_requests_.begin(); m != sync_requests_.end(); m++) {
+        (*m)->SetupRequest();
+        (*m)->Request(server_->c_server(), server_cq_->cq());
+      }
+
+      Initialize();  // ThreadManager's Initialize()
+    }
+  }
+
+ private:
+  Server* server_;
+  CompletionQueue* server_cq_;
+  int cq_timeout_msec_;
+  std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
+  std::unique_ptr<RpcServiceMethod> unknown_method_;
+  std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
+};
+
 static internal::GrpcLibraryInitializer g_gli_initializer;
-Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
-               int max_receive_message_size, ChannelArguments* args)
+Server::Server(
+    int max_receive_message_size, ChannelArguments* args,
+    std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+        sync_server_cqs,
+    int min_pollers, int max_pollers, int sync_cq_timeout_msec)
     : max_receive_message_size_(max_receive_message_size),
+      sync_server_cqs_(sync_server_cqs),
       started_(false),
       shutdown_(false),
       shutdown_notified_(false),
-      num_running_cb_(0),
-      sync_methods_(new std::list<SyncRequest>),
       has_generic_service_(false),
       server_(nullptr),
-      thread_pool_(thread_pool),
-      thread_pool_owned_(thread_pool_owned),
       server_initializer_(new ServerInitializer(this)) {
   g_gli_initializer.summon();
   gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
   global_callbacks_ = g_callbacks;
   global_callbacks_->UpdateArguments(args);
+
+  for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
+       it++) {
+    sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
+        this, (*it).get(), global_callbacks_, min_pollers, max_pollers,
+        sync_cq_timeout_msec));
+  }
+
   grpc_channel_args channel_args;
   args->SetChannelArgs(&channel_args);
+
   server_ = grpc_server_create(&channel_args, nullptr);
-  if (thread_pool_ == nullptr) {
-    grpc_server_register_non_listening_completion_queue(server_, cq_.cq(),
-                                                        nullptr);
-  } else {
-    grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
-  }
 }
 
 Server::~Server() {
@@ -311,17 +384,14 @@
       lock.unlock();
       Shutdown();
     } else if (!started_) {
-      cq_.Shutdown();
+      // Shutdown the completion queues
+      for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+        (*it)->ShutdownAndDrainCompletionQueue();
+      }
     }
   }
-  void* got_tag;
-  bool ok;
-  GPR_ASSERT(!cq_.Next(&got_tag, &ok));
+
   grpc_server_destroy(server_);
-  if (thread_pool_owned_) {
-    delete thread_pool_;
-  }
-  delete sync_methods_;
 }
 
 void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@@ -352,12 +422,14 @@
                "Can only register an asynchronous service against one server.");
     service->server_ = this;
   }
+
   const char* method_name = nullptr;
   for (auto it = service->methods_.begin(); it != service->methods_.end();
        ++it) {
     if (it->get() == nullptr) {  // Handled by generic service if any.
       continue;
     }
+
     RpcServiceMethod* method = it->get();
     void* tag = grpc_server_register_method(
         server_, method->name(), host ? host->c_str() : nullptr,
@@ -367,11 +439,15 @@
               method->name());
       return false;
     }
-    if (method->handler() == nullptr) {
+
+    if (method->handler() == nullptr) {  // Async method
       method->set_server_tag(tag);
     } else {
-      sync_methods_->emplace_back(method, tag);
+      for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+        (*it)->AddSyncMethod(method, tag);
+      }
     }
+
     method_name = method->name();
   }
 
@@ -406,28 +482,19 @@
   grpc_server_start(server_);
 
   if (!has_generic_service_) {
-    if (!sync_methods_->empty()) {
-      unknown_method_.reset(new RpcServiceMethod(
-          "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
-      // Use of emplace_back with just constructor arguments is not accepted
-      // here by gcc-4.4 because it can't match the anonymous nullptr with a
-      // proper constructor implicitly. Construct the object and use push_back.
-      sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+    for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+      (*it)->AddUnknownSyncMethod();
     }
+
     for (size_t i = 0; i < num_cqs; i++) {
       if (cqs[i]->IsFrequentlyPolled()) {
         new UnimplementedAsyncRequest(this, cqs[i]);
       }
     }
   }
-  // Start processing rpcs.
-  if (!sync_methods_->empty()) {
-    for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
-      m->SetupRequest();
-      m->Request(server_, cq_.cq());
-    }
 
-    ScheduleCallback();
+  for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+    (*it)->Start();
   }
 
   return true;
@@ -437,29 +504,43 @@
   grpc::unique_lock<grpc::mutex> lock(mu_);
   if (started_ && !shutdown_) {
     shutdown_ = true;
-    grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
-    cq_.Shutdown();
-    lock.unlock();
-    // Spin, eating requests until the completion queue is completely shutdown.
-    // If the deadline expires then cancel anything that's pending and keep
-    // spinning forever until the work is actually drained.
-    // Since nothing else needs to touch state guarded by mu_, holding it
-    // through this loop is fine.
-    SyncRequest* request;
-    bool ok;
-    while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
-      if (request == NULL) {  // deadline expired
-        grpc_server_cancel_all_calls(server_);
-        deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
-      } else if (ok) {
-        SyncRequest::CallData call_data(this, request);
-      }
-    }
-    lock.lock();
 
-    // Wait for running callbacks to finish.
-    while (num_running_cb_ != 0) {
-      callback_cv_.wait(lock);
+    /// The completion queue to use for server shutdown completion notification
+    CompletionQueue shutdown_cq;
+    ShutdownTag shutdown_tag;  // Dummy shutdown tag
+    grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
+
+    // Shutdown all ThreadManagers. This will try to gracefully stop all the
+    // threads in the ThreadManagers (once they process any inflight requests)
+    for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+      (*it)->Shutdown();  // ThreadManager's Shutdown()
+    }
+
+    shutdown_cq.Shutdown();
+
+    void* tag;
+    bool ok;
+    CompletionQueue::NextStatus status =
+        shutdown_cq.AsyncNext(&tag, &ok, deadline);
+
+    // If this timed out, it means we are done with the grace period for a clean
+    // shutdown. We should force a shutdown now by cancelling all inflight calls
+    if (status == CompletionQueue::NextStatus::TIMEOUT) {
+      grpc_server_cancel_all_calls(server_);
+    }
+    // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
+    // successfully shutdown
+
+    // Wait for threads in all ThreadManagers to terminate
+    for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+      (*it)->Wait();
+      (*it)->ShutdownAndDrainCompletionQueue();
+    }
+
+    // Drain the shutdown queue (if the previous call to AsyncNext() timed out
+    // and we didn't remove the tag from the queue yet)
+    while (shutdown_cq.Next(&tag, &ok)) {
+      // Nothing to be done here. Just ignore ok and tag values
     }
 
     shutdown_notified_ = true;
@@ -585,47 +666,6 @@
   request_->stream()->call_.PerformOps(this);
 }
 
-void Server::ScheduleCallback() {
-  {
-    grpc::unique_lock<grpc::mutex> lock(mu_);
-    num_running_cb_++;
-  }
-  thread_pool_->Add(std::bind(&Server::RunRpc, this));
-}
-
-void Server::RunRpc() {
-  // Wait for one more incoming rpc.
-  bool ok;
-  GPR_TIMER_SCOPE("Server::RunRpc", 0);
-  auto* mrd = SyncRequest::Wait(&cq_, &ok);
-  if (mrd) {
-    ScheduleCallback();
-    if (ok) {
-      SyncRequest::CallData cd(this, mrd);
-      {
-        mrd->SetupRequest();
-        grpc::unique_lock<grpc::mutex> lock(mu_);
-        if (!shutdown_) {
-          mrd->Request(server_, cq_.cq());
-        } else {
-          // destroy the structure that was created
-          mrd->TeardownRequest();
-        }
-      }
-      GPR_TIMER_SCOPE("cd.Run()", 0);
-      cd.Run(global_callbacks_);
-    }
-  }
-
-  {
-    grpc::unique_lock<grpc::mutex> lock(mu_);
-    num_running_cb_--;
-    if (shutdown_) {
-      callback_cv_.notify_all();
-    }
-  }
-}
-
 ServerInitializer* Server::initializer() { return server_initializer_.get(); }
 
 }  // namespace grpc
diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc
new file mode 100644
index 0000000..caae4c4
--- /dev/null
+++ b/src/cpp/thread_manager/thread_manager.cc
@@ -0,0 +1,181 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+#include <grpc/support/log.h>
+#include <climits>
+
+#include "src/cpp/thread_manager/thread_manager.h"
+
+namespace grpc {
+
+ThreadManager::WorkerThread::WorkerThread(ThreadManager* thd_mgr)
+    : thd_mgr_(thd_mgr), thd_(&ThreadManager::WorkerThread::Run, this) {}
+
+void ThreadManager::WorkerThread::Run() {
+  thd_mgr_->MainWorkLoop();
+  thd_mgr_->MarkAsCompleted(this);
+}
+
+ThreadManager::WorkerThread::~WorkerThread() { thd_.join(); }
+
+ThreadManager::ThreadManager(int min_pollers, int max_pollers)
+    : shutdown_(false),
+      num_pollers_(0),
+      min_pollers_(min_pollers),
+      max_pollers_(max_pollers == -1 ? INT_MAX : max_pollers),
+      num_threads_(0) {}
+
+ThreadManager::~ThreadManager() {
+  {
+    std::unique_lock<grpc::mutex> lock(mu_);
+    GPR_ASSERT(num_threads_ == 0);
+  }
+
+  CleanupCompletedThreads();
+}
+
+void ThreadManager::Wait() {
+  std::unique_lock<grpc::mutex> lock(mu_);
+  while (num_threads_ != 0) {
+    shutdown_cv_.wait(lock);
+  }
+}
+
+void ThreadManager::Shutdown() {
+  std::unique_lock<grpc::mutex> lock(mu_);
+  shutdown_ = true;
+}
+
+bool ThreadManager::IsShutdown() {
+  std::unique_lock<grpc::mutex> lock(mu_);
+  return shutdown_;
+}
+
+void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
+  {
+    std::unique_lock<grpc::mutex> list_lock(list_mu_);
+    completed_threads_.push_back(thd);
+  }
+
+  grpc::unique_lock<grpc::mutex> lock(mu_);
+  num_threads_--;
+  if (num_threads_ == 0) {
+    shutdown_cv_.notify_one();
+  }
+}
+
+void ThreadManager::CleanupCompletedThreads() {
+  std::unique_lock<grpc::mutex> lock(list_mu_);
+  for (auto thd = completed_threads_.begin(); thd != completed_threads_.end();
+       thd = completed_threads_.erase(thd)) {
+    delete *thd;
+  }
+}
+
+void ThreadManager::Initialize() {
+  for (int i = 0; i < min_pollers_; i++) {
+    MaybeCreatePoller();
+  }
+}
+
+// If the number of pollers (i.e threads currently blocked in PollForWork()) is
+// less than max threshold (i.e max_pollers_) and the total number of threads is
+// below the maximum threshold, we can let the current thread continue as poller
+bool ThreadManager::MaybeContinueAsPoller() {
+  std::unique_lock<grpc::mutex> lock(mu_);
+  if (shutdown_ || num_pollers_ > max_pollers_) {
+    return false;
+  }
+
+  num_pollers_++;
+  return true;
+}
+
+// Create a new poller if the current number of pollers i.e num_pollers_ (i.e
+// threads currently blocked in PollForWork()) is below the threshold (i.e
+// min_pollers_) and the total number of threads is below the maximum threshold
+void ThreadManager::MaybeCreatePoller() {
+  grpc::unique_lock<grpc::mutex> lock(mu_);
+  if (!shutdown_ && num_pollers_ < min_pollers_) {
+    num_pollers_++;
+    num_threads_++;
+
+    // Create a new thread (which ends up calling the MainWorkLoop() function
+    new WorkerThread(this);
+  }
+}
+
+void ThreadManager::MainWorkLoop() {
+  void* tag;
+  bool ok;
+
+  /*
+   1. Poll for work (i.e PollForWork())
+   2. After returning from PollForWork, reduce the number of pollers by 1. If
+      PollForWork() returned a TIMEOUT, then it may indicate that we have more
+      polling threads than needed. Check if the number of pollers is greater
+      than min_pollers and if so, terminate the thread.
+   3. Since we are short of one poller now, see if a new poller has to be
+      created (i.e see MaybeCreatePoller() for more details)
+   4. Do the actual work (DoWork())
+   5. After doing the work, see it this thread can resume polling work (i.e
+      see MaybeContinueAsPoller() for more details) */
+  do {
+    WorkStatus work_status = PollForWork(&tag, &ok);
+
+    {
+      grpc::unique_lock<grpc::mutex> lock(mu_);
+      num_pollers_--;
+
+      if (work_status == TIMEOUT && num_pollers_ > min_pollers_) {
+        break;
+      }
+    }
+
+    // Note that MaybeCreatePoller does check for shutdown and creates a new
+    // thread only if ThreadManager is not shutdown
+    if (work_status == WORK_FOUND) {
+      MaybeCreatePoller();
+      DoWork(tag, ok);
+    }
+  } while (MaybeContinueAsPoller());
+
+  CleanupCompletedThreads();
+
+  // If we are here, either ThreadManager is shutting down or it already has
+  // enough threads.
+}
+
+}  // namespace grpc
diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h
new file mode 100644
index 0000000..9cfdb8a
--- /dev/null
+++ b/src/cpp/thread_manager/thread_manager.h
@@ -0,0 +1,159 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_THREAD_MANAGER_H
+#define GRPC_INTERNAL_CPP_THREAD_MANAGER_H
+
+#include <list>
+#include <memory>
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+#include <grpc++/support/config.h>
+
+namespace grpc {
+
+class ThreadManager {
+ public:
+  explicit ThreadManager(int min_pollers, int max_pollers);
+  virtual ~ThreadManager();
+
+  // Initializes and Starts the Rpc Manager threads
+  void Initialize();
+
+  // The return type of PollForWork() function
+  enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT };
+
+  // "Polls" for new work.
+  // If the return value is WORK_FOUND:
+  //  - The implementaion of PollForWork() MAY set some opaque identifier to
+  //    (identify the work item found) via the '*tag' parameter
+  //  - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A
+  //    value of 'false' indicates some implemenation specific error (that is
+  //    neither SHUTDOWN nor TIMEOUT)
+  //  - ThreadManager does not interpret the values of 'tag' and 'ok'
+  //  - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to
+  //    DoWork()
+  //
+  // If the return value is SHUTDOWN:,
+  //  - ThreadManager WILL NOT call DoWork() and terminates the thead
+  //
+  // If the return value is TIMEOUT:,
+  //  - ThreadManager WILL NOT call DoWork()
+  //  - ThreadManager MAY terminate the thread depending on the current number
+  //    of active poller threads and mix_pollers/max_pollers settings
+  //  - Also, the value of timeout is specific to the derived class
+  //    implementation
+  virtual WorkStatus PollForWork(void** tag, bool* ok) = 0;
+
+  // The implementation of DoWork() is supposed to perform the work found by
+  // PollForWork(). The tag and ok parameters are the same as returned by
+  // PollForWork()
+  //
+  // The implementation of DoWork() should also do any setup needed to ensure
+  // that the next call to PollForWork() (not necessarily by the current thread)
+  // actually finds some work
+  virtual void DoWork(void* tag, bool ok) = 0;
+
+  // Mark the ThreadManager as shutdown and begin draining the work. This is a
+  // non-blocking call and the caller should call Wait(), a blocking call which
+  // returns only once the shutdown is complete
+  void Shutdown();
+
+  // Has Shutdown() been called
+  bool IsShutdown();
+
+  // A blocking call that returns only after the ThreadManager has shutdown and
+  // all the threads have drained all the outstanding work
+  void Wait();
+
+ private:
+  // Helper wrapper class around std::thread. This takes a ThreadManager object
+  // and starts a new std::thread to calls the Run() function.
+  //
+  // The Run() function calls ThreadManager::MainWorkLoop() function and once
+  // that completes, it marks the WorkerThread completed by calling
+  // ThreadManager::MarkAsCompleted()
+  class WorkerThread {
+   public:
+    WorkerThread(ThreadManager* thd_mgr);
+    ~WorkerThread();
+
+   private:
+    // Calls thd_mgr_->MainWorkLoop() and once that completes, calls
+    // thd_mgr_>MarkAsCompleted(this) to mark the thread as completed
+    void Run();
+
+    ThreadManager* thd_mgr_;
+    grpc::thread thd_;
+  };
+
+  // The main funtion in ThreadManager
+  void MainWorkLoop();
+
+  // Create a new poller if the number of current pollers is less than the
+  // minimum number of pollers needed (i.e min_pollers).
+  void MaybeCreatePoller();
+
+  // Returns true if the current thread can resume as a poller. i.e if the
+  // current number of pollers is less than the max_pollers.
+  bool MaybeContinueAsPoller();
+
+  void MarkAsCompleted(WorkerThread* thd);
+  void CleanupCompletedThreads();
+
+  // Protects shutdown_, num_pollers_ and num_threads_
+  // TODO: sreek - Change num_pollers and num_threads_ to atomics
+  grpc::mutex mu_;
+
+  bool shutdown_;
+  grpc::condition_variable shutdown_cv_;
+
+  // Number of threads doing polling
+  int num_pollers_;
+
+  // The minimum and maximum number of threads that should be doing polling
+  int min_pollers_;
+  int max_pollers_;
+
+  // The total number of threads (includes threads includes the threads that are
+  // currently polling i.e num_pollers_)
+  int num_threads_;
+
+  grpc::mutex list_mu_;
+  std::list<WorkerThread*> completed_threads_;
+};
+
+}  // namespace grpc
+
+#endif  // GRPC_INTERNAL_CPP_THREAD_MANAGER_H
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 7af0fb9..954cf2d 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -250,6 +250,11 @@
     builder.SetMaxMessageSize(
         kMaxMessageSize_);  // For testing max message size.
     builder.RegisterService(&dup_pkg_service_);
+
+    builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
+    builder.SetSyncServerOption(
+        ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
+
     server_ = builder.BuildAndStart();
     is_server_started_ = true;
   }
@@ -279,6 +284,11 @@
       ServerBuilder builder;
       builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
       builder.RegisterService(proxy_service_.get());
+
+      builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
+      builder.SetSyncServerOption(
+          ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
+
       proxy_server_ = builder.BuildAndStart();
 
       channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc
new file mode 100644
index 0000000..5c70103
--- /dev/null
+++ b/test/cpp/thread_manager/thread_manager_test.cc
@@ -0,0 +1,136 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *is % allowed in string
+ */
+
+#include <memory>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <grpc++/grpc++.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/thread_manager/thread_manager.h"
+#include "test/cpp/util/test_config.h"
+
+namespace grpc {
+class ThreadManagerTest GRPC_FINAL : public grpc::ThreadManager {
+ public:
+  ThreadManagerTest()
+      : ThreadManager(kMinPollers, kMaxPollers),
+        num_do_work_(0),
+        num_poll_for_work_(0),
+        num_work_found_(0) {}
+
+  grpc::ThreadManager::WorkStatus PollForWork(void **tag,
+                                              bool *ok) GRPC_OVERRIDE;
+  void DoWork(void *tag, bool ok) GRPC_OVERRIDE;
+  void PerformTest();
+
+ private:
+  void SleepForMs(int sleep_time_ms);
+
+  static const int kMinPollers = 2;
+  static const int kMaxPollers = 10;
+
+  static const int kPollingTimeoutMsec = 10;
+  static const int kDoWorkDurationMsec = 1;
+
+  // PollForWork will return SHUTDOWN after these many number of invocations
+  static const int kMaxNumPollForWork = 50;
+
+  gpr_atm num_do_work_;        // Number of calls to DoWork
+  gpr_atm num_poll_for_work_;  // Number of calls to PollForWork
+  gpr_atm num_work_found_;     // Number of times WORK_FOUND was returned
+};
+
+void ThreadManagerTest::SleepForMs(int duration_ms) {
+  gpr_timespec sleep_time =
+      gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+                   gpr_time_from_millis(duration_ms, GPR_TIMESPAN));
+  gpr_sleep_until(sleep_time);
+}
+
+grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void **tag,
+                                                               bool *ok) {
+  int call_num = gpr_atm_no_barrier_fetch_add(&num_poll_for_work_, 1);
+
+  if (call_num >= kMaxNumPollForWork) {
+    Shutdown();
+    return SHUTDOWN;
+  }
+
+  // Simulate "polling for work" by sleeping for sometime
+  SleepForMs(kPollingTimeoutMsec);
+
+  *tag = nullptr;
+  *ok = true;
+
+  // Return timeout roughly 1 out of every 3 calls
+  if (call_num % 3 == 0) {
+    return TIMEOUT;
+  } else {
+    gpr_atm_no_barrier_fetch_add(&num_work_found_, 1);
+    return WORK_FOUND;
+  }
+}
+
+void ThreadManagerTest::DoWork(void *tag, bool ok) {
+  gpr_atm_no_barrier_fetch_add(&num_do_work_, 1);
+  SleepForMs(kDoWorkDurationMsec);  // Simulate doing work by sleeping
+}
+
+void ThreadManagerTest::PerformTest() {
+  // Initialize() starts the ThreadManager
+  Initialize();
+
+  // Wait for all the threads to gracefully terminate
+  Wait();
+
+  // The number of times DoWork() was called is equal to the number of times
+  // WORK_FOUND was returned
+  gpr_log(GPR_DEBUG, "DoWork() called %ld times",
+          gpr_atm_no_barrier_load(&num_do_work_));
+  GPR_ASSERT(gpr_atm_no_barrier_load(&num_do_work_) ==
+             gpr_atm_no_barrier_load(&num_work_found_));
+}
+}  // namespace grpc
+
+int main(int argc, char **argv) {
+  std::srand(std::time(NULL));
+
+  grpc::testing::InitTest(&argc, &argv, true);
+  grpc::ThreadManagerTest test_rpc_manager;
+  test_rpc_manager.PerformTest();
+
+  return 0;
+}
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index e266e15..7a95526 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -863,6 +863,7 @@
 src/cpp/common/channel_filter.h \
 src/cpp/server/dynamic_thread_pool.h \
 src/cpp/server/thread_pool_interface.h \
+src/cpp/thread_manager/thread_manager.h \
 src/cpp/client/insecure_credentials.cc \
 src/cpp/client/secure_credentials.cc \
 src/cpp/common/auth_property_iterator.cc \
@@ -891,6 +892,7 @@
 src/cpp/server/server_context.cc \
 src/cpp/server/server_credentials.cc \
 src/cpp/server/server_posix.cc \
+src/cpp/thread_manager/thread_manager.cc \
 src/cpp/util/byte_buffer_cc.cc \
 src/cpp/util/slice_cc.cc \
 src/cpp/util/status.cc \
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index d7d6911..eafba13 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -3186,6 +3186,23 @@
   {
     "deps": [
       "gpr", 
+      "grpc", 
+      "grpc++", 
+      "grpc++_test_config"
+    ], 
+    "headers": [], 
+    "is_filegroup": false, 
+    "language": "c++", 
+    "name": "thread_manager_test", 
+    "src": [
+      "test/cpp/thread_manager/thread_manager_test.cc"
+    ], 
+    "third_party": false, 
+    "type": "target"
+  }, 
+  {
+    "deps": [
+      "gpr", 
       "gpr_test_util", 
       "grpc", 
       "grpc++", 
@@ -7426,7 +7443,8 @@
       "src/cpp/client/create_channel_internal.h", 
       "src/cpp/common/channel_filter.h", 
       "src/cpp/server/dynamic_thread_pool.h", 
-      "src/cpp/server/thread_pool_interface.h"
+      "src/cpp/server/thread_pool_interface.h", 
+      "src/cpp/thread_manager/thread_manager.h"
     ], 
     "is_filegroup": true, 
     "language": "c++", 
@@ -7503,6 +7521,8 @@
       "src/cpp/server/server_credentials.cc", 
       "src/cpp/server/server_posix.cc", 
       "src/cpp/server/thread_pool_interface.h", 
+      "src/cpp/thread_manager/thread_manager.cc", 
+      "src/cpp/thread_manager/thread_manager.h", 
       "src/cpp/util/byte_buffer_cc.cc", 
       "src/cpp/util/slice_cc.cc", 
       "src/cpp/util/status.cc", 
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index e5b406d..8aeefe3 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -2979,6 +2979,27 @@
       "posix", 
       "windows"
     ], 
+    "cpu_cost": 1.0, 
+    "exclude_configs": [], 
+    "flaky": false, 
+    "gtest": false, 
+    "language": "c++", 
+    "name": "thread_manager_test", 
+    "platforms": [
+      "linux", 
+      "mac", 
+      "posix", 
+      "windows"
+    ]
+  }, 
+  {
+    "args": [], 
+    "ci_platforms": [
+      "linux", 
+      "mac", 
+      "posix", 
+      "windows"
+    ], 
     "cpu_cost": 100, 
     "exclude_configs": [], 
     "exclude_iomgrs": [], 
diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj
index 6bb9a61..ad217da 100644
--- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj
+++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj
@@ -363,6 +363,7 @@
     <ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h" />
     <ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" />
     <ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" />
+    <ClInclude Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.h" />
   </ItemGroup>
   <ItemGroup>
     <ClCompile Include="$(SolutionDir)\..\src\cpp\client\insecure_credentials.cc">
@@ -421,6 +422,8 @@
     </ClCompile>
     <ClCompile Include="$(SolutionDir)\..\src\cpp\server\server_posix.cc">
     </ClCompile>
+    <ClCompile Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.cc">
+    </ClCompile>
     <ClCompile Include="$(SolutionDir)\..\src\cpp\util\byte_buffer_cc.cc">
     </ClCompile>
     <ClCompile Include="$(SolutionDir)\..\src\cpp\util\slice_cc.cc">
diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
index 761424c..d4ad8c4 100644
--- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
@@ -85,6 +85,9 @@
     <ClCompile Include="$(SolutionDir)\..\src\cpp\server\server_posix.cc">
       <Filter>src\cpp\server</Filter>
     </ClCompile>
+    <ClCompile Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.cc">
+      <Filter>src\cpp\thread_manager</Filter>
+    </ClCompile>
     <ClCompile Include="$(SolutionDir)\..\src\cpp\util\byte_buffer_cc.cc">
       <Filter>src\cpp\util</Filter>
     </ClCompile>
@@ -416,6 +419,9 @@
     <ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h">
       <Filter>src\cpp\server</Filter>
     </ClInclude>
+    <ClInclude Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.h">
+      <Filter>src\cpp\thread_manager</Filter>
+    </ClInclude>
   </ItemGroup>
 
   <ItemGroup>
@@ -470,6 +476,9 @@
     <Filter Include="src\cpp\server">
       <UniqueIdentifier>{321b0980-74ad-e8ca-f23b-deffa5d6bb8f}</UniqueIdentifier>
     </Filter>
+    <Filter Include="src\cpp\thread_manager">
+      <UniqueIdentifier>{23f9df56-8604-52a0-e6a2-f01b8e68d0e7}</UniqueIdentifier>
+    </Filter>
     <Filter Include="src\cpp\util">
       <UniqueIdentifier>{f842537a-2bf1-1ec3-b495-7d62c64a1c06}</UniqueIdentifier>
     </Filter>
diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
index 02e6592..01940c3 100644
--- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
@@ -359,6 +359,7 @@
     <ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h" />
     <ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" />
     <ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" />
+    <ClInclude Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.h" />
   </ItemGroup>
   <ItemGroup>
     <ClCompile Include="$(SolutionDir)\..\src\cpp\client\insecure_credentials.cc">
@@ -407,6 +408,8 @@
     </ClCompile>
     <ClCompile Include="$(SolutionDir)\..\src\cpp\server\server_posix.cc">
     </ClCompile>
+    <ClCompile Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.cc">
+    </ClCompile>
     <ClCompile Include="$(SolutionDir)\..\src\cpp\util\byte_buffer_cc.cc">
     </ClCompile>
     <ClCompile Include="$(SolutionDir)\..\src\cpp\util\slice_cc.cc">
diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
index ff1a0e9..f261c04 100644
--- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
@@ -70,6 +70,9 @@
     <ClCompile Include="$(SolutionDir)\..\src\cpp\server\server_posix.cc">
       <Filter>src\cpp\server</Filter>
     </ClCompile>
+    <ClCompile Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.cc">
+      <Filter>src\cpp\thread_manager</Filter>
+    </ClCompile>
     <ClCompile Include="$(SolutionDir)\..\src\cpp\util\byte_buffer_cc.cc">
       <Filter>src\cpp\util</Filter>
     </ClCompile>
@@ -389,6 +392,9 @@
     <ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h">
       <Filter>src\cpp\server</Filter>
     </ClInclude>
+    <ClInclude Include="$(SolutionDir)\..\src\cpp\thread_manager\thread_manager.h">
+      <Filter>src\cpp\thread_manager</Filter>
+    </ClInclude>
   </ItemGroup>
 
   <ItemGroup>
@@ -443,6 +449,9 @@
     <Filter Include="src\cpp\server">
       <UniqueIdentifier>{8a54a279-d14b-4237-0df3-1ffe1ef5a7af}</UniqueIdentifier>
     </Filter>
+    <Filter Include="src\cpp\thread_manager">
+      <UniqueIdentifier>{e5b55f25-d99f-b8e5-9981-7da7fa7ba628}</UniqueIdentifier>
+    </Filter>
     <Filter Include="src\cpp\util">
       <UniqueIdentifier>{fb5d9a64-20ca-5119-ed38-04a3cf94923d}</UniqueIdentifier>
     </Filter>
diff --git a/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj b/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj
new file mode 100644
index 0000000..2c35a03
--- /dev/null
+++ b/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj
@@ -0,0 +1,201 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" />
+  <ItemGroup Label="ProjectConfigurations">
+    <ProjectConfiguration Include="Debug|Win32">
+      <Configuration>Debug</Configuration>
+      <Platform>Win32</Platform>
+    </ProjectConfiguration>
+    <ProjectConfiguration Include="Debug|x64">
+      <Configuration>Debug</Configuration>
+      <Platform>x64</Platform>
+    </ProjectConfiguration>
+    <ProjectConfiguration Include="Release|Win32">
+      <Configuration>Release</Configuration>
+      <Platform>Win32</Platform>
+    </ProjectConfiguration>
+    <ProjectConfiguration Include="Release|x64">
+      <Configuration>Release</Configuration>
+      <Platform>x64</Platform>
+    </ProjectConfiguration>
+  </ItemGroup>
+  <PropertyGroup Label="Globals">
+    <ProjectGuid>{08C611E4-7F87-73BE-76CE-C158A4CC05A3}</ProjectGuid>
+    <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
+    <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
+  </PropertyGroup>
+  <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+  <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration">
+    <PlatformToolset>v100</PlatformToolset>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration">
+    <PlatformToolset>v110</PlatformToolset>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration">
+    <PlatformToolset>v120</PlatformToolset>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration">
+    <PlatformToolset>v140</PlatformToolset>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration">
+    <ConfigurationType>Application</ConfigurationType>
+    <UseDebugLibraries>true</UseDebugLibraries>
+    <CharacterSet>Unicode</CharacterSet>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration">
+    <ConfigurationType>Application</ConfigurationType>
+    <UseDebugLibraries>false</UseDebugLibraries>
+    <WholeProgramOptimization>true</WholeProgramOptimization>
+    <CharacterSet>Unicode</CharacterSet>
+  </PropertyGroup>
+  <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+  <ImportGroup Label="ExtensionSettings">
+  </ImportGroup>
+  <ImportGroup Label="PropertySheets">
+    <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+    <Import Project="$(SolutionDir)\..\vsprojects\cpptest.props" />
+    <Import Project="$(SolutionDir)\..\vsprojects\global.props" />
+    <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" />
+    <Import Project="$(SolutionDir)\..\vsprojects\protobuf.props" />
+    <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" />
+    <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" />
+  </ImportGroup>
+  <PropertyGroup Label="UserMacros" />
+  <PropertyGroup Condition="'$(Configuration)'=='Debug'">
+    <TargetName>thread_manager_test</TargetName>
+    <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+    <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
+    <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+    <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(Configuration)'=='Release'">
+    <TargetName>thread_manager_test</TargetName>
+    <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+    <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
+    <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+    <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
+  </PropertyGroup>
+    <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+    <ClCompile>
+      <PrecompiledHeader>NotUsing</PrecompiledHeader>
+      <WarningLevel>Level3</WarningLevel>
+      <Optimization>Disabled</Optimization>
+      <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <SDLCheck>true</SDLCheck>
+      <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+      <TreatWarningAsError>true</TreatWarningAsError>
+      <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+      <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+    </ClCompile>
+    <Link>
+      <SubSystem>Console</SubSystem>
+      <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+      <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+    </Link>
+  </ItemDefinitionGroup>
+
+    <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+    <ClCompile>
+      <PrecompiledHeader>NotUsing</PrecompiledHeader>
+      <WarningLevel>Level3</WarningLevel>
+      <Optimization>Disabled</Optimization>
+      <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <SDLCheck>true</SDLCheck>
+      <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+      <TreatWarningAsError>true</TreatWarningAsError>
+      <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+      <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+    </ClCompile>
+    <Link>
+      <SubSystem>Console</SubSystem>
+      <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+      <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+    </Link>
+  </ItemDefinitionGroup>
+
+    <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+    <ClCompile>
+      <PrecompiledHeader>NotUsing</PrecompiledHeader>
+      <WarningLevel>Level3</WarningLevel>
+      <Optimization>MaxSpeed</Optimization>
+      <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <FunctionLevelLinking>true</FunctionLevelLinking>
+      <IntrinsicFunctions>true</IntrinsicFunctions>
+      <SDLCheck>true</SDLCheck>
+      <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+      <TreatWarningAsError>true</TreatWarningAsError>
+      <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+      <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+    </ClCompile>
+    <Link>
+      <SubSystem>Console</SubSystem>
+      <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+      <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+      <EnableCOMDATFolding>true</EnableCOMDATFolding>
+      <OptimizeReferences>true</OptimizeReferences>
+    </Link>
+  </ItemDefinitionGroup>
+
+    <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+    <ClCompile>
+      <PrecompiledHeader>NotUsing</PrecompiledHeader>
+      <WarningLevel>Level3</WarningLevel>
+      <Optimization>MaxSpeed</Optimization>
+      <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+      <FunctionLevelLinking>true</FunctionLevelLinking>
+      <IntrinsicFunctions>true</IntrinsicFunctions>
+      <SDLCheck>true</SDLCheck>
+      <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+      <TreatWarningAsError>true</TreatWarningAsError>
+      <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+      <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+    </ClCompile>
+    <Link>
+      <SubSystem>Console</SubSystem>
+      <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+      <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+      <EnableCOMDATFolding>true</EnableCOMDATFolding>
+      <OptimizeReferences>true</OptimizeReferences>
+    </Link>
+  </ItemDefinitionGroup>
+
+  <ItemGroup>
+    <ClCompile Include="$(SolutionDir)\..\test\cpp\thread_manager\thread_manager_test.cc">
+    </ClCompile>
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc++\grpc++.vcxproj">
+      <Project>{C187A093-A0FE-489D-A40A-6E33DE0F9FEB}</Project>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj">
+      <Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
+      <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
+    </ProjectReference>
+    <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc++_test_config\grpc++_test_config.vcxproj">
+      <Project>{3F7D093D-11F9-C4BC-BEB7-18EB28E3F290}</Project>
+    </ProjectReference>
+  </ItemGroup>
+  <ItemGroup>
+    <None Include="packages.config" />
+  </ItemGroup>
+  <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+  <ImportGroup Label="ExtensionTargets">
+  <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+  <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+  <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+  <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+  </ImportGroup>
+  <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
+    <PropertyGroup>
+      <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them.  For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
+    </PropertyGroup>
+    <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" />
+    <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" />
+    <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" />
+    <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" />
+    <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" />
+  </Target>
+</Project>
+
diff --git a/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj.filters b/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj.filters
new file mode 100644
index 0000000..e1741f8
--- /dev/null
+++ b/vsprojects/vcxproj/test/thread_manager_test/thread_manager_test.vcxproj.filters
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <ItemGroup>
+    <ClCompile Include="$(SolutionDir)\..\test\cpp\thread_manager\thread_manager_test.cc">
+      <Filter>test\cpp\thread_manager</Filter>
+    </ClCompile>
+  </ItemGroup>
+
+  <ItemGroup>
+    <Filter Include="test">
+      <UniqueIdentifier>{e9e471cd-7f7e-9abc-af13-ec58851849ac}</UniqueIdentifier>
+    </Filter>
+    <Filter Include="test\cpp">
+      <UniqueIdentifier>{b350f72c-af76-7272-4342-1b0fc7a458ee}</UniqueIdentifier>
+    </Filter>
+    <Filter Include="test\cpp\thread_manager">
+      <UniqueIdentifier>{6b09ea8d-fbc6-e6fe-f884-b3d3dfcbfc12}</UniqueIdentifier>
+    </Filter>
+  </ItemGroup>
+</Project>
+