blob: 64c260b834993a029ba07743a7eb1606e36b6560 [file] [log] [blame]
vjpaidea740f2015-02-26 16:35:35 -08001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <forward_list>
35#include <functional>
David Garcia Quintasc9516d42015-06-02 13:28:46 -070036#include <memory>
Vijay Pai8ad32092015-03-23 12:44:28 -070037#include <mutex>
vjpaidea740f2015-02-26 16:35:35 -080038#include <thread>
39
40#include <gflags/gflags.h>
yang-g9e2f90c2015-08-21 15:35:03 -070041#include <grpc/grpc.h>
vjpaidea740f2015-02-26 16:35:35 -080042#include <grpc/support/alloc.h>
43#include <grpc/support/host_port.h>
yang-g9e2f90c2015-08-21 15:35:03 -070044#include <grpc/support/log.h>
45#include <grpc++/support/config.h>
vjpaidea740f2015-02-26 16:35:35 -080046#include <grpc++/server.h>
47#include <grpc++/server_builder.h>
48#include <grpc++/server_context.h>
Julien Boeuf5be92a32015-08-28 16:28:18 -070049#include <grpc++/security/server_credentials.h>
vjpaidea740f2015-02-26 16:35:35 -080050#include <gtest/gtest.h>
yang-g9e2f90c2015-08-21 15:35:03 -070051
vjpai52bfb252015-10-21 07:50:49 -070052#include "test/proto/perf_control.grpc.pb.h"
Craig Tillerd6479d62015-03-04 12:50:11 -080053#include "test/cpp/qps/server.h"
vjpaidea740f2015-02-26 16:35:35 -080054
Craig Tillerd6479d62015-03-04 12:50:11 -080055namespace grpc {
Craig Tillera182bf12015-03-04 13:54:39 -080056namespace testing {
vjpaidea740f2015-02-26 16:35:35 -080057
Craig Tillerd6479d62015-03-04 12:50:11 -080058class AsyncQpsServerTest : public Server {
vjpaidea740f2015-02-26 16:35:35 -080059 public:
Craig Tiller27df2cf2015-07-01 15:11:29 -070060 AsyncQpsServerTest(const ServerConfig &config, int port) {
Craig Tiller5c8737d2015-05-21 11:42:17 -070061 char *server_address = NULL;
Craig Tillerd6479d62015-03-04 12:50:11 -080062 gpr_join_host_port(&server_address, "::", port);
vjpaidea740f2015-02-26 16:35:35 -080063
64 ServerBuilder builder;
Nicolas Noblecfd60732015-03-18 16:27:43 -070065 builder.AddListeningPort(server_address, InsecureServerCredentials());
Craig Tillerd6479d62015-03-04 12:50:11 -080066 gpr_free(server_address);
vjpaidea740f2015-02-26 16:35:35 -080067
68 builder.RegisterAsyncService(&async_service_);
Craig Tiller51f938f2015-06-09 23:32:57 -070069 for (int i = 0; i < config.threads(); i++) {
Nicolas "Pixel" Noble59588c62015-09-03 02:29:30 +020070 srv_cqs_.emplace_back(builder.AddCompletionQueue());
Craig Tiller51f938f2015-06-09 23:32:57 -070071 }
vjpaidea740f2015-02-26 16:35:35 -080072
73 server_ = builder.BuildAndStart();
vjpaidea740f2015-02-26 16:35:35 -080074
75 using namespace std::placeholders;
Craig Tillerca83dc82015-07-15 11:43:26 -070076 for (int i = 0; i < 10000 / config.threads(); i++) {
Craig Tiller51f938f2015-06-09 23:32:57 -070077 for (int j = 0; j < config.threads(); j++) {
78 auto request_unary = std::bind(
79 &TestService::AsyncService::RequestUnaryCall, &async_service_, _1,
80 _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
81 auto request_streaming = std::bind(
82 &TestService::AsyncService::RequestStreamingCall, &async_service_,
83 _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
84 contexts_.push_front(
85 new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
86 request_unary, ProcessRPC));
87 contexts_.push_front(
88 new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
89 request_streaming, ProcessRPC));
90 }
vjpaidea740f2015-02-26 16:35:35 -080091 }
Craig Tillerd6479d62015-03-04 12:50:11 -080092 for (int i = 0; i < config.threads(); i++) {
Craig Tiller27df2cf2015-07-01 15:11:29 -070093 shutdown_state_.emplace_back(new PerThreadShutdownState());
94 }
95 for (int i = 0; i < config.threads(); i++) {
Vijay Pai458faa92015-07-31 10:30:13 -070096 threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
vjpaidea740f2015-02-26 16:35:35 -080097 }
Craig Tillerd6479d62015-03-04 12:50:11 -080098 }
99 ~AsyncQpsServerTest() {
Craig Tiller71a0f9d2015-09-28 17:22:01 -0700100 auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(10);
vjpaid514b212015-08-31 16:27:35 -0700101 server_->Shutdown(deadline);
Craig Tiller27df2cf2015-07-01 15:11:29 -0700102 for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
103 (*ss)->set_shutdown();
Vijay Pai8ad32092015-03-23 12:44:28 -0700104 }
Vijay Pai82dd80a2015-03-24 10:36:08 -0700105 for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
106 thr->join();
Craig Tillerd6479d62015-03-04 12:50:11 -0800107 }
Craig Tiller51f938f2015-06-09 23:32:57 -0700108 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
109 (*cq)->Shutdown();
110 bool ok;
111 void *got_tag;
112 while ((*cq)->Next(&got_tag, &ok))
113 ;
114 }
Craig Tillerd6479d62015-03-04 12:50:11 -0800115 while (!contexts_.empty()) {
116 delete contexts_.front();
117 contexts_.pop_front();
vjpaidea740f2015-02-26 16:35:35 -0800118 }
119 }
Vijay Pai64ac47f2015-02-26 17:59:51 -0800120
vjpaidea740f2015-02-26 16:35:35 -0800121 private:
Vijay Pai458faa92015-07-31 10:30:13 -0700122 void ThreadFunc(int rank) {
123 // Wait until work is available or we are shutting down
124 bool ok;
125 void *got_tag;
126 while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
127 ServerRpcContext *ctx = detag(got_tag);
128 // The tag is a pointer to an RPC context to invoke
vjpaib1db8692015-08-11 22:41:02 -0700129 const bool still_going = ctx->RunNextState(ok);
Vijay Pai458faa92015-07-31 10:30:13 -0700130 if (!shutdown_state_[rank]->shutdown()) {
131 // this RPC context is done, so refresh it
132 if (!still_going) {
133 ctx->Reset();
134 }
135 } else {
136 return;
137 }
138 }
139 return;
140 }
141
vjpaidea740f2015-02-26 16:35:35 -0800142 class ServerRpcContext {
Vijay Pai64ac47f2015-02-26 17:59:51 -0800143 public:
vjpaidea740f2015-02-26 16:35:35 -0800144 ServerRpcContext() {}
Vijay Pai64ac47f2015-02-26 17:59:51 -0800145 virtual ~ServerRpcContext(){};
vjpai46f65232015-03-23 10:10:27 -0700146 virtual bool RunNextState(bool) = 0; // next state, return false if done
Craig Tiller5c8737d2015-05-21 11:42:17 -0700147 virtual void Reset() = 0; // start this back at a clean state
vjpaidea740f2015-02-26 16:35:35 -0800148 };
Craig Tiller5c8737d2015-05-21 11:42:17 -0700149 static void *tag(ServerRpcContext *func) {
150 return reinterpret_cast<void *>(func);
vjpaidea740f2015-02-26 16:35:35 -0800151 }
Craig Tiller5c8737d2015-05-21 11:42:17 -0700152 static ServerRpcContext *detag(void *tag) {
153 return reinterpret_cast<ServerRpcContext *>(tag);
vjpaidea740f2015-02-26 16:35:35 -0800154 }
155
156 template <class RequestType, class ResponseType>
vjpai46f65232015-03-23 10:10:27 -0700157 class ServerRpcContextUnaryImpl GRPC_FINAL : public ServerRpcContext {
vjpaidea740f2015-02-26 16:35:35 -0800158 public:
159 ServerRpcContextUnaryImpl(
Craig Tiller5c8737d2015-05-21 11:42:17 -0700160 std::function<void(ServerContext *, RequestType *,
161 grpc::ServerAsyncResponseWriter<ResponseType> *,
162 void *)> request_method,
163 std::function<grpc::Status(const RequestType *, ResponseType *)>
vjpai4e1e1bc2015-02-27 23:47:12 -0800164 invoke_method)
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700165 : srv_ctx_(new ServerContext),
166 next_state_(&ServerRpcContextUnaryImpl::invoker),
vjpaidea740f2015-02-26 16:35:35 -0800167 request_method_(request_method),
168 invoke_method_(invoke_method),
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700169 response_writer_(srv_ctx_.get()) {
170 request_method_(srv_ctx_.get(), &req_, &response_writer_,
Vijay Pai64ac47f2015-02-26 17:59:51 -0800171 AsyncQpsServerTest::tag(this));
vjpaidea740f2015-02-26 16:35:35 -0800172 }
vjpai3c110662015-02-27 07:17:16 -0800173 ~ServerRpcContextUnaryImpl() GRPC_OVERRIDE {}
Craig Tiller5c8737d2015-05-21 11:42:17 -0700174 bool RunNextState(bool ok) GRPC_OVERRIDE {
175 return (this->*next_state_)(ok);
176 }
vjpai5b39f9a2015-02-27 09:33:00 -0800177 void Reset() GRPC_OVERRIDE {
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700178 srv_ctx_.reset(new ServerContext);
vjpaidea740f2015-02-26 16:35:35 -0800179 req_ = RequestType();
180 response_writer_ =
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700181 grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get());
vjpaidea740f2015-02-26 16:35:35 -0800182
183 // Then request the method
184 next_state_ = &ServerRpcContextUnaryImpl::invoker;
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700185 request_method_(srv_ctx_.get(), &req_, &response_writer_,
Vijay Pai64ac47f2015-02-26 17:59:51 -0800186 AsyncQpsServerTest::tag(this));
vjpaidea740f2015-02-26 16:35:35 -0800187 }
Vijay Pai64ac47f2015-02-26 17:59:51 -0800188
vjpaidea740f2015-02-26 16:35:35 -0800189 private:
vjpai46f65232015-03-23 10:10:27 -0700190 bool finisher(bool) { return false; }
191 bool invoker(bool ok) {
Craig Tiller8221e402015-04-09 12:23:21 -0700192 if (!ok) {
193 return false;
194 }
vjpai46f65232015-03-23 10:10:27 -0700195
vjpaidea740f2015-02-26 16:35:35 -0800196 ResponseType response;
197
198 // Call the RPC processing function
199 grpc::Status status = invoke_method_(&req_, &response);
200
201 // Have the response writer work and invoke on_finish when done
202 next_state_ = &ServerRpcContextUnaryImpl::finisher;
Vijay Pai64ac47f2015-02-26 17:59:51 -0800203 response_writer_.Finish(response, status, AsyncQpsServerTest::tag(this));
vjpaidea740f2015-02-26 16:35:35 -0800204 return true;
205 }
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700206 std::unique_ptr<ServerContext> srv_ctx_;
vjpaidea740f2015-02-26 16:35:35 -0800207 RequestType req_;
vjpai46f65232015-03-23 10:10:27 -0700208 bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
Craig Tiller5c8737d2015-05-21 11:42:17 -0700209 std::function<void(ServerContext *, RequestType *,
210 grpc::ServerAsyncResponseWriter<ResponseType> *, void *)>
vjpai4e1e1bc2015-02-27 23:47:12 -0800211 request_method_;
Craig Tiller5c8737d2015-05-21 11:42:17 -0700212 std::function<grpc::Status(const RequestType *, ResponseType *)>
vjpai4e1e1bc2015-02-27 23:47:12 -0800213 invoke_method_;
vjpaidea740f2015-02-26 16:35:35 -0800214 grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
215 };
216
vjpai46f65232015-03-23 10:10:27 -0700217 template <class RequestType, class ResponseType>
218 class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext {
219 public:
220 ServerRpcContextStreamingImpl(
Craig Tiller5c8737d2015-05-21 11:42:17 -0700221 std::function<void(ServerContext *, grpc::ServerAsyncReaderWriter<
222 ResponseType, RequestType> *,
223 void *)> request_method,
vjpai46f65232015-03-23 10:10:27 -0700224 std::function<grpc::Status(const RequestType *, ResponseType *)>
225 invoke_method)
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700226 : srv_ctx_(new ServerContext),
227 next_state_(&ServerRpcContextStreamingImpl::request_done),
vjpai46f65232015-03-23 10:10:27 -0700228 request_method_(request_method),
229 invoke_method_(invoke_method),
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700230 stream_(srv_ctx_.get()) {
231 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
vjpai46f65232015-03-23 10:10:27 -0700232 }
Craig Tiller5c8737d2015-05-21 11:42:17 -0700233 ~ServerRpcContextStreamingImpl() GRPC_OVERRIDE {}
234 bool RunNextState(bool ok) GRPC_OVERRIDE {
235 return (this->*next_state_)(ok);
vjpai46f65232015-03-23 10:10:27 -0700236 }
vjpai46f65232015-03-23 10:10:27 -0700237 void Reset() GRPC_OVERRIDE {
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700238 srv_ctx_.reset(new ServerContext);
vjpai46f65232015-03-23 10:10:27 -0700239 req_ = RequestType();
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700240 stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(
241 srv_ctx_.get());
vjpai46f65232015-03-23 10:10:27 -0700242
243 // Then request the method
244 next_state_ = &ServerRpcContextStreamingImpl::request_done;
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700245 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
vjpai46f65232015-03-23 10:10:27 -0700246 }
247
248 private:
249 bool request_done(bool ok) {
Craig Tiller8221e402015-04-09 12:23:21 -0700250 if (!ok) {
251 return false;
252 }
vjpai46f65232015-03-23 10:10:27 -0700253 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
254 next_state_ = &ServerRpcContextStreamingImpl::read_done;
255 return true;
256 }
Vijay Pai8ad32092015-03-23 12:44:28 -0700257
vjpai46f65232015-03-23 10:10:27 -0700258 bool read_done(bool ok) {
259 if (ok) {
Craig Tiller5c8737d2015-05-21 11:42:17 -0700260 // invoke the method
261 ResponseType response;
262 // Call the RPC processing function
263 grpc::Status status = invoke_method_(&req_, &response);
264 // initiate the write
265 stream_.Write(response, AsyncQpsServerTest::tag(this));
266 next_state_ = &ServerRpcContextStreamingImpl::write_done;
267 } else { // client has sent writes done
268 // finish the stream
269 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
270 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
vjpai46f65232015-03-23 10:10:27 -0700271 }
272 return true;
273 }
274 bool write_done(bool ok) {
275 // now go back and get another streaming read!
276 if (ok) {
Craig Tiller5c8737d2015-05-21 11:42:17 -0700277 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
278 next_state_ = &ServerRpcContextStreamingImpl::read_done;
279 } else {
280 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
281 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
vjpai46f65232015-03-23 10:10:27 -0700282 }
283 return true;
284 }
Craig Tiller5c8737d2015-05-21 11:42:17 -0700285 bool finish_done(bool ok) { return false; /* reset the context */ }
vjpai46f65232015-03-23 10:10:27 -0700286
David Garcia Quintasc9516d42015-06-02 13:28:46 -0700287 std::unique_ptr<ServerContext> srv_ctx_;
vjpai46f65232015-03-23 10:10:27 -0700288 RequestType req_;
289 bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
Craig Tiller5c8737d2015-05-21 11:42:17 -0700290 std::function<void(
291 ServerContext *,
292 grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
293 request_method_;
vjpai46f65232015-03-23 10:10:27 -0700294 std::function<grpc::Status(const RequestType *, ResponseType *)>
295 invoke_method_;
Craig Tiller5c8737d2015-05-21 11:42:17 -0700296 grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
vjpai46f65232015-03-23 10:10:27 -0700297 };
298
Craig Tiller5c8737d2015-05-21 11:42:17 -0700299 static Status ProcessRPC(const SimpleRequest *request,
300 SimpleResponse *response) {
vjpai46f65232015-03-23 10:10:27 -0700301 if (request->response_size() > 0) {
vjpaidea740f2015-02-26 16:35:35 -0800302 if (!SetPayload(request->response_type(), request->response_size(),
303 response->mutable_payload())) {
304 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
305 }
306 }
307 return Status::OK;
308 }
Vijay Paiacf6f312015-03-02 15:13:39 -0800309 std::vector<std::thread> threads_;
Craig Tillerd6479d62015-03-04 12:50:11 -0800310 std::unique_ptr<grpc::Server> server_;
Craig Tiller51f938f2015-06-09 23:32:57 -0700311 std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700312 TestService::AsyncService async_service_;
Craig Tiller5c8737d2015-05-21 11:42:17 -0700313 std::forward_list<ServerRpcContext *> contexts_;
Vijay Pai8ad32092015-03-23 12:44:28 -0700314
Craig Tiller27df2cf2015-07-01 15:11:29 -0700315 class PerThreadShutdownState {
316 public:
317 PerThreadShutdownState() : shutdown_(false) {}
318
319 bool shutdown() const {
320 std::lock_guard<std::mutex> lock(mutex_);
321 return shutdown_;
322 }
323
324 void set_shutdown() {
325 std::lock_guard<std::mutex> lock(mutex_);
326 shutdown_ = true;
327 }
328
329 private:
330 mutable std::mutex mutex_;
331 bool shutdown_;
332 };
333 std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
vjpaidea740f2015-02-26 16:35:35 -0800334};
335
Craig Tiller5c8737d2015-05-21 11:42:17 -0700336std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
Craig Tillera182bf12015-03-04 13:54:39 -0800337 int port) {
338 return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
vjpaidea740f2015-02-26 16:35:35 -0800339}
340
Craig Tillera182bf12015-03-04 13:54:39 -0800341} // namespace testing
342} // namespace grpc