Async server works
diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc
index 8424dba..e598fb5 100644
--- a/test/cpp/qps/server.cc
+++ b/test/cpp/qps/server.cc
@@ -57,24 +57,12 @@
namespace grpc {
namespace testing {
-static bool SetPayload(PayloadType type, int size, Payload* payload) {
- PayloadType response_type = type;
- // TODO(yangg): Support UNCOMPRESSABLE payload.
- if (type != PayloadType::COMPRESSABLE) {
- return false;
- }
- payload->set_type(response_type);
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
- return true;
-}
-
class TestServiceImpl GRPC_FINAL : public TestService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) override {
if (request->has_response_size() && request->response_size() > 0) {
- if (!SetPayload(request->response_type(), request->response_size(),
+ if (!Server::SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
}
@@ -87,21 +75,7 @@
public:
SynchronousServer(const ServerConfig& config, int port)
: thread_pool_(config.threads()),
- impl_(MakeImpl(port)),
- timer_(new Timer) {}
-
- ServerStats Mark() GRPC_OVERRIDE {
- std::unique_ptr<Timer> timer(new Timer);
- timer.swap(timer_);
-
- auto timer_result = timer->Mark();
-
- ServerStats stats;
- stats.set_time_elapsed(timer_result.wall);
- stats.set_time_system(timer_result.system);
- stats.set_time_user(timer_result.user);
- return stats;
- }
+ impl_(MakeImpl(port)) {}
private:
std::unique_ptr<grpc::Server> MakeImpl(int port) {
@@ -120,7 +94,6 @@
TestServiceImpl service_;
ThreadPool thread_pool_;
std::unique_ptr<grpc::Server> impl_;
- std::unique_ptr<Timer> timer_;
};
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index 3542c17..ca22d7c 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -34,6 +34,7 @@
#ifndef TEST_QPS_SERVER_H
#define TEST_QPS_SERVER_H
+#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/qpstest.pb.h"
namespace grpc {
@@ -41,9 +42,36 @@
class Server {
public:
+ Server():timer_(new Timer) {}
virtual ~Server() {}
- virtual ServerStats Mark() = 0;
+ ServerStats Mark() {
+ std::unique_ptr<Timer> timer(new Timer);
+ timer.swap(timer_);
+
+ auto timer_result = timer->Mark();
+
+ ServerStats stats;
+ stats.set_time_elapsed(timer_result.wall);
+ stats.set_time_system(timer_result.system);
+ stats.set_time_user(timer_result.user);
+ return stats;
+ }
+
+ static bool SetPayload(PayloadType type, int size, Payload* payload) {
+ PayloadType response_type = type;
+ // TODO(yangg): Support UNCOMPRESSABLE payload.
+ if (type != PayloadType::COMPRESSABLE) {
+ return false;
+ }
+ payload->set_type(response_type);
+ std::unique_ptr<char[]> body(new char[size]());
+ payload->set_body(body.get(), size);
+ return true;
+ }
+
+ private:
+ std::unique_ptr<Timer> timer_;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index c006262..741a858 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -51,104 +51,37 @@
#include "src/cpp/server/thread_pool.h"
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/qps/qpstest.pb.h"
+#include "test/cpp/qps/server.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
-DEFINE_int32(port, 0, "Server port.");
-DEFINE_int32(server_threads, 4, "Number of server threads.");
+namespace grpc {
+ namespace testing {
-using grpc::CompletionQueue;
-using grpc::Server;
-using grpc::ServerBuilder;
-using grpc::ServerContext;
-using grpc::ThreadPool;
-using grpc::testing::Payload;
-using grpc::testing::PayloadType;
-using grpc::testing::ServerStats;
-using grpc::testing::SimpleRequest;
-using grpc::testing::SimpleResponse;
-using grpc::testing::StatsRequest;
-using grpc::testing::TestService;
-using grpc::Status;
-
-// In some distros, gflags is in the namespace google, and in some others,
-// in gflags. This hack is enabling us to find both.
-namespace google {}
-namespace gflags {}
-using namespace google;
-using namespace gflags;
-
-static bool got_sigint = false;
-
-static void sigint_handler(int x) { got_sigint = 1; }
-
-static double time_double(struct timeval *tv) {
- return tv->tv_sec + 1e-6 * tv->tv_usec;
-}
-
-static bool SetPayload(PayloadType type, int size, Payload *payload) {
- PayloadType response_type = type;
- // TODO(yangg): Support UNCOMPRESSABLE payload.
- if (type != PayloadType::COMPRESSABLE) {
- return false;
- }
- payload->set_type(response_type);
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
- return true;
-}
-
-namespace {
-
-class AsyncQpsServerTest {
+class AsyncQpsServerTest : public Server {
public:
- AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
+ AsyncQpsServerTest(const ServerConfig& config, int port) : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
char *server_address = NULL;
- gpr_join_host_port(&server_address, "::", FLAGS_port);
+ gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder;
builder.AddPort(server_address);
+ gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
server_ = builder.BuildAndStart();
- gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
- gpr_free(server_address);
using namespace std::placeholders;
request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall,
&async_service_, _1, _2, _3, &srv_cq_, _4);
- request_stats_ =
- std::bind(&TestService::AsyncService::RequestCollectServerStats,
- &async_service_, _1, _2, _3, &srv_cq_, _4);
for (int i = 0; i < 100; i++) {
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary_, UnaryCall));
- contexts_.push_front(
- new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>(
- request_stats_, CollectServerStats));
}
- }
- ~AsyncQpsServerTest() {
- server_->Shutdown();
- void *ignored_tag;
- bool ignored_ok;
- srv_cq_.Shutdown();
- while (srv_cq_.Next(&ignored_tag, &ignored_ok)) {
- }
- while (!contexts_.empty()) {
- delete contexts_.front();
- contexts_.pop_front();
- }
- for (auto& thr: threads_) {
- thr.join();
- }
- }
- void ServeRpcs(int num_threads) {
- for (int i = 0; i < num_threads; i++) {
+ for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
@@ -166,8 +99,16 @@
return;
}));
}
- while (!got_sigint) {
- std::this_thread::sleep_for(std::chrono::seconds(5));
+ }
+ ~AsyncQpsServerTest() {
+ server_->Shutdown();
+ srv_cq_.Shutdown();
+ for (auto& thr: threads_) {
+ thr.join();
+ }
+ while (!contexts_.empty()) {
+ delete contexts_.front();
+ contexts_.pop_front();
}
}
@@ -240,17 +181,6 @@
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
- static Status CollectServerStats(const StatsRequest *,
- ServerStats *response) {
- struct rusage usage;
- struct timeval tv;
- gettimeofday(&tv, NULL);
- getrusage(RUSAGE_SELF, &usage);
- response->set_time_now(time_double(&tv));
- response->set_time_user(time_double(&usage.ru_utime));
- response->set_time_system(time_double(&usage.ru_stime));
- return Status::OK;
- }
static Status UnaryCall(const SimpleRequest *request,
SimpleResponse *response) {
if (request->has_response_size() && request->response_size() > 0) {
@@ -264,40 +194,16 @@
CompletionQueue srv_cq_;
TestService::AsyncService async_service_;
std::vector<std::thread> threads_;
- std::unique_ptr<Server> server_;
+ std::unique_ptr<grpc::Server> server_;
std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
- std::function<void(ServerContext *, StatsRequest *,
- grpc::ServerAsyncResponseWriter<ServerStats> *, void *)>
- request_stats_;
std::forward_list<ServerRpcContext *> contexts_;
};
-} // namespace
-
-static void RunServer() {
- AsyncQpsServerTest server;
-
- grpc_profiler_start("qps_server_async.prof");
-
- server.ServeRpcs(FLAGS_server_threads);
-
- grpc_profiler_stop();
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port) {
+ return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
}
-int main(int argc, char **argv) {
- grpc_init();
- ParseCommandLineFlags(&argc, &argv, true);
- GPR_ASSERT(FLAGS_port != 0);
- GPR_ASSERT(!FLAGS_enable_ssl);
-
- signal(SIGINT, sigint_handler);
-
- RunServer();
-
- grpc_shutdown();
- google::protobuf::ShutdownProtobufLibrary();
-
- return 0;
-}
+ }// namespace testing
+}// namespace grpc
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index 4a2e798..a8d5752 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -90,7 +90,7 @@
case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, FLAGS_server_port);
case ServerType::ASYNC_SERVER:
- abort(); // return CreateAsyncServer(config, FLAGS_server_port);
+ return CreateAsyncServer(config, FLAGS_server_port);
}
abort();
}