blob: 4965f1870873d5f0184932cc3cbd8f44ab7f2d7f [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc++/server.h>
35#include <utility>
36
37#include <grpc/grpc.h>
yangg9e21f722014-12-08 15:49:52 -080038#include <grpc/grpc_security.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040#include "src/cpp/server/server_rpc_handler.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080041#include <grpc++/async_server_context.h>
42#include <grpc++/completion_queue.h>
yangg1b151092015-01-09 15:31:05 -080043#include <grpc++/impl/rpc_service_method.h>
yangg9e21f722014-12-08 15:49:52 -080044#include <grpc++/server_credentials.h>
Craig Tiller0db1bef2015-02-09 13:47:39 -080045#include <grpc++/thread_pool_interface.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080046
47namespace grpc {
48
Craig Tiller0db1bef2015-02-09 13:47:39 -080049Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerCredentials *creds)
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050 : started_(false),
51 shutdown_(false),
52 num_running_cb_(0),
Craig Tiller0db1bef2015-02-09 13:47:39 -080053 thread_pool_(thread_pool),
54 thread_pool_owned_(thread_pool_owned),
yangg9e21f722014-12-08 15:49:52 -080055 secure_(creds != nullptr) {
56 if (creds) {
57 server_ =
58 grpc_secure_server_create(creds->GetRawCreds(), cq_.cq(), nullptr);
59 } else {
60 server_ = grpc_server_create(cq_.cq(), nullptr);
61 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080062}
63
64Server::Server() {
65 // Should not be called.
66 GPR_ASSERT(false);
67}
68
69Server::~Server() {
70 std::unique_lock<std::mutex> lock(mu_);
71 if (started_ && !shutdown_) {
72 lock.unlock();
73 Shutdown();
74 }
75 grpc_server_destroy(server_);
76 if (thread_pool_owned_) {
77 delete thread_pool_;
78 }
79}
80
Craig Tiller0db1bef2015-02-09 13:47:39 -080081bool Server::RegisterService(RpcService *service) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080082 for (int i = 0; i < service->GetMethodCount(); ++i) {
Craig Tillerecd49342015-01-18 14:36:47 -080083 RpcServiceMethod *method = service->GetMethod(i);
Craig Tiller0db1bef2015-02-09 13:47:39 -080084 if (method_map_.find(method->name()) != method_map_.end()) {
85 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name());
86 return false;
87 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080088 method_map_.insert(std::make_pair(method->name(), method));
89 }
Craig Tiller7c72adc2015-02-09 14:07:26 -080090 return true;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080091}
92
Craig Tiller0db1bef2015-02-09 13:47:39 -080093int Server::AddPort(const grpc::string &addr) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080094 GPR_ASSERT(!started_);
yangg9e21f722014-12-08 15:49:52 -080095 if (secure_) {
Craig Tiller0db1bef2015-02-09 13:47:39 -080096 return grpc_server_add_secure_http2_port(server_, addr.c_str());
yangg9e21f722014-12-08 15:49:52 -080097 } else {
Craig Tiller0db1bef2015-02-09 13:47:39 -080098 return grpc_server_add_http2_port(server_, addr.c_str());
yangg9e21f722014-12-08 15:49:52 -080099 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800100}
101
Craig Tiller0db1bef2015-02-09 13:47:39 -0800102bool Server::Start() {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800103 GPR_ASSERT(!started_);
104 started_ = true;
105 grpc_server_start(server_);
106
107 // Start processing rpcs.
Craig Tiller7c72adc2015-02-09 14:07:26 -0800108 if (thread_pool_) {
109 ScheduleCallback();
110 }
Craig Tiller0db1bef2015-02-09 13:47:39 -0800111
112 return true;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800113}
114
115void Server::AllowOneRpc() {
116 GPR_ASSERT(started_);
Craig Tillerc4f0ebe2015-02-02 10:16:30 -0800117 grpc_call_error err = grpc_server_request_call_old(server_, nullptr);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800118 GPR_ASSERT(err == GRPC_CALL_OK);
119}
120
121void Server::Shutdown() {
122 {
123 std::unique_lock<std::mutex> lock(mu_);
124 if (started_ && !shutdown_) {
125 shutdown_ = true;
126 grpc_server_shutdown(server_);
127
128 // Wait for running callbacks to finish.
129 while (num_running_cb_ != 0) {
130 callback_cv_.wait(lock);
131 }
132 }
133 }
134
135 // Shutdown the completion queue.
136 cq_.Shutdown();
Craig Tillerecd49342015-01-18 14:36:47 -0800137 void *tag = nullptr;
Craig Tiller7c72adc2015-02-09 14:07:26 -0800138 bool ok = false;
139 GPR_ASSERT(false == cq_.Next(&tag, &ok));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800140}
141
142void Server::ScheduleCallback() {
143 {
144 std::unique_lock<std::mutex> lock(mu_);
145 num_running_cb_++;
146 }
Craig Tiller0db1bef2015-02-09 13:47:39 -0800147 thread_pool_->ScheduleCallback(std::bind(&Server::RunRpc, this));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800148}
149
150void Server::RunRpc() {
151 // Wait for one more incoming rpc.
Craig Tillerecd49342015-01-18 14:36:47 -0800152 void *tag = nullptr;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800153 AllowOneRpc();
Craig Tiller7c72adc2015-02-09 14:07:26 -0800154 bool ok = false;
155 GPR_ASSERT(cq_.Next(&tag, &ok));
156 if (ok) {
157 AsyncServerContext *server_context = static_cast<AsyncServerContext *>(tag);
158 // server_context could be nullptr during server shutdown.
159 if (server_context != nullptr) {
160 // Schedule a new callback to handle more rpcs.
161 ScheduleCallback();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800162
Craig Tiller7c72adc2015-02-09 14:07:26 -0800163 RpcServiceMethod *method = nullptr;
164 auto iter = method_map_.find(server_context->method());
165 if (iter != method_map_.end()) {
166 method = iter->second;
167 }
168 ServerRpcHandler rpc_handler(server_context, method);
169 rpc_handler.StartRpc();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800170 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800171 }
172
173 {
174 std::unique_lock<std::mutex> lock(mu_);
175 num_running_cb_--;
176 if (shutdown_) {
177 callback_cv_.notify_all();
178 }
179 }
180}
181
182} // namespace grpc