blob: cc632a71924b69162ed9aee1645fd787dc3ab133 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc++/server.h>
35#include <utility>
36
37#include <grpc/grpc.h>
yangg9e21f722014-12-08 15:49:52 -080038#include <grpc/grpc_security.h>
Yang Gao2a3c96a2015-03-11 23:32:40 -070039#include <grpc/support/alloc.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080041#include <grpc++/completion_queue.h>
Yang Gao49996492015-03-12 16:40:19 -070042#include <grpc++/async_generic_service.h>
yangg1b151092015-01-09 15:31:05 -080043#include <grpc++/impl/rpc_service_method.h>
Craig Tiller8c8d0aa2015-02-12 11:38:36 -080044#include <grpc++/impl/service_type.h>
Craig Tillerb4701692015-02-09 16:12:00 -080045#include <grpc++/server_context.h>
yangg9e21f722014-12-08 15:49:52 -080046#include <grpc++/server_credentials.h>
Craig Tiller0db1bef2015-02-09 13:47:39 -080047#include <grpc++/thread_pool_interface.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080048
Craig Tillerc4165772015-02-11 10:51:04 -080049#include "src/cpp/proto/proto_utils.h"
Craig Tiller3d6ceb62015-02-12 14:33:54 -080050#include "src/cpp/util/time.h"
Craig Tillerc4165772015-02-11 10:51:04 -080051
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080052namespace grpc {
53
Craig Tillercf133f42015-02-26 14:05:56 -080054class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
Craig Tillerc4165772015-02-11 10:51:04 -080055 public:
Craig Tiller1c9a2a92015-02-12 14:10:25 -080056 SyncRequest(RpcServiceMethod* method, void* tag)
Craig Tillerc4165772015-02-11 10:51:04 -080057 : method_(method),
58 tag_(tag),
Craig Tillercf133f42015-02-26 14:05:56 -080059 in_flight_(false),
Craig Tillerc4165772015-02-11 10:51:04 -080060 has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
61 method->method_type() ==
62 RpcMethod::SERVER_STREAMING),
63 has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
64 method->method_type() ==
65 RpcMethod::CLIENT_STREAMING) {
66 grpc_metadata_array_init(&request_metadata_);
67 }
68
Craig Tiller1c9a2a92015-02-12 14:10:25 -080069 static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
Craig Tiller8c8d0aa2015-02-12 11:38:36 -080070 void* tag = nullptr;
Craig Tiller504bd332015-02-11 20:34:33 -080071 *ok = false;
Craig Tillerc4165772015-02-11 10:51:04 -080072 if (!cq->Next(&tag, ok)) {
73 return nullptr;
74 }
Craig Tiller1c9a2a92015-02-12 14:10:25 -080075 auto* mrd = static_cast<SyncRequest*>(tag);
Craig Tillerc4165772015-02-11 10:51:04 -080076 GPR_ASSERT(mrd->in_flight_);
77 return mrd;
78 }
79
Craig Tiller8c8d0aa2015-02-12 11:38:36 -080080 void Request(grpc_server* server) {
Craig Tillerc4165772015-02-11 10:51:04 -080081 GPR_ASSERT(!in_flight_);
82 in_flight_ = true;
83 cq_ = grpc_completion_queue_create();
84 GPR_ASSERT(GRPC_CALL_OK ==
85 grpc_server_request_registered_call(
86 server, tag_, &call_, &deadline_, &request_metadata_,
Craig Tiller8c8d0aa2015-02-12 11:38:36 -080087 has_request_payload_ ? &request_payload_ : nullptr, cq_,
88 this));
Craig Tillerc4165772015-02-11 10:51:04 -080089 }
90
Craig Tillercf133f42015-02-26 14:05:56 -080091 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
Craig Tillerec3257c2015-02-12 15:59:43 -080092 if (!*status) {
93 grpc_completion_queue_destroy(cq_);
94 }
Craig Tiller645466e2015-02-18 09:18:33 -080095 return true;
Craig Tillerec3257c2015-02-12 15:59:43 -080096 }
Craig Tillerc4165772015-02-11 10:51:04 -080097
Craig Tillercf133f42015-02-26 14:05:56 -080098 class CallData GRPC_FINAL {
Craig Tillerc4165772015-02-11 10:51:04 -080099 public:
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800100 explicit CallData(Server* server, SyncRequest* mrd)
Craig Tillerc4165772015-02-11 10:51:04 -0800101 : cq_(mrd->cq_),
Craig Tillerbb5227f2015-02-11 13:34:48 -0800102 call_(mrd->call_, server, &cq_),
Craig Tillerc4165772015-02-11 10:51:04 -0800103 ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
104 mrd->request_metadata_.count),
105 has_request_payload_(mrd->has_request_payload_),
106 has_response_payload_(mrd->has_response_payload_),
107 request_payload_(mrd->request_payload_),
108 method_(mrd->method_) {
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800109 ctx_.call_ = mrd->call_;
Craig Tillerc4165772015-02-11 10:51:04 -0800110 GPR_ASSERT(mrd->in_flight_);
111 mrd->in_flight_ = false;
112 mrd->request_metadata_.count = 0;
113 }
114
Craig Tillerec3257c2015-02-12 15:59:43 -0800115 ~CallData() {
116 if (has_request_payload_ && request_payload_) {
117 grpc_byte_buffer_destroy(request_payload_);
118 }
119 }
120
Craig Tillerc4165772015-02-11 10:51:04 -0800121 void Run() {
Yang Gao7694c352015-03-03 09:48:06 -0800122 std::unique_ptr<grpc::protobuf::Message> req;
123 std::unique_ptr<grpc::protobuf::Message> res;
Craig Tillerc4165772015-02-11 10:51:04 -0800124 if (has_request_payload_) {
125 req.reset(method_->AllocateRequestProto());
126 if (!DeserializeProto(request_payload_, req.get())) {
127 abort(); // for now
128 }
129 }
130 if (has_response_payload_) {
Craig Tiller7c2f3f72015-02-11 13:21:54 -0800131 res.reset(method_->AllocateResponseProto());
Craig Tillerc4165772015-02-11 10:51:04 -0800132 }
Craig Tiller492968f2015-02-18 13:14:03 -0800133 ctx_.BeginCompletionOp(&call_);
Craig Tillerc4165772015-02-11 10:51:04 -0800134 auto status = method_->handler()->RunHandler(
135 MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
136 CallOpBuffer buf;
Craig Tillerbc8e3db2015-02-12 09:56:02 -0800137 if (!ctx_.sent_initial_metadata_) {
138 buf.AddSendInitialMetadata(&ctx_.initial_metadata_);
139 }
Craig Tillerc4165772015-02-11 10:51:04 -0800140 if (has_response_payload_) {
141 buf.AddSendMessage(*res);
142 }
Craig Tiller9dcb0f82015-02-11 15:36:31 -0800143 buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
Craig Tillerc4165772015-02-11 10:51:04 -0800144 call_.PerformOps(&buf);
145 GPR_ASSERT(cq_.Pluck(&buf));
Craig Tiller492968f2015-02-18 13:14:03 -0800146 void* ignored_tag;
147 bool ignored_ok;
148 cq_.Shutdown();
149 GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
Craig Tillerc4165772015-02-11 10:51:04 -0800150 }
151
152 private:
153 CompletionQueue cq_;
154 Call call_;
155 ServerContext ctx_;
156 const bool has_request_payload_;
157 const bool has_response_payload_;
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800158 grpc_byte_buffer* request_payload_;
159 RpcServiceMethod* const method_;
Craig Tillerc4165772015-02-11 10:51:04 -0800160 };
161
162 private:
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800163 RpcServiceMethod* const method_;
164 void* const tag_;
Craig Tillercf133f42015-02-26 14:05:56 -0800165 bool in_flight_;
Craig Tillerc4165772015-02-11 10:51:04 -0800166 const bool has_request_payload_;
167 const bool has_response_payload_;
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800168 grpc_call* call_;
Craig Tillerc4165772015-02-11 10:51:04 -0800169 gpr_timespec deadline_;
170 grpc_metadata_array request_metadata_;
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800171 grpc_byte_buffer* request_payload_;
172 grpc_completion_queue* cq_;
Craig Tillerc4165772015-02-11 10:51:04 -0800173};
174
Craig Tiller42bc87c2015-02-23 08:50:19 -0800175Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned)
vjpaicad5f0a2015-02-18 22:02:52 -0800176 : started_(false),
177 shutdown_(false),
178 num_running_cb_(0),
Craig Tiller42bc87c2015-02-23 08:50:19 -0800179 server_(grpc_server_create(cq_.cq(), nullptr)),
vjpaicad5f0a2015-02-18 22:02:52 -0800180 thread_pool_(thread_pool),
Craig Tiller42bc87c2015-02-23 08:50:19 -0800181 thread_pool_owned_(thread_pool_owned) {}
vjpaicad5f0a2015-02-18 22:02:52 -0800182
183Server::~Server() {
184 std::unique_lock<std::mutex> lock(mu_);
185 if (started_ && !shutdown_) {
186 lock.unlock();
187 Shutdown();
188 } else {
189 lock.unlock();
190 }
191 grpc_server_destroy(server_);
192 if (thread_pool_owned_) {
193 delete thread_pool_;
194 }
195}
196
197bool Server::RegisterService(RpcService* service) {
198 for (int i = 0; i < service->GetMethodCount(); ++i) {
199 RpcServiceMethod* method = service->GetMethod(i);
200 void* tag =
201 grpc_server_register_method(server_, method->name(), nullptr, cq_.cq());
202 if (!tag) {
203 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
204 method->name());
205 return false;
206 }
207 sync_methods_.emplace_back(method, tag);
208 }
209 return true;
210}
211
212bool Server::RegisterAsyncService(AsynchronousService* service) {
213 GPR_ASSERT(service->dispatch_impl_ == nullptr &&
214 "Can only register an asynchronous service against one server.");
215 service->dispatch_impl_ = this;
216 service->request_args_ = new void* [service->method_count_];
217 for (size_t i = 0; i < service->method_count_; ++i) {
218 void* tag =
219 grpc_server_register_method(server_, service->method_names_[i], nullptr,
220 service->completion_queue()->cq());
221 if (!tag) {
222 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
223 service->method_names_[i]);
224 return false;
225 }
226 service->request_args_[i] = tag;
227 }
228 return true;
229}
230
Yang Gao49996492015-03-12 16:40:19 -0700231void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
Yang Gao1c402332015-03-05 16:39:25 -0800232 GPR_ASSERT(service->server_ == nullptr &&
Yang Gao005eb882015-03-11 22:17:13 -0700233 "Can only register an generic service against one server.");
Yang Gao1c402332015-03-05 16:39:25 -0800234 service->server_ = this;
235}
236
Craig Tiller42bc87c2015-02-23 08:50:19 -0800237int Server::AddPort(const grpc::string& addr, ServerCredentials* creds) {
vjpaicad5f0a2015-02-18 22:02:52 -0800238 GPR_ASSERT(!started_);
Craig Tiller42bc87c2015-02-23 08:50:19 -0800239 return creds->AddPortToServer(addr, server_);
vjpaicad5f0a2015-02-18 22:02:52 -0800240}
241
Craig Tiller0db1bef2015-02-09 13:47:39 -0800242bool Server::Start() {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800243 GPR_ASSERT(!started_);
244 started_ = true;
245 grpc_server_start(server_);
246
247 // Start processing rpcs.
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800248 if (!sync_methods_.empty()) {
249 for (auto& m : sync_methods_) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800250 m.Request(server_);
Craig Tillercbd04852015-02-10 17:39:54 -0800251 }
252
Craig Tiller7c72adc2015-02-09 14:07:26 -0800253 ScheduleCallback();
254 }
Craig Tiller0db1bef2015-02-09 13:47:39 -0800255
256 return true;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800257}
258
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800259void Server::Shutdown() {
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800260 std::unique_lock<std::mutex> lock(mu_);
261 if (started_ && !shutdown_) {
262 shutdown_ = true;
263 grpc_server_shutdown(server_);
264 cq_.Shutdown();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800265
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800266 // Wait for running callbacks to finish.
267 while (num_running_cb_ != 0) {
268 callback_cv_.wait(lock);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800269 }
270 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800271}
272
Craig Tiller6e57b9e2015-02-24 15:46:22 -0800273void Server::Wait() {
274 std::unique_lock<std::mutex> lock(mu_);
275 while (num_running_cb_ != 0) {
276 callback_cv_.wait(lock);
277 }
278}
279
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800280void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
Craig Tillerbb5227f2015-02-11 13:34:48 -0800281 static const size_t MAX_OPS = 8;
282 size_t nops = MAX_OPS;
283 grpc_op ops[MAX_OPS];
284 buf->FillOps(ops, &nops);
285 GPR_ASSERT(GRPC_CALL_OK ==
Craig Tiller8c8d0aa2015-02-12 11:38:36 -0800286 grpc_call_start_batch(call->call(), ops, nops, buf));
Craig Tillerbb5227f2015-02-11 13:34:48 -0800287}
288
Craig Tillercf133f42015-02-26 14:05:56 -0800289class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800290 public:
291 AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
Yang Gao7694c352015-03-03 09:48:06 -0800292 grpc::protobuf::Message* request,
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800293 ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
294 void* tag)
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800295 : tag_(tag),
296 request_(request),
297 stream_(stream),
298 cq_(cq),
299 ctx_(ctx),
Yang Gao005eb882015-03-11 22:17:13 -0700300 generic_ctx_(nullptr),
Craig Tillercf133f42015-02-26 14:05:56 -0800301 server_(server),
302 call_(nullptr),
303 payload_(nullptr) {
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800304 memset(&array_, 0, sizeof(array_));
Yang Gao1c402332015-03-05 16:39:25 -0800305 grpc_call_details_init(&call_details_);
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800306 grpc_server_request_registered_call(
Yang Gao1c402332015-03-05 16:39:25 -0800307 server->server_, registered_method, &call_, &call_details_.deadline,
308 &array_, request ? &payload_ : nullptr, cq->cq(), this);
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800309 }
310
Yang Gao005eb882015-03-11 22:17:13 -0700311 AsyncRequest(Server* server, GenericServerContext* ctx,
Yang Gao1c402332015-03-05 16:39:25 -0800312 ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
313 void* tag)
314 : tag_(tag),
315 request_(nullptr),
316 stream_(stream),
317 cq_(cq),
318 ctx_(nullptr),
Yang Gao005eb882015-03-11 22:17:13 -0700319 generic_ctx_(ctx),
Yang Gao1c402332015-03-05 16:39:25 -0800320 server_(server),
321 call_(nullptr),
322 payload_(nullptr) {
323 memset(&array_, 0, sizeof(array_));
324 grpc_call_details_init(&call_details_);
325 grpc_server_request_call(
326 server->server_, &call_, &call_details_, &array_, cq->cq(), this);
327 }
328
329
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800330 ~AsyncRequest() {
331 if (payload_) {
332 grpc_byte_buffer_destroy(payload_);
333 }
334 grpc_metadata_array_destroy(&array_);
335 }
336
Craig Tillercf133f42015-02-26 14:05:56 -0800337 bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800338 *tag = tag_;
Vijay Paidbb79632015-03-03 11:54:27 -0800339 bool orig_status = *status;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800340 if (*status && request_) {
341 if (payload_) {
Craig Tiller645466e2015-02-18 09:18:33 -0800342 *status = DeserializeProto(payload_, request_);
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800343 } else {
344 *status = false;
345 }
346 }
Yang Gao005eb882015-03-11 22:17:13 -0700347 ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_;
Yang Gao1c402332015-03-05 16:39:25 -0800348 GPR_ASSERT(ctx);
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800349 if (*status) {
Yang Gao1c402332015-03-05 16:39:25 -0800350 ctx->deadline_ = Timespec2Timepoint(call_details_.deadline);
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800351 for (size_t i = 0; i < array_.count; i++) {
Yang Gao1c402332015-03-05 16:39:25 -0800352 ctx->client_metadata_.insert(std::make_pair(
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800353 grpc::string(array_.metadata[i].key),
354 grpc::string(
355 array_.metadata[i].value,
356 array_.metadata[i].value + array_.metadata[i].value_length)));
357 }
Yang Gao005eb882015-03-11 22:17:13 -0700358 if (generic_ctx_) {
Yang Gao2a3c96a2015-03-11 23:32:40 -0700359 // TODO(yangg) remove the copy here.
Yang Gao005eb882015-03-11 22:17:13 -0700360 generic_ctx_->method_ = call_details_.method;
361 generic_ctx_->host_ = call_details_.host;
Yang Gao2a3c96a2015-03-11 23:32:40 -0700362 gpr_free(call_details_.method);
363 gpr_free(call_details_.host);
Yang Gao5f4539f2015-03-06 16:11:16 -0800364 }
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800365 }
Yang Gao1c402332015-03-05 16:39:25 -0800366 ctx->call_ = call_;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800367 Call call(call_, server_, cq_);
Vijay Paidbb79632015-03-03 11:54:27 -0800368 if (orig_status && call_) {
Yang Gao1c402332015-03-05 16:39:25 -0800369 ctx->BeginCompletionOp(&call);
Vijay Pai0823cb72015-03-02 15:48:51 -0800370 }
Craig Tiller645466e2015-02-18 09:18:33 -0800371 // just the pointers inside call are copied here
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800372 stream_->BindCall(&call);
373 delete this;
Craig Tiller645466e2015-02-18 09:18:33 -0800374 return true;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800375 }
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800376
377 private:
378 void* const tag_;
Yang Gao7694c352015-03-03 09:48:06 -0800379 grpc::protobuf::Message* const request_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800380 ServerAsyncStreamingInterface* const stream_;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800381 CompletionQueue* const cq_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800382 ServerContext* const ctx_;
Yang Gao005eb882015-03-11 22:17:13 -0700383 GenericServerContext* const generic_ctx_;
Craig Tiller3d6ceb62015-02-12 14:33:54 -0800384 Server* const server_;
Craig Tillercf133f42015-02-26 14:05:56 -0800385 grpc_call* call_;
Yang Gao1c402332015-03-05 16:39:25 -0800386 grpc_call_details call_details_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800387 grpc_metadata_array array_;
Craig Tillercf133f42015-02-26 14:05:56 -0800388 grpc_byte_buffer* payload_;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800389};
390
391void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
Yang Gao7694c352015-03-03 09:48:06 -0800392 grpc::protobuf::Message* request,
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800393 ServerAsyncStreamingInterface* stream,
394 CompletionQueue* cq, void* tag) {
395 new AsyncRequest(this, registered_method, context, request, stream, cq, tag);
396}
Yang Gao5f4539f2015-03-06 16:11:16 -0800397
Yang Gao49996492015-03-12 16:40:19 -0700398void Server::RequestAsyncGenericCall(GenericServerContext* context,
399 ServerAsyncStreamingInterface* stream,
400 CompletionQueue* cq, void* tag) {
Yang Gao1c402332015-03-05 16:39:25 -0800401 new AsyncRequest(this, context, stream, cq, tag);
402}
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800403
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800404void Server::ScheduleCallback() {
405 {
406 std::unique_lock<std::mutex> lock(mu_);
407 num_running_cb_++;
408 }
Craig Tiller0db1bef2015-02-09 13:47:39 -0800409 thread_pool_->ScheduleCallback(std::bind(&Server::RunRpc, this));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800410}
411
412void Server::RunRpc() {
413 // Wait for one more incoming rpc.
Craig Tillerc4165772015-02-11 10:51:04 -0800414 bool ok;
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800415 auto* mrd = SyncRequest::Wait(&cq_, &ok);
Craig Tillercbd04852015-02-10 17:39:54 -0800416 if (mrd) {
Craig Tillerbd217572015-02-11 18:10:56 -0800417 ScheduleCallback();
Craig Tillerc4165772015-02-11 10:51:04 -0800418 if (ok) {
Craig Tiller1c9a2a92015-02-12 14:10:25 -0800419 SyncRequest::CallData cd(this, mrd);
Craig Tiller3b29b562015-02-11 12:58:46 -0800420 mrd->Request(server_);
Craig Tillercbd04852015-02-10 17:39:54 -0800421
Craig Tillerc4165772015-02-11 10:51:04 -0800422 cd.Run();
423 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800424 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800425
426 {
427 std::unique_lock<std::mutex> lock(mu_);
428 num_running_cb_--;
429 if (shutdown_) {
430 callback_cv_.notify_all();
431 }
432 }
433}
434
435} // namespace grpc