blob: e9c4f4eaafe3410b603b5b4a731a47c85b555de9 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc++/server.h>
35#include <utility>
36
37#include <grpc/grpc.h>
yangg9e21f722014-12-08 15:49:52 -080038#include <grpc/grpc_security.h>
Yang Gao2a3c96a2015-03-11 23:32:40 -070039#include <grpc/support/alloc.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080041#include <grpc++/completion_queue.h>
Yang Gao49996492015-03-12 16:40:19 -070042#include <grpc++/async_generic_service.h>
yangg1b151092015-01-09 15:31:05 -080043#include <grpc++/impl/rpc_service_method.h>
Craig Tiller8c8d0aa2015-02-12 11:38:36 -080044#include <grpc++/impl/service_type.h>
Craig Tillerb4701692015-02-09 16:12:00 -080045#include <grpc++/server_context.h>
yangg9e21f722014-12-08 15:49:52 -080046#include <grpc++/server_credentials.h>
Craig Tiller0db1bef2015-02-09 13:47:39 -080047#include <grpc++/thread_pool_interface.h>
Nicolas Noble89219162015-04-07 18:01:18 -070048#include <grpc++/time.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049
Vijay Pai9ffbd0c2015-04-15 01:02:50 -070050#include "src/core/profiling/timers.h"
Craig Tillerc4165772015-02-11 10:51:04 -080051#include "src/cpp/proto/proto_utils.h"
52
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053namespace grpc {
54
Craig Tillercf133f42015-02-26 14:05:56 -080055class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
Craig Tillerc4165772015-02-11 10:51:04 -080056 public:
Craig Tiller1c9a2a92015-02-12 14:10:25 -080057 SyncRequest(RpcServiceMethod* method, void* tag)
Craig Tillerc4165772015-02-11 10:51:04 -080058 : method_(method),
59 tag_(tag),
Craig Tillercf133f42015-02-26 14:05:56 -080060 in_flight_(false),
Craig Tillerc4165772015-02-11 10:51:04 -080061 has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
62 method->method_type() ==
63 RpcMethod::SERVER_STREAMING),
64 has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
65 method->method_type() ==
66 RpcMethod::CLIENT_STREAMING) {
67 grpc_metadata_array_init(&request_metadata_);
68 }
69
Craig Tiller1c9a2a92015-02-12 14:10:25 -080070 static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
Craig Tiller8c8d0aa2015-02-12 11:38:36 -080071 void* tag = nullptr;
Craig Tiller504bd332015-02-11 20:34:33 -080072 *ok = false;
Craig Tillerc4165772015-02-11 10:51:04 -080073 if (!cq->Next(&tag, ok)) {
74 return nullptr;
75 }
Craig Tiller1c9a2a92015-02-12 14:10:25 -080076 auto* mrd = static_cast<SyncRequest*>(tag);
Craig Tillerc4165772015-02-11 10:51:04 -080077 GPR_ASSERT(mrd->in_flight_);
78 return mrd;
79 }
80
Craig Tillerf9e6adf2015-05-06 11:45:59 -070081 void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
Craig Tillerc4165772015-02-11 10:51:04 -080082 GPR_ASSERT(!in_flight_);
83 in_flight_ = true;
84 cq_ = grpc_completion_queue_create();
85 GPR_ASSERT(GRPC_CALL_OK ==
86 grpc_server_request_registered_call(
87 server, tag_, &call_, &deadline_, &request_metadata_,
Craig Tiller8c8d0aa2015-02-12 11:38:36 -080088 has_request_payload_ ? &request_payload_ : nullptr, cq_,
Craig Tillerf9e6adf2015-05-06 11:45:59 -070089 notify_cq, this));
Craig Tillerc4165772015-02-11 10:51:04 -080090 }
91
Craig Tillercf133f42015-02-26 14:05:56 -080092 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
Craig Tillerec3257c2015-02-12 15:59:43 -080093 if (!*status) {
94 grpc_completion_queue_destroy(cq_);
95 }
Craig Tiller645466e2015-02-18 09:18:33 -080096 return true;
Craig Tillerec3257c2015-02-12 15:59:43 -080097 }
Craig Tillerc4165772015-02-11 10:51:04 -080098
Craig Tillercf133f42015-02-26 14:05:56 -080099 class CallData GRPC_FINAL {
Craig Tillerc4165772015-02-11 10:51:04 -0800100 public:
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800101 explicit CallData(Server* server, SyncRequest* mrd)
Craig Tillerc4165772015-02-11 10:51:04 -0800102 : cq_(mrd->cq_),
Yang Gao3921c562015-04-30 16:07:06 -0700103 call_(mrd->call_, server, &cq_, server->max_message_size_),
Craig Tillerc4165772015-02-11 10:51:04 -0800104 ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
105 mrd->request_metadata_.count),
106 has_request_payload_(mrd->has_request_payload_),
107 has_response_payload_(mrd->has_response_payload_),
108 request_payload_(mrd->request_payload_),
109 method_(mrd->method_) {
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800110 ctx_.call_ = mrd->call_;
Yang Gao1205f6f2015-03-22 15:18:14 -0700111 ctx_.cq_ = &cq_;
Craig Tillerc4165772015-02-11 10:51:04 -0800112 GPR_ASSERT(mrd->in_flight_);
113 mrd->in_flight_ = false;
114 mrd->request_metadata_.count = 0;
115 }
116
Craig Tillerec3257c2015-02-12 15:59:43 -0800117 ~CallData() {
118 if (has_request_payload_ && request_payload_) {
119 grpc_byte_buffer_destroy(request_payload_);
120 }
121 }
122
Craig Tillerc4165772015-02-11 10:51:04 -0800123 void Run() {
Yang Gao7694c352015-03-03 09:48:06 -0800124 std::unique_ptr<grpc::protobuf::Message> req;
125 std::unique_ptr<grpc::protobuf::Message> res;
Craig Tillerc4165772015-02-11 10:51:04 -0800126 if (has_request_payload_) {
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700127 GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
Craig Tillerc4165772015-02-11 10:51:04 -0800128 req.reset(method_->AllocateRequestProto());
Yang Gaoc71a9d22015-05-04 00:22:12 -0700129 if (!DeserializeProto(request_payload_, req.get(),
130 call_.max_message_size())) {
Yang Gaof7d05b52015-05-04 00:14:02 -0700131 // FIXME(yangg) deal with deserialization failure
132 cq_.Shutdown();
133 return;
Craig Tillerc4165772015-02-11 10:51:04 -0800134 }
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700135 GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
Craig Tillerc4165772015-02-11 10:51:04 -0800136 }
137 if (has_response_payload_) {
Craig Tiller7c2f3f72015-02-11 13:21:54 -0800138 res.reset(method_->AllocateResponseProto());
Craig Tillerc4165772015-02-11 10:51:04 -0800139 }
Craig Tiller492968f2015-02-18 13:14:03 -0800140 ctx_.BeginCompletionOp(&call_);
Craig Tillerc4165772015-02-11 10:51:04 -0800141 auto status = method_->handler()->RunHandler(
142 MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
143 CallOpBuffer buf;
Craig Tillerbc8e3db2015-02-12 09:56:02 -0800144 if (!ctx_.sent_initial_metadata_) {
145 buf.AddSendInitialMetadata(&ctx_.initial_metadata_);
146 }
Craig Tillerc4165772015-02-11 10:51:04 -0800147 if (has_response_payload_) {
148 buf.AddSendMessage(*res);
149 }
Craig Tiller9dcb0f82015-02-11 15:36:31 -0800150 buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
Craig Tillerc4165772015-02-11 10:51:04 -0800151 call_.PerformOps(&buf);
152 GPR_ASSERT(cq_.Pluck(&buf));
Craig Tiller492968f2015-02-18 13:14:03 -0800153 void* ignored_tag;
154 bool ignored_ok;
155 cq_.Shutdown();
156 GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
Craig Tillerc4165772015-02-11 10:51:04 -0800157 }
158
159 private:
160 CompletionQueue cq_;
161 Call call_;
162 ServerContext ctx_;
163 const bool has_request_payload_;
164 const bool has_response_payload_;
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800165 grpc_byte_buffer* request_payload_;
166 RpcServiceMethod* const method_;
Craig Tillerc4165772015-02-11 10:51:04 -0800167 };
168
169 private:
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800170 RpcServiceMethod* const method_;
171 void* const tag_;
Craig Tillercf133f42015-02-26 14:05:56 -0800172 bool in_flight_;
Craig Tillerc4165772015-02-11 10:51:04 -0800173 const bool has_request_payload_;
174 const bool has_response_payload_;
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800175 grpc_call* call_;
Craig Tillerc4165772015-02-11 10:51:04 -0800176 gpr_timespec deadline_;
177 grpc_metadata_array request_metadata_;
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800178 grpc_byte_buffer* request_payload_;
179 grpc_completion_queue* cq_;
Craig Tillerc4165772015-02-11 10:51:04 -0800180};
181
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700182static grpc_server* CreateServer(int max_message_size) {
Yang Gao3921c562015-04-30 16:07:06 -0700183 if (max_message_size > 0) {
184 grpc_arg arg;
185 arg.type = GRPC_ARG_INTEGER;
186 arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
187 arg.value.integer = max_message_size;
188 grpc_channel_args args = {1, &arg};
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700189 return grpc_server_create(&args);
Yang Gao3921c562015-04-30 16:07:06 -0700190 } else {
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700191 return grpc_server_create(nullptr);
Yang Gao3921c562015-04-30 16:07:06 -0700192 }
193}
194
195Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
196 int max_message_size)
197 : max_message_size_(max_message_size),
198 started_(false),
vjpaicad5f0a2015-02-18 22:02:52 -0800199 shutdown_(false),
200 num_running_cb_(0),
Nicolas Noble30862032015-04-24 18:17:45 -0700201 sync_methods_(new std::list<SyncRequest>),
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700202 server_(CreateServer(max_message_size)),
vjpaicad5f0a2015-02-18 22:02:52 -0800203 thread_pool_(thread_pool),
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700204 thread_pool_owned_(thread_pool_owned) {
205 grpc_server_register_completion_queue(server_, cq_.cq());
206}
vjpaicad5f0a2015-02-18 22:02:52 -0800207
208Server::~Server() {
Ruyi Wangb486ba62015-03-14 22:19:44 +0800209 {
Nicolas "Pixel" Nobleff2828b2015-04-03 03:16:46 +0200210 grpc::unique_lock<grpc::mutex> lock(mu_);
Ruyi Wangb486ba62015-03-14 22:19:44 +0800211 if (started_ && !shutdown_) {
212 lock.unlock();
213 Shutdown();
214 }
vjpaicad5f0a2015-02-18 22:02:52 -0800215 }
216 grpc_server_destroy(server_);
217 if (thread_pool_owned_) {
218 delete thread_pool_;
219 }
Nicolas Noble30862032015-04-24 18:17:45 -0700220 delete sync_methods_;
vjpaicad5f0a2015-02-18 22:02:52 -0800221}
222
223bool Server::RegisterService(RpcService* service) {
224 for (int i = 0; i < service->GetMethodCount(); ++i) {
225 RpcServiceMethod* method = service->GetMethod(i);
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700226 void* tag = grpc_server_register_method(server_, method->name(), nullptr);
vjpaicad5f0a2015-02-18 22:02:52 -0800227 if (!tag) {
228 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
229 method->name());
230 return false;
231 }
Nicolas Noble30862032015-04-24 18:17:45 -0700232 SyncRequest request(method, tag);
233 sync_methods_->emplace_back(request);
vjpaicad5f0a2015-02-18 22:02:52 -0800234 }
235 return true;
236}
237
238bool Server::RegisterAsyncService(AsynchronousService* service) {
239 GPR_ASSERT(service->dispatch_impl_ == nullptr &&
Yang Gao1ad253d2015-03-16 13:11:29 -0700240 "Can only register an asynchronous service against one server.");
vjpaicad5f0a2015-02-18 22:02:52 -0800241 service->dispatch_impl_ = this;
Yang Gaoc71a9d22015-05-04 00:22:12 -0700242 service->request_args_ = new void*[service->method_count_];
vjpaicad5f0a2015-02-18 22:02:52 -0800243 for (size_t i = 0; i < service->method_count_; ++i) {
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700244 void* tag = grpc_server_register_method(server_, service->method_names_[i],
245 nullptr);
vjpaicad5f0a2015-02-18 22:02:52 -0800246 if (!tag) {
247 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
248 service->method_names_[i]);
249 return false;
250 }
251 service->request_args_[i] = tag;
252 }
253 return true;
254}
255
Yang Gao49996492015-03-12 16:40:19 -0700256void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
Yang Gao1c402332015-03-05 16:39:25 -0800257 GPR_ASSERT(service->server_ == nullptr &&
Yang Gao1ad253d2015-03-16 13:11:29 -0700258 "Can only register an async generic service against one server.");
Yang Gao1c402332015-03-05 16:39:25 -0800259 service->server_ = this;
260}
261
Nicolas Noblecfd60732015-03-18 16:27:43 -0700262int Server::AddListeningPort(const grpc::string& addr,
263 ServerCredentials* creds) {
vjpaicad5f0a2015-02-18 22:02:52 -0800264 GPR_ASSERT(!started_);
Craig Tiller42bc87c2015-02-23 08:50:19 -0800265 return creds->AddPortToServer(addr, server_);
vjpaicad5f0a2015-02-18 22:02:52 -0800266}
267
Craig Tiller0db1bef2015-02-09 13:47:39 -0800268bool Server::Start() {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800269 GPR_ASSERT(!started_);
270 started_ = true;
271 grpc_server_start(server_);
272
273 // Start processing rpcs.
Nicolas Noble30862032015-04-24 18:17:45 -0700274 if (!sync_methods_->empty()) {
275 for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700276 m->Request(server_, cq_.cq());
Craig Tillercbd04852015-02-10 17:39:54 -0800277 }
278
Craig Tiller7c72adc2015-02-09 14:07:26 -0800279 ScheduleCallback();
280 }
Craig Tiller0db1bef2015-02-09 13:47:39 -0800281
282 return true;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800283}
284
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800285void Server::Shutdown() {
Nicolas "Pixel" Nobleff2828b2015-04-03 03:16:46 +0200286 grpc::unique_lock<grpc::mutex> lock(mu_);
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800287 if (started_ && !shutdown_) {
288 shutdown_ = true;
289 grpc_server_shutdown(server_);
290 cq_.Shutdown();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800291
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800292 // Wait for running callbacks to finish.
293 while (num_running_cb_ != 0) {
294 callback_cv_.wait(lock);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800295 }
296 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800297}
298
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800299void Server::Wait() {
Nicolas "Pixel" Nobleff2828b2015-04-03 03:16:46 +0200300 grpc::unique_lock<grpc::mutex> lock(mu_);
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800301 while (num_running_cb_ != 0) {
302 callback_cv_.wait(lock);
303 }
304}
305
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800306void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
Craig Tillerbb5227f2015-02-11 13:34:48 -0800307 static const size_t MAX_OPS = 8;
308 size_t nops = MAX_OPS;
309 grpc_op ops[MAX_OPS];
310 buf->FillOps(ops, &nops);
311 GPR_ASSERT(GRPC_CALL_OK ==
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800312 grpc_call_start_batch(call->call(), ops, nops, buf));
Craig Tillerbb5227f2015-02-11 13:34:48 -0800313}
314
Craig Tillercf133f42015-02-26 14:05:56 -0800315class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800316 public:
317 AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
Yang Gao7694c352015-03-03 09:48:06 -0800318 grpc::protobuf::Message* request,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700319 ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
320 ServerCompletionQueue* notification_cq, void* tag)
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800321 : tag_(tag),
322 request_(request),
323 stream_(stream),
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700324 call_cq_(call_cq),
325 notification_cq_(notification_cq),
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800326 ctx_(ctx),
Yang Gao005eb882015-03-11 22:17:13 -0700327 generic_ctx_(nullptr),
Craig Tillercf133f42015-02-26 14:05:56 -0800328 server_(server),
329 call_(nullptr),
330 payload_(nullptr) {
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800331 memset(&array_, 0, sizeof(array_));
Yang Gao1c402332015-03-05 16:39:25 -0800332 grpc_call_details_init(&call_details_);
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700333 GPR_ASSERT(notification_cq);
334 GPR_ASSERT(call_cq);
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800335 grpc_server_request_registered_call(
Yang Gao1c402332015-03-05 16:39:25 -0800336 server->server_, registered_method, &call_, &call_details_.deadline,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700337 &array_, request ? &payload_ : nullptr, call_cq->cq(),
338 notification_cq->cq(), this);
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800339 }
340
Yang Gao005eb882015-03-11 22:17:13 -0700341 AsyncRequest(Server* server, GenericServerContext* ctx,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700342 ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
343 ServerCompletionQueue* notification_cq, void* tag)
Yang Gao1c402332015-03-05 16:39:25 -0800344 : tag_(tag),
345 request_(nullptr),
346 stream_(stream),
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700347 call_cq_(call_cq),
348 notification_cq_(notification_cq),
Yang Gao1c402332015-03-05 16:39:25 -0800349 ctx_(nullptr),
Yang Gao005eb882015-03-11 22:17:13 -0700350 generic_ctx_(ctx),
Yang Gao1c402332015-03-05 16:39:25 -0800351 server_(server),
352 call_(nullptr),
353 payload_(nullptr) {
354 memset(&array_, 0, sizeof(array_));
355 grpc_call_details_init(&call_details_);
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700356 GPR_ASSERT(notification_cq);
357 GPR_ASSERT(call_cq);
Yang Gao6baa9b62015-03-17 10:49:39 -0700358 grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700359 call_cq->cq(), notification_cq->cq(), this);
Yang Gao1c402332015-03-05 16:39:25 -0800360 }
361
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800362 ~AsyncRequest() {
363 if (payload_) {
364 grpc_byte_buffer_destroy(payload_);
365 }
366 grpc_metadata_array_destroy(&array_);
367 }
368
Craig Tillercf133f42015-02-26 14:05:56 -0800369 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800370 *tag = tag_;
Vijay Paidbb79632015-03-03 11:54:27 -0800371 bool orig_status = *status;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800372 if (*status && request_) {
373 if (payload_) {
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700374 GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_);
Yang Gaoc71a9d22015-05-04 00:22:12 -0700375 *status =
376 DeserializeProto(payload_, request_, server_->max_message_size_);
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700377 GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_);
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800378 } else {
379 *status = false;
380 }
381 }
Yang Gao005eb882015-03-11 22:17:13 -0700382 ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_;
Yang Gao1c402332015-03-05 16:39:25 -0800383 GPR_ASSERT(ctx);
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800384 if (*status) {
Nicolas Noble89219162015-04-07 18:01:18 -0700385 ctx->deadline_ = call_details_.deadline;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800386 for (size_t i = 0; i < array_.count; i++) {
Yang Gao1c402332015-03-05 16:39:25 -0800387 ctx->client_metadata_.insert(std::make_pair(
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800388 grpc::string(array_.metadata[i].key),
389 grpc::string(
390 array_.metadata[i].value,
391 array_.metadata[i].value + array_.metadata[i].value_length)));
392 }
Yang Gao005eb882015-03-11 22:17:13 -0700393 if (generic_ctx_) {
Yang Gao2a3c96a2015-03-11 23:32:40 -0700394 // TODO(yangg) remove the copy here.
Yang Gao005eb882015-03-11 22:17:13 -0700395 generic_ctx_->method_ = call_details_.method;
396 generic_ctx_->host_ = call_details_.host;
Yang Gao2a3c96a2015-03-11 23:32:40 -0700397 gpr_free(call_details_.method);
398 gpr_free(call_details_.host);
Yang Gao5f4539f2015-03-06 16:11:16 -0800399 }
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800400 }
Yang Gao1c402332015-03-05 16:39:25 -0800401 ctx->call_ = call_;
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700402 ctx->cq_ = call_cq_;
403 Call call(call_, server_, call_cq_, server_->max_message_size_);
Vijay Paidbb79632015-03-03 11:54:27 -0800404 if (orig_status && call_) {
Yang Gao1c402332015-03-05 16:39:25 -0800405 ctx->BeginCompletionOp(&call);
Vijay Pai0823cb72015-03-02 15:48:51 -0800406 }
Craig Tiller645466e2015-02-18 09:18:33 -0800407 // just the pointers inside call are copied here
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800408 stream_->BindCall(&call);
409 delete this;
Craig Tiller645466e2015-02-18 09:18:33 -0800410 return true;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800411 }
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800412
413 private:
414 void* const tag_;
Yang Gao7694c352015-03-03 09:48:06 -0800415 grpc::protobuf::Message* const request_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800416 ServerAsyncStreamingInterface* const stream_;
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700417 CompletionQueue* const call_cq_;
418 ServerCompletionQueue* const notification_cq_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800419 ServerContext* const ctx_;
Yang Gao005eb882015-03-11 22:17:13 -0700420 GenericServerContext* const generic_ctx_;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800421 Server* const server_;
Craig Tillercf133f42015-02-26 14:05:56 -0800422 grpc_call* call_;
Yang Gao1c402332015-03-05 16:39:25 -0800423 grpc_call_details call_details_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800424 grpc_metadata_array array_;
Craig Tillercf133f42015-02-26 14:05:56 -0800425 grpc_byte_buffer* payload_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800426};
427
428void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
Yang Gao7694c352015-03-03 09:48:06 -0800429 grpc::protobuf::Message* request,
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800430 ServerAsyncStreamingInterface* stream,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700431 CompletionQueue* call_cq,
432 ServerCompletionQueue* notification_cq,
433 void* tag) {
434 new AsyncRequest(this, registered_method, context, request, stream, call_cq,
435 notification_cq, tag);
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800436}
Yang Gao5f4539f2015-03-06 16:11:16 -0800437
Yang Gao49996492015-03-12 16:40:19 -0700438void Server::RequestAsyncGenericCall(GenericServerContext* context,
439 ServerAsyncStreamingInterface* stream,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700440 CompletionQueue* call_cq,
441 ServerCompletionQueue* notification_cq,
442 void* tag) {
443 new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
Yang Gao1c402332015-03-05 16:39:25 -0800444}
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800445
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800446void Server::ScheduleCallback() {
447 {
Nicolas "Pixel" Nobleff2828b2015-04-03 03:16:46 +0200448 grpc::unique_lock<grpc::mutex> lock(mu_);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800449 num_running_cb_++;
450 }
Craig Tiller0db1bef2015-02-09 13:47:39 -0800451 thread_pool_->ScheduleCallback(std::bind(&Server::RunRpc, this));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800452}
453
454void Server::RunRpc() {
455 // Wait for one more incoming rpc.
Craig Tillerc4165772015-02-11 10:51:04 -0800456 bool ok;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800457 auto* mrd = SyncRequest::Wait(&cq_, &ok);
Craig Tillercbd04852015-02-10 17:39:54 -0800458 if (mrd) {
Craig Tillerbd217572015-02-11 18:10:56 -0800459 ScheduleCallback();
Craig Tillerc4165772015-02-11 10:51:04 -0800460 if (ok) {
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800461 SyncRequest::CallData cd(this, mrd);
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700462 mrd->Request(server_, cq_.cq());
Craig Tillercbd04852015-02-10 17:39:54 -0800463
Craig Tillerc4165772015-02-11 10:51:04 -0800464 cd.Run();
465 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800466 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800467
468 {
Nicolas "Pixel" Nobleff2828b2015-04-03 03:16:46 +0200469 grpc::unique_lock<grpc::mutex> lock(mu_);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800470 num_running_cb_--;
471 if (shutdown_) {
472 callback_cv_.notify_all();
473 }
474 }
475}
476
477} // namespace grpc