Driver changes
WIP - things compile again after a broad set of changes preparing for
the driver code.
diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc
index 8e13634..a5edb05 100644
--- a/test/cpp/qps/server.cc
+++ b/test/cpp/qps/server.cc
@@ -44,6 +44,7 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/status.h>
+#include <grpc++/stream.h>
#include "src/cpp/server/thread_pool.h"
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/qps/qpstest.pb.h"
@@ -51,13 +52,13 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
DEFINE_int32(port, 0, "Server port.");
-DEFINE_int32(server_threads, 4, "Number of server threads.");
+DEFINE_int32(driver_port, 0, "Server driver port.");
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
+using grpc::ServerReaderWriter;
using grpc::ThreadPool;
using grpc::testing::Payload;
using grpc::testing::PayloadType;
@@ -66,6 +67,10 @@
using grpc::testing::SimpleResponse;
using grpc::testing::StatsRequest;
using grpc::testing::TestService;
+using grpc::testing::QpsServer;
+using grpc::testing::ServerArgs;
+using grpc::testing::ServerStats;
+using grpc::testing::ServerStatus;
using grpc::Status;
// In some distros, gflags is in the namespace google, and in some others,
@@ -124,34 +129,76 @@
} // namespace
+class ServerImpl : public QpsServer::Service {
+ public:
+ Status RunServer(ServerContext* ctx, ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
+ ServerArgs args;
+ std::unique_ptr<ServerStats> last_stats;
+ if (!stream->Read(&args)) return Status::OK;
+
+ bool done = false;
+ while (!done) {
+ std::lock_guard<std::mutex> lock(server_mu_);
+
+ char* server_address = NULL;
+ gpr_join_host_port(&server_address, "::", FLAGS_port);
+
+ TestServiceImpl service;
+
+ ServerBuilder builder;
+ builder.AddPort(server_address);
+ builder.RegisterService(&service);
+
+ gpr_free(server_address);
+
+ std::unique_ptr<ThreadPool> pool(new ThreadPool(args.threads()));
+ builder.SetThreadPool(pool.get());
+
+ auto server = builder.BuildAndStart();
+ gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
+
+ ServerStatus last_status;
+ if (last_stats.get()) {
+ *last_status.mutable_stats() = *last_stats;
+ }
+ if (!stream->Write(last_status)) return Status(grpc::UNKNOWN);
+
+ grpc_profiler_start("qps_server.prof");
+
+ done = stream->Read(&args);
+
+ grpc_profiler_stop();
+ }
+
+ ServerStatus last_status;
+ if (last_stats.get()) {
+ *last_status.mutable_stats() = *last_stats;
+ }
+ stream->Write(last_status);
+ return Status::OK;
+ }
+
+ private:
+ std::mutex server_mu_;
+};
+
static void RunServer() {
char* server_address = NULL;
- gpr_join_host_port(&server_address, "::", FLAGS_port);
+ gpr_join_host_port(&server_address, "::", FLAGS_driver_port);
- TestServiceImpl service;
-
- SimpleRequest request;
- SimpleResponse response;
+ ServerImpl service;
ServerBuilder builder;
builder.AddPort(server_address);
builder.RegisterService(&service);
- std::unique_ptr<ThreadPool> pool(new ThreadPool(FLAGS_server_threads));
- builder.SetThreadPool(pool.get());
+ gpr_free(server_address);
- std::unique_ptr<Server> server(builder.BuildAndStart());
- gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
-
- grpc_profiler_start("qps_server.prof");
+ auto server = builder.BuildAndStart();
while (!got_sigint) {
std::this_thread::sleep_for(std::chrono::seconds(5));
}
-
- grpc_profiler_stop();
-
- gpr_free(server_address);
}
int main(int argc, char** argv) {
@@ -161,7 +208,6 @@
signal(SIGINT, sigint_handler);
GPR_ASSERT(FLAGS_port != 0);
- GPR_ASSERT(!FLAGS_enable_ssl);
RunServer();
grpc_shutdown();