Merge branch 'master' into rpc_mgr
diff --git a/BUILD b/BUILD
index 65f2658..571e734 100644
--- a/BUILD
+++ b/BUILD
@@ -1264,6 +1264,7 @@
"src/cpp/server/secure_server_credentials.h",
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/channel_filter.h",
+ "src/cpp/rpcmanager/grpc_rpc_manager.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/thread_pool_interface.h",
"src/core/lib/channel/channel_args.h",
@@ -1368,6 +1369,7 @@
"src/cpp/common/completion_queue_cc.cc",
"src/cpp/common/core_codegen.cc",
"src/cpp/common/rpc_method.cc",
+ "src/cpp/rpcmanager/grpc_rpc_manager.cc",
"src/cpp/server/async_generic_service.cc",
"src/cpp/server/create_default_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.cc",
@@ -1672,6 +1674,7 @@
srcs = [
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/channel_filter.h",
+ "src/cpp/rpcmanager/grpc_rpc_manager.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/thread_pool_interface.h",
"src/core/lib/channel/channel_args.h",
@@ -1771,6 +1774,7 @@
"src/cpp/common/completion_queue_cc.cc",
"src/cpp/common/core_codegen.cc",
"src/cpp/common/rpc_method.cc",
+ "src/cpp/rpcmanager/grpc_rpc_manager.cc",
"src/cpp/server/async_generic_service.cc",
"src/cpp/server/create_default_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.cc",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d3337b5..6dd650d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1022,6 +1022,7 @@
src/cpp/common/completion_queue_cc.cc
src/cpp/common/core_codegen.cc
src/cpp/common/rpc_method.cc
+ src/cpp/rpcmanager/grpc_rpc_manager.cc
src/cpp/server/async_generic_service.cc
src/cpp/server/create_default_thread_pool.cc
src/cpp/server/dynamic_thread_pool.cc
@@ -1375,6 +1376,7 @@
src/cpp/common/completion_queue_cc.cc
src/cpp/common/core_codegen.cc
src/cpp/common/rpc_method.cc
+ src/cpp/rpcmanager/grpc_rpc_manager.cc
src/cpp/server/async_generic_service.cc
src/cpp/server/create_default_thread_pool.cc
src/cpp/server/dynamic_thread_pool.cc
diff --git a/Makefile b/Makefile
index 4da80e0..f068986 100644
--- a/Makefile
+++ b/Makefile
@@ -1048,6 +1048,7 @@
grpc_node_plugin: $(BINDIR)/$(CONFIG)/grpc_node_plugin
grpc_objective_c_plugin: $(BINDIR)/$(CONFIG)/grpc_objective_c_plugin
grpc_python_plugin: $(BINDIR)/$(CONFIG)/grpc_python_plugin
+grpc_rpc_manager_test: $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test
grpc_ruby_plugin: $(BINDIR)/$(CONFIG)/grpc_ruby_plugin
grpc_tool_test: $(BINDIR)/$(CONFIG)/grpc_tool_test
grpclb_api_test: $(BINDIR)/$(CONFIG)/grpclb_api_test
@@ -1416,6 +1417,7 @@
$(BINDIR)/$(CONFIG)/generic_end2end_test \
$(BINDIR)/$(CONFIG)/golden_file_test \
$(BINDIR)/$(CONFIG)/grpc_cli \
+ $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test \
$(BINDIR)/$(CONFIG)/grpc_tool_test \
$(BINDIR)/$(CONFIG)/grpclb_api_test \
$(BINDIR)/$(CONFIG)/grpclb_test \
@@ -1503,6 +1505,7 @@
$(BINDIR)/$(CONFIG)/generic_end2end_test \
$(BINDIR)/$(CONFIG)/golden_file_test \
$(BINDIR)/$(CONFIG)/grpc_cli \
+ $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test \
$(BINDIR)/$(CONFIG)/grpc_tool_test \
$(BINDIR)/$(CONFIG)/grpclb_api_test \
$(BINDIR)/$(CONFIG)/grpclb_test \
@@ -1801,6 +1804,8 @@
$(Q) $(BINDIR)/$(CONFIG)/generic_end2end_test || ( echo test generic_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing golden_file_test"
$(Q) $(BINDIR)/$(CONFIG)/golden_file_test || ( echo test golden_file_test failed ; exit 1 )
+ $(E) "[RUN] Testing grpc_rpc_manager_test"
+ $(Q) $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test || ( echo test grpc_rpc_manager_test failed ; exit 1 )
$(E) "[RUN] Testing grpc_tool_test"
$(Q) $(BINDIR)/$(CONFIG)/grpc_tool_test || ( echo test grpc_tool_test failed ; exit 1 )
$(E) "[RUN] Testing grpclb_api_test"
@@ -3587,6 +3592,7 @@
src/cpp/common/completion_queue_cc.cc \
src/cpp/common/core_codegen.cc \
src/cpp/common/rpc_method.cc \
+ src/cpp/rpcmanager/grpc_rpc_manager.cc \
src/cpp/server/async_generic_service.cc \
src/cpp/server/create_default_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.cc \
@@ -4215,6 +4221,7 @@
src/cpp/common/completion_queue_cc.cc \
src/cpp/common/core_codegen.cc \
src/cpp/common/rpc_method.cc \
+ src/cpp/rpcmanager/grpc_rpc_manager.cc \
src/cpp/server/async_generic_service.cc \
src/cpp/server/create_default_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.cc \
@@ -11919,6 +11926,49 @@
endif
+GRPC_RPC_MANAGER_TEST_SRC = \
+ test/cpp/rpcmanager/grpc_rpc_manager_test.cc \
+
+GRPC_RPC_MANAGER_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_RPC_MANAGER_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/grpc_rpc_manager_test: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
+
+$(BINDIR)/$(CONFIG)/grpc_rpc_manager_test: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/grpc_rpc_manager_test: $(PROTOBUF_DEP) $(GRPC_RPC_MANAGER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
+ $(E) "[LD] Linking $@"
+ $(Q) mkdir -p `dirname $@`
+ $(Q) $(LDXX) $(LDFLAGS) $(GRPC_RPC_MANAGER_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/grpc_rpc_manager_test
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/cpp/rpcmanager/grpc_rpc_manager_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a
+
+deps_grpc_rpc_manager_test: $(GRPC_RPC_MANAGER_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(GRPC_RPC_MANAGER_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
GRPC_RUBY_PLUGIN_SRC = \
src/compiler/ruby_plugin.cc \
diff --git a/build.yaml b/build.yaml
index 3701c0d..430761c 100644
--- a/build.yaml
+++ b/build.yaml
@@ -709,6 +709,7 @@
headers:
- src/cpp/client/create_channel_internal.h
- src/cpp/common/channel_filter.h
+ - src/cpp/rpcmanager/grpc_rpc_manager.h
- src/cpp/server/dynamic_thread_pool.h
- src/cpp/server/thread_pool_interface.h
src:
@@ -724,6 +725,7 @@
- src/cpp/common/completion_queue_cc.cc
- src/cpp/common/core_codegen.cc
- src/cpp/common/rpc_method.cc
+ - src/cpp/rpcmanager/grpc_rpc_manager.cc
- src/cpp/server/async_generic_service.cc
- src/cpp/server/create_default_thread_pool.cc
- src/cpp/server/dynamic_thread_pool.cc
@@ -2868,6 +2870,18 @@
secure: false
vs_config_type: Application
vs_project_guid: '{DF52D501-A6CF-4E6F-BA38-6EBE2E8DAFB2}'
+- name: grpc_rpc_manager_test
+ build: test
+ language: c++
+ headers:
+ - test/cpp/rpcmanager/grpc_rpc_manager_test.h
+ src:
+ - test/cpp/rpcmanager/grpc_rpc_manager_test.cc
+ deps:
+ - grpc++
+ - grpc
+ - gpr
+ - grpc++_test_config
- name: grpc_ruby_plugin
build: protoc
language: c++
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 6e15035..5b4cb6f 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -50,6 +50,8 @@
#include <grpc++/support/status.h>
#include <grpc/compression.h>
+#include "src/cpp/rpcmanager/grpc_rpc_manager.h"
+
struct grpc_server;
namespace grpc {
@@ -96,9 +98,6 @@
// Returns a \em raw pointer to the underlying grpc_server instance.
grpc_server* c_server();
- // Returns a \em raw pointer to the underlying CompletionQueue.
- CompletionQueue* completion_queue();
-
private:
friend class AsyncGenericService;
friend class ServerBuilder;
@@ -108,18 +107,38 @@
class AsyncRequest;
class ShutdownRequest;
+ /// SyncRequestManager is an implementation of GrpcRpcManager. This class is
+ /// responsible for polling for incoming RPCs and calling the RPC handlers.
+ /// This is only used in case of a Sync server (i.e a server exposing a sync
+ /// interface)
+ class SyncRequestManager;
+
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
/// Server constructors. To be used by \a ServerBuilder only.
///
- /// \param thread_pool The threadpool instance to use for call processing.
- /// \param thread_pool_owned Does the server own the \a thread_pool instance?
- /// \param max_receive_message_size Maximum message length that the channel
- /// can receive.
- Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_receive_message_size, ChannelArguments* args);
+ /// \param sync_server_cqs The completion queues to use if the server is a
+ /// synchronous server (or a hybrid server). The server polls for new RPCs on
+ /// these queues
+ ///
+ /// \param max_message_size Maximum message length that the channel can
+ /// receive.
+ ///
+ /// \param args The channel args
+ ///
+ /// \param min_pollers The minimum number of polling threads per server
+ /// completion queue (in param sync_server_cqs) to use for listening to
+ /// incoming requests (used only in case of sync server)
+ ///
+ /// \param max_pollers The maximum number of polling threads per server
+ /// completion queue (in param sync_server_cqs) to use for listening to
+ /// incoming requests (used only in case of sync server)
+ Server(std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs,
+ int max_message_size, ChannelArguments* args, int min_pollers,
+ int max_pollers);
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
@@ -174,34 +193,36 @@
const int max_receive_message_size_;
- // Completion queue.
- CompletionQueue cq_;
+ /// The following completion queues are ONLY used in case of Sync API i.e if
+ /// the server has any services with sync methods. The server uses these
+ /// completion queues to poll for new RPCs
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs_;
+
+ /// List of GrpcRpcManager instances (one for each cq in the sync_server_cqs)
+ std::vector<std::unique_ptr<SyncRequestManager>> sync_req_mgrs_;
// Sever status
grpc::mutex mu_;
bool started_;
bool shutdown_;
bool shutdown_notified_;
+
+ // TODO (sreek) : Remove num_running_cb_ and callback_cv_;
// The number of threads which are running callbacks.
- int num_running_cb_;
- grpc::condition_variable callback_cv_;
+ // int num_running_cb_;
+ // grpc::condition_variable callback_cv_;
grpc::condition_variable shutdown_cv_;
std::shared_ptr<GlobalCallbacks> global_callbacks_;
- std::list<SyncRequest>* sync_methods_;
std::vector<grpc::string> services_;
- std::unique_ptr<RpcServiceMethod> unknown_method_;
bool has_generic_service_;
- // Pointer to the c grpc server.
+ // Pointer to the c core's grpc server.
grpc_server* server_;
- ThreadPoolInterface* thread_pool_;
- // Whether the thread pool is created and owned by the server.
- bool thread_pool_owned_;
-
std::unique_ptr<ServerInitializer> server_initializer_;
};
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 37f1f8c..d9a6878 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -164,6 +164,12 @@
private:
friend class ::grpc::testing::ServerBuilderPluginTest;
+ // TODO (sreek) Make these configurable
+ // The default number of minimum and maximum number of polling threads needed
+ // per completion queue. These are only used in case of Sync server
+ const int kDefaultMinPollers = 1;
+ const int kDefaultMaxPollers = -1; // Unlimited
+
struct Port {
grpc::string addr;
std::shared_ptr<ServerCredentials> creds;
@@ -184,7 +190,10 @@
std::vector<std::unique_ptr<ServerBuilderOption>> options_;
std::vector<std::unique_ptr<NamedService>> services_;
std::vector<Port> ports_;
+
+ /* List of completion queues added via AddCompletionQueue() method */
std::vector<ServerCompletionQueue*> cqs_;
+
std::shared_ptr<ServerCredentials> creds_;
std::vector<std::unique_ptr<ServerBuilderPlugin>> plugins_;
AsyncGenericService* generic_service_;
diff --git a/include/grpc/grpc_posix.h b/include/grpc/grpc_posix.h
index 5e89ae3..eaecbf3 100644
--- a/include/grpc/grpc_posix.h
+++ b/include/grpc/grpc_posix.h
@@ -57,11 +57,13 @@
/** Add the connected communication channel based on file descriptor 'fd' to the
'server'. The 'fd' must be an open file descriptor corresponding to a
- connected socket. The 'cq' is a completion queue that will be getting events
- from that descriptor. */
+ connected socket. Events from the file descriptor may come on any of the
+ server completion queues (i.e completion queues registered via the
+ grpc_server_register_completion_queue API).
+
+ The 'reserved' pointer MUST be NULL */
GRPCAPI void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
- grpc_completion_queue *cq,
- int fd);
+ void *reserved, int fd);
/** GRPC Core POSIX library may internally use signals to optimize some work.
The library uses (SIGRTMIN + 2) signal by default. Use this API to instruct
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
index 4350543..bca589d 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
@@ -50,8 +50,10 @@
#include "src/core/lib/surface/server.h"
void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
- grpc_completion_queue *cq,
+ void *reserved,
int fd) {
+ GPR_ASSERT(reserved == NULL);
+
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
char *name;
@@ -65,7 +67,15 @@
const grpc_channel_args *server_args = grpc_server_get_channel_args(server);
grpc_transport *transport = grpc_create_chttp2_transport(
&exec_ctx, server_args, server_endpoint, 0 /* is_client */);
- grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, grpc_cq_pollset(cq));
+
+ grpc_pollset **pollsets;
+ size_t num_pollsets = 0;
+ grpc_server_get_pollsets(server, &pollsets, &num_pollsets);
+
+ for (size_t i = 0; i < num_pollsets; i++) {
+ grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, pollsets[i]);
+ }
+
grpc_server_setup_transport(&exec_ctx, server, transport, NULL, server_args);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
grpc_exec_ctx_finish(&exec_ctx);
@@ -74,7 +84,7 @@
#else // !GPR_SUPPORT_CHANNELS_FROM_FD
void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
- grpc_completion_queue *cq,
+ void *cq,
int fd) {
GPR_ASSERT(0);
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 56fb80e..7300d79 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1106,6 +1106,12 @@
grpc_exec_ctx_finish(&exec_ctx);
}
+void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets,
+ size_t *pollset_count) {
+ *pollset_count = server->cq_count;
+ *pollsets = server->pollsets;
+}
+
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
grpc_pollset *accepting_pollset,
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index fb6e4d6..44d64a3 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -60,4 +60,7 @@
int grpc_server_has_open_connections(grpc_server *server);
+void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets,
+ size_t *pollset_count);
+
#endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.cc b/src/cpp/rpcmanager/grpc_rpc_manager.cc
new file mode 100644
index 0000000..58b337d
--- /dev/null
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.cc
@@ -0,0 +1,185 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+#include <grpc/support/log.h>
+#include <climits>
+
+#include "src/cpp/rpcmanager/grpc_rpc_manager.h"
+
+namespace grpc {
+
+GrpcRpcManager::GrpcRpcManagerThread::GrpcRpcManagerThread(
+ GrpcRpcManager* rpc_mgr)
+ : rpc_mgr_(rpc_mgr),
+ thd_(new std::thread(&GrpcRpcManager::GrpcRpcManagerThread::Run, this)) {}
+
+void GrpcRpcManager::GrpcRpcManagerThread::Run() {
+ rpc_mgr_->MainWorkLoop();
+ rpc_mgr_->MarkAsCompleted(this);
+}
+
+GrpcRpcManager::GrpcRpcManagerThread::~GrpcRpcManagerThread() {
+ thd_->join();
+ thd_.reset();
+}
+
+GrpcRpcManager::GrpcRpcManager(int min_pollers, int max_pollers)
+ : shutdown_(false),
+ num_pollers_(0),
+ min_pollers_(min_pollers),
+ max_pollers_(max_pollers == -1 ? INT_MAX: max_pollers),
+ num_threads_(0) {}
+
+GrpcRpcManager::~GrpcRpcManager() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ GPR_ASSERT(num_threads_ == 0);
+
+ CleanupCompletedThreads();
+}
+
+void GrpcRpcManager::Wait() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ while (num_threads_ != 0) {
+ shutdown_cv_.wait(lock);
+ }
+}
+
+void GrpcRpcManager::ShutdownRpcManager() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ shutdown_ = true;
+}
+
+bool GrpcRpcManager::IsShutdown() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ return shutdown_;
+}
+
+void GrpcRpcManager::MarkAsCompleted(GrpcRpcManagerThread* thd) {
+ std::unique_lock<grpc::mutex> lock(list_mu_);
+ completed_threads_.push_back(thd);
+}
+
+void GrpcRpcManager::CleanupCompletedThreads() {
+ std::unique_lock<grpc::mutex> lock(list_mu_);
+ for (auto thd = completed_threads_.begin(); thd != completed_threads_.end();
+ thd = completed_threads_.erase(thd)) {
+ delete *thd;
+ }
+}
+
+void GrpcRpcManager::Initialize() {
+ for (int i = 0; i < min_pollers_; i++) {
+ MaybeCreatePoller();
+ }
+}
+
+// If the number of pollers (i.e threads currently blocked in PollForWork()) is
+// less than max threshold (i.e max_pollers_) and the total number of threads is
+// below the maximum threshold, we can let the current thread continue as poller
+bool GrpcRpcManager::MaybeContinueAsPoller() {
+ std::unique_lock<grpc::mutex> lock(mu_);
+
+ if (shutdown_ || num_pollers_ > max_pollers_) {
+ return false;
+ }
+
+ num_pollers_++;
+ return true;
+}
+
+// Create a new poller if the current number of pollers i.e num_pollers_ (i.e
+// threads currently blocked in PollForWork()) is below the threshold (i.e
+// min_pollers_) and the total number of threads is below the maximum threshold
+void GrpcRpcManager::MaybeCreatePoller() {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_ && num_pollers_ < min_pollers_) {
+ num_pollers_++;
+ num_threads_++;
+
+ // Create a new thread (which ends up calling the MainWorkLoop() function
+ new GrpcRpcManagerThread(this);
+ }
+}
+
+void GrpcRpcManager::MainWorkLoop() {
+ void* tag;
+ bool ok;
+
+ /*
+ 1. Poll for work (i.e PollForWork())
+ 2. After returning from PollForWork, reduce the number of pollers by 1. If
+ PollForWork() returned a TIMEOUT, then it may indicate that we have more
+ polling threads than needed. Check if the number of pollers is greater
+ than min_pollers and if so, terminate the thread.
+ 3. Since we are short of one poller now, see if a new poller has to be
+ created (i.e see MaybeCreatePoller() for more details)
+ 4. Do the actual work (DoWork())
+ 5. After doing the work, see it this thread can resume polling work (i.e
+ see MaybeContinueAsPoller() for more details) */
+ do {
+ WorkStatus work_status = PollForWork(&tag, &ok);
+
+ {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ num_pollers_--;
+
+ if (work_status == TIMEOUT && num_pollers_ > min_pollers_) {
+ break;
+ }
+ }
+
+ // TODO (sreek) See if we need to check for shutdown here and quit
+ // Note that MaybeCreatePoller does check for shutdown and creates a new
+ // thread only if GrpcRpcManager is not shutdown
+ if (work_status == WORK_FOUND) {
+ MaybeCreatePoller();
+ DoWork(tag, ok);
+ }
+ } while (MaybeContinueAsPoller());
+
+ // If we are here, either GrpcRpcManager is shutting down or it already has
+ // enough threads. In both cases, current thread can be terminated
+ {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ num_threads_--;
+ if (num_threads_ == 0) {
+ shutdown_cv_.notify_one();
+ }
+ }
+
+ CleanupCompletedThreads();
+}
+
+} // namespace grpc
diff --git a/src/cpp/rpcmanager/grpc_rpc_manager.h b/src/cpp/rpcmanager/grpc_rpc_manager.h
new file mode 100644
index 0000000..d00771b
--- /dev/null
+++ b/src/cpp/rpcmanager/grpc_rpc_manager.h
@@ -0,0 +1,157 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
+#define GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
+
+#include <list>
+#include <memory>
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+
+namespace grpc {
+
+class GrpcRpcManager {
+ public:
+ explicit GrpcRpcManager(int min_pollers, int max_pollers);
+ virtual ~GrpcRpcManager();
+
+ // This function MUST be called before using the object
+ void Initialize();
+
+ enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT };
+
+ // "Polls" for new work.
+ // If the return value is WORK_FOUND:
+ // - The implementaion of PollForWork() MAY set some opaque identifier to
+ // (identify the work item found) via the '*tag' parameter
+ // - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A
+ // value of 'false' indicates some implemenation specific error (that is
+ // neither SHUTDOWN nor TIMEOUT)
+ // - GrpcRpcManager does not interpret the values of 'tag' and 'ok'
+ // - GrpcRpcManager WILL call DoWork() and pass '*tag' and 'ok' as input to
+ // DoWork()
+ //
+ // If the return value is SHUTDOWN:,
+ // - GrpcManager WILL NOT call DoWork() and terminates the thead
+ //
+ // If the return value is TIMEOUT:,
+ // - GrpcManager WILL NOT call DoWork()
+ // - GrpcManager MAY terminate the thread depending on the current number of
+ // active poller threads and mix_pollers/max_pollers settings
+ // - Also, the value of timeout is specific to the derived class
+ // implementation
+ virtual WorkStatus PollForWork(void** tag, bool* ok) = 0;
+
+ // The implementation of DoWork() is supposed to perform the work found by
+ // PollForWork(). The tag and ok parameters are the same as returned by
+ // PollForWork()
+ //
+ // The implementation of DoWork() should also do any setup needed to ensure
+ // that the next call to PollForWork() (not necessarily by the current thread)
+ // actually finds some work
+ virtual void DoWork(void* tag, bool ok) = 0;
+
+ // Mark the GrpcRpcManager as shutdown and begin draining the work.
+ // This is a non-blocking call and the caller should call Wait(), a blocking
+ // call which returns only once the shutdown is complete
+ void ShutdownRpcManager();
+
+ // Has ShutdownRpcManager() been called
+ bool IsShutdown();
+
+ // A blocking call that returns only after the GrpcRpcManager has shutdown and
+ // all the threads have drained all the outstanding work
+ void Wait();
+
+ private:
+ // Helper wrapper class around std::thread. This takes a GrpcRpcManager object
+ // and starts a new std::thread to calls the Run() function.
+ //
+ // The Run() function calls GrpcManager::MainWorkLoop() function and once that
+ // completes, it marks the GrpcRpcManagerThread completed by calling
+ // GrpcRpcManager::MarkAsCompleted()
+ class GrpcRpcManagerThread {
+ public:
+ GrpcRpcManagerThread(GrpcRpcManager* rpc_mgr);
+ ~GrpcRpcManagerThread();
+
+ private:
+ // Calls rpc_mgr_->MainWorkLoop() and once that completes, calls
+ // rpc_mgr_>MarkAsCompleted(this) to mark the thread as completed
+ void Run();
+
+ GrpcRpcManager* rpc_mgr_;
+ std::unique_ptr<grpc::thread> thd_;
+ };
+
+ // The main funtion in GrpcRpcManager
+ void MainWorkLoop();
+
+ // Create a new poller if the number of current pollers is less than the
+ // minimum number of pollers needed (i.e min_pollers).
+ void MaybeCreatePoller();
+
+ // Returns true if the current thread can resume as a poller. i.e if the
+ // current number of pollers is less than the max_pollers.
+ bool MaybeContinueAsPoller();
+
+ void MarkAsCompleted(GrpcRpcManagerThread* thd);
+ void CleanupCompletedThreads();
+
+ // Protects shutdown_, num_pollers_ and num_threads_
+ // TODO: sreek - Change num_pollers and num_threads_ to atomics
+ grpc::mutex mu_;
+
+ bool shutdown_;
+ grpc::condition_variable shutdown_cv_;
+
+ // Number of threads doing polling
+ int num_pollers_;
+
+ // The minimum and maximum number of threads that should be doing polling
+ int min_pollers_;
+ int max_pollers_;
+
+ // The total number of threads (includes threads includes the threads that are
+ // currently polling i.e num_pollers_)
+ int num_threads_;
+
+ grpc::mutex list_mu_;
+ std::list<GrpcRpcManagerThread*> completed_threads_;
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_GRPC_RPC_MANAGER_H
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 2980b16..eab57b4 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -35,6 +35,7 @@
#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@@ -94,7 +95,7 @@
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p",
- service);
+ (void*)service);
} else {
generic_service_ = service;
}
@@ -139,35 +140,24 @@
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
- std::unique_ptr<ThreadPoolInterface> thread_pool;
- bool has_sync_methods = false;
- for (auto it = services_.begin(); it != services_.end(); ++it) {
- if ((*it)->service->has_synchronous_methods()) {
- if (!thread_pool) {
- thread_pool.reset(CreateDefaultThreadPool());
- has_sync_methods = true;
- break;
- }
- }
- }
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
(*option)->UpdateArguments(&args);
(*option)->UpdatePlugins(&plugins_);
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
- if (!thread_pool && (*plugin)->has_sync_methods()) {
- thread_pool.reset(CreateDefaultThreadPool());
- has_sync_methods = true;
- }
(*plugin)->UpdateChannelArguments(&args);
}
+
if (max_receive_message_size_ >= 0) {
args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
}
+
if (max_send_message_size_ >= 0) {
args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
}
+
args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
enabled_compression_algorithms_bitset_);
if (maybe_default_compression_level_.is_set) {
@@ -178,27 +168,84 @@
args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
maybe_default_compression_algorithm_.algorithm);
}
- std::unique_ptr<Server> server(new Server(thread_pool.release(), true,
- max_receive_message_size_, &args));
+
+ // == Determine if the server has any syncrhonous methods ==
+ bool has_sync_methods = false;
+ for (auto it = services_.begin(); it != services_.end(); ++it) {
+ if ((*it)->service->has_synchronous_methods()) {
+ has_sync_methods = true;
+ break;
+ }
+ }
+
+ if (!has_sync_methods) {
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ if ((*plugin)->has_sync_methods()) {
+ has_sync_methods = true;
+ break;
+ }
+ }
+ }
+
+ // If this is a Sync server, i.e a server expositing sync API, then the server
+ // needs to create some completion queues to listen for incoming requests.
+ // 'sync_server_cqs' are those internal completion queues.
+ //
+ // This is different from the completion queues added to the server via
+ // ServerBuilder's AddCompletionQueue() method (those completion queues
+ // are in 'cqs_' member variable of ServerBuilder object)
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs(
+ new std::vector<std::unique_ptr<ServerCompletionQueue>>());
+
+ if (has_sync_methods) {
+ // If the server has synchronous methods, it will need completion queues to
+ // handle those methods. Create one cq per core (or create 4 if number of
+ // cores is less than 4 or unavailable)
+ //
+ // TODO (sreek) - The default number 4 is just a guess. Check if a lower or
+ // higher number makes sense
+ int num_cqs = gpr_cpu_num_cores();
+ num_cqs = GPR_MAX(num_cqs, 4);
+
+ for (int i = 0; i < num_cqs; i++) {
+ sync_server_cqs->emplace_back(new ServerCompletionQueue());
+ }
+ }
+
+ // TODO (sreek) Make the number of pollers configurable
+ std::unique_ptr<Server> server(
+ new Server(sync_server_cqs, max_receive_message_size_, &args,
+ kDefaultMinPollers, kDefaultMaxPollers));
+
ServerInitializer* initializer = server->initializer();
- // If the server has atleast one sync methods, we know that this is a Sync
- // server or a Hybrid server and the completion queue (server->cq_) would be
- // frequently polled.
- int num_frequently_polled_cqs = has_sync_methods ? 1 : 0;
+ // Register all the completion queues with the server. i.e
+ // 1. sync_server_cqs: internal completion queues created IF this is a sync
+ // server
+ // 2. cqs_: Completion queues added via AddCompletionQueue() call
- for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- // A completion queue that is not polled frequently (by calling Next() or
- // AsyncNext()) is not safe to use for listening to incoming channels.
- // Register all such completion queues as non-listening completion queues
- // with the GRPC core library.
- if ((*cq)->IsFrequentlyPolled()) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ // All sync cqs (if any) are frequently polled by the GrpcRpcManager
+ int num_frequently_polled_cqs = sync_server_cqs->size();
+
+ for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
+ nullptr);
+ }
+
+ // cqs_ contains the completion queue added by calling the ServerBuilder's
+ // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
+ // calling Next() or AsyncNext()) and hence are not safe to be used for
+ // listening to incoming channels. Such completion queues must be registered
+ // as non-listening queues
+ for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+ if ((*it)->IsFrequentlyPolled()) {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
num_frequently_polled_cqs++;
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
- (*cq)->cq(), nullptr);
+ (*it)->cq(), nullptr);
}
}
@@ -214,9 +261,11 @@
return nullptr;
}
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->InitServer(initializer);
}
+
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
} else {
@@ -229,6 +278,7 @@
}
}
}
+
for (auto port = ports_.begin(); port != ports_.end(); port++) {
int r = server->AddListeningPort(port->addr, port->creds.get());
if (!r) return nullptr;
@@ -236,13 +286,16 @@
*port->selected_port = r;
}
}
+
auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
if (!server->Start(cqs_data, cqs_.size())) {
return nullptr;
}
+
for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
(*plugin)->Finish(initializer);
}
+
return server;
}
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index a693ce9..e9f3c99 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -118,6 +118,7 @@
UnimplementedAsyncRequest* const request_;
};
+// TODO (sreek) - This might no longer be needed
class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
@@ -126,6 +127,11 @@
}
};
+class ShutdownTag : public CompletionQueueTag {
+ public:
+ bool FinalizeResult(void** tag, bool* status) { return false; }
+};
+
class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
SyncRequest(RpcServiceMethod* method, void* tag)
@@ -147,6 +153,7 @@
grpc_metadata_array_destroy(&request_metadata_);
}
+ // TODO (Sreek) This function is probably no longer needed
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
*ok = false;
@@ -158,6 +165,7 @@
return mrd;
}
+ // TODO (sreek) - This function is probably no longer needed
static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
gpr_timespec deadline) {
void* tag = nullptr;
@@ -177,6 +185,8 @@
GPR_UNREACHABLE_CODE(return false);
}
+ // TODO (sreek) - Refactor this SetupRequest/TeardownRequest and ResetRequest
+ // functions
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -184,6 +194,8 @@
cq_ = nullptr;
}
+ void ResetRequest() { in_flight_ = false; }
+
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
@@ -275,53 +287,167 @@
grpc_completion_queue* cq_;
};
+class Server::SyncRequestManager : public GrpcRpcManager {
+ public:
+ SyncRequestManager(Server* server, CompletionQueue* server_cq,
+ std::shared_ptr<GlobalCallbacks> global_callbacks,
+ int min_pollers, int max_pollers)
+ : GrpcRpcManager(min_pollers, max_pollers),
+ server_(server),
+ server_cq_(server_cq),
+ global_callbacks_(global_callbacks) {}
+
+ static const int kRpcPollingTimeoutMsec = 3000;
+
+ WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
+ *tag = nullptr;
+ gpr_timespec deadline =
+ gpr_time_from_millis(kRpcPollingTimeoutMsec, GPR_TIMESPAN);
+
+ switch (server_cq_->AsyncNext(tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ return TIMEOUT;
+ case CompletionQueue::SHUTDOWN:
+ return SHUTDOWN;
+ case CompletionQueue::GOT_EVENT:
+ return WORK_FOUND;
+ }
+
+ GPR_UNREACHABLE_CODE(return TIMEOUT);
+ }
+
+ void DoWork(void* tag, bool ok) GRPC_OVERRIDE {
+ SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
+
+ if (!sync_req) {
+ // No tag. Nothing to work on
+ // TODO (sreek) - Log a warning here since this is an unlikely case
+ return;
+ }
+
+ if (ok) {
+ SyncRequest::CallData cd(server_, sync_req);
+ {
+ sync_req->SetupRequest();
+ if (!IsShutdown()) {
+ sync_req->Request(server_->c_server(), server_cq_->cq());
+ } else {
+ sync_req->TeardownRequest();
+ }
+ }
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
+ } else {
+ sync_req->ResetRequest();
+ // ok is false. For some reason, the tag was returned but event was not
+ // successful. In this case, request again unless we are shutting down
+ if (!IsShutdown()) {
+ // TODO (sreek) Remove this
+ // sync_req->Request(server_->c_server(), server_cq_->cq());
+ }
+ }
+ }
+
+ void AddSyncMethod(RpcServiceMethod* method, void* tag) {
+ sync_methods_.emplace_back(method, tag);
+ }
+
+ void AddUnknownSyncMethod() {
+ // TODO (sreek) - Check if !sync_methods_.empty() is really needed here
+ if (!sync_methods_.empty()) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ // Use of emplace_back with just constructor arguments is not accepted
+ // here by gcc-4.4 because it can't match the anonymous nullptr with a
+ // proper constructor implicitly. Construct the object and use push_back.
+ sync_methods_.push_back(SyncRequest(unknown_method_.get(), nullptr));
+ }
+ }
+
+ void ShutdownAndDrainCompletionQueue() {
+ server_cq_->Shutdown();
+
+ // Drain any pending items from the queue
+ void* tag;
+ bool ok;
+ while (server_cq_->Next(&tag, &ok)) {
+ // Nothing to be done here
+ }
+ }
+
+ void Start() {
+ if (!sync_methods_.empty()) {
+ for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
+ m->SetupRequest();
+ m->Request(server_->c_server(), server_cq_->cq());
+ }
+
+ GrpcRpcManager::Initialize();
+ }
+ }
+
+ private:
+ Server* server_;
+ CompletionQueue* server_cq_;
+ std::vector<SyncRequest> sync_methods_;
+ std::unique_ptr<RpcServiceMethod> unknown_method_;
+ std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
+};
+
static internal::GrpcLibraryInitializer g_gli_initializer;
-Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_receive_message_size, ChannelArguments* args)
+Server::Server(
+ std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
+ sync_server_cqs,
+ int max_receive_message_size, ChannelArguments* args, int min_pollers,
+ int max_pollers)
: max_receive_message_size_(max_receive_message_size),
+ sync_server_cqs_(sync_server_cqs),
started_(false),
shutdown_(false),
shutdown_notified_(false),
- num_running_cb_(0),
- sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
server_(nullptr),
- thread_pool_(thread_pool),
- thread_pool_owned_(thread_pool_owned),
server_initializer_(new ServerInitializer(this)) {
g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
global_callbacks_->UpdateArguments(args);
+
+ for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
+ it++) {
+ sync_req_mgrs_.emplace_back(new SyncRequestManager(
+ this, (*it).get(), global_callbacks_, min_pollers, max_pollers));
+ }
+
grpc_channel_args channel_args;
args->SetChannelArgs(&channel_args);
+
server_ = grpc_server_create(&channel_args, nullptr);
- if (thread_pool_ == nullptr) {
- grpc_server_register_non_listening_completion_queue(server_, cq_.cq(),
- nullptr);
- } else {
- grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
- }
}
Server::~Server() {
{
+ // TODO (sreek) Check if we can just call Shutdown() even in case where
+ // started_ == false. This will make things much simpler
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
} else if (!started_) {
- cq_.Shutdown();
+ // Shutdown the completion queues
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->ShutdownAndDrainCompletionQueue();
+ }
}
}
+
+ // TODO(sreek) Do thisfor all cqs ?
+ /*
void* got_tag;
bool ok;
GPR_ASSERT(!cq_.Next(&got_tag, &ok));
+ */
grpc_server_destroy(server_);
- if (thread_pool_owned_) {
- delete thread_pool_;
- }
- delete sync_methods_;
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@@ -332,8 +458,6 @@
grpc_server* Server::c_server() { return server_; }
-CompletionQueue* Server::completion_queue() { return &cq_; }
-
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
RpcServiceMethod* method) {
switch (method->method_type()) {
@@ -354,6 +478,7 @@
"Can only register an asynchronous service against one server.");
service->server_ = this;
}
+
const char* method_name = nullptr;
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
@@ -372,7 +497,9 @@
if (method->handler() == nullptr) {
method->set_server_tag(tag);
} else {
- sync_methods_->emplace_back(method, tag);
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddSyncMethod(method, tag);
+ }
}
method_name = method->name();
}
@@ -408,20 +535,23 @@
grpc_server_start(server_);
if (!has_generic_service_) {
- if (!sync_methods_->empty()) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
- // Use of emplace_back with just constructor arguments is not accepted
- // here by gcc-4.4 because it can't match the anonymous nullptr with a
- // proper constructor implicitly. Construct the object and use push_back.
- sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->AddUnknownSyncMethod();
}
+
for (size_t i = 0; i < num_cqs; i++) {
if (cqs[i]->IsFrequentlyPolled()) {
new UnimplementedAsyncRequest(this, cqs[i]);
}
}
}
+
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Start();
+ }
+
+ /* TODO (Sreek) - No longer needed (being done in (*it)->Start above) */
+ /*
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
@@ -429,24 +559,76 @@
m->Request(server_, cq_.cq());
}
- ScheduleCallback();
+ GrpcRpcManager::Initialize();
}
+ */
return true;
}
+/* TODO (sreek) check if started_ and shutdown_ are needed anymore */
void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
+
+ /// The completion queue to use for server shutdown completion notification
+ CompletionQueue shutdown_cq;
+ ShutdownTag shutdown_tag; // Dummy shutdown tag
+ grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
+
+ // Shutdown all RpcManagers. This will try to gracefully stop all the
+ // threads in the RpcManagers (once they process any inflight requests)
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->ShutdownRpcManager();
+ }
+
+ shutdown_cq.Shutdown();
+
+ void* tag;
+ bool ok;
+ CompletionQueue::NextStatus status =
+ shutdown_cq.AsyncNext(&tag, &ok, deadline);
+
+ // If this timed out, it means we are done with the grace period for a clean
+ // shutdown. We should force a shutdown now by cancelling all inflight calls
+ if (status == CompletionQueue::NextStatus::TIMEOUT) {
+ grpc_server_cancel_all_calls(server_);
+ }
+ // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
+ // successfully shutdown
+
+ // Wait for threads in all RpcManagers to terminate
+ for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
+ (*it)->Wait();
+ (*it)->ShutdownAndDrainCompletionQueue();
+ }
+
+ // Drain the shutdown queue (if the previous call to AsyncNext() timed out
+ // and we didn't remove the tag from the queue yet)
+ while(shutdown_cq.Next(&tag, &ok)) {
+ // Nothing to be done here
+ }
+
+ /*
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
lock.unlock();
+ */
+
+ // TODO (sreek) Delete this
+ /*
+ GrpcRpcManager::ShutdownRpcManager();
+ GrpcRpcManager::Wait();
+ */
+
// Spin, eating requests until the completion queue is completely shutdown.
// If the deadline expires then cancel anything that's pending and keep
// spinning forever until the work is actually drained.
// Since nothing else needs to touch state guarded by mu_, holding it
// through this loop is fine.
+ //
+ /*
SyncRequest* request;
bool ok;
while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
@@ -458,11 +640,15 @@
}
}
lock.lock();
+ */
+ /* TODO (sreek) - Remove this block */
// Wait for running callbacks to finish.
- while (num_running_cb_ != 0) {
- callback_cv_.wait(lock);
- }
+ /*
+ while (num_running_cb_ != 0) {
+ callback_cv_.wait(lock);
+ }
+ */
shutdown_notified_ = true;
shutdown_cv_.notify_all();
@@ -587,47 +773,87 @@
request_->stream()->call_.PerformOps(this);
}
+// TODO: sreek - Remove this function
void Server::ScheduleCallback() {
+ GPR_ASSERT(false);
+ /*
{
grpc::unique_lock<grpc::mutex> lock(mu_);
num_running_cb_++;
}
thread_pool_->Add(std::bind(&Server::RunRpc, this));
+ */
}
+// TODO: sreek - Remove this function
void Server::RunRpc() {
- // Wait for one more incoming rpc.
- bool ok;
- GPR_TIMER_SCOPE("Server::RunRpc", 0);
- auto* mrd = SyncRequest::Wait(&cq_, &ok);
- if (mrd) {
- ScheduleCallback();
- if (ok) {
- SyncRequest::CallData cd(this, mrd);
- {
- mrd->SetupRequest();
- grpc::unique_lock<grpc::mutex> lock(mu_);
- if (!shutdown_) {
- mrd->Request(server_, cq_.cq());
- } else {
- // destroy the structure that was created
- mrd->TeardownRequest();
+ GPR_ASSERT(false);
+ /*
+ // Wait for one more incoming rpc.
+ bool ok;
+ GPR_TIMER_SCOPE("Server::RunRpc", 0);
+ auto* mrd = SyncRequest::Wait(&cq_, &ok);
+ if (mrd) {
+ ScheduleCallback();
+ if (ok) {
+ SyncRequest::CallData cd(this, mrd);
+ {
+ mrd->SetupRequest();
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_) {
+ mrd->Request(server_, cq_.cq());
+ } else {
+ // destroy the structure that was created
+ mrd->TeardownRequest();
+ }
}
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
}
- GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run(global_callbacks_);
}
- }
- {
- grpc::unique_lock<grpc::mutex> lock(mu_);
- num_running_cb_--;
- if (shutdown_) {
- callback_cv_.notify_all();
+ {
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ num_running_cb_--;
+ if (shutdown_) {
+ callback_cv_.notify_all();
+ }
}
+ */
+}
+
+/* TODO (sreek) Move this to SyncRequestManager */
+/*
+void Server::PollForWork(bool& is_work_found, void** tag) {
+ is_work_found = true;
+ *tag = nullptr;
+ auto* mrd = SyncRequest::Wait(&cq_, &is_work_found);
+ if (is_work_found) {
+ *tag = mrd;
}
}
+
+void Server::DoWork(void* tag) {
+ auto* mrd = static_cast<SyncRequest*>(tag);
+ if (mrd) {
+ SyncRequest::CallData cd(this, mrd);
+ {
+ mrd->SetupRequest();
+ grpc::unique_lock<grpc::mutex> lock(mu_);
+ if (!shutdown_) {
+ mrd->Request(server_, cq_.cq());
+ } else {
+ // destroy the structure that was created
+ mrd->TeardownRequest();
+ }
+ }
+ GPR_TIMER_SCOPE("cd.Run()", 0);
+ cd.Run(global_callbacks_);
+ }
+}
+*/
+
ServerInitializer* Server::initializer() { return server_initializer_.get(); }
} // namespace grpc
diff --git a/src/cpp/server/server_posix.cc b/src/cpp/server/server_posix.cc
index c3aa2ad..33d42a8 100644
--- a/src/cpp/server/server_posix.cc
+++ b/src/cpp/server/server_posix.cc
@@ -40,8 +40,7 @@
#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
void AddInsecureChannelFromFd(Server* server, int fd) {
- grpc_server_add_insecure_channel_from_fd(
- server->c_server(), server->completion_queue()->cq(), fd);
+ grpc_server_add_insecure_channel_from_fd(server->c_server(), NULL, fd);
}
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index b87abfd..00a67b0 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -350,7 +350,7 @@
typedef grpc_channel *(*grpc_insecure_channel_create_from_fd_type)(const char *target, int fd, const grpc_channel_args *args);
extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
#define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import
-typedef void(*grpc_server_add_insecure_channel_from_fd_type)(grpc_server *server, grpc_completion_queue *cq, int fd);
+typedef void(*grpc_server_add_insecure_channel_from_fd_type)(grpc_server *server, void *reserved, int fd);
extern grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import;
#define grpc_server_add_insecure_channel_from_fd grpc_server_add_insecure_channel_from_fd_import
typedef void(*grpc_use_signal_type)(int signum);
diff --git a/test/core/end2end/fixtures/h2_fd.c b/test/core/end2end/fixtures/h2_fd.c
index 89fa025..8561fee 100644
--- a/test/core/end2end/fixtures/h2_fd.c
+++ b/test/core/end2end/fixtures/h2_fd.c
@@ -95,7 +95,7 @@
grpc_server_register_completion_queue(f->server, f->cq, NULL);
grpc_server_start(f->server);
- grpc_server_add_insecure_channel_from_fd(f->server, f->cq, sfd->fd_pair[1]);
+ grpc_server_add_insecure_channel_from_fd(f->server, NULL, sfd->fd_pair[1]);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/test/cpp/rpcmanager/grpc_rpc_manager_test.cc b/test/cpp/rpcmanager/grpc_rpc_manager_test.cc
new file mode 100644
index 0000000..f48ac27
--- /dev/null
+++ b/test/cpp/rpcmanager/grpc_rpc_manager_test.cc
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *is % allowed in string
+ */
+
+#include <chrono>
+#include <memory>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <grpc++/grpc++.h>
+
+#include "test/cpp/rpcmanager/grpc_rpc_manager_test.h"
+#include "test/cpp/util/test_config.h"
+
+using grpc::testing::GrpcRpcManagerTest;
+
+// TODO: sreek - Rewrite this test. Find a better test case
+
+grpc::GrpcRpcManager::WorkStatus GrpcRpcManagerTest::PollForWork(void **tag,
+ bool *ok) {
+ {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ std::cout << "Poll: " << std::this_thread::get_id() << std::endl;
+ }
+
+ WorkStatus work_status = WORK_FOUND;
+ *tag = nullptr;
+ *ok = true;
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+ {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ num_calls_++;
+ if (num_calls_ > 50) {
+ std::cout << "poll: False" << std::endl;
+ work_status = SHUTDOWN;
+ ShutdownRpcManager();
+ }
+ }
+
+ return work_status;
+}
+
+void GrpcRpcManagerTest::DoWork(void *tag, bool ok) {
+ {
+ std::unique_lock<grpc::mutex> lock(mu_);
+ std::cout << "Work: " << std::this_thread::get_id() << std::endl;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+}
+
+int main(int argc, char **argv) {
+ grpc::testing::InitTest(&argc, &argv, true);
+ GrpcRpcManagerTest test_rpc_manager(3, 15, 20);
+ test_rpc_manager.Initialize();
+ test_rpc_manager.Wait();
+
+ return 0;
+}
diff --git a/test/cpp/rpcmanager/grpc_rpc_manager_test.h b/test/cpp/rpcmanager/grpc_rpc_manager_test.h
new file mode 100644
index 0000000..186da81
--- /dev/null
+++ b/test/cpp/rpcmanager/grpc_rpc_manager_test.h
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *is % allowed in string
+ */
+#ifndef GRPC_TEST_CPP_GRPC_RPC_MANAGER_TEST_H
+#define GRPC_TEST_CPP_GRPC_RPC_MANAGER_TEST_H
+
+#include "src/cpp/rpcmanager/grpc_rpc_manager.h"
+
+namespace grpc {
+namespace testing {
+
+class GrpcRpcManagerTest GRPC_FINAL : public GrpcRpcManager {
+ public:
+ GrpcRpcManagerTest(int min_pollers, int max_pollers, int max_threads)
+ : GrpcRpcManager(min_pollers, max_pollers), num_calls_(0){};
+
+ grpc::GrpcRpcManager::WorkStatus PollForWork(void **tag,
+ bool *ok) GRPC_OVERRIDE;
+ void DoWork(void *tag, bool ok) GRPC_OVERRIDE;
+
+ private:
+ grpc::mutex mu_;
+ int num_calls_;
+};
+
+} // namespace testing
+} // namespace grpc
+
+#endif // GRPC_TEST_CPP_GRPC_RPC_MANAGER_TEST_H
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index fbe82c8..2a2be0d 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -868,6 +868,7 @@
src/cpp/server/secure_server_credentials.h \
src/cpp/client/create_channel_internal.h \
src/cpp/common/channel_filter.h \
+src/cpp/rpcmanager/grpc_rpc_manager.h \
src/cpp/server/dynamic_thread_pool.h \
src/cpp/server/thread_pool_interface.h \
src/core/lib/channel/channel_args.h \
@@ -972,6 +973,7 @@
src/cpp/common/completion_queue_cc.cc \
src/cpp/common/core_codegen.cc \
src/cpp/common/rpc_method.cc \
+src/cpp/rpcmanager/grpc_rpc_manager.cc \
src/cpp/server/async_generic_service.cc \
src/cpp/server/create_default_thread_pool.cc \
src/cpp/server/dynamic_thread_pool.cc \
diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py
index fa401fd..8166fbd 100644
--- a/tools/run_tests/performance/scenario_config.py
+++ b/tools/run_tests/performance/scenario_config.py
@@ -31,52 +31,49 @@
import math
-WARMUP_SECONDS=5
-JAVA_WARMUP_SECONDS=15 # Java needs more warmup time for JIT to kick in.
-BENCHMARK_SECONDS=30
+WARMUP_SECONDS = 5
+JAVA_WARMUP_SECONDS = 15 # Java needs more warmup time for JIT to kick in.
+BENCHMARK_SECONDS = 30
-SMOKETEST='smoketest'
-SCALABLE='scalable'
-SWEEP='sweep'
-DEFAULT_CATEGORIES=[SCALABLE, SMOKETEST]
+SMOKETEST = 'smoketest'
+SCALABLE = 'scalable'
+SWEEP = 'sweep'
+DEFAULT_CATEGORIES = [SCALABLE, SMOKETEST]
SECURE_SECARGS = {'use_test_ca': True,
'server_host_override': 'foo.test.google.fr'}
HISTOGRAM_PARAMS = {
- 'resolution': 0.01,
- 'max_possible': 60e9,
+ 'resolution': 0.01,
+ 'max_possible': 60e9,
}
EMPTY_GENERIC_PAYLOAD = {
- 'bytebuf_params': {
- 'req_size': 0,
- 'resp_size': 0,
- }
+ 'bytebuf_params': {
+ 'req_size': 0,
+ 'resp_size': 0,
+ }
}
EMPTY_PROTO_PAYLOAD = {
- 'simple_params': {
- 'req_size': 0,
- 'resp_size': 0,
- }
+ 'simple_params': {
+ 'req_size': 0,
+ 'resp_size': 0,
+ }
}
BIG_GENERIC_PAYLOAD = {
- 'bytebuf_params': {
- 'req_size': 65536,
- 'resp_size': 65536,
- }
+ 'bytebuf_params': {
+ 'req_size': 65536,
+ 'resp_size': 65536,
+ }
}
# target number of RPCs outstanding on across all client channels in
# non-ping-pong tests (since we can only specify per-channel numbers, the
# actual target will be slightly higher)
-OUTSTANDING_REQUESTS={
- 'async': 6400,
- 'sync': 1000
-}
+OUTSTANDING_REQUESTS = {'async': 6400, 'sync': 1000}
# wide is the number of client channels in multi-channel tests (1 otherwise)
-WIDE=64
+WIDE = 64
def _get_secargs(is_secure):
@@ -102,8 +99,10 @@
n *= step
-def _ping_pong_scenario(name, rpc_type,
- client_type, server_type,
+def _ping_pong_scenario(name,
+ rpc_type,
+ client_type,
+ server_type,
secure=True,
use_generic_payload=False,
unconstrained_client=None,
@@ -117,29 +116,29 @@
outstanding=None):
"""Creates a basic ping pong scenario."""
scenario = {
- 'name': name,
- 'num_servers': 1,
- 'num_clients': 1,
- 'client_config': {
- 'client_type': client_type,
- 'security_params': _get_secargs(secure),
- 'outstanding_rpcs_per_channel': 1,
- 'client_channels': 1,
- 'async_client_threads': 1,
- 'rpc_type': rpc_type,
- 'load_params': {
- 'closed_loop': {}
+ 'name': name,
+ 'num_servers': 1,
+ 'num_clients': 1,
+ 'client_config': {
+ 'client_type': client_type,
+ 'security_params': _get_secargs(secure),
+ 'outstanding_rpcs_per_channel': 1,
+ 'client_channels': 1,
+ 'async_client_threads': 1,
+ 'rpc_type': rpc_type,
+ 'load_params': {
+ 'closed_loop': {}
+ },
+ 'histogram_params': HISTOGRAM_PARAMS,
},
- 'histogram_params': HISTOGRAM_PARAMS,
- },
- 'server_config': {
- 'server_type': server_type,
- 'security_params': _get_secargs(secure),
- 'core_limit': server_core_limit,
- 'async_server_threads': async_server_threads,
- },
- 'warmup_seconds': warmup_seconds,
- 'benchmark_seconds': BENCHMARK_SECONDS
+ 'server_config': {
+ 'server_type': server_type,
+ 'security_params': _get_secargs(secure),
+ 'core_limit': server_core_limit,
+ 'async_server_threads': async_server_threads,
+ },
+ 'warmup_seconds': warmup_seconds,
+ 'benchmark_seconds': BENCHMARK_SECONDS
}
if use_generic_payload:
if server_type != 'ASYNC_GENERIC_SERVER':
@@ -151,7 +150,8 @@
scenario['client_config']['payload_config'] = EMPTY_PROTO_PAYLOAD
if unconstrained_client:
- outstanding_calls = outstanding if outstanding is not None else OUTSTANDING_REQUESTS[unconstrained_client]
+ outstanding_calls = outstanding if outstanding is not None else OUTSTANDING_REQUESTS[
+ unconstrained_client]
wide = channels if channels is not None else WIDE
deep = int(math.ceil(1.0 * outstanding_calls / wide))
@@ -197,7 +197,9 @@
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_GENERIC_SERVER',
- use_generic_payload=True, server_core_limit=1, async_server_threads=1,
+ use_generic_payload=True,
+ server_core_limit=1,
+ async_server_threads=1,
secure=secure,
categories=smoketest_categories)
@@ -206,49 +208,71 @@
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_GENERIC_SERVER',
- unconstrained_client='async', use_generic_payload=True,
+ unconstrained_client='async',
+ use_generic_payload=True,
secure=secure,
- categories=smoketest_categories+[SCALABLE])
+ categories=smoketest_categories + [SCALABLE])
yield _ping_pong_scenario(
'cpp_generic_async_streaming_qps_one_server_core_%s' % secstr,
rpc_type='STREAMING',
client_type='ASYNC_CLIENT',
server_type='ASYNC_GENERIC_SERVER',
- unconstrained_client='async', use_generic_payload=True,
- server_core_limit=1, async_server_threads=1,
+ unconstrained_client='async',
+ use_generic_payload=True,
+ server_core_limit=1,
+ async_server_threads=1,
secure=secure)
+ yield _ping_pong_scenario(
+ 'cpp_protobuf_async_client_sync_server_unary_qps_unconstrained_%s' %
+ (secstr),
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='SYNC_SERVER',
+ unconstrained_client='async',
+ secure=secure,
+ categories=smoketest_categories + [SCALABLE])
+
for rpc_type in ['unary', 'streaming']:
for synchronicity in ['sync', 'async']:
yield _ping_pong_scenario(
- 'cpp_protobuf_%s_%s_ping_pong_%s' % (synchronicity, rpc_type, secstr),
+ 'cpp_protobuf_%s_%s_ping_pong_%s' %
+ (synchronicity, rpc_type, secstr),
rpc_type=rpc_type.upper(),
client_type='%s_CLIENT' % synchronicity.upper(),
server_type='%s_SERVER' % synchronicity.upper(),
- server_core_limit=1, async_server_threads=1,
+ server_core_limit=1,
+ async_server_threads=1,
secure=secure)
yield _ping_pong_scenario(
- 'cpp_protobuf_%s_%s_qps_unconstrained_%s' % (synchronicity, rpc_type, secstr),
+ 'cpp_protobuf_%s_%s_qps_unconstrained_%s' %
+ (synchronicity, rpc_type, secstr),
rpc_type=rpc_type.upper(),
client_type='%s_CLIENT' % synchronicity.upper(),
server_type='%s_SERVER' % synchronicity.upper(),
unconstrained_client=synchronicity,
secure=secure,
- categories=smoketest_categories+[SCALABLE])
+ categories=smoketest_categories + [SCALABLE])
for channels in geometric_progression(1, 20000, math.sqrt(10)):
for outstanding in geometric_progression(1, 200000, math.sqrt(10)):
- if synchronicity == 'sync' and outstanding > 1200: continue
- if outstanding < channels: continue
- yield _ping_pong_scenario(
- 'cpp_protobuf_%s_%s_qps_unconstrained_%s_%d_channels_%d_outstanding' % (synchronicity, rpc_type, secstr, channels, outstanding),
- rpc_type=rpc_type.upper(),
- client_type='%s_CLIENT' % synchronicity.upper(),
- server_type='%s_SERVER' % synchronicity.upper(),
- unconstrained_client=synchronicity, secure=secure,
- categories=[SWEEP], channels=channels, outstanding=outstanding)
+ if synchronicity == 'sync' and outstanding > 1200:
+ continue
+ if outstanding < channels:
+ continue
+ yield _ping_pong_scenario(
+ 'cpp_protobuf_%s_%s_qps_unconstrained_%s_%d_channels_%d_outstanding'
+ % (synchronicity, rpc_type, secstr, channels, outstanding),
+ rpc_type=rpc_type.upper(),
+ client_type='%s_CLIENT' % synchronicity.upper(),
+ server_type='%s_SERVER' % synchronicity.upper(),
+ unconstrained_client=synchronicity,
+ secure=secure,
+ categories=[SWEEP],
+ channels=channels,
+ outstanding=outstanding)
def __str__(self):
return 'c++'
@@ -267,66 +291,94 @@
def scenarios(self):
yield _ping_pong_scenario(
- 'csharp_generic_async_streaming_ping_pong', rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
+ 'csharp_generic_async_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
use_generic_payload=True,
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'csharp_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
+ 'csharp_protobuf_async_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
- 'csharp_protobuf_async_unary_ping_pong', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'csharp_protobuf_async_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'csharp_protobuf_sync_to_async_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER')
+ 'csharp_protobuf_sync_to_async_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
- 'csharp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'csharp_protobuf_async_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='async',
- categories=[SMOKETEST,SCALABLE])
+ categories=[SMOKETEST, SCALABLE])
yield _ping_pong_scenario(
- 'csharp_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'csharp_protobuf_async_streaming_qps_unconstrained',
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='async',
categories=[SCALABLE])
yield _ping_pong_scenario(
- 'csharp_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1,
+ 'csharp_to_cpp_protobuf_sync_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1,
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'csharp_to_cpp_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1)
+ 'csharp_to_cpp_protobuf_async_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1)
yield _ping_pong_scenario(
- 'csharp_to_cpp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
- unconstrained_client='async', server_language='c++',
+ 'csharp_to_cpp_protobuf_async_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ unconstrained_client='async',
+ server_language='c++',
categories=[SCALABLE])
yield _ping_pong_scenario(
- 'csharp_to_cpp_protobuf_sync_to_async_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
- unconstrained_client='sync', server_language='c++',
+ 'csharp_to_cpp_protobuf_sync_to_async_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ unconstrained_client='sync',
+ server_language='c++',
categories=[SCALABLE])
yield _ping_pong_scenario(
- 'cpp_to_csharp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
- unconstrained_client='async', client_language='c++',
+ 'cpp_to_csharp_protobuf_async_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ unconstrained_client='async',
+ client_language='c++',
categories=[SCALABLE])
-
def __str__(self):
return 'csharp'
@@ -356,13 +408,17 @@
# client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
- 'node_protobuf_unary_ping_pong', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'node_protobuf_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'node_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'node_protobuf_async_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='async',
categories=[SMOKETEST])
@@ -387,6 +443,7 @@
def __str__(self):
return 'node'
+
class PythonLanguage:
def __init__(self):
@@ -400,48 +457,69 @@
def scenarios(self):
yield _ping_pong_scenario(
- 'python_generic_sync_streaming_ping_pong', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
+ 'python_generic_sync_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
use_generic_payload=True,
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'python_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER')
+ 'python_protobuf_sync_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
- 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
+ 'python_protobuf_async_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER')
yield _ping_pong_scenario(
- 'python_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'python_protobuf_sync_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'python_protobuf_sync_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
- 'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'python_protobuf_sync_streaming_qps_unconstrained',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
- 'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1,
+ 'python_to_cpp_protobuf_sync_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1,
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1)
+ 'python_to_cpp_protobuf_sync_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1)
def __str__(self):
return 'python'
+
class RubyLanguage:
def __init__(self):
@@ -456,34 +534,50 @@
def scenarios(self):
yield _ping_pong_scenario(
- 'ruby_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'ruby_protobuf_sync_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'ruby_protobuf_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'ruby_protobuf_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
categories=[SMOKETEST])
yield _ping_pong_scenario(
- 'ruby_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'ruby_protobuf_sync_unary_qps_unconstrained',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
- 'ruby_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'ruby_protobuf_sync_streaming_qps_unconstrained',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
- 'ruby_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1)
+ 'ruby_to_cpp_protobuf_sync_unary_ping_pong',
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1)
yield _ping_pong_scenario(
- 'ruby_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
- server_language='c++', server_core_limit=1, async_server_threads=1)
+ 'ruby_to_cpp_protobuf_sync_streaming_ping_pong',
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
+ server_language='c++',
+ server_core_limit=1,
+ async_server_threads=1)
def __str__(self):
return 'ruby'
@@ -507,58 +601,85 @@
smoketest_categories = [SMOKETEST] if secure else []
yield _ping_pong_scenario(
- 'java_generic_async_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- use_generic_payload=True, async_server_threads=1,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
+ 'java_generic_async_streaming_ping_pong_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
+ use_generic_payload=True,
+ async_server_threads=1,
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS,
categories=smoketest_categories)
yield _ping_pong_scenario(
- 'java_protobuf_async_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'java_protobuf_async_streaming_ping_pong_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
async_server_threads=1,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS)
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS)
yield _ping_pong_scenario(
- 'java_protobuf_async_unary_ping_pong_%s' % secstr, rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'java_protobuf_async_unary_ping_pong_%s' % secstr,
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
async_server_threads=1,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS,
categories=smoketest_categories)
yield _ping_pong_scenario(
- 'java_protobuf_unary_ping_pong_%s' % secstr, rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'java_protobuf_unary_ping_pong_%s' % secstr,
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
async_server_threads=1,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS)
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS)
yield _ping_pong_scenario(
- 'java_protobuf_async_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'java_protobuf_async_unary_qps_unconstrained_%s' % secstr,
+ rpc_type='UNARY',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='async',
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
- categories=smoketest_categories+[SCALABLE])
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS,
+ categories=smoketest_categories + [SCALABLE])
yield _ping_pong_scenario(
- 'java_protobuf_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
+ 'java_protobuf_async_streaming_qps_unconstrained_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_SERVER',
unconstrained_client='async',
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS,
categories=[SCALABLE])
yield _ping_pong_scenario(
- 'java_generic_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- unconstrained_client='async', use_generic_payload=True,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
+ 'java_generic_async_streaming_qps_unconstrained_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
+ unconstrained_client='async',
+ use_generic_payload=True,
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS,
categories=[SCALABLE])
yield _ping_pong_scenario(
- 'java_generic_async_streaming_qps_one_server_core_%s' % secstr, rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- unconstrained_client='async', use_generic_payload=True,
+ 'java_generic_async_streaming_qps_one_server_core_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
+ unconstrained_client='async',
+ use_generic_payload=True,
async_server_threads=1,
- secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS)
+ secure=secure,
+ warmup_seconds=JAVA_WARMUP_SECONDS)
# TODO(jtattermusch): add scenarios java vs C++
@@ -586,37 +707,48 @@
# ASYNC_GENERIC_SERVER for Go actually uses a sync streaming server,
# but that's mostly because of lack of better name of the enum value.
yield _ping_pong_scenario(
- 'go_generic_sync_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- use_generic_payload=True, async_server_threads=1,
+ 'go_generic_sync_streaming_ping_pong_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
+ use_generic_payload=True,
+ async_server_threads=1,
secure=secure,
categories=smoketest_categories)
yield _ping_pong_scenario(
- 'go_protobuf_sync_streaming_ping_pong_%s' % secstr, rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'go_protobuf_sync_streaming_ping_pong_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
async_server_threads=1,
secure=secure)
yield _ping_pong_scenario(
- 'go_protobuf_sync_unary_ping_pong_%s' % secstr, rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'go_protobuf_sync_unary_ping_pong_%s' % secstr,
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
async_server_threads=1,
secure=secure,
categories=smoketest_categories)
# unconstrained_client='async' is intended (client uses goroutines)
yield _ping_pong_scenario(
- 'go_protobuf_sync_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'go_protobuf_sync_unary_qps_unconstrained_%s' % secstr,
+ rpc_type='UNARY',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
unconstrained_client='async',
secure=secure,
- categories=smoketest_categories+[SCALABLE])
+ categories=smoketest_categories + [SCALABLE])
# unconstrained_client='async' is intended (client uses goroutines)
yield _ping_pong_scenario(
- 'go_protobuf_sync_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ 'go_protobuf_sync_streaming_qps_unconstrained_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='SYNC_SERVER',
unconstrained_client='async',
secure=secure,
categories=[SCALABLE])
@@ -625,9 +757,12 @@
# ASYNC_GENERIC_SERVER for Go actually uses a sync streaming server,
# but that's mostly because of lack of better name of the enum value.
yield _ping_pong_scenario(
- 'go_generic_sync_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
- client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- unconstrained_client='async', use_generic_payload=True,
+ 'go_generic_sync_streaming_qps_unconstrained_%s' % secstr,
+ rpc_type='STREAMING',
+ client_type='SYNC_CLIENT',
+ server_type='ASYNC_GENERIC_SERVER',
+ unconstrained_client='async',
+ use_generic_payload=True,
secure=secure,
categories=[SCALABLE])
@@ -638,11 +773,11 @@
LANGUAGES = {
- 'c++' : CXXLanguage(),
- 'csharp' : CSharpLanguage(),
- 'node' : NodeLanguage(),
- 'ruby' : RubyLanguage(),
- 'java' : JavaLanguage(),
- 'python' : PythonLanguage(),
- 'go' : GoLanguage(),
+ 'c++': CXXLanguage(),
+ 'csharp': CSharpLanguage(),
+ 'node': NodeLanguage(),
+ 'ruby': RubyLanguage(),
+ 'java': JavaLanguage(),
+ 'python': PythonLanguage(),
+ 'go': GoLanguage(),
}
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index 1b8f19a..dbd77d3 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -2349,6 +2349,25 @@
},
{
"deps": [
+ "gpr",
+ "grpc",
+ "grpc++",
+ "grpc++_test_config"
+ ],
+ "headers": [
+ "test/cpp/rpcmanager/grpc_rpc_manager_test.h"
+ ],
+ "language": "c++",
+ "name": "grpc_rpc_manager_test",
+ "src": [
+ "test/cpp/rpcmanager/grpc_rpc_manager_test.cc",
+ "test/cpp/rpcmanager/grpc_rpc_manager_test.h"
+ ],
+ "third_party": false,
+ "type": "target"
+ },
+ {
+ "deps": [
"grpc_plugin_support"
],
"headers": [],
@@ -6847,6 +6866,7 @@
"include/grpc++/support/time.h",
"src/cpp/client/create_channel_internal.h",
"src/cpp/common/channel_filter.h",
+ "src/cpp/rpcmanager/grpc_rpc_manager.h",
"src/cpp/server/dynamic_thread_pool.h",
"src/cpp/server/thread_pool_interface.h"
],
@@ -6914,6 +6934,8 @@
"src/cpp/common/completion_queue_cc.cc",
"src/cpp/common/core_codegen.cc",
"src/cpp/common/rpc_method.cc",
+ "src/cpp/rpcmanager/grpc_rpc_manager.cc",
+ "src/cpp/rpcmanager/grpc_rpc_manager.h",
"src/cpp/server/async_generic_service.cc",
"src/cpp/server/create_default_thread_pool.cc",
"src/cpp/server/dynamic_thread_pool.cc",
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index 70c806e..a31742a 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -2406,6 +2406,27 @@
"cpu_cost": 1.0,
"exclude_configs": [],
"flaky": false,
+ "gtest": false,
+ "language": "c++",
+ "name": "grpc_rpc_manager_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ]
+ },
+ {
+ "args": [],
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "flaky": false,
"gtest": true,
"language": "c++",
"name": "grpc_tool_test",
diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj
index 1e5fb9b..1f7b1e5 100644
--- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj
+++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj
@@ -368,6 +368,7 @@
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\secure_server_credentials.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\client\create_channel_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\channel_args.h" />
@@ -494,6 +495,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\create_default_thread_pool.cc">
diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
index 8da2b76..c5100d1 100644
--- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
@@ -61,6 +61,9 @@
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc">
<Filter>src\cpp\common</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc">
+ <Filter>src\cpp\rpcmanager</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc">
<Filter>src\cpp\server</Filter>
</ClCompile>
@@ -704,6 +707,9 @@
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h">
<Filter>src\cpp\common</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h">
+ <Filter>src\cpp\rpcmanager</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h">
<Filter>src\cpp\server</Filter>
</ClInclude>
@@ -1037,6 +1043,9 @@
<Filter Include="src\cpp\common">
<UniqueIdentifier>{2336e396-7e0b-8bf9-3b09-adc6ad1f0e5b}</UniqueIdentifier>
</Filter>
+ <Filter Include="src\cpp\rpcmanager">
+ <UniqueIdentifier>{f142b1a2-5198-040b-9da4-2afc09e9248a}</UniqueIdentifier>
+ </Filter>
<Filter Include="src\cpp\server">
<UniqueIdentifier>{321b0980-74ad-e8ca-f23b-deffa5d6bb8f}</UniqueIdentifier>
</Filter>
diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
index e712c48..32d7b68 100644
--- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
@@ -364,6 +364,7 @@
<ItemGroup>
<ClInclude Include="$(SolutionDir)\..\src\cpp\client\create_channel_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h" />
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\thread_pool_interface.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\channel_args.h" />
@@ -480,6 +481,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\create_default_thread_pool.cc">
diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
index b2ccc64..9734270 100644
--- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
@@ -46,6 +46,9 @@
<ClCompile Include="$(SolutionDir)\..\src\cpp\common\rpc_method.cc">
<Filter>src\cpp\common</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.cc">
+ <Filter>src\cpp\rpcmanager</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\cpp\server\async_generic_service.cc">
<Filter>src\cpp\server</Filter>
</ClCompile>
@@ -677,6 +680,9 @@
<ClInclude Include="$(SolutionDir)\..\src\cpp\common\channel_filter.h">
<Filter>src\cpp\common</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\cpp\rpcmanager\grpc_rpc_manager.h">
+ <Filter>src\cpp\rpcmanager</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\cpp\server\dynamic_thread_pool.h">
<Filter>src\cpp\server</Filter>
</ClInclude>
@@ -1010,6 +1016,9 @@
<Filter Include="src\cpp\common">
<UniqueIdentifier>{ed8e4daa-825f-fbe5-2a45-846ad9165d3d}</UniqueIdentifier>
</Filter>
+ <Filter Include="src\cpp\rpcmanager">
+ <UniqueIdentifier>{cb26a5cb-4725-6fee-8abc-09d5fcd52f39}</UniqueIdentifier>
+ </Filter>
<Filter Include="src\cpp\server">
<UniqueIdentifier>{8a54a279-d14b-4237-0df3-1ffe1ef5a7af}</UniqueIdentifier>
</Filter>
diff --git a/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj
new file mode 100644
index 0000000..4502de8
--- /dev/null
+++ b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj
@@ -0,0 +1,204 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" />
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Debug|x64">
+ <Configuration>Debug</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|x64">
+ <Configuration>Release</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{A4F24E89-1766-2FAA-9058-1094EAA018A8}</ProjectGuid>
+ <IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
+ <IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration">
+ <PlatformToolset>v100</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration">
+ <PlatformToolset>v110</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration">
+ <PlatformToolset>v120</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration">
+ <PlatformToolset>v140</PlatformToolset>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ <Import Project="$(SolutionDir)\..\vsprojects\cpptest.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\global.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\openssl.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\protobuf.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\winsock.props" />
+ <Import Project="$(SolutionDir)\..\vsprojects\zlib.props" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)'=='Debug'">
+ <TargetName>grpc_rpc_manager_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)'=='Release'">
+ <TargetName>grpc_rpc_manager_test</TargetName>
+ <Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
+ <Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
+ <Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
+ <Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <ClCompile>
+ <PrecompiledHeader>NotUsing</PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>MaxSpeed</Optimization>
+ <PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <SDLCheck>true</SDLCheck>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <TreatWarningAsError>true</TreatWarningAsError>
+ <DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
+ <MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
+ <GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ </ItemDefinitionGroup>
+
+ <ItemGroup>
+ <ClInclude Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.h" />
+ </ItemGroup>
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.cc">
+ </ClCompile>
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc++\grpc++.vcxproj">
+ <Project>{C187A093-A0FE-489D-A40A-6E33DE0F9FEB}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj">
+ <Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
+ <Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
+ </ProjectReference>
+ <ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc++_test_config\grpc++_test_config.vcxproj">
+ <Project>{3F7D093D-11F9-C4BC-BEB7-18EB28E3F290}</Project>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="packages.config" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ <Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
+ </ImportGroup>
+ <Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
+ <PropertyGroup>
+ <ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
+ </PropertyGroup>
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" />
+ <Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" />
+ </Target>
+</Project>
+
diff --git a/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj.filters b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj.filters
new file mode 100644
index 0000000..fedaea0
--- /dev/null
+++ b/vsprojects/vcxproj/test/grpc_rpc_manager_test/grpc_rpc_manager_test.vcxproj.filters
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <ClCompile Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.cc">
+ <Filter>test\cpp\rpcmanager</Filter>
+ </ClCompile>
+ </ItemGroup>
+ <ItemGroup>
+ <ClInclude Include="$(SolutionDir)\..\test\cpp\rpcmanager\grpc_rpc_manager_test.h">
+ <Filter>test\cpp\rpcmanager</Filter>
+ </ClInclude>
+ </ItemGroup>
+
+ <ItemGroup>
+ <Filter Include="test">
+ <UniqueIdentifier>{9da529f7-8064-34c0-54da-0fade27184ad}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\cpp">
+ <UniqueIdentifier>{b6e53cff-22ab-1194-866d-57caa3551fd2}</UniqueIdentifier>
+ </Filter>
+ <Filter Include="test\cpp\rpcmanager">
+ <UniqueIdentifier>{c63d7236-e7c6-d7b7-e3d8-f25853e358e6}</UniqueIdentifier>
+ </Filter>
+ </ItemGroup>
+</Project>
+