blob: 439452aea270e23ff32e608f8e590d8407d8c53c [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
Craig Tiller6a006ce2015-07-13 16:25:40 -070039#include <grpc/support/alloc.h>
40#include <grpc/support/log.h>
41#include <grpc/support/string_util.h>
42#include <grpc/support/useful.h>
43
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/channel/census_filter.h"
45#include "src/core/channel/channel_args.h"
46#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080047#include "src/core/iomgr/iomgr.h"
Craig Tiller6a006ce2015-07-13 16:25:40 -070048#include "src/core/support/stack_lockfree.h"
Craig Tiller485d7762015-01-23 12:54:05 -080049#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050#include "src/core/surface/call.h"
51#include "src/core/surface/channel.h"
52#include "src/core/surface/completion_queue.h"
Craig Tiller60fd3612015-03-05 16:24:22 -080053#include "src/core/surface/init.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080054#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080055
56typedef struct listener {
57 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080058 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
59 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080060 void (*destroy)(grpc_server *server, void *arg);
61 struct listener *next;
62} listener;
63
64typedef struct call_data call_data;
65typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080066typedef struct registered_method registered_method;
67
68typedef struct {
69 call_data *next;
70 call_data *prev;
71} call_link;
72
Craig Tiller0e919562015-04-28 14:03:47 -070073typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
Craig Tiller24be0f72015-02-10 14:04:22 -080074
Craig Tiller97fc6a32015-07-08 15:31:35 -070075typedef struct requested_call {
Craig Tiller24be0f72015-02-10 14:04:22 -080076 requested_call_type type;
77 void *tag;
Craig Tiller6a006ce2015-07-13 16:25:40 -070078 grpc_server *server;
Craig Tillerf9e6adf2015-05-06 11:45:59 -070079 grpc_completion_queue *cq_bound_to_call;
80 grpc_completion_queue *cq_for_notification;
81 grpc_call **call;
Craig Tiller97fc6a32015-07-08 15:31:35 -070082 grpc_cq_completion completion;
Craig Tiller24be0f72015-02-10 14:04:22 -080083 union {
84 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080085 grpc_call_details *details;
86 grpc_metadata_array *initial_metadata;
87 } batch;
88 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080089 registered_method *registered_method;
90 gpr_timespec *deadline;
91 grpc_metadata_array *initial_metadata;
92 grpc_byte_buffer **optional_payload;
93 } registered;
94 } data;
95} requested_call;
96
Craig Tiller24be0f72015-02-10 14:04:22 -080097typedef struct channel_registered_method {
98 registered_method *server_registered_method;
99 grpc_mdstr *method;
100 grpc_mdstr *host;
101} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800102
103struct channel_data {
104 grpc_server *server;
Craig Tillere039f032015-06-25 12:54:23 -0700105 grpc_connectivity_state connectivity_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800106 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800107 grpc_mdstr *path_key;
108 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800109 /* linked list of all channels on a server */
110 channel_data *next;
111 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800112 channel_registered_method *registered_methods;
113 gpr_uint32 registered_method_slots;
114 gpr_uint32 registered_method_max_probes;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700115 grpc_iomgr_closure finish_destroy_channel_closure;
Craig Tillere039f032015-06-25 12:54:23 -0700116 grpc_iomgr_closure channel_connectivity_changed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800117};
118
Craig Tillerbce999f2015-05-27 09:55:51 -0700119typedef struct shutdown_tag {
120 void *tag;
121 grpc_completion_queue *cq;
Craig Tiller97fc6a32015-07-08 15:31:35 -0700122 grpc_cq_completion completion;
Craig Tillerbce999f2015-05-27 09:55:51 -0700123} shutdown_tag;
124
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800125typedef enum {
126 /* waiting for metadata */
127 NOT_STARTED,
128 /* inital metadata read, not flow controlled in yet */
129 PENDING,
130 /* flow controlled in, on completion queue */
131 ACTIVATED,
132 /* cancelled before being queued */
133 ZOMBIED
134} call_state;
135
Craig Tiller729b35a2015-07-13 12:36:47 -0700136typedef struct request_matcher request_matcher;
137
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800138struct call_data {
139 grpc_call *call;
140
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700141 /** protects state */
142 gpr_mu mu_state;
143 /** the current state of a call - see call_state */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800144 call_state state;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700145
Craig Tillercce17ac2015-01-20 09:29:28 -0800146 grpc_mdstr *path;
147 grpc_mdstr *host;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700148 gpr_timespec deadline;
149 int got_initial_metadata;
Craig Tillercce17ac2015-01-20 09:29:28 -0800150
Craig Tiller20bc56d2015-02-12 09:02:56 -0800151 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800152
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700153 grpc_stream_op_buffer *recv_ops;
154 grpc_stream_state *recv_state;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700155 grpc_iomgr_closure *on_done_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700156
Craig Tiller1e6facb2015-06-11 22:47:11 -0700157 grpc_iomgr_closure server_on_recv;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700158 grpc_iomgr_closure kill_zombie_closure;
159
Craig Tiller729b35a2015-07-13 12:36:47 -0700160 call_data *pending_next;
161};
162
163struct request_matcher {
164 call_data *pending_head;
165 call_data *pending_tail;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700166 gpr_stack_lockfree *requests;
Craig Tiller729b35a2015-07-13 12:36:47 -0700167};
168
169struct registered_method {
170 char *method;
171 char *host;
172 request_matcher request_matcher;
173 registered_method *next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800174};
175
Craig Tillerff3ae682015-06-29 17:44:04 -0700176typedef struct {
177 grpc_channel **channels;
178 size_t num_channels;
179} channel_broadcaster;
180
Craig Tiller729b35a2015-07-13 12:36:47 -0700181struct grpc_server {
182 size_t channel_filter_count;
183 const grpc_channel_filter **channel_filters;
184 grpc_channel_args *channel_args;
185
186 grpc_completion_queue **cqs;
187 grpc_pollset **pollsets;
188 size_t cq_count;
189
190 /* The two following mutexes control access to server-state
191 mu_global controls access to non-call-related state (e.g., channel state)
192 mu_call controls access to call-related state (e.g., the call lists)
193
194 If they are ever required to be nested, you must lock mu_global
195 before mu_call. This is currently used in shutdown processing
196 (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
197 gpr_mu mu_global; /* mutex for server and channel state */
198 gpr_mu mu_call; /* mutex for call-specific state */
199
200 registered_method *registered_methods;
201 request_matcher unregistered_request_matcher;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700202 /** free list of available requested_calls indices */
203 gpr_stack_lockfree *request_freelist;
204 /** requested call backing data */
205 requested_call *requested_calls;
206 int max_requested_calls;
Craig Tiller729b35a2015-07-13 12:36:47 -0700207
Craig Tiller6a006ce2015-07-13 16:25:40 -0700208 gpr_atm shutdown_flag;
Craig Tiller729b35a2015-07-13 12:36:47 -0700209 gpr_uint8 shutdown_published;
210 size_t num_shutdown_tags;
211 shutdown_tag *shutdown_tags;
212
213 channel_data root_channel_data;
214
215 listener *listeners;
216 int listeners_destroyed;
217 gpr_refcount internal_refcount;
218
219 /** when did we print the last shutdown progress message */
220 gpr_timespec last_shutdown_message_time;
221};
222
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800223#define SERVER_FROM_CALL_ELEM(elem) \
224 (((channel_data *)(elem)->channel_data)->server)
225
Craig Tiller24be0f72015-02-10 14:04:22 -0800226static void begin_call(grpc_server *server, call_data *calld,
227 requested_call *rc);
228static void fail_call(grpc_server *server, requested_call *rc);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700229/* Before calling maybe_finish_shutdown, we must hold mu_global and not
230 hold mu_call */
Craig Tiller52760dd2015-05-29 23:09:26 -0700231static void maybe_finish_shutdown(grpc_server *server);
Craig Tiller24be0f72015-02-10 14:04:22 -0800232
Craig Tiller729b35a2015-07-13 12:36:47 -0700233/*
234 * channel broadcaster
235 */
Craig Tillerff3ae682015-06-29 17:44:04 -0700236
237/* assumes server locked */
238static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
239 channel_data *c;
240 size_t count = 0;
Craig Tiller079a11b2015-06-30 10:07:15 -0700241 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
242 count++;
Craig Tillerff3ae682015-06-29 17:44:04 -0700243 }
244 cb->num_channels = count;
245 cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
246 count = 0;
Craig Tiller079a11b2015-06-30 10:07:15 -0700247 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
Craig Tiller49924e02015-06-29 22:42:33 -0700248 cb->channels[count++] = c->channel;
Craig Tillerff3ae682015-06-29 17:44:04 -0700249 GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
Craig Tillerff3ae682015-06-29 17:44:04 -0700250 }
251}
252
253struct shutdown_cleanup_args {
254 grpc_iomgr_closure closure;
255 gpr_slice slice;
256};
257
258static void shutdown_cleanup(void *arg, int iomgr_status_ignored) {
259 struct shutdown_cleanup_args *a = arg;
260 gpr_slice_unref(a->slice);
261 gpr_free(a);
262}
263
Craig Tiller079a11b2015-06-30 10:07:15 -0700264static void send_shutdown(grpc_channel *channel, int send_goaway,
265 int send_disconnect) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700266 grpc_transport_op op;
267 struct shutdown_cleanup_args *sc;
268 grpc_channel_element *elem;
269
270 memset(&op, 0, sizeof(op));
Craig Tillerff3ae682015-06-29 17:44:04 -0700271 op.send_goaway = send_goaway;
272 sc = gpr_malloc(sizeof(*sc));
273 sc->slice = gpr_slice_from_copied_string("Server shutdown");
274 op.goaway_message = &sc->slice;
275 op.goaway_status = GRPC_STATUS_OK;
276 op.disconnect = send_disconnect;
277 grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc);
278 op.on_consumed = &sc->closure;
279
Craig Tiller079a11b2015-06-30 10:07:15 -0700280 elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
Craig Tillerff3ae682015-06-29 17:44:04 -0700281 elem->filter->start_transport_op(elem, &op);
282}
283
Craig Tiller079a11b2015-06-30 10:07:15 -0700284static void channel_broadcaster_shutdown(channel_broadcaster *cb,
Craig Tiller12cf5372015-07-09 13:48:11 -0700285 int send_goaway,
286 int force_disconnect) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700287 size_t i;
288
289 for (i = 0; i < cb->num_channels; i++) {
Craig Tiller9188d7a2015-07-05 12:44:37 -0700290 send_shutdown(cb->channels[i], send_goaway, force_disconnect);
Craig Tillerff3ae682015-06-29 17:44:04 -0700291 GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
292 }
293 gpr_free(cb->channels);
294}
295
Craig Tiller729b35a2015-07-13 12:36:47 -0700296/*
297 * request_matcher
298 */
Craig Tillerff3ae682015-06-29 17:44:04 -0700299
Craig Tiller6a006ce2015-07-13 16:25:40 -0700300static void request_matcher_init(request_matcher *request_matcher,
301 int entries) {
Craig Tiller729b35a2015-07-13 12:36:47 -0700302 memset(request_matcher, 0, sizeof(*request_matcher));
Craig Tiller6a006ce2015-07-13 16:25:40 -0700303 request_matcher->requests = gpr_stack_lockfree_create(entries);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800304}
305
Craig Tiller6a006ce2015-07-13 16:25:40 -0700306static void request_matcher_destroy(request_matcher *request_matcher) {
307 GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1);
308 gpr_stack_lockfree_destroy(request_matcher->requests);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800309}
310
Craig Tiller729b35a2015-07-13 12:36:47 -0700311static void kill_zombie(void *elem, int success) {
312 grpc_call_destroy(grpc_call_from_top_element(elem));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313}
314
Craig Tiller729b35a2015-07-13 12:36:47 -0700315static void request_matcher_zombify_all_pending_calls(
316 request_matcher *request_matcher) {
317 while (request_matcher->pending_head) {
318 call_data *calld = request_matcher->pending_head;
319 request_matcher->pending_head = calld->pending_next;
Craig Tillerb6450262015-07-14 07:12:19 -0700320 gpr_mu_lock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700321 calld->state = ZOMBIED;
Craig Tillerb6450262015-07-14 07:12:19 -0700322 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700323 grpc_iomgr_closure_init(
324 &calld->kill_zombie_closure, kill_zombie,
325 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
326 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800327 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800328}
329
Craig Tiller729b35a2015-07-13 12:36:47 -0700330/*
331 * server proper
332 */
333
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800334static void server_ref(grpc_server *server) {
335 gpr_ref(&server->internal_refcount);
336}
337
Craig Tilleree945e82015-05-26 16:15:34 -0700338static void server_delete(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800339 registered_method *rm;
Craig Tiller89504612015-04-27 11:48:46 -0700340 size_t i;
Craig Tilleree945e82015-05-26 16:15:34 -0700341 grpc_channel_args_destroy(server->channel_args);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700342 gpr_mu_destroy(&server->mu_global);
343 gpr_mu_destroy(&server->mu_call);
Craig Tilleree945e82015-05-26 16:15:34 -0700344 gpr_free(server->channel_filters);
Craig Tilleree945e82015-05-26 16:15:34 -0700345 while ((rm = server->registered_methods) != NULL) {
346 server->registered_methods = rm->next;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700347 request_matcher_destroy(&rm->request_matcher);
Craig Tilleree945e82015-05-26 16:15:34 -0700348 gpr_free(rm->method);
349 gpr_free(rm->host);
Craig Tilleree945e82015-05-26 16:15:34 -0700350 gpr_free(rm);
351 }
352 for (i = 0; i < server->cq_count; i++) {
Craig Tiller463f2372015-05-28 16:16:15 -0700353 GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
Craig Tilleree945e82015-05-26 16:15:34 -0700354 }
Craig Tiller6a006ce2015-07-13 16:25:40 -0700355 request_matcher_destroy(&server->unregistered_request_matcher);
356 gpr_stack_lockfree_destroy(server->request_freelist);
Craig Tilleree945e82015-05-26 16:15:34 -0700357 gpr_free(server->cqs);
358 gpr_free(server->pollsets);
359 gpr_free(server->shutdown_tags);
Craig Tiller6a006ce2015-07-13 16:25:40 -0700360 gpr_free(server->requested_calls);
Craig Tilleree945e82015-05-26 16:15:34 -0700361 gpr_free(server);
362}
363
364static void server_unref(grpc_server *server) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800365 if (gpr_unref(&server->internal_refcount)) {
Craig Tilleree945e82015-05-26 16:15:34 -0700366 server_delete(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367 }
368}
369
370static int is_channel_orphaned(channel_data *chand) {
371 return chand->next == chand;
372}
373
374static void orphan_channel(channel_data *chand) {
375 chand->next->prev = chand->prev;
376 chand->prev->next = chand->next;
377 chand->next = chand->prev = chand;
378}
379
ctiller58393c22015-01-07 14:03:30 -0800380static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800381 channel_data *chand = cd;
382 grpc_server *server = chand->server;
Craig Tiller9ec2a522015-05-29 22:46:54 -0700383 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800384 server_unref(server);
385}
386
387static void destroy_channel(channel_data *chand) {
388 if (is_channel_orphaned(chand)) return;
389 GPR_ASSERT(chand->server != NULL);
390 orphan_channel(chand);
391 server_ref(chand->server);
Craig Tiller52760dd2015-05-29 23:09:26 -0700392 maybe_finish_shutdown(chand->server);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700393 chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
394 chand->finish_destroy_channel_closure.cb_arg = chand;
395 grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800396}
397
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700398static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
Craig Tiller729b35a2015-07-13 12:36:47 -0700399 request_matcher *request_matcher) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800400 call_data *calld = elem->call_data;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700401 int request_id;
402
403 request_id = gpr_stack_lockfree_pop(request_matcher->requests);
404 if (request_id == -1) {
405 gpr_mu_lock(&server->mu_call);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700406 gpr_mu_lock(&calld->mu_state);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800407 calld->state = PENDING;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700408 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700409 if (request_matcher->pending_head == NULL) {
410 request_matcher->pending_tail = request_matcher->pending_head = calld;
411 } else {
412 request_matcher->pending_tail->pending_next = calld;
413 request_matcher->pending_tail = calld;
414 }
415 calld->pending_next = NULL;
Vijay Pai8931cdd2015-06-17 12:42:17 -0700416 gpr_mu_unlock(&server->mu_call);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800417 } else {
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700418 gpr_mu_lock(&calld->mu_state);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800419 calld->state = ACTIVATED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700420 gpr_mu_unlock(&calld->mu_state);
Craig Tiller6a006ce2015-07-13 16:25:40 -0700421 begin_call(server, calld, &server->requested_calls[request_id]);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800422 }
423}
424
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800425static void start_new_rpc(grpc_call_element *elem) {
426 channel_data *chand = elem->channel_data;
427 call_data *calld = elem->call_data;
428 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800429 gpr_uint32 i;
430 gpr_uint32 hash;
431 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800432
Craig Tiller04cc8be2015-02-10 16:11:22 -0800433 if (chand->registered_methods && calld->path && calld->host) {
Craig Tillera94beff2015-02-17 22:02:06 -0800434 /* TODO(ctiller): unify these two searches */
Craig Tiller04cc8be2015-02-10 16:11:22 -0800435 /* check for an exact match with host */
436 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
Craig Tillerc6d6d902015-07-09 15:53:50 -0700437 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800438 rm = &chand->registered_methods[(hash + i) %
439 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800440 if (!rm) break;
441 if (rm->host != calld->host) continue;
442 if (rm->method != calld->path) continue;
Craig Tiller729b35a2015-07-13 12:36:47 -0700443 finish_start_new_rpc(server, elem,
444 &rm->server_registered_method->request_matcher);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800445 return;
446 }
447 /* check for a wildcard method definition (no host set) */
448 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800449 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800450 rm = &chand->registered_methods[(hash + i) %
451 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800452 if (!rm) break;
453 if (rm->host != NULL) continue;
454 if (rm->method != calld->path) continue;
Craig Tiller729b35a2015-07-13 12:36:47 -0700455 finish_start_new_rpc(server, elem,
456 &rm->server_registered_method->request_matcher);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800457 return;
458 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800459 }
Craig Tiller729b35a2015-07-13 12:36:47 -0700460 finish_start_new_rpc(server, elem, &server->unregistered_request_matcher);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800461}
462
Craig Tilleree945e82015-05-26 16:15:34 -0700463static int num_listeners(grpc_server *server) {
464 listener *l;
465 int n = 0;
466 for (l = server->listeners; l; l = l->next) {
467 n++;
468 }
469 return n;
470}
471
Craig Tiller97fc6a32015-07-08 15:31:35 -0700472static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
473 server_unref(server);
474}
475
Craig Tillerab54f792015-07-08 08:34:20 -0700476static int num_channels(grpc_server *server) {
477 channel_data *chand;
478 int n = 0;
479 for (chand = server->root_channel_data.next;
480 chand != &server->root_channel_data; chand = chand->next) {
481 n++;
482 }
483 return n;
484}
485
Craig Tillerdc627722015-05-26 15:27:02 -0700486static void maybe_finish_shutdown(grpc_server *server) {
Craig Tillerbce999f2015-05-27 09:55:51 -0700487 size_t i;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700488 if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
Craig Tillerc4a1f522015-05-29 22:32:04 -0700489 return;
490 }
Vijay Pai8931cdd2015-06-17 12:42:17 -0700491
Craig Tillerab54f792015-07-08 08:34:20 -0700492 if (server->root_channel_data.next != &server->root_channel_data ||
493 server->listeners_destroyed < num_listeners(server)) {
Craig Tiller58bbc862015-07-13 09:51:17 -0700494 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
495 server->last_shutdown_message_time),
496 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
Craig Tiller080d6c52015-07-10 10:23:10 -0700497 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
Craig Tillerab54f792015-07-08 08:34:20 -0700498 gpr_log(GPR_DEBUG,
499 "Waiting for %d channels and %d/%d listeners to be destroyed"
500 " before shutting down server",
501 num_channels(server),
502 num_listeners(server) - server->listeners_destroyed,
503 num_listeners(server));
504 }
Craig Tillerc4a1f522015-05-29 22:32:04 -0700505 return;
506 }
507 server->shutdown_published = 1;
508 for (i = 0; i < server->num_shutdown_tags; i++) {
Craig Tiller97fc6a32015-07-08 15:31:35 -0700509 server_ref(server);
Craig Tiller12cf5372015-07-09 13:48:11 -0700510 grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1,
511 done_shutdown_event, server,
512 &server->shutdown_tags[i].completion);
Craig Tillerdc627722015-05-26 15:27:02 -0700513 }
514}
515
Craig Tiller6902ad22015-04-16 08:01:49 -0700516static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
517 grpc_call_element *elem = user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800518 channel_data *chand = elem->channel_data;
519 call_data *calld = elem->call_data;
Craig Tiller6902ad22015-04-16 08:01:49 -0700520 if (md->key == chand->path_key) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700521 calld->path = GRPC_MDSTR_REF(md->value);
Craig Tiller6902ad22015-04-16 08:01:49 -0700522 return NULL;
523 } else if (md->key == chand->authority_key) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700524 calld->host = GRPC_MDSTR_REF(md->value);
Craig Tiller6902ad22015-04-16 08:01:49 -0700525 return NULL;
526 }
527 return md;
528}
529
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700530static void server_on_recv(void *ptr, int success) {
531 grpc_call_element *elem = ptr;
Craig Tiller6902ad22015-04-16 08:01:49 -0700532 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700533
534 if (success && !calld->got_initial_metadata) {
535 size_t i;
536 size_t nops = calld->recv_ops->nops;
537 grpc_stream_op *ops = calld->recv_ops->ops;
538 for (i = 0; i < nops; i++) {
539 grpc_stream_op *op = &ops[i];
540 if (op->type != GRPC_OP_METADATA) continue;
Craig Tiller205aee12015-04-16 14:46:41 -0700541 grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
Craig Tiller143e7bf2015-07-13 08:41:49 -0700542 if (0 != gpr_time_cmp(op->data.metadata.deadline,
543 gpr_inf_future(GPR_CLOCK_REALTIME))) {
Craig Tiller6902ad22015-04-16 08:01:49 -0700544 calld->deadline = op->data.metadata.deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800545 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700546 calld->got_initial_metadata = 1;
547 start_new_rpc(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800548 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700549 }
550 }
551
552 switch (*calld->recv_state) {
Craig Tiller06aeea72015-04-23 10:54:45 -0700553 case GRPC_STREAM_OPEN:
554 break;
555 case GRPC_STREAM_SEND_CLOSED:
556 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700557 case GRPC_STREAM_RECV_CLOSED:
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700558 gpr_mu_lock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700559 if (calld->state == NOT_STARTED) {
560 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700561 gpr_mu_unlock(&calld->mu_state);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700562 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
563 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700564 } else {
565 gpr_mu_unlock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700566 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800567 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700568 case GRPC_STREAM_CLOSED:
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700569 gpr_mu_lock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700570 if (calld->state == NOT_STARTED) {
571 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700572 gpr_mu_unlock(&calld->mu_state);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700573 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
574 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700575 } else if (calld->state == PENDING) {
Craig Tillerc9d03822015-05-20 16:08:45 -0700576 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700577 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700578 /* zombied call will be destroyed when it's removed from the pending
579 queue... later */
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700580 } else {
581 gpr_mu_unlock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700582 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800583 break;
584 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700585
Craig Tiller1e6facb2015-06-11 22:47:11 -0700586 calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700587}
588
Craig Tillerb7959a02015-06-25 08:50:54 -0700589static void server_mutate_op(grpc_call_element *elem,
590 grpc_transport_stream_op *op) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700591 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700592
593 if (op->recv_ops) {
594 /* substitute our callback for the higher callback */
595 calld->recv_ops = op->recv_ops;
596 calld->recv_state = op->recv_state;
597 calld->on_done_recv = op->on_done_recv;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700598 op->on_done_recv = &calld->server_on_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700599 }
Craig Tiller50d9db52015-04-23 10:52:14 -0700600}
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700601
Craig Tillere039f032015-06-25 12:54:23 -0700602static void server_start_transport_stream_op(grpc_call_element *elem,
603 grpc_transport_stream_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -0700604 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
605 server_mutate_op(elem, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700606 grpc_call_next_op(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800607}
608
Craig Tillere039f032015-06-25 12:54:23 -0700609static void accept_stream(void *cd, grpc_transport *transport,
610 const void *transport_server_data) {
611 channel_data *chand = cd;
612 /* create a call */
613 grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0,
Craig Tiller143e7bf2015-07-13 08:41:49 -0700614 gpr_inf_future(GPR_CLOCK_REALTIME));
Craig Tillere039f032015-06-25 12:54:23 -0700615}
616
617static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
618 channel_data *chand = cd;
619 grpc_server *server = chand->server;
620 if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
621 grpc_transport_op op;
622 memset(&op, 0, sizeof(op));
623 op.on_connectivity_state_change = &chand->channel_connectivity_changed,
624 op.connectivity_state = &chand->connectivity_state;
625 grpc_channel_next_op(grpc_channel_stack_element(
626 grpc_channel_get_channel_stack(chand->channel), 0),
627 &op);
628 } else {
629 gpr_mu_lock(&server->mu_global);
630 destroy_channel(chand);
631 gpr_mu_unlock(&server->mu_global);
632 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
633 }
634}
635
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800636static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700637 const void *server_transport_data,
Craig Tillerb7959a02015-06-25 08:50:54 -0700638 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800639 call_data *calld = elem->call_data;
640 channel_data *chand = elem->channel_data;
641 memset(calld, 0, sizeof(call_data));
Craig Tiller143e7bf2015-07-13 08:41:49 -0700642 calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800643 calld->call = grpc_call_from_top_element(elem);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700644 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800645
Craig Tiller1e6facb2015-06-11 22:47:11 -0700646 grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
647
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800648 server_ref(chand->server);
Craig Tiller50d9db52015-04-23 10:52:14 -0700649
Craig Tiller482ef8b2015-04-23 11:38:20 -0700650 if (initial_op) server_mutate_op(elem, initial_op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800651}
652
653static void destroy_call_elem(grpc_call_element *elem) {
654 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800655 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800656
Craig Tiller729b35a2015-07-13 12:36:47 -0700657 GPR_ASSERT(calld->state != PENDING);
Craig Tiller092d8d12015-07-04 22:35:00 -0700658
Craig Tiller4df31a62015-01-30 09:44:31 -0800659 if (calld->host) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700660 GRPC_MDSTR_UNREF(calld->host);
Craig Tiller4df31a62015-01-30 09:44:31 -0800661 }
662 if (calld->path) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700663 GRPC_MDSTR_UNREF(calld->path);
Craig Tiller4df31a62015-01-30 09:44:31 -0800664 }
665
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700666 gpr_mu_destroy(&calld->mu_state);
667
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800668 server_unref(chand->server);
669}
670
Craig Tiller079a11b2015-06-30 10:07:15 -0700671static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800672 const grpc_channel_args *args,
673 grpc_mdctx *metadata_context, int is_first,
674 int is_last) {
675 channel_data *chand = elem->channel_data;
676 GPR_ASSERT(is_first);
677 GPR_ASSERT(!is_last);
678 chand->server = NULL;
679 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800680 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
681 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800682 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800683 chand->registered_methods = NULL;
Craig Tillere039f032015-06-25 12:54:23 -0700684 chand->connectivity_state = GRPC_CHANNEL_IDLE;
685 grpc_iomgr_closure_init(&chand->channel_connectivity_changed,
686 channel_connectivity_changed, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800687}
688
689static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800690 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800691 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800692 if (chand->registered_methods) {
693 for (i = 0; i < chand->registered_method_slots; i++) {
694 if (chand->registered_methods[i].method) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700695 GRPC_MDSTR_UNREF(chand->registered_methods[i].method);
Craig Tillerec3257c2015-02-12 15:59:43 -0800696 }
697 if (chand->registered_methods[i].host) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700698 GRPC_MDSTR_UNREF(chand->registered_methods[i].host);
Craig Tillerec3257c2015-02-12 15:59:43 -0800699 }
700 }
701 gpr_free(chand->registered_methods);
702 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800703 if (chand->server) {
Vijay Pai8931cdd2015-06-17 12:42:17 -0700704 gpr_mu_lock(&chand->server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800705 chand->next->prev = chand->prev;
706 chand->prev->next = chand->next;
707 chand->next = chand->prev = chand;
Craig Tillerb76d05b2015-05-29 17:21:56 -0700708 maybe_finish_shutdown(chand->server);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700709 gpr_mu_unlock(&chand->server->mu_global);
Craig Tiller1a65a232015-07-06 10:22:32 -0700710 GRPC_MDSTR_UNREF(chand->path_key);
711 GRPC_MDSTR_UNREF(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800712 server_unref(chand->server);
713 }
714}
715
716static const grpc_channel_filter server_surface_filter = {
Craig Tillere039f032015-06-25 12:54:23 -0700717 server_start_transport_stream_op,
718 grpc_channel_next_op,
Craig Tillerb76d05b2015-05-29 17:21:56 -0700719 sizeof(call_data),
720 init_call_elem,
721 destroy_call_elem,
722 sizeof(channel_data),
723 init_channel_elem,
724 destroy_channel_elem,
725 "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800726};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800727
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700728void grpc_server_register_completion_queue(grpc_server *server,
729 grpc_completion_queue *cq) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800730 size_t i, n;
731 for (i = 0; i < server->cq_count; i++) {
732 if (server->cqs[i] == cq) return;
733 }
Craig Tiller463f2372015-05-28 16:16:15 -0700734 GRPC_CQ_INTERNAL_REF(cq, "server");
Craig Tillerb56975c2015-06-15 10:11:16 -0700735 grpc_cq_mark_server_cq(cq);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800736 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800737 server->cqs = gpr_realloc(server->cqs,
738 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800739 server->cqs[n] = cq;
740}
741
David Garcia Quintase25e9282015-06-23 10:18:52 -0700742grpc_server *grpc_server_create_from_filters(
743 const grpc_channel_filter **filters, size_t filter_count,
744 const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800745 size_t i;
Alistair Veitch9d48ebf2015-06-01 10:01:03 -0700746 /* TODO(census): restore this once we finalize census filter etc.
747 int census_enabled = grpc_channel_args_is_census_enabled(args); */
748 int census_enabled = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800749
750 grpc_server *server = gpr_malloc(sizeof(grpc_server));
Craig Tiller60fd3612015-03-05 16:24:22 -0800751
752 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
753
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800754 memset(server, 0, sizeof(grpc_server));
755
Vijay Pai8931cdd2015-06-17 12:42:17 -0700756 gpr_mu_init(&server->mu_global);
757 gpr_mu_init(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800758
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800759 /* decremented by grpc_server_destroy */
760 gpr_ref_init(&server->internal_refcount, 1);
761 server->root_channel_data.next = server->root_channel_data.prev =
762 &server->root_channel_data;
763
Craig Tiller6a006ce2015-07-13 16:25:40 -0700764 /* TODO(ctiller): expose a channel_arg for this */
765 server->max_requested_calls = 32768;
766 server->request_freelist =
767 gpr_stack_lockfree_create(server->max_requested_calls);
768 for (i = 0; i < (size_t)server->max_requested_calls; i++) {
769 gpr_stack_lockfree_push(server->request_freelist, i);
770 }
771 request_matcher_init(&server->unregistered_request_matcher,
772 server->max_requested_calls);
773 server->requested_calls = gpr_malloc(server->max_requested_calls *
774 sizeof(*server->requested_calls));
Craig Tiller729b35a2015-07-13 12:36:47 -0700775
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800776 /* Server filter stack is:
777
778 server_surface_filter - for making surface API calls
779 grpc_server_census_filter (optional) - for stats collection and tracing
780 {passed in filter stack}
781 grpc_connected_channel_filter - for interfacing with transports */
782 server->channel_filter_count = filter_count + 1 + census_enabled;
783 server->channel_filters =
784 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
785 server->channel_filters[0] = &server_surface_filter;
Alistair Veitch9d48ebf2015-06-01 10:01:03 -0700786 /* TODO(census): restore this once we rework census filter
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800787 if (census_enabled) {
788 server->channel_filters[1] = &grpc_server_census_filter;
Alistair Veitch9686dab2015-05-26 14:26:47 -0700789 } */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800790 for (i = 0; i < filter_count; i++) {
791 server->channel_filters[i + 1 + census_enabled] = filters[i];
792 }
793
794 server->channel_args = grpc_channel_args_copy(args);
795
796 return server;
797}
798
Craig Tiller24be0f72015-02-10 14:04:22 -0800799static int streq(const char *a, const char *b) {
800 if (a == NULL && b == NULL) return 1;
801 if (a == NULL) return 0;
802 if (b == NULL) return 0;
803 return 0 == strcmp(a, b);
804}
805
806void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700807 const char *host) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800808 registered_method *m;
809 if (!method) {
Craig Tillerb76d05b2015-05-29 17:21:56 -0700810 gpr_log(GPR_ERROR,
811 "grpc_server_register_method method string cannot be NULL");
Craig Tiller24be0f72015-02-10 14:04:22 -0800812 return NULL;
813 }
814 for (m = server->registered_methods; m; m = m->next) {
815 if (streq(m->method, method) && streq(m->host, host)) {
816 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
817 host ? host : "*");
818 return NULL;
819 }
820 }
821 m = gpr_malloc(sizeof(registered_method));
822 memset(m, 0, sizeof(*m));
Craig Tiller6a006ce2015-07-13 16:25:40 -0700823 request_matcher_init(&m->request_matcher, server->max_requested_calls);
Craig Tiller24be0f72015-02-10 14:04:22 -0800824 m->method = gpr_strdup(method);
825 m->host = gpr_strdup(host);
826 m->next = server->registered_methods;
827 server->registered_methods = m;
828 return m;
829}
830
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800831void grpc_server_start(grpc_server *server) {
832 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800833 size_t i;
834
Craig Tillerec3257c2015-02-12 15:59:43 -0800835 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800836 for (i = 0; i < server->cq_count; i++) {
837 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
838 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800839
840 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800841 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800842 }
843}
844
Craig Tiller1064f8b2015-06-25 13:52:57 -0700845void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
846 grpc_channel_filter const **extra_filters,
847 size_t num_extra_filters, grpc_mdctx *mdctx,
848 const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800849 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
850 grpc_channel_filter const **filters =
851 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
852 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800853 size_t num_registered_methods;
854 size_t alloc;
855 registered_method *rm;
856 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800857 grpc_channel *channel;
858 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800859 grpc_mdstr *host;
860 grpc_mdstr *method;
861 gpr_uint32 hash;
862 gpr_uint32 slots;
863 gpr_uint32 probes;
864 gpr_uint32 max_probes = 0;
Craig Tillere039f032015-06-25 12:54:23 -0700865 grpc_transport_op op;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800866
867 for (i = 0; i < s->channel_filter_count; i++) {
868 filters[i] = s->channel_filters[i];
869 }
870 for (; i < s->channel_filter_count + num_extra_filters; i++) {
871 filters[i] = extra_filters[i - s->channel_filter_count];
872 }
873 filters[i] = &grpc_connected_channel_filter;
874
Craig Tiller20bc56d2015-02-12 09:02:56 -0800875 for (i = 0; i < s->cq_count; i++) {
Craig Tillere039f032015-06-25 12:54:23 -0700876 memset(&op, 0, sizeof(op));
877 op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
878 grpc_transport_perform_op(transport, &op);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800879 }
ctillerd79b4862014-12-17 16:36:59 -0800880
Julien Boeufc6f8d0a2015-05-11 22:40:02 -0700881 channel =
882 grpc_channel_create_from_filters(filters, num_filters, args, mdctx, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800883 chand = (channel_data *)grpc_channel_stack_element(
Craig Tillerc02c1d82015-04-07 16:21:55 -0700884 grpc_channel_get_channel_stack(channel), 0)
885 ->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800886 chand->server = s;
887 server_ref(s);
888 chand->channel = channel;
889
Craig Tiller04cc8be2015-02-10 16:11:22 -0800890 num_registered_methods = 0;
891 for (rm = s->registered_methods; rm; rm = rm->next) {
892 num_registered_methods++;
893 }
894 /* build a lookup table phrased in terms of mdstr's in this channels context
895 to quickly find registered methods */
896 if (num_registered_methods > 0) {
897 slots = 2 * num_registered_methods;
898 alloc = sizeof(channel_registered_method) * slots;
899 chand->registered_methods = gpr_malloc(alloc);
900 memset(chand->registered_methods, 0, alloc);
901 for (rm = s->registered_methods; rm; rm = rm->next) {
902 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
Craig Tillere76c9662015-02-11 13:18:31 -0800903 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800904 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800905 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
Craig Tillerc02c1d82015-04-07 16:21:55 -0700906 .server_registered_method != NULL;
Craig Tiller3b29b562015-02-11 12:58:46 -0800907 probes++)
908 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800909 if (probes > max_probes) max_probes = probes;
910 crm = &chand->registered_methods[(hash + probes) % slots];
911 crm->server_registered_method = rm;
912 crm->host = host;
913 crm->method = method;
914 }
915 chand->registered_method_slots = slots;
916 chand->registered_method_max_probes = max_probes;
917 }
918
Craig Tiller1064f8b2015-06-25 13:52:57 -0700919 grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
920 transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800921
Vijay Pai8931cdd2015-06-17 12:42:17 -0700922 gpr_mu_lock(&s->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800923 chand->next = &s->root_channel_data;
924 chand->prev = chand->next->prev;
925 chand->next->prev = chand->prev->next = chand;
Vijay Pai8931cdd2015-06-17 12:42:17 -0700926 gpr_mu_unlock(&s->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800927
928 gpr_free(filters);
Craig Tiller4b804102015-06-26 16:16:12 -0700929
930 GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
931 memset(&op, 0, sizeof(op));
932 op.set_accept_stream = accept_stream;
933 op.set_accept_stream_user_data = chand;
934 op.on_connectivity_state_change = &chand->channel_connectivity_changed;
935 op.connectivity_state = &chand->connectivity_state;
936 grpc_transport_perform_op(transport, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800937}
938
Craig Tiller6a006ce2015-07-13 16:25:40 -0700939typedef struct {
940 requested_call **requests;
941 size_t count;
942 size_t capacity;
943} request_killer;
944
945static void request_killer_init(request_killer *rk) {
946 memset(rk, 0, sizeof(*rk));
947}
948
949static void request_killer_add(request_killer *rk, requested_call *rc) {
950 if (rk->capacity == rk->count) {
951 rk->capacity = GPR_MAX(8, rk->capacity * 2);
952 rk->requests =
953 gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests));
954 }
955 rk->requests[rk->count++] = rc;
956}
957
958static void request_killer_add_request_matcher(request_killer *rk,
959 grpc_server *server,
960 request_matcher *rm) {
961 int request_id;
962 while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
963 request_killer_add(rk, &server->requested_calls[request_id]);
964 }
965}
966
967static void request_killer_run(request_killer *rk, grpc_server *server) {
968 size_t i;
969 for (i = 0; i < rk->count; i++) {
970 fail_call(server, rk->requests[i]);
971 }
972 gpr_free(rk->requests);
973}
974
Craig Tillerbce999f2015-05-27 09:55:51 -0700975void grpc_server_shutdown_and_notify(grpc_server *server,
976 grpc_completion_queue *cq, void *tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800977 listener *l;
Craig Tillerbd217572015-02-11 18:10:56 -0800978 registered_method *rm;
Craig Tillerbce999f2015-05-27 09:55:51 -0700979 shutdown_tag *sdt;
Craig Tillerff3ae682015-06-29 17:44:04 -0700980 channel_broadcaster broadcaster;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700981 request_killer reqkill;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800982
Craig Tiller7156fca2015-07-15 13:54:20 -0700983 GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
984
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800985 /* lock, and gather up some stuff to do */
Vijay Pai8931cdd2015-06-17 12:42:17 -0700986 gpr_mu_lock(&server->mu_global);
Craig Tiller97fc6a32015-07-08 15:31:35 -0700987 grpc_cq_begin_op(cq);
Craig Tilleree945e82015-05-26 16:15:34 -0700988 server->shutdown_tags =
989 gpr_realloc(server->shutdown_tags,
Craig Tiller208d2122015-05-29 08:50:08 -0700990 sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
Craig Tillerbce999f2015-05-27 09:55:51 -0700991 sdt = &server->shutdown_tags[server->num_shutdown_tags++];
992 sdt->tag = tag;
993 sdt->cq = cq;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700994 if (gpr_atm_acq_load(&server->shutdown_flag)) {
Vijay Pai8931cdd2015-06-17 12:42:17 -0700995 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800996 return;
997 }
998
Craig Tiller080d6c52015-07-10 10:23:10 -0700999 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
Craig Tillerab54f792015-07-08 08:34:20 -07001000
Craig Tillerff3ae682015-06-29 17:44:04 -07001001 channel_broadcaster_init(server, &broadcaster);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001002 request_killer_init(&reqkill);
nnoble0c475f02014-12-05 15:37:39 -08001003
Craig Tillerbd217572015-02-11 18:10:56 -08001004 /* collect all unregistered then registered calls */
Vijay Pai8931cdd2015-06-17 12:42:17 -07001005 gpr_mu_lock(&server->mu_call);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001006 request_killer_add_request_matcher(&reqkill, server,
1007 &server->unregistered_request_matcher);
Craig Tiller729b35a2015-07-13 12:36:47 -07001008 request_matcher_zombify_all_pending_calls(
1009 &server->unregistered_request_matcher);
Craig Tillerbd217572015-02-11 18:10:56 -08001010 for (rm = server->registered_methods; rm; rm = rm->next) {
Craig Tiller6a006ce2015-07-13 16:25:40 -07001011 request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher);
Craig Tiller729b35a2015-07-13 12:36:47 -07001012 request_matcher_zombify_all_pending_calls(&rm->request_matcher);
Craig Tillerbd217572015-02-11 18:10:56 -08001013 }
Vijay Pai8931cdd2015-06-17 12:42:17 -07001014 gpr_mu_unlock(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001015
Craig Tiller6a006ce2015-07-13 16:25:40 -07001016 gpr_atm_rel_store(&server->shutdown_flag, 1);
Craig Tillerdc627722015-05-26 15:27:02 -07001017 maybe_finish_shutdown(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001018 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001019
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001020 /* terminate all the requested calls */
Craig Tiller6a006ce2015-07-13 16:25:40 -07001021 request_killer_run(&reqkill, server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001022
1023 /* Shutdown listeners */
1024 for (l = server->listeners; l; l = l->next) {
1025 l->destroy(server, l->arg);
1026 }
Craig Tillerff3ae682015-06-29 17:44:04 -07001027
1028 channel_broadcaster_shutdown(&broadcaster, 1, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001029}
1030
Craig Tilleraec96aa2015-04-07 14:32:15 -07001031void grpc_server_listener_destroy_done(void *s) {
1032 grpc_server *server = s;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001033 gpr_mu_lock(&server->mu_global);
Craig Tilleraec96aa2015-04-07 14:32:15 -07001034 server->listeners_destroyed++;
Craig Tilleree945e82015-05-26 16:15:34 -07001035 maybe_finish_shutdown(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001036 gpr_mu_unlock(&server->mu_global);
Craig Tilleraec96aa2015-04-07 14:32:15 -07001037}
1038
Craig Tillerafa2d632015-05-26 16:39:13 -07001039void grpc_server_cancel_all_calls(grpc_server *server) {
Craig Tiller092d8d12015-07-04 22:35:00 -07001040 channel_broadcaster broadcaster;
Craig Tillerafa2d632015-05-26 16:39:13 -07001041
Craig Tiller092d8d12015-07-04 22:35:00 -07001042 gpr_mu_lock(&server->mu_global);
1043 channel_broadcaster_init(server, &broadcaster);
1044 gpr_mu_unlock(&server->mu_global);
Craig Tillerafa2d632015-05-26 16:39:13 -07001045
Craig Tiller092d8d12015-07-04 22:35:00 -07001046 channel_broadcaster_shutdown(&broadcaster, 0, 1);
Craig Tillerafa2d632015-05-26 16:39:13 -07001047}
1048
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001049void grpc_server_destroy(grpc_server *server) {
Craig Tilleraec96aa2015-04-07 14:32:15 -07001050 listener *l;
Craig Tiller872af022015-04-24 15:57:52 -07001051
Vijay Pai8931cdd2015-06-17 12:42:17 -07001052 gpr_mu_lock(&server->mu_global);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001053 GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
Craig Tilleree945e82015-05-26 16:15:34 -07001054 GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
Craig Tilleraec96aa2015-04-07 14:32:15 -07001055
1056 while (server->listeners) {
1057 l = server->listeners;
1058 server->listeners = l->next;
1059 gpr_free(l);
1060 }
1061
Vijay Pai8931cdd2015-06-17 12:42:17 -07001062 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001063
1064 server_unref(server);
1065}
1066
1067void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -08001068 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -08001069 grpc_pollset **pollsets,
1070 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001071 void (*destroy)(grpc_server *server, void *arg)) {
1072 listener *l = gpr_malloc(sizeof(listener));
1073 l->arg = arg;
1074 l->start = start;
1075 l->destroy = destroy;
1076 l->next = server->listeners;
1077 server->listeners = l;
1078}
1079
Craig Tiller9f28ac22015-01-27 17:01:29 -08001080static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -08001081 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001082 call_data *calld = NULL;
Craig Tiller729b35a2015-07-13 12:36:47 -07001083 request_matcher *request_matcher = NULL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001084 int request_id;
1085 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1086 fail_call(server, rc);
1087 return GRPC_CALL_OK;
1088 }
1089 request_id = gpr_stack_lockfree_pop(server->request_freelist);
1090 if (request_id == -1) {
1091 /* out of request ids: just fail this one */
Craig Tiller24be0f72015-02-10 14:04:22 -08001092 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001093 return GRPC_CALL_OK;
1094 }
Craig Tiller04cc8be2015-02-10 16:11:22 -08001095 switch (rc->type) {
Craig Tiller04cc8be2015-02-10 16:11:22 -08001096 case BATCH_CALL:
Craig Tiller729b35a2015-07-13 12:36:47 -07001097 request_matcher = &server->unregistered_request_matcher;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001098 break;
1099 case REGISTERED_CALL:
Craig Tiller729b35a2015-07-13 12:36:47 -07001100 request_matcher = &rc->data.registered.registered_method->request_matcher;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001101 break;
1102 }
Craig Tiller6a006ce2015-07-13 16:25:40 -07001103 server->requested_calls[request_id] = *rc;
1104 gpr_free(rc);
1105 if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) {
1106 /* this was the first queued request: we need to lock and start
1107 matching calls */
1108 gpr_mu_lock(&server->mu_call);
1109 while ((calld = request_matcher->pending_head) != NULL) {
1110 request_id = gpr_stack_lockfree_pop(request_matcher->requests);
1111 if (request_id == -1) break;
1112 request_matcher->pending_head = calld->pending_next;
1113 gpr_mu_unlock(&server->mu_call);
1114 gpr_mu_lock(&calld->mu_state);
1115 if (calld->state == ZOMBIED) {
1116 gpr_mu_unlock(&calld->mu_state);
1117 grpc_iomgr_closure_init(
1118 &calld->kill_zombie_closure, kill_zombie,
1119 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
1120 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
1121 } else {
1122 GPR_ASSERT(calld->state == PENDING);
1123 calld->state = ACTIVATED;
1124 gpr_mu_unlock(&calld->mu_state);
Craig Tillerb6450262015-07-14 07:12:19 -07001125 begin_call(server, calld, &server->requested_calls[request_id]);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001126 }
1127 gpr_mu_lock(&server->mu_call);
Craig Tiller729b35a2015-07-13 12:36:47 -07001128 }
Craig Tiller76d2c3b2015-07-07 11:46:01 -07001129 gpr_mu_unlock(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001130 }
Craig Tiller6a006ce2015-07-13 16:25:40 -07001131 return GRPC_CALL_OK;
Craig Tillercce17ac2015-01-20 09:29:28 -08001132}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001133
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001134grpc_call_error grpc_server_request_call(
1135 grpc_server *server, grpc_call **call, grpc_call_details *details,
1136 grpc_metadata_array *initial_metadata,
1137 grpc_completion_queue *cq_bound_to_call,
1138 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001139 requested_call *rc = gpr_malloc(sizeof(*rc));
murgatroid99ad7c20c2015-05-22 14:42:29 -07001140 GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
1141 initial_metadata, cq_bound_to_call,
1142 cq_for_notification, tag);
Craig Tillerb56975c2015-06-15 10:11:16 -07001143 if (!grpc_cq_is_server_cq(cq_for_notification)) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001144 gpr_free(rc);
Craig Tillerb56975c2015-06-15 10:11:16 -07001145 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1146 }
Craig Tiller97fc6a32015-07-08 15:31:35 -07001147 grpc_cq_begin_op(cq_for_notification);
1148 rc->type = BATCH_CALL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001149 rc->server = server;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001150 rc->tag = tag;
1151 rc->cq_bound_to_call = cq_bound_to_call;
1152 rc->cq_for_notification = cq_for_notification;
1153 rc->call = call;
1154 rc->data.batch.details = details;
1155 rc->data.batch.initial_metadata = initial_metadata;
1156 return queue_call_request(server, rc);
Craig Tiller24be0f72015-02-10 14:04:22 -08001157}
1158
1159grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -08001160 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
1161 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001162 grpc_completion_queue *cq_bound_to_call,
1163 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001164 requested_call *rc = gpr_malloc(sizeof(*rc));
Craig Tiller20bc56d2015-02-12 09:02:56 -08001165 registered_method *registered_method = rm;
Craig Tillerb56975c2015-06-15 10:11:16 -07001166 if (!grpc_cq_is_server_cq(cq_for_notification)) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001167 gpr_free(rc);
Craig Tillerb56975c2015-06-15 10:11:16 -07001168 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1169 }
Craig Tiller97fc6a32015-07-08 15:31:35 -07001170 grpc_cq_begin_op(cq_for_notification);
1171 rc->type = REGISTERED_CALL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001172 rc->server = server;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001173 rc->tag = tag;
1174 rc->cq_bound_to_call = cq_bound_to_call;
1175 rc->cq_for_notification = cq_for_notification;
1176 rc->call = call;
1177 rc->data.registered.registered_method = registered_method;
1178 rc->data.registered.deadline = deadline;
1179 rc->data.registered.initial_metadata = initial_metadata;
1180 rc->data.registered.optional_payload = optional_payload;
1181 return queue_call_request(server, rc);
Craig Tiller24be0f72015-02-10 14:04:22 -08001182}
1183
Craig Tiller64be9f72015-05-04 14:53:51 -07001184static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller24be0f72015-02-10 14:04:22 -08001185 void *tag);
Craig Tiller64be9f72015-05-04 14:53:51 -07001186static void publish_was_not_set(grpc_call *call, int success, void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001187 abort();
1188}
Craig Tiller24be0f72015-02-10 14:04:22 -08001189
Craig Tiller166e2502015-02-03 20:14:41 -08001190static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1191 gpr_slice slice = value->slice;
1192 size_t len = GPR_SLICE_LENGTH(slice);
1193
1194 if (len + 1 > *capacity) {
1195 *capacity = GPR_MAX(len + 1, *capacity * 2);
1196 *dest = gpr_realloc(*dest, *capacity);
1197 }
1198 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1199}
1200
Craig Tiller24be0f72015-02-10 14:04:22 -08001201static void begin_call(grpc_server *server, call_data *calld,
1202 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001203 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001204 grpc_ioreq req[2];
1205 grpc_ioreq *r = req;
1206
1207 /* called once initial metadata has been read by the call, but BEFORE
1208 the ioreq to fetch it out of the call has been executed.
1209 This means metadata related fields can be relied on in calld, but to
1210 fill in the metadata array passed by the client, we need to perform
1211 an ioreq op, that should complete immediately. */
1212
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001213 grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
1214 *rc->call = calld->call;
1215 calld->cq_new = rc->cq_for_notification;
Craig Tiller24be0f72015-02-10 14:04:22 -08001216 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001217 case BATCH_CALL:
Craig Tiller04c5d4b2015-06-26 17:21:41 -07001218 GPR_ASSERT(calld->host != NULL);
1219 GPR_ASSERT(calld->path != NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001220 cpstr(&rc->data.batch.details->host,
1221 &rc->data.batch.details->host_capacity, calld->host);
1222 cpstr(&rc->data.batch.details->method,
1223 &rc->data.batch.details->method_capacity, calld->path);
Masood Malekghassemibf177c82015-04-27 12:14:38 -07001224 rc->data.batch.details->deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001225 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1226 r->data.recv_metadata = rc->data.batch.initial_metadata;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001227 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001228 r++;
1229 publish = publish_registered_or_batch;
1230 break;
1231 case REGISTERED_CALL:
1232 *rc->data.registered.deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001233 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1234 r->data.recv_metadata = rc->data.registered.initial_metadata;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001235 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001236 r++;
1237 if (rc->data.registered.optional_payload) {
1238 r->op = GRPC_IOREQ_RECV_MESSAGE;
1239 r->data.recv_message = rc->data.registered.optional_payload;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001240 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001241 r++;
1242 }
1243 publish = publish_registered_or_batch;
1244 break;
1245 }
1246
Craig Tiller4df412b2015-04-28 07:57:54 -07001247 GRPC_CALL_INTERNAL_REF(calld->call, "server");
Craig Tiller12cf5372015-07-09 13:48:11 -07001248 grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001249}
1250
1251static void done_request_event(void *req, grpc_cq_completion *c) {
Craig Tiller6a006ce2015-07-13 16:25:40 -07001252 requested_call *rc = req;
1253 grpc_server *server = rc->server;
1254
1255 if (rc >= server->requested_calls &&
1256 rc < server->requested_calls + server->max_requested_calls) {
1257 gpr_stack_lockfree_push(server->request_freelist,
1258 rc - server->requested_calls);
1259 } else {
1260 gpr_free(req);
1261 }
Craig Tiller24be0f72015-02-10 14:04:22 -08001262}
1263
1264static void fail_call(grpc_server *server, requested_call *rc) {
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001265 *rc->call = NULL;
Craig Tiller24be0f72015-02-10 14:04:22 -08001266 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001267 case BATCH_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001268 rc->data.batch.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001269 break;
1270 case REGISTERED_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001271 rc->data.registered.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001272 break;
1273 }
Craig Tiller12cf5372015-07-09 13:48:11 -07001274 grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
1275 &rc->completion);
Craig Tiller24be0f72015-02-10 14:04:22 -08001276}
1277
Craig Tiller64be9f72015-05-04 14:53:51 -07001278static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller97fc6a32015-07-08 15:31:35 -07001279 void *prc) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001280 grpc_call_element *elem =
1281 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001282 requested_call *rc = prc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001283 call_data *calld = elem->call_data;
Craig Tiller12cf5372015-07-09 13:48:11 -07001284 grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
1285 &rc->completion);
Craig Tillerc0122582015-07-09 13:21:36 -07001286 GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
Craig Tiller24be0f72015-02-10 14:04:22 -08001287}
1288
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001289const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1290 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001291}
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001292
1293int grpc_server_has_open_connections(grpc_server *server) {
1294 int r;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001295 gpr_mu_lock(&server->mu_global);
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001296 r = server->root_channel_data.next != &server->root_channel_data;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001297 gpr_mu_unlock(&server->mu_global);
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001298 return r;
1299}