Make settings configurable
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 5b4cb6f..bae83ee 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -135,10 +135,13 @@
/// \param max_pollers The maximum number of polling threads per server
/// completion queue (in param sync_server_cqs) to use for listening to
/// incoming requests (used only in case of sync server)
+ ///
+ /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
+ /// server completion queues passed via sync_server_cqs param.
Server(std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
int max_message_size, ChannelArguments* args, int min_pollers,
- int max_pollers);
+ int max_pollers, int sync_cq_timeout_msec);
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index d9a6878..8fac168 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -62,6 +62,22 @@
public:
ServerBuilder();
+ struct SyncServerSettings {
+ // Number of server completion queues to create to listen to incoming RPCs.
+ int num_cqs;
+
+ // Minimum number of threads per completion queue that should be listening
+ // to incoming RPCs.
+ int min_pollers;
+
+ // Maximum number of threads per completion queue that can be listening to
+ // incoming RPCs.
+ int max_pollers;
+
+ // The timeout for server completion queue's AsyncNext call.
+ int cq_timeout_msec;
+ };
+
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the \a Server instance returned
/// by \a BuildAndStart().
@@ -115,6 +131,9 @@
ServerBuilder& SetOption(std::unique_ptr<ServerBuilderOption> option);
+ /// Note: Only useful if this is a Synchronous server.
+ void SetSyncServerSettings(SyncServerSettings settings);
+
/// Tries to bind \a server to the given \a addr.
///
/// It can be invoked multiple times.
@@ -164,18 +183,20 @@
private:
friend class ::grpc::testing::ServerBuilderPluginTest;
- // TODO (sreek) Make these configurable
- // The default number of minimum and maximum number of polling threads needed
- // per completion queue. These are only used in case of Sync server
- const int kDefaultMinPollers = 1;
- const int kDefaultMaxPollers = -1; // Unlimited
-
struct Port {
grpc::string addr;
std::shared_ptr<ServerCredentials> creds;
int* selected_port;
};
+ // Sync server settings. If this is not set via SetSyncServerSettings(), the
+ // following default values are used:
+ // sync_server_settings_.num_cqs = Number of CPUs
+ // sync_server_settings_.min_pollers = 1
+ // sync_server_settings_.max_pollers = INT_MAX
+ // sync_server_settings_.cq_timeout_msec = 1000
+ struct SyncServerSettings sync_server_settings_;
+
typedef std::unique_ptr<grpc::string> HostString;
struct NamedService {
explicit NamedService(Service* s) : service(s) {}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index eab57b4..1a27100 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -62,6 +62,7 @@
auto& factory = *it;
plugins_.emplace_back(factory());
}
+
// all compression algorithms enabled by default.
enabled_compression_algorithms_bitset_ =
(1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
@@ -69,6 +70,17 @@
sizeof(maybe_default_compression_level_));
memset(&maybe_default_compression_algorithm_, 0,
sizeof(maybe_default_compression_algorithm_));
+
+
+ // Sync server setting defaults
+ sync_server_settings_.min_pollers = 1;
+ sync_server_settings_.max_pollers = INT_MAX;
+
+ int num_cpus = gpr_cpu_num_cores();
+ num_cpus = GPR_MAX(num_cpus, 4);
+ sync_server_settings_.num_cqs = num_cpus;
+
+ sync_server_settings_.cq_timeout_msec = 1000;
}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
@@ -131,6 +143,10 @@
return *this;
}
+void ServerBuilder:: SetSyncServerSettings(SyncServerSettings settings) {
+ sync_server_settings_ = settings; // copy the settings
+}
+
ServerBuilder& ServerBuilder::AddListeningPort(
const grpc::string& addr, std::shared_ptr<ServerCredentials> creds,
int* selected_port) {
@@ -200,23 +216,17 @@
if (has_sync_methods) {
// If the server has synchronous methods, it will need completion queues to
- // handle those methods. Create one cq per core (or create 4 if number of
- // cores is less than 4 or unavailable)
- //
- // TODO (sreek) - The default number 4 is just a guess. Check if a lower or
- // higher number makes sense
- int num_cqs = gpr_cpu_num_cores();
- num_cqs = GPR_MAX(num_cqs, 4);
-
- for (int i = 0; i < num_cqs; i++) {
+ // handle those methods.
+ for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
sync_server_cqs->emplace_back(new ServerCompletionQueue());
}
}
// TODO (sreek) Make the number of pollers configurable
- std::unique_ptr<Server> server(
- new Server(sync_server_cqs, max_receive_message_size_, &args,
- kDefaultMinPollers, kDefaultMaxPollers));
+ std::unique_ptr<Server> server(new Server(
+ sync_server_cqs, max_receive_message_size_, &args,
+ sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec));
ServerInitializer* initializer = server->initializer();
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index e9f3c99..36bc61f 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -291,18 +291,17 @@
public:
SyncRequestManager(Server* server, CompletionQueue* server_cq,
std::shared_ptr<GlobalCallbacks> global_callbacks,
- int min_pollers, int max_pollers)
+ int min_pollers, int max_pollers, int cq_timeout_msec)
: GrpcRpcManager(min_pollers, max_pollers),
server_(server),
server_cq_(server_cq),
+ cq_timeout_msec_(cq_timeout_msec),
global_callbacks_(global_callbacks) {}
- static const int kRpcPollingTimeoutMsec = 3000;
-
WorkStatus PollForWork(void** tag, bool* ok) GRPC_OVERRIDE {
*tag = nullptr;
gpr_timespec deadline =
- gpr_time_from_millis(kRpcPollingTimeoutMsec, GPR_TIMESPAN);
+ gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN);
switch (server_cq_->AsyncNext(tag, ok, deadline)) {
case CompletionQueue::TIMEOUT:
@@ -389,6 +388,7 @@
private:
Server* server_;
CompletionQueue* server_cq_;
+ int cq_timeout_msec_;
std::vector<SyncRequest> sync_methods_;
std::unique_ptr<RpcServiceMethod> unknown_method_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
@@ -399,7 +399,7 @@
std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
sync_server_cqs,
int max_receive_message_size, ChannelArguments* args, int min_pollers,
- int max_pollers)
+ int max_pollers, int sync_cq_timeout_msec)
: max_receive_message_size_(max_receive_message_size),
sync_server_cqs_(sync_server_cqs),
started_(false),
@@ -415,8 +415,9 @@
for (auto it = sync_server_cqs_->begin(); it != sync_server_cqs_->end();
it++) {
- sync_req_mgrs_.emplace_back(new SyncRequestManager(
- this, (*it).get(), global_callbacks_, min_pollers, max_pollers));
+ sync_req_mgrs_.emplace_back(
+ new SyncRequestManager(this, (*it).get(), global_callbacks_,
+ min_pollers, max_pollers, sync_cq_timeout_msec));
}
grpc_channel_args channel_args;
@@ -606,7 +607,7 @@
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
- while(shutdown_cq.Next(&tag, &ok)) {
+ while (shutdown_cq.Next(&tag, &ok)) {
// Nothing to be done here
}
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index b1d3ce9..a46f9f2 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -226,6 +226,11 @@
kMaxMessageSize_(8192),
special_service_("special") {
GetParam().Log();
+
+ sync_server_settings_.max_pollers = INT_MAX;
+ sync_server_settings_.min_pollers = 1;
+ sync_server_settings_.cq_timeout_msec = 10;
+ sync_server_settings_.num_cqs = 4;
}
void TearDown() GRPC_OVERRIDE {
@@ -250,6 +255,9 @@
builder.SetMaxMessageSize(
kMaxMessageSize_); // For testing max message size.
builder.RegisterService(&dup_pkg_service_);
+
+ builder.SetSyncServerSettings(sync_server_settings_);
+
server_ = builder.BuildAndStart();
is_server_started_ = true;
}
@@ -279,6 +287,8 @@
ServerBuilder builder;
builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
builder.RegisterService(proxy_service_.get());
+ builder.SetSyncServerSettings(sync_server_settings_);
+
proxy_server_ = builder.BuildAndStart();
channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
@@ -299,6 +309,7 @@
TestServiceImpl special_service_;
TestServiceImplDupPkg dup_pkg_service_;
grpc::string user_agent_prefix_;
+ ServerBuilder::SyncServerSettings sync_server_settings_;
};
static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,