blob: 024537c34aa7b09a1aa47f454912e39b2cd59ad9 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc++/server.h>
35#include <utility>
36
37#include <grpc/grpc.h>
yangg9e21f722014-12-08 15:49:52 -080038#include <grpc/grpc_security.h>
Yang Gao2a3c96a2015-03-11 23:32:40 -070039#include <grpc/support/alloc.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080041#include <grpc++/completion_queue.h>
Yang Gao49996492015-03-12 16:40:19 -070042#include <grpc++/async_generic_service.h>
yangg1b151092015-01-09 15:31:05 -080043#include <grpc++/impl/rpc_service_method.h>
Craig Tiller8c8d0aa2015-02-12 11:38:36 -080044#include <grpc++/impl/service_type.h>
Craig Tillerb4701692015-02-09 16:12:00 -080045#include <grpc++/server_context.h>
yangg9e21f722014-12-08 15:49:52 -080046#include <grpc++/server_credentials.h>
Craig Tiller0db1bef2015-02-09 13:47:39 -080047#include <grpc++/thread_pool_interface.h>
Nicolas Noble89219162015-04-07 18:01:18 -070048#include <grpc++/time.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049
Vijay Pai9ffbd0c2015-04-15 01:02:50 -070050#include "src/core/profiling/timers.h"
Craig Tillerc4165772015-02-11 10:51:04 -080051#include "src/cpp/proto/proto_utils.h"
52
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053namespace grpc {
54
Craig Tillerbce999f2015-05-27 09:55:51 -070055class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
56 public:
57 bool FinalizeResult(void** tag, bool* status) {
58 delete this;
59 return false;
60 }
61};
62
Craig Tillercf133f42015-02-26 14:05:56 -080063class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
Craig Tillerc4165772015-02-11 10:51:04 -080064 public:
Craig Tiller1c9a2a92015-02-12 14:10:25 -080065 SyncRequest(RpcServiceMethod* method, void* tag)
Craig Tillerc4165772015-02-11 10:51:04 -080066 : method_(method),
67 tag_(tag),
Craig Tillercf133f42015-02-26 14:05:56 -080068 in_flight_(false),
Craig Tillerc4165772015-02-11 10:51:04 -080069 has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
70 method->method_type() ==
71 RpcMethod::SERVER_STREAMING),
72 has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
73 method->method_type() ==
Vijay Paiced73bd2015-06-17 10:23:28 -070074 RpcMethod::CLIENT_STREAMING),
75 cq_(nullptr) {
Craig Tillerc4165772015-02-11 10:51:04 -080076 grpc_metadata_array_init(&request_metadata_);
77 }
78
Yang Gao7b49a772015-05-28 14:07:54 -070079 ~SyncRequest() {
80 grpc_metadata_array_destroy(&request_metadata_);
81 }
82
Craig Tiller1c9a2a92015-02-12 14:10:25 -080083 static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
Craig Tiller8c8d0aa2015-02-12 11:38:36 -080084 void* tag = nullptr;
Craig Tiller504bd332015-02-11 20:34:33 -080085 *ok = false;
Craig Tillerc4165772015-02-11 10:51:04 -080086 if (!cq->Next(&tag, ok)) {
87 return nullptr;
88 }
Craig Tiller1c9a2a92015-02-12 14:10:25 -080089 auto* mrd = static_cast<SyncRequest*>(tag);
Craig Tillerc4165772015-02-11 10:51:04 -080090 GPR_ASSERT(mrd->in_flight_);
91 return mrd;
92 }
93
Vijay Paiced73bd2015-06-17 10:23:28 -070094 void SetupRequest() {
Craig Tillerc4165772015-02-11 10:51:04 -080095 cq_ = grpc_completion_queue_create();
Vijay Paiced73bd2015-06-17 10:23:28 -070096 }
97
98 void TeardownRequest() {
99 grpc_completion_queue_destroy(cq_);
100 cq_ = nullptr;
101 }
102
103 void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
104 GPR_ASSERT(cq_ && !in_flight_);
105 in_flight_ = true;
Craig Tillerc4165772015-02-11 10:51:04 -0800106 GPR_ASSERT(GRPC_CALL_OK ==
107 grpc_server_request_registered_call(
108 server, tag_, &call_, &deadline_, &request_metadata_,
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800109 has_request_payload_ ? &request_payload_ : nullptr, cq_,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700110 notify_cq, this));
Craig Tillerc4165772015-02-11 10:51:04 -0800111 }
112
Craig Tillercf133f42015-02-26 14:05:56 -0800113 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
Craig Tillerec3257c2015-02-12 15:59:43 -0800114 if (!*status) {
115 grpc_completion_queue_destroy(cq_);
116 }
Craig Tiller645466e2015-02-18 09:18:33 -0800117 return true;
Craig Tillerec3257c2015-02-12 15:59:43 -0800118 }
Craig Tillerc4165772015-02-11 10:51:04 -0800119
Craig Tillercf133f42015-02-26 14:05:56 -0800120 class CallData GRPC_FINAL {
Craig Tillerc4165772015-02-11 10:51:04 -0800121 public:
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800122 explicit CallData(Server* server, SyncRequest* mrd)
Craig Tillerc4165772015-02-11 10:51:04 -0800123 : cq_(mrd->cq_),
Yang Gao3921c562015-04-30 16:07:06 -0700124 call_(mrd->call_, server, &cq_, server->max_message_size_),
Craig Tillerc4165772015-02-11 10:51:04 -0800125 ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
126 mrd->request_metadata_.count),
127 has_request_payload_(mrd->has_request_payload_),
128 has_response_payload_(mrd->has_response_payload_),
129 request_payload_(mrd->request_payload_),
130 method_(mrd->method_) {
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800131 ctx_.call_ = mrd->call_;
Yang Gao1205f6f2015-03-22 15:18:14 -0700132 ctx_.cq_ = &cq_;
Craig Tillerc4165772015-02-11 10:51:04 -0800133 GPR_ASSERT(mrd->in_flight_);
134 mrd->in_flight_ = false;
135 mrd->request_metadata_.count = 0;
136 }
137
Craig Tillerec3257c2015-02-12 15:59:43 -0800138 ~CallData() {
139 if (has_request_payload_ && request_payload_) {
140 grpc_byte_buffer_destroy(request_payload_);
141 }
142 }
143
Craig Tillerc4165772015-02-11 10:51:04 -0800144 void Run() {
Yang Gao7694c352015-03-03 09:48:06 -0800145 std::unique_ptr<grpc::protobuf::Message> req;
146 std::unique_ptr<grpc::protobuf::Message> res;
Craig Tillerc4165772015-02-11 10:51:04 -0800147 if (has_request_payload_) {
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700148 GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
Craig Tillerc4165772015-02-11 10:51:04 -0800149 req.reset(method_->AllocateRequestProto());
Yang Gaoc71a9d22015-05-04 00:22:12 -0700150 if (!DeserializeProto(request_payload_, req.get(),
151 call_.max_message_size())) {
Yang Gaof7d05b52015-05-04 00:14:02 -0700152 // FIXME(yangg) deal with deserialization failure
153 cq_.Shutdown();
154 return;
Craig Tillerc4165772015-02-11 10:51:04 -0800155 }
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700156 GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
Craig Tillerc4165772015-02-11 10:51:04 -0800157 }
158 if (has_response_payload_) {
Craig Tiller7c2f3f72015-02-11 13:21:54 -0800159 res.reset(method_->AllocateResponseProto());
Craig Tillerc4165772015-02-11 10:51:04 -0800160 }
Craig Tiller492968f2015-02-18 13:14:03 -0800161 ctx_.BeginCompletionOp(&call_);
Craig Tillerc4165772015-02-11 10:51:04 -0800162 auto status = method_->handler()->RunHandler(
163 MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
164 CallOpBuffer buf;
Craig Tillerbc8e3db2015-02-12 09:56:02 -0800165 if (!ctx_.sent_initial_metadata_) {
166 buf.AddSendInitialMetadata(&ctx_.initial_metadata_);
167 }
Craig Tillerc4165772015-02-11 10:51:04 -0800168 if (has_response_payload_) {
169 buf.AddSendMessage(*res);
170 }
Craig Tiller9dcb0f82015-02-11 15:36:31 -0800171 buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
Craig Tillerc4165772015-02-11 10:51:04 -0800172 call_.PerformOps(&buf);
Craig Tillerdd093f32015-05-15 16:29:58 -0700173 cq_.Pluck(&buf); /* status ignored */
Craig Tiller492968f2015-02-18 13:14:03 -0800174 void* ignored_tag;
175 bool ignored_ok;
176 cq_.Shutdown();
177 GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
Craig Tillerc4165772015-02-11 10:51:04 -0800178 }
179
180 private:
181 CompletionQueue cq_;
182 Call call_;
183 ServerContext ctx_;
184 const bool has_request_payload_;
185 const bool has_response_payload_;
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800186 grpc_byte_buffer* request_payload_;
187 RpcServiceMethod* const method_;
Craig Tillerc4165772015-02-11 10:51:04 -0800188 };
189
190 private:
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800191 RpcServiceMethod* const method_;
192 void* const tag_;
Craig Tillercf133f42015-02-26 14:05:56 -0800193 bool in_flight_;
Craig Tillerc4165772015-02-11 10:51:04 -0800194 const bool has_request_payload_;
195 const bool has_response_payload_;
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800196 grpc_call* call_;
Craig Tillerc4165772015-02-11 10:51:04 -0800197 gpr_timespec deadline_;
198 grpc_metadata_array request_metadata_;
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800199 grpc_byte_buffer* request_payload_;
200 grpc_completion_queue* cq_;
Craig Tillerc4165772015-02-11 10:51:04 -0800201};
202
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700203static grpc_server* CreateServer(int max_message_size) {
Yang Gao3921c562015-04-30 16:07:06 -0700204 if (max_message_size > 0) {
205 grpc_arg arg;
206 arg.type = GRPC_ARG_INTEGER;
207 arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
208 arg.value.integer = max_message_size;
209 grpc_channel_args args = {1, &arg};
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700210 return grpc_server_create(&args);
Yang Gao3921c562015-04-30 16:07:06 -0700211 } else {
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700212 return grpc_server_create(nullptr);
Yang Gao3921c562015-04-30 16:07:06 -0700213 }
214}
215
216Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
217 int max_message_size)
218 : max_message_size_(max_message_size),
219 started_(false),
vjpaicad5f0a2015-02-18 22:02:52 -0800220 shutdown_(false),
221 num_running_cb_(0),
Nicolas Noble30862032015-04-24 18:17:45 -0700222 sync_methods_(new std::list<SyncRequest>),
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700223 server_(CreateServer(max_message_size)),
vjpaicad5f0a2015-02-18 22:02:52 -0800224 thread_pool_(thread_pool),
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700225 thread_pool_owned_(thread_pool_owned) {
226 grpc_server_register_completion_queue(server_, cq_.cq());
227}
vjpaicad5f0a2015-02-18 22:02:52 -0800228
229Server::~Server() {
Ruyi Wangb486ba62015-03-14 22:19:44 +0800230 {
Nicolas "Pixel" Nobleff2828b2015-04-03 03:16:46 +0200231 grpc::unique_lock<grpc::mutex> lock(mu_);
Ruyi Wangb486ba62015-03-14 22:19:44 +0800232 if (started_ && !shutdown_) {
233 lock.unlock();
234 Shutdown();
235 }
vjpaicad5f0a2015-02-18 22:02:52 -0800236 }
Craig Tiller29f79dc2015-05-27 15:59:23 -0700237 void* got_tag;
238 bool ok;
239 GPR_ASSERT(!cq_.Next(&got_tag, &ok));
vjpaicad5f0a2015-02-18 22:02:52 -0800240 grpc_server_destroy(server_);
241 if (thread_pool_owned_) {
242 delete thread_pool_;
243 }
Nicolas Noble30862032015-04-24 18:17:45 -0700244 delete sync_methods_;
vjpaicad5f0a2015-02-18 22:02:52 -0800245}
246
247bool Server::RegisterService(RpcService* service) {
248 for (int i = 0; i < service->GetMethodCount(); ++i) {
249 RpcServiceMethod* method = service->GetMethod(i);
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700250 void* tag = grpc_server_register_method(server_, method->name(), nullptr);
vjpaicad5f0a2015-02-18 22:02:52 -0800251 if (!tag) {
252 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
253 method->name());
254 return false;
255 }
Nicolas Noble30862032015-04-24 18:17:45 -0700256 SyncRequest request(method, tag);
257 sync_methods_->emplace_back(request);
vjpaicad5f0a2015-02-18 22:02:52 -0800258 }
259 return true;
260}
261
262bool Server::RegisterAsyncService(AsynchronousService* service) {
263 GPR_ASSERT(service->dispatch_impl_ == nullptr &&
Yang Gao1ad253d2015-03-16 13:11:29 -0700264 "Can only register an asynchronous service against one server.");
vjpaicad5f0a2015-02-18 22:02:52 -0800265 service->dispatch_impl_ = this;
Yang Gaoc71a9d22015-05-04 00:22:12 -0700266 service->request_args_ = new void*[service->method_count_];
vjpaicad5f0a2015-02-18 22:02:52 -0800267 for (size_t i = 0; i < service->method_count_; ++i) {
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700268 void* tag = grpc_server_register_method(server_, service->method_names_[i],
269 nullptr);
vjpaicad5f0a2015-02-18 22:02:52 -0800270 if (!tag) {
271 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
272 service->method_names_[i]);
273 return false;
274 }
275 service->request_args_[i] = tag;
276 }
277 return true;
278}
279
Yang Gao49996492015-03-12 16:40:19 -0700280void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
Yang Gao1c402332015-03-05 16:39:25 -0800281 GPR_ASSERT(service->server_ == nullptr &&
Yang Gao1ad253d2015-03-16 13:11:29 -0700282 "Can only register an async generic service against one server.");
Yang Gao1c402332015-03-05 16:39:25 -0800283 service->server_ = this;
284}
285
Nicolas Noblecfd60732015-03-18 16:27:43 -0700286int Server::AddListeningPort(const grpc::string& addr,
287 ServerCredentials* creds) {
vjpaicad5f0a2015-02-18 22:02:52 -0800288 GPR_ASSERT(!started_);
Craig Tiller42bc87c2015-02-23 08:50:19 -0800289 return creds->AddPortToServer(addr, server_);
vjpaicad5f0a2015-02-18 22:02:52 -0800290}
291
Craig Tiller0db1bef2015-02-09 13:47:39 -0800292bool Server::Start() {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800293 GPR_ASSERT(!started_);
294 started_ = true;
295 grpc_server_start(server_);
296
297 // Start processing rpcs.
Nicolas Noble30862032015-04-24 18:17:45 -0700298 if (!sync_methods_->empty()) {
299 for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
Vijay Paiced73bd2015-06-17 10:23:28 -0700300 m->SetupRequest();
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700301 m->Request(server_, cq_.cq());
Craig Tillercbd04852015-02-10 17:39:54 -0800302 }
303
Craig Tiller7c72adc2015-02-09 14:07:26 -0800304 ScheduleCallback();
305 }
Craig Tiller0db1bef2015-02-09 13:47:39 -0800306
307 return true;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800308}
309
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800310void Server::Shutdown() {
Nicolas "Pixel" Nobleff2828b2015-04-03 03:16:46 +0200311 grpc::unique_lock<grpc::mutex> lock(mu_);
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800312 if (started_ && !shutdown_) {
313 shutdown_ = true;
Craig Tillerbce999f2015-05-27 09:55:51 -0700314 grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800315 cq_.Shutdown();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800316
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800317 // Wait for running callbacks to finish.
318 while (num_running_cb_ != 0) {
319 callback_cv_.wait(lock);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800320 }
321 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800322}
323
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800324void Server::Wait() {
Nicolas "Pixel" Nobleff2828b2015-04-03 03:16:46 +0200325 grpc::unique_lock<grpc::mutex> lock(mu_);
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800326 while (num_running_cb_ != 0) {
327 callback_cv_.wait(lock);
328 }
329}
330
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800331void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
Craig Tillerbb5227f2015-02-11 13:34:48 -0800332 static const size_t MAX_OPS = 8;
333 size_t nops = MAX_OPS;
334 grpc_op ops[MAX_OPS];
335 buf->FillOps(ops, &nops);
336 GPR_ASSERT(GRPC_CALL_OK ==
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800337 grpc_call_start_batch(call->call(), ops, nops, buf));
Craig Tillerbb5227f2015-02-11 13:34:48 -0800338}
339
Craig Tillercf133f42015-02-26 14:05:56 -0800340class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800341 public:
342 AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
Yang Gao7694c352015-03-03 09:48:06 -0800343 grpc::protobuf::Message* request,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700344 ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
345 ServerCompletionQueue* notification_cq, void* tag)
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800346 : tag_(tag),
347 request_(request),
348 stream_(stream),
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700349 call_cq_(call_cq),
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800350 ctx_(ctx),
Yang Gao005eb882015-03-11 22:17:13 -0700351 generic_ctx_(nullptr),
Craig Tillercf133f42015-02-26 14:05:56 -0800352 server_(server),
353 call_(nullptr),
354 payload_(nullptr) {
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800355 memset(&array_, 0, sizeof(array_));
Yang Gao1c402332015-03-05 16:39:25 -0800356 grpc_call_details_init(&call_details_);
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700357 GPR_ASSERT(notification_cq);
358 GPR_ASSERT(call_cq);
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800359 grpc_server_request_registered_call(
Yang Gao1c402332015-03-05 16:39:25 -0800360 server->server_, registered_method, &call_, &call_details_.deadline,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700361 &array_, request ? &payload_ : nullptr, call_cq->cq(),
362 notification_cq->cq(), this);
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800363 }
364
Yang Gao005eb882015-03-11 22:17:13 -0700365 AsyncRequest(Server* server, GenericServerContext* ctx,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700366 ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
367 ServerCompletionQueue* notification_cq, void* tag)
Yang Gao1c402332015-03-05 16:39:25 -0800368 : tag_(tag),
369 request_(nullptr),
370 stream_(stream),
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700371 call_cq_(call_cq),
Yang Gao1c402332015-03-05 16:39:25 -0800372 ctx_(nullptr),
Yang Gao005eb882015-03-11 22:17:13 -0700373 generic_ctx_(ctx),
Yang Gao1c402332015-03-05 16:39:25 -0800374 server_(server),
375 call_(nullptr),
376 payload_(nullptr) {
377 memset(&array_, 0, sizeof(array_));
378 grpc_call_details_init(&call_details_);
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700379 GPR_ASSERT(notification_cq);
380 GPR_ASSERT(call_cq);
Yang Gao6baa9b62015-03-17 10:49:39 -0700381 grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700382 call_cq->cq(), notification_cq->cq(), this);
Yang Gao1c402332015-03-05 16:39:25 -0800383 }
384
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800385 ~AsyncRequest() {
386 if (payload_) {
387 grpc_byte_buffer_destroy(payload_);
388 }
389 grpc_metadata_array_destroy(&array_);
390 }
391
Craig Tillercf133f42015-02-26 14:05:56 -0800392 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800393 *tag = tag_;
Vijay Paidbb79632015-03-03 11:54:27 -0800394 bool orig_status = *status;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800395 if (*status && request_) {
396 if (payload_) {
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700397 GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_);
Yang Gaoc71a9d22015-05-04 00:22:12 -0700398 *status =
399 DeserializeProto(payload_, request_, server_->max_message_size_);
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700400 GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_);
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800401 } else {
402 *status = false;
403 }
404 }
Yang Gao005eb882015-03-11 22:17:13 -0700405 ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_;
Yang Gao1c402332015-03-05 16:39:25 -0800406 GPR_ASSERT(ctx);
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800407 if (*status) {
Nicolas Noble89219162015-04-07 18:01:18 -0700408 ctx->deadline_ = call_details_.deadline;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800409 for (size_t i = 0; i < array_.count; i++) {
Yang Gao1c402332015-03-05 16:39:25 -0800410 ctx->client_metadata_.insert(std::make_pair(
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800411 grpc::string(array_.metadata[i].key),
412 grpc::string(
413 array_.metadata[i].value,
414 array_.metadata[i].value + array_.metadata[i].value_length)));
415 }
Yang Gao005eb882015-03-11 22:17:13 -0700416 if (generic_ctx_) {
Yang Gao2a3c96a2015-03-11 23:32:40 -0700417 // TODO(yangg) remove the copy here.
Yang Gao005eb882015-03-11 22:17:13 -0700418 generic_ctx_->method_ = call_details_.method;
419 generic_ctx_->host_ = call_details_.host;
Yang Gao2a3c96a2015-03-11 23:32:40 -0700420 gpr_free(call_details_.method);
421 gpr_free(call_details_.host);
Yang Gao5f4539f2015-03-06 16:11:16 -0800422 }
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800423 }
Yang Gao1c402332015-03-05 16:39:25 -0800424 ctx->call_ = call_;
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700425 ctx->cq_ = call_cq_;
426 Call call(call_, server_, call_cq_, server_->max_message_size_);
Vijay Paidbb79632015-03-03 11:54:27 -0800427 if (orig_status && call_) {
Yang Gao1c402332015-03-05 16:39:25 -0800428 ctx->BeginCompletionOp(&call);
Vijay Pai0823cb72015-03-02 15:48:51 -0800429 }
Craig Tiller645466e2015-02-18 09:18:33 -0800430 // just the pointers inside call are copied here
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800431 stream_->BindCall(&call);
432 delete this;
Craig Tiller645466e2015-02-18 09:18:33 -0800433 return true;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800434 }
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800435
436 private:
437 void* const tag_;
Yang Gao7694c352015-03-03 09:48:06 -0800438 grpc::protobuf::Message* const request_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800439 ServerAsyncStreamingInterface* const stream_;
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700440 CompletionQueue* const call_cq_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800441 ServerContext* const ctx_;
Yang Gao005eb882015-03-11 22:17:13 -0700442 GenericServerContext* const generic_ctx_;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800443 Server* const server_;
Craig Tillercf133f42015-02-26 14:05:56 -0800444 grpc_call* call_;
Yang Gao1c402332015-03-05 16:39:25 -0800445 grpc_call_details call_details_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800446 grpc_metadata_array array_;
Craig Tillercf133f42015-02-26 14:05:56 -0800447 grpc_byte_buffer* payload_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800448};
449
450void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
Yang Gao7694c352015-03-03 09:48:06 -0800451 grpc::protobuf::Message* request,
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800452 ServerAsyncStreamingInterface* stream,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700453 CompletionQueue* call_cq,
454 ServerCompletionQueue* notification_cq,
455 void* tag) {
456 new AsyncRequest(this, registered_method, context, request, stream, call_cq,
457 notification_cq, tag);
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800458}
Yang Gao5f4539f2015-03-06 16:11:16 -0800459
Yang Gao49996492015-03-12 16:40:19 -0700460void Server::RequestAsyncGenericCall(GenericServerContext* context,
461 ServerAsyncStreamingInterface* stream,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700462 CompletionQueue* call_cq,
463 ServerCompletionQueue* notification_cq,
464 void* tag) {
465 new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
Yang Gao1c402332015-03-05 16:39:25 -0800466}
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800467
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800468void Server::ScheduleCallback() {
469 {
Nicolas "Pixel" Nobleff2828b2015-04-03 03:16:46 +0200470 grpc::unique_lock<grpc::mutex> lock(mu_);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800471 num_running_cb_++;
472 }
Craig Tiller0db1bef2015-02-09 13:47:39 -0800473 thread_pool_->ScheduleCallback(std::bind(&Server::RunRpc, this));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800474}
475
476void Server::RunRpc() {
477 // Wait for one more incoming rpc.
Craig Tillerc4165772015-02-11 10:51:04 -0800478 bool ok;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800479 auto* mrd = SyncRequest::Wait(&cq_, &ok);
Craig Tillercbd04852015-02-10 17:39:54 -0800480 if (mrd) {
Craig Tillerbd217572015-02-11 18:10:56 -0800481 ScheduleCallback();
Craig Tillerc4165772015-02-11 10:51:04 -0800482 if (ok) {
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800483 SyncRequest::CallData cd(this, mrd);
Yang Gaob8a5f862015-05-07 16:26:33 -0700484 {
Vijay Paiced73bd2015-06-17 10:23:28 -0700485 mrd->SetupRequest();
Yang Gaob8a5f862015-05-07 16:26:33 -0700486 grpc::unique_lock<grpc::mutex> lock(mu_);
487 if (!shutdown_) {
Craig Tillera33acb72015-05-08 08:02:55 -0700488 mrd->Request(server_, cq_.cq());
Vijay Paiced73bd2015-06-17 10:23:28 -0700489 } else {
490 // destroy the structure that was created
491 mrd->TeardownRequest();
Yang Gaob8a5f862015-05-07 16:26:33 -0700492 }
493 }
Craig Tillerc4165772015-02-11 10:51:04 -0800494 cd.Run();
495 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800496 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800497
498 {
Nicolas "Pixel" Nobleff2828b2015-04-03 03:16:46 +0200499 grpc::unique_lock<grpc::mutex> lock(mu_);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800500 num_running_cb_--;
501 if (shutdown_) {
502 callback_cv_.notify_all();
503 }
504 }
505}
506
507} // namespace grpc