blob: c3d8046787223567b3570d1bef5ea143041a21a0 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
Craig Tiller6a006ce2015-07-13 16:25:40 -070039#include <grpc/support/alloc.h>
40#include <grpc/support/log.h>
41#include <grpc/support/string_util.h>
42#include <grpc/support/useful.h>
43
Hongyu Chene09dc782015-08-21 11:28:33 -070044#include "src/core/census/grpc_filter.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045#include "src/core/channel/channel_args.h"
46#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080047#include "src/core/iomgr/iomgr.h"
Craig Tiller6a006ce2015-07-13 16:25:40 -070048#include "src/core/support/stack_lockfree.h"
Craig Tiller485d7762015-01-23 12:54:05 -080049#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050#include "src/core/surface/call.h"
51#include "src/core/surface/channel.h"
52#include "src/core/surface/completion_queue.h"
Craig Tiller60fd3612015-03-05 16:24:22 -080053#include "src/core/surface/init.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080054#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080055
56typedef struct listener {
57 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080058 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
59 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080060 void (*destroy)(grpc_server *server, void *arg);
61 struct listener *next;
62} listener;
63
64typedef struct call_data call_data;
65typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080066typedef struct registered_method registered_method;
67
68typedef struct {
69 call_data *next;
70 call_data *prev;
71} call_link;
72
Craig Tiller0e919562015-04-28 14:03:47 -070073typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
Craig Tiller24be0f72015-02-10 14:04:22 -080074
Craig Tiller97fc6a32015-07-08 15:31:35 -070075typedef struct requested_call {
Craig Tiller24be0f72015-02-10 14:04:22 -080076 requested_call_type type;
77 void *tag;
Craig Tiller6a006ce2015-07-13 16:25:40 -070078 grpc_server *server;
Craig Tillerf9e6adf2015-05-06 11:45:59 -070079 grpc_completion_queue *cq_bound_to_call;
80 grpc_completion_queue *cq_for_notification;
81 grpc_call **call;
Craig Tiller97fc6a32015-07-08 15:31:35 -070082 grpc_cq_completion completion;
Craig Tiller24be0f72015-02-10 14:04:22 -080083 union {
84 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080085 grpc_call_details *details;
86 grpc_metadata_array *initial_metadata;
87 } batch;
88 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080089 registered_method *registered_method;
90 gpr_timespec *deadline;
91 grpc_metadata_array *initial_metadata;
92 grpc_byte_buffer **optional_payload;
93 } registered;
94 } data;
95} requested_call;
96
Craig Tiller24be0f72015-02-10 14:04:22 -080097typedef struct channel_registered_method {
98 registered_method *server_registered_method;
99 grpc_mdstr *method;
100 grpc_mdstr *host;
101} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800102
103struct channel_data {
104 grpc_server *server;
Craig Tillere039f032015-06-25 12:54:23 -0700105 grpc_connectivity_state connectivity_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800106 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800107 grpc_mdstr *path_key;
108 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800109 /* linked list of all channels on a server */
110 channel_data *next;
111 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800112 channel_registered_method *registered_methods;
113 gpr_uint32 registered_method_slots;
114 gpr_uint32 registered_method_max_probes;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700115 grpc_iomgr_closure finish_destroy_channel_closure;
Craig Tillere039f032015-06-25 12:54:23 -0700116 grpc_iomgr_closure channel_connectivity_changed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800117};
118
Craig Tillerbce999f2015-05-27 09:55:51 -0700119typedef struct shutdown_tag {
120 void *tag;
121 grpc_completion_queue *cq;
Craig Tiller97fc6a32015-07-08 15:31:35 -0700122 grpc_cq_completion completion;
Craig Tillerbce999f2015-05-27 09:55:51 -0700123} shutdown_tag;
124
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800125typedef enum {
126 /* waiting for metadata */
127 NOT_STARTED,
128 /* inital metadata read, not flow controlled in yet */
129 PENDING,
130 /* flow controlled in, on completion queue */
131 ACTIVATED,
132 /* cancelled before being queued */
133 ZOMBIED
134} call_state;
135
Craig Tiller729b35a2015-07-13 12:36:47 -0700136typedef struct request_matcher request_matcher;
137
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800138struct call_data {
139 grpc_call *call;
140
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700141 /** protects state */
142 gpr_mu mu_state;
143 /** the current state of a call - see call_state */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800144 call_state state;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700145
Craig Tillercce17ac2015-01-20 09:29:28 -0800146 grpc_mdstr *path;
147 grpc_mdstr *host;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700148 gpr_timespec deadline;
149 int got_initial_metadata;
Craig Tillercce17ac2015-01-20 09:29:28 -0800150
Craig Tiller20bc56d2015-02-12 09:02:56 -0800151 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800152
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700153 grpc_stream_op_buffer *recv_ops;
154 grpc_stream_state *recv_state;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700155 grpc_iomgr_closure *on_done_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700156
Craig Tiller1e6facb2015-06-11 22:47:11 -0700157 grpc_iomgr_closure server_on_recv;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700158 grpc_iomgr_closure kill_zombie_closure;
159
Craig Tiller729b35a2015-07-13 12:36:47 -0700160 call_data *pending_next;
161};
162
163struct request_matcher {
164 call_data *pending_head;
165 call_data *pending_tail;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700166 gpr_stack_lockfree *requests;
Craig Tiller729b35a2015-07-13 12:36:47 -0700167};
168
169struct registered_method {
170 char *method;
171 char *host;
172 request_matcher request_matcher;
173 registered_method *next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800174};
175
Craig Tillerff3ae682015-06-29 17:44:04 -0700176typedef struct {
177 grpc_channel **channels;
178 size_t num_channels;
179} channel_broadcaster;
180
Craig Tiller729b35a2015-07-13 12:36:47 -0700181struct grpc_server {
182 size_t channel_filter_count;
183 const grpc_channel_filter **channel_filters;
184 grpc_channel_args *channel_args;
185
186 grpc_completion_queue **cqs;
187 grpc_pollset **pollsets;
188 size_t cq_count;
189
190 /* The two following mutexes control access to server-state
191 mu_global controls access to non-call-related state (e.g., channel state)
192 mu_call controls access to call-related state (e.g., the call lists)
193
194 If they are ever required to be nested, you must lock mu_global
195 before mu_call. This is currently used in shutdown processing
196 (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
197 gpr_mu mu_global; /* mutex for server and channel state */
198 gpr_mu mu_call; /* mutex for call-specific state */
199
200 registered_method *registered_methods;
201 request_matcher unregistered_request_matcher;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700202 /** free list of available requested_calls indices */
203 gpr_stack_lockfree *request_freelist;
204 /** requested call backing data */
205 requested_call *requested_calls;
Craig Tiller32ca48c2015-09-10 11:47:15 -0700206 size_t max_requested_calls;
Craig Tiller729b35a2015-07-13 12:36:47 -0700207
Craig Tiller6a006ce2015-07-13 16:25:40 -0700208 gpr_atm shutdown_flag;
Craig Tiller729b35a2015-07-13 12:36:47 -0700209 gpr_uint8 shutdown_published;
210 size_t num_shutdown_tags;
211 shutdown_tag *shutdown_tags;
212
213 channel_data root_channel_data;
214
215 listener *listeners;
216 int listeners_destroyed;
217 gpr_refcount internal_refcount;
218
219 /** when did we print the last shutdown progress message */
220 gpr_timespec last_shutdown_message_time;
221};
222
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800223#define SERVER_FROM_CALL_ELEM(elem) \
224 (((channel_data *)(elem)->channel_data)->server)
225
Craig Tiller24be0f72015-02-10 14:04:22 -0800226static void begin_call(grpc_server *server, call_data *calld,
227 requested_call *rc);
228static void fail_call(grpc_server *server, requested_call *rc);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700229/* Before calling maybe_finish_shutdown, we must hold mu_global and not
230 hold mu_call */
Craig Tiller52760dd2015-05-29 23:09:26 -0700231static void maybe_finish_shutdown(grpc_server *server);
Craig Tiller24be0f72015-02-10 14:04:22 -0800232
Craig Tiller729b35a2015-07-13 12:36:47 -0700233/*
234 * channel broadcaster
235 */
Craig Tillerff3ae682015-06-29 17:44:04 -0700236
237/* assumes server locked */
238static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
239 channel_data *c;
240 size_t count = 0;
Craig Tiller079a11b2015-06-30 10:07:15 -0700241 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
242 count++;
Craig Tillerff3ae682015-06-29 17:44:04 -0700243 }
244 cb->num_channels = count;
245 cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
246 count = 0;
Craig Tiller079a11b2015-06-30 10:07:15 -0700247 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
Craig Tiller49924e02015-06-29 22:42:33 -0700248 cb->channels[count++] = c->channel;
Craig Tillerff3ae682015-06-29 17:44:04 -0700249 GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
Craig Tillerff3ae682015-06-29 17:44:04 -0700250 }
251}
252
253struct shutdown_cleanup_args {
254 grpc_iomgr_closure closure;
255 gpr_slice slice;
256};
257
258static void shutdown_cleanup(void *arg, int iomgr_status_ignored) {
259 struct shutdown_cleanup_args *a = arg;
260 gpr_slice_unref(a->slice);
261 gpr_free(a);
262}
263
Craig Tiller079a11b2015-06-30 10:07:15 -0700264static void send_shutdown(grpc_channel *channel, int send_goaway,
265 int send_disconnect) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700266 grpc_transport_op op;
267 struct shutdown_cleanup_args *sc;
268 grpc_channel_element *elem;
269
270 memset(&op, 0, sizeof(op));
Craig Tillerff3ae682015-06-29 17:44:04 -0700271 op.send_goaway = send_goaway;
272 sc = gpr_malloc(sizeof(*sc));
273 sc->slice = gpr_slice_from_copied_string("Server shutdown");
274 op.goaway_message = &sc->slice;
275 op.goaway_status = GRPC_STATUS_OK;
276 op.disconnect = send_disconnect;
277 grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc);
278 op.on_consumed = &sc->closure;
279
Craig Tiller079a11b2015-06-30 10:07:15 -0700280 elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
Craig Tillerff3ae682015-06-29 17:44:04 -0700281 elem->filter->start_transport_op(elem, &op);
282}
283
Craig Tiller079a11b2015-06-30 10:07:15 -0700284static void channel_broadcaster_shutdown(channel_broadcaster *cb,
Craig Tiller12cf5372015-07-09 13:48:11 -0700285 int send_goaway,
286 int force_disconnect) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700287 size_t i;
288
289 for (i = 0; i < cb->num_channels; i++) {
Craig Tiller9188d7a2015-07-05 12:44:37 -0700290 send_shutdown(cb->channels[i], send_goaway, force_disconnect);
Craig Tillerff3ae682015-06-29 17:44:04 -0700291 GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
292 }
293 gpr_free(cb->channels);
294}
295
Craig Tiller729b35a2015-07-13 12:36:47 -0700296/*
297 * request_matcher
298 */
Craig Tillerff3ae682015-06-29 17:44:04 -0700299
Craig Tiller6a006ce2015-07-13 16:25:40 -0700300static void request_matcher_init(request_matcher *request_matcher,
Craig Tiller32ca48c2015-09-10 11:47:15 -0700301 size_t entries) {
Craig Tiller729b35a2015-07-13 12:36:47 -0700302 memset(request_matcher, 0, sizeof(*request_matcher));
Craig Tiller6a006ce2015-07-13 16:25:40 -0700303 request_matcher->requests = gpr_stack_lockfree_create(entries);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800304}
305
Craig Tiller6a006ce2015-07-13 16:25:40 -0700306static void request_matcher_destroy(request_matcher *request_matcher) {
307 GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1);
308 gpr_stack_lockfree_destroy(request_matcher->requests);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800309}
310
Craig Tiller729b35a2015-07-13 12:36:47 -0700311static void kill_zombie(void *elem, int success) {
312 grpc_call_destroy(grpc_call_from_top_element(elem));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313}
314
Craig Tiller729b35a2015-07-13 12:36:47 -0700315static void request_matcher_zombify_all_pending_calls(
316 request_matcher *request_matcher) {
317 while (request_matcher->pending_head) {
318 call_data *calld = request_matcher->pending_head;
319 request_matcher->pending_head = calld->pending_next;
Craig Tillerb6450262015-07-14 07:12:19 -0700320 gpr_mu_lock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700321 calld->state = ZOMBIED;
Craig Tillerb6450262015-07-14 07:12:19 -0700322 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700323 grpc_iomgr_closure_init(
324 &calld->kill_zombie_closure, kill_zombie,
325 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
326 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800327 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800328}
329
Craig Tiller1191e212015-07-30 14:49:02 -0700330static void request_matcher_kill_requests(grpc_server *server,
331 request_matcher *rm) {
332 int request_id;
333 while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
334 fail_call(server, &server->requested_calls[request_id]);
335 }
336}
337
Craig Tiller729b35a2015-07-13 12:36:47 -0700338/*
339 * server proper
340 */
341
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800342static void server_ref(grpc_server *server) {
343 gpr_ref(&server->internal_refcount);
344}
345
Craig Tilleree945e82015-05-26 16:15:34 -0700346static void server_delete(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800347 registered_method *rm;
Craig Tiller89504612015-04-27 11:48:46 -0700348 size_t i;
Craig Tilleree945e82015-05-26 16:15:34 -0700349 grpc_channel_args_destroy(server->channel_args);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700350 gpr_mu_destroy(&server->mu_global);
351 gpr_mu_destroy(&server->mu_call);
Craig Tilleree945e82015-05-26 16:15:34 -0700352 gpr_free(server->channel_filters);
Craig Tilleree945e82015-05-26 16:15:34 -0700353 while ((rm = server->registered_methods) != NULL) {
354 server->registered_methods = rm->next;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700355 request_matcher_destroy(&rm->request_matcher);
Craig Tilleree945e82015-05-26 16:15:34 -0700356 gpr_free(rm->method);
357 gpr_free(rm->host);
Craig Tilleree945e82015-05-26 16:15:34 -0700358 gpr_free(rm);
359 }
360 for (i = 0; i < server->cq_count; i++) {
Craig Tiller463f2372015-05-28 16:16:15 -0700361 GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
Craig Tilleree945e82015-05-26 16:15:34 -0700362 }
Craig Tiller6a006ce2015-07-13 16:25:40 -0700363 request_matcher_destroy(&server->unregistered_request_matcher);
364 gpr_stack_lockfree_destroy(server->request_freelist);
Craig Tilleree945e82015-05-26 16:15:34 -0700365 gpr_free(server->cqs);
366 gpr_free(server->pollsets);
367 gpr_free(server->shutdown_tags);
Craig Tiller6a006ce2015-07-13 16:25:40 -0700368 gpr_free(server->requested_calls);
Craig Tilleree945e82015-05-26 16:15:34 -0700369 gpr_free(server);
370}
371
372static void server_unref(grpc_server *server) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800373 if (gpr_unref(&server->internal_refcount)) {
Craig Tilleree945e82015-05-26 16:15:34 -0700374 server_delete(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800375 }
376}
377
378static int is_channel_orphaned(channel_data *chand) {
379 return chand->next == chand;
380}
381
382static void orphan_channel(channel_data *chand) {
383 chand->next->prev = chand->prev;
384 chand->prev->next = chand->next;
385 chand->next = chand->prev = chand;
386}
387
ctiller58393c22015-01-07 14:03:30 -0800388static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800389 channel_data *chand = cd;
390 grpc_server *server = chand->server;
Craig Tiller9ec2a522015-05-29 22:46:54 -0700391 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800392 server_unref(server);
393}
394
395static void destroy_channel(channel_data *chand) {
396 if (is_channel_orphaned(chand)) return;
397 GPR_ASSERT(chand->server != NULL);
398 orphan_channel(chand);
399 server_ref(chand->server);
Craig Tiller52760dd2015-05-29 23:09:26 -0700400 maybe_finish_shutdown(chand->server);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700401 chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
402 chand->finish_destroy_channel_closure.cb_arg = chand;
403 grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800404}
405
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700406static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
Craig Tiller729b35a2015-07-13 12:36:47 -0700407 request_matcher *request_matcher) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800408 call_data *calld = elem->call_data;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700409 int request_id;
410
Craig Tiller9835a132015-07-22 17:37:39 -0700411 if (gpr_atm_acq_load(&server->shutdown_flag)) {
412 gpr_mu_lock(&calld->mu_state);
413 calld->state = ZOMBIED;
414 gpr_mu_unlock(&calld->mu_state);
415 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
416 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
417 return;
418 }
419
Craig Tiller6a006ce2015-07-13 16:25:40 -0700420 request_id = gpr_stack_lockfree_pop(request_matcher->requests);
421 if (request_id == -1) {
422 gpr_mu_lock(&server->mu_call);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700423 gpr_mu_lock(&calld->mu_state);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800424 calld->state = PENDING;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700425 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700426 if (request_matcher->pending_head == NULL) {
427 request_matcher->pending_tail = request_matcher->pending_head = calld;
428 } else {
429 request_matcher->pending_tail->pending_next = calld;
430 request_matcher->pending_tail = calld;
431 }
432 calld->pending_next = NULL;
Vijay Pai8931cdd2015-06-17 12:42:17 -0700433 gpr_mu_unlock(&server->mu_call);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800434 } else {
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700435 gpr_mu_lock(&calld->mu_state);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800436 calld->state = ACTIVATED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700437 gpr_mu_unlock(&calld->mu_state);
Craig Tiller6a006ce2015-07-13 16:25:40 -0700438 begin_call(server, calld, &server->requested_calls[request_id]);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800439 }
440}
441
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800442static void start_new_rpc(grpc_call_element *elem) {
443 channel_data *chand = elem->channel_data;
444 call_data *calld = elem->call_data;
445 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800446 gpr_uint32 i;
447 gpr_uint32 hash;
448 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800449
Craig Tiller04cc8be2015-02-10 16:11:22 -0800450 if (chand->registered_methods && calld->path && calld->host) {
Craig Tillera94beff2015-02-17 22:02:06 -0800451 /* TODO(ctiller): unify these two searches */
Craig Tiller04cc8be2015-02-10 16:11:22 -0800452 /* check for an exact match with host */
453 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
Craig Tillerc6d6d902015-07-09 15:53:50 -0700454 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800455 rm = &chand->registered_methods[(hash + i) %
456 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800457 if (!rm) break;
458 if (rm->host != calld->host) continue;
459 if (rm->method != calld->path) continue;
Craig Tiller729b35a2015-07-13 12:36:47 -0700460 finish_start_new_rpc(server, elem,
461 &rm->server_registered_method->request_matcher);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800462 return;
463 }
464 /* check for a wildcard method definition (no host set) */
465 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800466 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800467 rm = &chand->registered_methods[(hash + i) %
468 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800469 if (!rm) break;
470 if (rm->host != NULL) continue;
471 if (rm->method != calld->path) continue;
Craig Tiller729b35a2015-07-13 12:36:47 -0700472 finish_start_new_rpc(server, elem,
473 &rm->server_registered_method->request_matcher);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800474 return;
475 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800476 }
Craig Tiller729b35a2015-07-13 12:36:47 -0700477 finish_start_new_rpc(server, elem, &server->unregistered_request_matcher);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800478}
479
Craig Tilleree945e82015-05-26 16:15:34 -0700480static int num_listeners(grpc_server *server) {
481 listener *l;
482 int n = 0;
483 for (l = server->listeners; l; l = l->next) {
484 n++;
485 }
486 return n;
487}
488
Craig Tiller97fc6a32015-07-08 15:31:35 -0700489static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
490 server_unref(server);
491}
492
Craig Tillerab54f792015-07-08 08:34:20 -0700493static int num_channels(grpc_server *server) {
494 channel_data *chand;
495 int n = 0;
496 for (chand = server->root_channel_data.next;
497 chand != &server->root_channel_data; chand = chand->next) {
498 n++;
499 }
500 return n;
501}
502
Craig Tiller1191e212015-07-30 14:49:02 -0700503static void kill_pending_work_locked(grpc_server *server) {
504 registered_method *rm;
505 request_matcher_kill_requests(server, &server->unregistered_request_matcher);
506 request_matcher_zombify_all_pending_calls(
507 &server->unregistered_request_matcher);
508 for (rm = server->registered_methods; rm; rm = rm->next) {
509 request_matcher_kill_requests(server, &rm->request_matcher);
510 request_matcher_zombify_all_pending_calls(&rm->request_matcher);
511 }
512}
513
Craig Tillerdc627722015-05-26 15:27:02 -0700514static void maybe_finish_shutdown(grpc_server *server) {
Craig Tillerbce999f2015-05-27 09:55:51 -0700515 size_t i;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700516 if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
Craig Tillerc4a1f522015-05-29 22:32:04 -0700517 return;
518 }
Vijay Pai8931cdd2015-06-17 12:42:17 -0700519
Craig Tiller1191e212015-07-30 14:49:02 -0700520 kill_pending_work_locked(server);
521
Craig Tillerab54f792015-07-08 08:34:20 -0700522 if (server->root_channel_data.next != &server->root_channel_data ||
523 server->listeners_destroyed < num_listeners(server)) {
Craig Tiller58bbc862015-07-13 09:51:17 -0700524 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
525 server->last_shutdown_message_time),
526 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
Craig Tiller080d6c52015-07-10 10:23:10 -0700527 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
Craig Tillerab54f792015-07-08 08:34:20 -0700528 gpr_log(GPR_DEBUG,
529 "Waiting for %d channels and %d/%d listeners to be destroyed"
530 " before shutting down server",
531 num_channels(server),
532 num_listeners(server) - server->listeners_destroyed,
533 num_listeners(server));
534 }
Craig Tillerc4a1f522015-05-29 22:32:04 -0700535 return;
536 }
537 server->shutdown_published = 1;
538 for (i = 0; i < server->num_shutdown_tags; i++) {
Craig Tiller97fc6a32015-07-08 15:31:35 -0700539 server_ref(server);
Craig Tiller12cf5372015-07-09 13:48:11 -0700540 grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1,
541 done_shutdown_event, server,
542 &server->shutdown_tags[i].completion);
Craig Tillerdc627722015-05-26 15:27:02 -0700543 }
544}
545
Craig Tiller6902ad22015-04-16 08:01:49 -0700546static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
547 grpc_call_element *elem = user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800548 channel_data *chand = elem->channel_data;
549 call_data *calld = elem->call_data;
Craig Tiller6902ad22015-04-16 08:01:49 -0700550 if (md->key == chand->path_key) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700551 calld->path = GRPC_MDSTR_REF(md->value);
Craig Tiller6902ad22015-04-16 08:01:49 -0700552 return NULL;
553 } else if (md->key == chand->authority_key) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700554 calld->host = GRPC_MDSTR_REF(md->value);
Craig Tiller6902ad22015-04-16 08:01:49 -0700555 return NULL;
556 }
557 return md;
558}
559
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700560static void server_on_recv(void *ptr, int success) {
561 grpc_call_element *elem = ptr;
Craig Tiller6902ad22015-04-16 08:01:49 -0700562 call_data *calld = elem->call_data;
Craig Tiller94329d02015-07-23 09:52:11 -0700563 gpr_timespec op_deadline;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700564
565 if (success && !calld->got_initial_metadata) {
566 size_t i;
567 size_t nops = calld->recv_ops->nops;
568 grpc_stream_op *ops = calld->recv_ops->ops;
569 for (i = 0; i < nops; i++) {
570 grpc_stream_op *op = &ops[i];
571 if (op->type != GRPC_OP_METADATA) continue;
Craig Tiller205aee12015-04-16 14:46:41 -0700572 grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
Craig Tiller94329d02015-07-23 09:52:11 -0700573 op_deadline = op->data.metadata.deadline;
574 if (0 !=
575 gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
Craig Tiller6902ad22015-04-16 08:01:49 -0700576 calld->deadline = op->data.metadata.deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800577 }
Craig Tillerc4b56b62015-07-23 17:44:11 -0700578 if (calld->host && calld->path) {
579 calld->got_initial_metadata = 1;
580 start_new_rpc(elem);
581 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800582 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700583 }
584 }
585
586 switch (*calld->recv_state) {
Craig Tiller06aeea72015-04-23 10:54:45 -0700587 case GRPC_STREAM_OPEN:
588 break;
589 case GRPC_STREAM_SEND_CLOSED:
590 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700591 case GRPC_STREAM_RECV_CLOSED:
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700592 gpr_mu_lock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700593 if (calld->state == NOT_STARTED) {
594 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700595 gpr_mu_unlock(&calld->mu_state);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700596 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
597 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700598 } else {
599 gpr_mu_unlock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700600 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800601 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700602 case GRPC_STREAM_CLOSED:
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700603 gpr_mu_lock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700604 if (calld->state == NOT_STARTED) {
605 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700606 gpr_mu_unlock(&calld->mu_state);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700607 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
608 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700609 } else if (calld->state == PENDING) {
Craig Tillerc9d03822015-05-20 16:08:45 -0700610 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700611 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700612 /* zombied call will be destroyed when it's removed from the pending
613 queue... later */
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700614 } else {
615 gpr_mu_unlock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700616 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800617 break;
618 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700619
Craig Tiller1e6facb2015-06-11 22:47:11 -0700620 calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700621}
622
Craig Tillerb7959a02015-06-25 08:50:54 -0700623static void server_mutate_op(grpc_call_element *elem,
624 grpc_transport_stream_op *op) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700625 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700626
627 if (op->recv_ops) {
628 /* substitute our callback for the higher callback */
629 calld->recv_ops = op->recv_ops;
630 calld->recv_state = op->recv_state;
631 calld->on_done_recv = op->on_done_recv;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700632 op->on_done_recv = &calld->server_on_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700633 }
Craig Tiller50d9db52015-04-23 10:52:14 -0700634}
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700635
Craig Tillere039f032015-06-25 12:54:23 -0700636static void server_start_transport_stream_op(grpc_call_element *elem,
637 grpc_transport_stream_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -0700638 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
639 server_mutate_op(elem, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700640 grpc_call_next_op(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800641}
642
Craig Tillere039f032015-06-25 12:54:23 -0700643static void accept_stream(void *cd, grpc_transport *transport,
644 const void *transport_server_data) {
645 channel_data *chand = cd;
646 /* create a call */
Craig Tiller3e7c6a72015-07-31 16:17:04 -0700647 grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL,
648 0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
Craig Tillere039f032015-06-25 12:54:23 -0700649}
650
651static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
652 channel_data *chand = cd;
653 grpc_server *server = chand->server;
654 if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
655 grpc_transport_op op;
656 memset(&op, 0, sizeof(op));
657 op.on_connectivity_state_change = &chand->channel_connectivity_changed,
658 op.connectivity_state = &chand->connectivity_state;
659 grpc_channel_next_op(grpc_channel_stack_element(
660 grpc_channel_get_channel_stack(chand->channel), 0),
661 &op);
662 } else {
663 gpr_mu_lock(&server->mu_global);
664 destroy_channel(chand);
665 gpr_mu_unlock(&server->mu_global);
666 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
667 }
668}
669
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800670static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700671 const void *server_transport_data,
Craig Tillerb7959a02015-06-25 08:50:54 -0700672 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800673 call_data *calld = elem->call_data;
674 channel_data *chand = elem->channel_data;
675 memset(calld, 0, sizeof(call_data));
Craig Tiller143e7bf2015-07-13 08:41:49 -0700676 calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800677 calld->call = grpc_call_from_top_element(elem);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700678 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800679
Craig Tiller1e6facb2015-06-11 22:47:11 -0700680 grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
681
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800682 server_ref(chand->server);
Craig Tiller50d9db52015-04-23 10:52:14 -0700683
Craig Tiller482ef8b2015-04-23 11:38:20 -0700684 if (initial_op) server_mutate_op(elem, initial_op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800685}
686
687static void destroy_call_elem(grpc_call_element *elem) {
688 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800689 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800690
Craig Tiller729b35a2015-07-13 12:36:47 -0700691 GPR_ASSERT(calld->state != PENDING);
Craig Tiller092d8d12015-07-04 22:35:00 -0700692
Craig Tiller4df31a62015-01-30 09:44:31 -0800693 if (calld->host) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700694 GRPC_MDSTR_UNREF(calld->host);
Craig Tiller4df31a62015-01-30 09:44:31 -0800695 }
696 if (calld->path) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700697 GRPC_MDSTR_UNREF(calld->path);
Craig Tiller4df31a62015-01-30 09:44:31 -0800698 }
699
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700700 gpr_mu_destroy(&calld->mu_state);
701
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800702 server_unref(chand->server);
703}
704
Craig Tiller079a11b2015-06-30 10:07:15 -0700705static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800706 const grpc_channel_args *args,
707 grpc_mdctx *metadata_context, int is_first,
708 int is_last) {
709 channel_data *chand = elem->channel_data;
710 GPR_ASSERT(is_first);
711 GPR_ASSERT(!is_last);
712 chand->server = NULL;
713 chand->channel = NULL;
Craig Tiller6999c092015-07-22 08:14:12 -0700714 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path", 0);
Craig Tillerd6c98df2015-08-18 09:33:44 -0700715 chand->authority_key =
716 grpc_mdstr_from_string(metadata_context, ":authority", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800717 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800718 chand->registered_methods = NULL;
Craig Tillere039f032015-06-25 12:54:23 -0700719 chand->connectivity_state = GRPC_CHANNEL_IDLE;
720 grpc_iomgr_closure_init(&chand->channel_connectivity_changed,
721 channel_connectivity_changed, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800722}
723
724static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800725 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800726 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800727 if (chand->registered_methods) {
728 for (i = 0; i < chand->registered_method_slots; i++) {
729 if (chand->registered_methods[i].method) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700730 GRPC_MDSTR_UNREF(chand->registered_methods[i].method);
Craig Tillerec3257c2015-02-12 15:59:43 -0800731 }
732 if (chand->registered_methods[i].host) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700733 GRPC_MDSTR_UNREF(chand->registered_methods[i].host);
Craig Tillerec3257c2015-02-12 15:59:43 -0800734 }
735 }
736 gpr_free(chand->registered_methods);
737 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800738 if (chand->server) {
Vijay Pai8931cdd2015-06-17 12:42:17 -0700739 gpr_mu_lock(&chand->server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800740 chand->next->prev = chand->prev;
741 chand->prev->next = chand->next;
742 chand->next = chand->prev = chand;
Craig Tillerb76d05b2015-05-29 17:21:56 -0700743 maybe_finish_shutdown(chand->server);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700744 gpr_mu_unlock(&chand->server->mu_global);
Craig Tiller1a65a232015-07-06 10:22:32 -0700745 GRPC_MDSTR_UNREF(chand->path_key);
746 GRPC_MDSTR_UNREF(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800747 server_unref(chand->server);
748 }
749}
750
751static const grpc_channel_filter server_surface_filter = {
Craig Tillere039f032015-06-25 12:54:23 -0700752 server_start_transport_stream_op,
753 grpc_channel_next_op,
Craig Tillerb76d05b2015-05-29 17:21:56 -0700754 sizeof(call_data),
755 init_call_elem,
756 destroy_call_elem,
757 sizeof(channel_data),
758 init_channel_elem,
759 destroy_channel_elem,
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700760 grpc_call_next_get_peer,
Craig Tillerb76d05b2015-05-29 17:21:56 -0700761 "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800762};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800763
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700764void grpc_server_register_completion_queue(grpc_server *server,
Nicolas "Pixel" Nobleebb51402015-07-23 02:41:33 +0200765 grpc_completion_queue *cq,
766 void *reserved) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800767 size_t i, n;
Nicolas "Pixel" Noble45992882015-07-29 23:52:14 +0200768 GPR_ASSERT(!reserved);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800769 for (i = 0; i < server->cq_count; i++) {
770 if (server->cqs[i] == cq) return;
771 }
Craig Tiller463f2372015-05-28 16:16:15 -0700772 GRPC_CQ_INTERNAL_REF(cq, "server");
Craig Tillerb56975c2015-06-15 10:11:16 -0700773 grpc_cq_mark_server_cq(cq);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800774 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800775 server->cqs = gpr_realloc(server->cqs,
776 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800777 server->cqs[n] = cq;
778}
779
David Garcia Quintase25e9282015-06-23 10:18:52 -0700780grpc_server *grpc_server_create_from_filters(
781 const grpc_channel_filter **filters, size_t filter_count,
782 const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800783 size_t i;
Alistair Veitch9d48ebf2015-06-01 10:01:03 -0700784 /* TODO(census): restore this once we finalize census filter etc.
785 int census_enabled = grpc_channel_args_is_census_enabled(args); */
786 int census_enabled = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800787
788 grpc_server *server = gpr_malloc(sizeof(grpc_server));
Craig Tiller60fd3612015-03-05 16:24:22 -0800789
790 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
791
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800792 memset(server, 0, sizeof(grpc_server));
793
Vijay Pai8931cdd2015-06-17 12:42:17 -0700794 gpr_mu_init(&server->mu_global);
795 gpr_mu_init(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800796
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800797 /* decremented by grpc_server_destroy */
798 gpr_ref_init(&server->internal_refcount, 1);
799 server->root_channel_data.next = server->root_channel_data.prev =
800 &server->root_channel_data;
801
Craig Tiller6a006ce2015-07-13 16:25:40 -0700802 /* TODO(ctiller): expose a channel_arg for this */
803 server->max_requested_calls = 32768;
804 server->request_freelist =
805 gpr_stack_lockfree_create(server->max_requested_calls);
806 for (i = 0; i < (size_t)server->max_requested_calls; i++) {
807 gpr_stack_lockfree_push(server->request_freelist, i);
808 }
809 request_matcher_init(&server->unregistered_request_matcher,
810 server->max_requested_calls);
811 server->requested_calls = gpr_malloc(server->max_requested_calls *
812 sizeof(*server->requested_calls));
Craig Tiller729b35a2015-07-13 12:36:47 -0700813
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800814 /* Server filter stack is:
815
816 server_surface_filter - for making surface API calls
817 grpc_server_census_filter (optional) - for stats collection and tracing
818 {passed in filter stack}
819 grpc_connected_channel_filter - for interfacing with transports */
Craig Tiller32ca48c2015-09-10 11:47:15 -0700820 server->channel_filter_count = filter_count + 1u + (census_enabled ? 1u : 0u);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800821 server->channel_filters =
822 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
823 server->channel_filters[0] = &server_surface_filter;
824 if (census_enabled) {
825 server->channel_filters[1] = &grpc_server_census_filter;
Hongyu Chenf68e4722015-08-07 18:06:42 -0700826 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800827 for (i = 0; i < filter_count; i++) {
Craig Tiller32ca48c2015-09-10 11:47:15 -0700828 server->channel_filters[i + 1u + (census_enabled ? 1u : 0u)] = filters[i];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800829 }
830
831 server->channel_args = grpc_channel_args_copy(args);
832
833 return server;
834}
835
Craig Tiller24be0f72015-02-10 14:04:22 -0800836static int streq(const char *a, const char *b) {
837 if (a == NULL && b == NULL) return 1;
838 if (a == NULL) return 0;
839 if (b == NULL) return 0;
840 return 0 == strcmp(a, b);
841}
842
843void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700844 const char *host) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800845 registered_method *m;
846 if (!method) {
Craig Tillerb76d05b2015-05-29 17:21:56 -0700847 gpr_log(GPR_ERROR,
848 "grpc_server_register_method method string cannot be NULL");
Craig Tiller24be0f72015-02-10 14:04:22 -0800849 return NULL;
850 }
851 for (m = server->registered_methods; m; m = m->next) {
852 if (streq(m->method, method) && streq(m->host, host)) {
853 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
854 host ? host : "*");
855 return NULL;
856 }
857 }
858 m = gpr_malloc(sizeof(registered_method));
859 memset(m, 0, sizeof(*m));
Craig Tiller6a006ce2015-07-13 16:25:40 -0700860 request_matcher_init(&m->request_matcher, server->max_requested_calls);
Craig Tiller24be0f72015-02-10 14:04:22 -0800861 m->method = gpr_strdup(method);
862 m->host = gpr_strdup(host);
863 m->next = server->registered_methods;
864 server->registered_methods = m;
865 return m;
866}
867
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800868void grpc_server_start(grpc_server *server) {
869 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800870 size_t i;
871
Craig Tillerec3257c2015-02-12 15:59:43 -0800872 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800873 for (i = 0; i < server->cq_count; i++) {
874 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
875 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800876
877 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800878 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800879 }
880}
881
Craig Tiller1064f8b2015-06-25 13:52:57 -0700882void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
883 grpc_channel_filter const **extra_filters,
884 size_t num_extra_filters, grpc_mdctx *mdctx,
885 const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800886 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
887 grpc_channel_filter const **filters =
888 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
889 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800890 size_t num_registered_methods;
891 size_t alloc;
892 registered_method *rm;
893 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800894 grpc_channel *channel;
895 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800896 grpc_mdstr *host;
897 grpc_mdstr *method;
898 gpr_uint32 hash;
899 gpr_uint32 slots;
900 gpr_uint32 probes;
901 gpr_uint32 max_probes = 0;
Craig Tillere039f032015-06-25 12:54:23 -0700902 grpc_transport_op op;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800903
904 for (i = 0; i < s->channel_filter_count; i++) {
905 filters[i] = s->channel_filters[i];
906 }
907 for (; i < s->channel_filter_count + num_extra_filters; i++) {
908 filters[i] = extra_filters[i - s->channel_filter_count];
909 }
910 filters[i] = &grpc_connected_channel_filter;
911
Craig Tiller20bc56d2015-02-12 09:02:56 -0800912 for (i = 0; i < s->cq_count; i++) {
Craig Tillere039f032015-06-25 12:54:23 -0700913 memset(&op, 0, sizeof(op));
914 op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
915 grpc_transport_perform_op(transport, &op);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800916 }
ctillerd79b4862014-12-17 16:36:59 -0800917
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700918 channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
919 mdctx, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800920 chand = (channel_data *)grpc_channel_stack_element(
Craig Tillerc02c1d82015-04-07 16:21:55 -0700921 grpc_channel_get_channel_stack(channel), 0)
922 ->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800923 chand->server = s;
924 server_ref(s);
925 chand->channel = channel;
926
Craig Tiller04cc8be2015-02-10 16:11:22 -0800927 num_registered_methods = 0;
928 for (rm = s->registered_methods; rm; rm = rm->next) {
929 num_registered_methods++;
930 }
931 /* build a lookup table phrased in terms of mdstr's in this channels context
932 to quickly find registered methods */
933 if (num_registered_methods > 0) {
934 slots = 2 * num_registered_methods;
935 alloc = sizeof(channel_registered_method) * slots;
936 chand->registered_methods = gpr_malloc(alloc);
937 memset(chand->registered_methods, 0, alloc);
938 for (rm = s->registered_methods; rm; rm = rm->next) {
Craig Tiller6999c092015-07-22 08:14:12 -0700939 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host, 0) : NULL;
940 method = grpc_mdstr_from_string(mdctx, rm->method, 0);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800941 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800942 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
Craig Tillerc02c1d82015-04-07 16:21:55 -0700943 .server_registered_method != NULL;
Craig Tiller3b29b562015-02-11 12:58:46 -0800944 probes++)
945 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800946 if (probes > max_probes) max_probes = probes;
947 crm = &chand->registered_methods[(hash + probes) % slots];
948 crm->server_registered_method = rm;
949 crm->host = host;
950 crm->method = method;
951 }
952 chand->registered_method_slots = slots;
953 chand->registered_method_max_probes = max_probes;
954 }
955
Craig Tiller1064f8b2015-06-25 13:52:57 -0700956 grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
957 transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800958
Vijay Pai8931cdd2015-06-17 12:42:17 -0700959 gpr_mu_lock(&s->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800960 chand->next = &s->root_channel_data;
961 chand->prev = chand->next->prev;
962 chand->next->prev = chand->prev->next = chand;
Vijay Pai8931cdd2015-06-17 12:42:17 -0700963 gpr_mu_unlock(&s->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800964
965 gpr_free(filters);
Craig Tiller4b804102015-06-26 16:16:12 -0700966
967 GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
968 memset(&op, 0, sizeof(op));
969 op.set_accept_stream = accept_stream;
970 op.set_accept_stream_user_data = chand;
971 op.on_connectivity_state_change = &chand->channel_connectivity_changed;
972 op.connectivity_state = &chand->connectivity_state;
Craig Tiller98371d92015-07-30 16:00:47 -0700973 op.disconnect = gpr_atm_acq_load(&s->shutdown_flag);
Craig Tiller4b804102015-06-26 16:16:12 -0700974 grpc_transport_perform_op(transport, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800975}
976
murgatroid9900a3dab2015-08-19 11:15:38 -0700977void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) {
978 (void) done_arg;
979 gpr_free(storage);
980}
981
Craig Tillerbce999f2015-05-27 09:55:51 -0700982void grpc_server_shutdown_and_notify(grpc_server *server,
983 grpc_completion_queue *cq, void *tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800984 listener *l;
Craig Tillerbce999f2015-05-27 09:55:51 -0700985 shutdown_tag *sdt;
Craig Tillerff3ae682015-06-29 17:44:04 -0700986 channel_broadcaster broadcaster;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800987
Craig Tiller7156fca2015-07-15 13:54:20 -0700988 GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
989
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800990 /* lock, and gather up some stuff to do */
Vijay Pai8931cdd2015-06-17 12:42:17 -0700991 gpr_mu_lock(&server->mu_global);
Craig Tiller97fc6a32015-07-08 15:31:35 -0700992 grpc_cq_begin_op(cq);
murgatroid9900a3dab2015-08-19 11:15:38 -0700993 if (server->shutdown_published) {
994 grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL,
995 gpr_malloc(sizeof(grpc_cq_completion)));
996 gpr_mu_unlock(&server->mu_global);
997 return;
998 }
Craig Tilleree945e82015-05-26 16:15:34 -0700999 server->shutdown_tags =
1000 gpr_realloc(server->shutdown_tags,
Craig Tiller208d2122015-05-29 08:50:08 -07001001 sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
Craig Tillerbce999f2015-05-27 09:55:51 -07001002 sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1003 sdt->tag = tag;
1004 sdt->cq = cq;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001005 if (gpr_atm_acq_load(&server->shutdown_flag)) {
Vijay Pai8931cdd2015-06-17 12:42:17 -07001006 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001007 return;
1008 }
1009
Craig Tiller080d6c52015-07-10 10:23:10 -07001010 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
Craig Tillerab54f792015-07-08 08:34:20 -07001011
Craig Tillerff3ae682015-06-29 17:44:04 -07001012 channel_broadcaster_init(server, &broadcaster);
nnoble0c475f02014-12-05 15:37:39 -08001013
Craig Tillerbd217572015-02-11 18:10:56 -08001014 /* collect all unregistered then registered calls */
Vijay Pai8931cdd2015-06-17 12:42:17 -07001015 gpr_mu_lock(&server->mu_call);
Craig Tiller1191e212015-07-30 14:49:02 -07001016 kill_pending_work_locked(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001017 gpr_mu_unlock(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001018
Craig Tiller6a006ce2015-07-13 16:25:40 -07001019 gpr_atm_rel_store(&server->shutdown_flag, 1);
Craig Tillerdc627722015-05-26 15:27:02 -07001020 maybe_finish_shutdown(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001021 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001022
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001023 /* Shutdown listeners */
1024 for (l = server->listeners; l; l = l->next) {
1025 l->destroy(server, l->arg);
1026 }
Craig Tillerff3ae682015-06-29 17:44:04 -07001027
1028 channel_broadcaster_shutdown(&broadcaster, 1, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001029}
1030
Craig Tilleraec96aa2015-04-07 14:32:15 -07001031void grpc_server_listener_destroy_done(void *s) {
1032 grpc_server *server = s;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001033 gpr_mu_lock(&server->mu_global);
Craig Tilleraec96aa2015-04-07 14:32:15 -07001034 server->listeners_destroyed++;
Craig Tilleree945e82015-05-26 16:15:34 -07001035 maybe_finish_shutdown(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001036 gpr_mu_unlock(&server->mu_global);
Craig Tilleraec96aa2015-04-07 14:32:15 -07001037}
1038
Craig Tillerafa2d632015-05-26 16:39:13 -07001039void grpc_server_cancel_all_calls(grpc_server *server) {
Craig Tiller092d8d12015-07-04 22:35:00 -07001040 channel_broadcaster broadcaster;
Craig Tillerafa2d632015-05-26 16:39:13 -07001041
Craig Tiller092d8d12015-07-04 22:35:00 -07001042 gpr_mu_lock(&server->mu_global);
1043 channel_broadcaster_init(server, &broadcaster);
1044 gpr_mu_unlock(&server->mu_global);
Craig Tillerafa2d632015-05-26 16:39:13 -07001045
Craig Tiller092d8d12015-07-04 22:35:00 -07001046 channel_broadcaster_shutdown(&broadcaster, 0, 1);
Craig Tillerafa2d632015-05-26 16:39:13 -07001047}
1048
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001049void grpc_server_destroy(grpc_server *server) {
Craig Tilleraec96aa2015-04-07 14:32:15 -07001050 listener *l;
Craig Tiller872af022015-04-24 15:57:52 -07001051
Vijay Pai8931cdd2015-06-17 12:42:17 -07001052 gpr_mu_lock(&server->mu_global);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001053 GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
Craig Tilleree945e82015-05-26 16:15:34 -07001054 GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
Craig Tilleraec96aa2015-04-07 14:32:15 -07001055
1056 while (server->listeners) {
1057 l = server->listeners;
1058 server->listeners = l->next;
1059 gpr_free(l);
1060 }
1061
Vijay Pai8931cdd2015-06-17 12:42:17 -07001062 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001063
1064 server_unref(server);
1065}
1066
1067void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -08001068 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -08001069 grpc_pollset **pollsets,
1070 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001071 void (*destroy)(grpc_server *server, void *arg)) {
1072 listener *l = gpr_malloc(sizeof(listener));
1073 l->arg = arg;
1074 l->start = start;
1075 l->destroy = destroy;
1076 l->next = server->listeners;
1077 server->listeners = l;
1078}
1079
Craig Tiller9f28ac22015-01-27 17:01:29 -08001080static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -08001081 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001082 call_data *calld = NULL;
Craig Tiller729b35a2015-07-13 12:36:47 -07001083 request_matcher *request_matcher = NULL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001084 int request_id;
1085 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1086 fail_call(server, rc);
1087 return GRPC_CALL_OK;
1088 }
1089 request_id = gpr_stack_lockfree_pop(server->request_freelist);
1090 if (request_id == -1) {
1091 /* out of request ids: just fail this one */
Craig Tiller24be0f72015-02-10 14:04:22 -08001092 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001093 return GRPC_CALL_OK;
1094 }
Craig Tiller04cc8be2015-02-10 16:11:22 -08001095 switch (rc->type) {
Craig Tiller04cc8be2015-02-10 16:11:22 -08001096 case BATCH_CALL:
Craig Tiller729b35a2015-07-13 12:36:47 -07001097 request_matcher = &server->unregistered_request_matcher;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001098 break;
1099 case REGISTERED_CALL:
Craig Tiller729b35a2015-07-13 12:36:47 -07001100 request_matcher = &rc->data.registered.registered_method->request_matcher;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001101 break;
1102 }
Craig Tiller6a006ce2015-07-13 16:25:40 -07001103 server->requested_calls[request_id] = *rc;
1104 gpr_free(rc);
1105 if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) {
1106 /* this was the first queued request: we need to lock and start
1107 matching calls */
1108 gpr_mu_lock(&server->mu_call);
1109 while ((calld = request_matcher->pending_head) != NULL) {
1110 request_id = gpr_stack_lockfree_pop(request_matcher->requests);
1111 if (request_id == -1) break;
1112 request_matcher->pending_head = calld->pending_next;
1113 gpr_mu_unlock(&server->mu_call);
1114 gpr_mu_lock(&calld->mu_state);
1115 if (calld->state == ZOMBIED) {
1116 gpr_mu_unlock(&calld->mu_state);
1117 grpc_iomgr_closure_init(
1118 &calld->kill_zombie_closure, kill_zombie,
1119 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
1120 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
1121 } else {
1122 GPR_ASSERT(calld->state == PENDING);
1123 calld->state = ACTIVATED;
1124 gpr_mu_unlock(&calld->mu_state);
Craig Tillerb6450262015-07-14 07:12:19 -07001125 begin_call(server, calld, &server->requested_calls[request_id]);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001126 }
1127 gpr_mu_lock(&server->mu_call);
Craig Tiller729b35a2015-07-13 12:36:47 -07001128 }
Craig Tiller76d2c3b2015-07-07 11:46:01 -07001129 gpr_mu_unlock(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001130 }
Craig Tiller6a006ce2015-07-13 16:25:40 -07001131 return GRPC_CALL_OK;
Craig Tillercce17ac2015-01-20 09:29:28 -08001132}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001133
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001134grpc_call_error grpc_server_request_call(
1135 grpc_server *server, grpc_call **call, grpc_call_details *details,
1136 grpc_metadata_array *initial_metadata,
1137 grpc_completion_queue *cq_bound_to_call,
1138 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001139 requested_call *rc = gpr_malloc(sizeof(*rc));
murgatroid99ad7c20c2015-05-22 14:42:29 -07001140 GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
1141 initial_metadata, cq_bound_to_call,
1142 cq_for_notification, tag);
Craig Tillerb56975c2015-06-15 10:11:16 -07001143 if (!grpc_cq_is_server_cq(cq_for_notification)) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001144 gpr_free(rc);
Craig Tillerb56975c2015-06-15 10:11:16 -07001145 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1146 }
Craig Tiller97fc6a32015-07-08 15:31:35 -07001147 grpc_cq_begin_op(cq_for_notification);
Craig Tiller9928d392015-08-18 09:40:24 -07001148 details->reserved = NULL;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001149 rc->type = BATCH_CALL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001150 rc->server = server;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001151 rc->tag = tag;
1152 rc->cq_bound_to_call = cq_bound_to_call;
1153 rc->cq_for_notification = cq_for_notification;
1154 rc->call = call;
1155 rc->data.batch.details = details;
1156 rc->data.batch.initial_metadata = initial_metadata;
1157 return queue_call_request(server, rc);
Craig Tiller24be0f72015-02-10 14:04:22 -08001158}
1159
1160grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -08001161 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
1162 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001163 grpc_completion_queue *cq_bound_to_call,
1164 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001165 requested_call *rc = gpr_malloc(sizeof(*rc));
Craig Tiller20bc56d2015-02-12 09:02:56 -08001166 registered_method *registered_method = rm;
Craig Tillerb56975c2015-06-15 10:11:16 -07001167 if (!grpc_cq_is_server_cq(cq_for_notification)) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001168 gpr_free(rc);
Craig Tillerb56975c2015-06-15 10:11:16 -07001169 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1170 }
Craig Tiller97fc6a32015-07-08 15:31:35 -07001171 grpc_cq_begin_op(cq_for_notification);
1172 rc->type = REGISTERED_CALL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001173 rc->server = server;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001174 rc->tag = tag;
1175 rc->cq_bound_to_call = cq_bound_to_call;
1176 rc->cq_for_notification = cq_for_notification;
1177 rc->call = call;
1178 rc->data.registered.registered_method = registered_method;
1179 rc->data.registered.deadline = deadline;
1180 rc->data.registered.initial_metadata = initial_metadata;
1181 rc->data.registered.optional_payload = optional_payload;
1182 return queue_call_request(server, rc);
Craig Tiller24be0f72015-02-10 14:04:22 -08001183}
1184
Craig Tiller64be9f72015-05-04 14:53:51 -07001185static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller24be0f72015-02-10 14:04:22 -08001186 void *tag);
Craig Tiller64be9f72015-05-04 14:53:51 -07001187static void publish_was_not_set(grpc_call *call, int success, void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001188 abort();
1189}
Craig Tiller24be0f72015-02-10 14:04:22 -08001190
Craig Tiller166e2502015-02-03 20:14:41 -08001191static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1192 gpr_slice slice = value->slice;
1193 size_t len = GPR_SLICE_LENGTH(slice);
1194
1195 if (len + 1 > *capacity) {
1196 *capacity = GPR_MAX(len + 1, *capacity * 2);
1197 *dest = gpr_realloc(*dest, *capacity);
1198 }
1199 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1200}
1201
Craig Tiller24be0f72015-02-10 14:04:22 -08001202static void begin_call(grpc_server *server, call_data *calld,
1203 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001204 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001205 grpc_ioreq req[2];
1206 grpc_ioreq *r = req;
1207
1208 /* called once initial metadata has been read by the call, but BEFORE
1209 the ioreq to fetch it out of the call has been executed.
1210 This means metadata related fields can be relied on in calld, but to
1211 fill in the metadata array passed by the client, we need to perform
1212 an ioreq op, that should complete immediately. */
1213
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001214 grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
1215 *rc->call = calld->call;
1216 calld->cq_new = rc->cq_for_notification;
Craig Tiller24be0f72015-02-10 14:04:22 -08001217 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001218 case BATCH_CALL:
Craig Tiller04c5d4b2015-06-26 17:21:41 -07001219 GPR_ASSERT(calld->host != NULL);
1220 GPR_ASSERT(calld->path != NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001221 cpstr(&rc->data.batch.details->host,
1222 &rc->data.batch.details->host_capacity, calld->host);
1223 cpstr(&rc->data.batch.details->method,
1224 &rc->data.batch.details->method_capacity, calld->path);
Masood Malekghassemibf177c82015-04-27 12:14:38 -07001225 rc->data.batch.details->deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001226 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1227 r->data.recv_metadata = rc->data.batch.initial_metadata;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001228 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001229 r++;
1230 publish = publish_registered_or_batch;
1231 break;
1232 case REGISTERED_CALL:
1233 *rc->data.registered.deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001234 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1235 r->data.recv_metadata = rc->data.registered.initial_metadata;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001236 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001237 r++;
1238 if (rc->data.registered.optional_payload) {
1239 r->op = GRPC_IOREQ_RECV_MESSAGE;
1240 r->data.recv_message = rc->data.registered.optional_payload;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001241 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001242 r++;
1243 }
1244 publish = publish_registered_or_batch;
1245 break;
1246 }
1247
Craig Tiller4df412b2015-04-28 07:57:54 -07001248 GRPC_CALL_INTERNAL_REF(calld->call, "server");
Craig Tiller32ca48c2015-09-10 11:47:15 -07001249 grpc_call_start_ioreq_and_call_back(calld->call, req, (size_t)(r - req),
1250 publish, rc);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001251}
1252
1253static void done_request_event(void *req, grpc_cq_completion *c) {
Craig Tiller6a006ce2015-07-13 16:25:40 -07001254 requested_call *rc = req;
1255 grpc_server *server = rc->server;
1256
1257 if (rc >= server->requested_calls &&
1258 rc < server->requested_calls + server->max_requested_calls) {
1259 gpr_stack_lockfree_push(server->request_freelist,
1260 rc - server->requested_calls);
1261 } else {
1262 gpr_free(req);
1263 }
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001264
1265 server_unref(server);
Craig Tiller24be0f72015-02-10 14:04:22 -08001266}
1267
1268static void fail_call(grpc_server *server, requested_call *rc) {
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001269 *rc->call = NULL;
Craig Tiller24be0f72015-02-10 14:04:22 -08001270 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001271 case BATCH_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001272 rc->data.batch.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001273 break;
1274 case REGISTERED_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001275 rc->data.registered.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001276 break;
1277 }
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001278 server_ref(server);
Craig Tiller12cf5372015-07-09 13:48:11 -07001279 grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
1280 &rc->completion);
Craig Tiller24be0f72015-02-10 14:04:22 -08001281}
1282
Craig Tiller64be9f72015-05-04 14:53:51 -07001283static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller97fc6a32015-07-08 15:31:35 -07001284 void *prc) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001285 grpc_call_element *elem =
1286 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001287 requested_call *rc = prc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001288 call_data *calld = elem->call_data;
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001289 channel_data *chand = elem->channel_data;
1290 server_ref(chand->server);
Craig Tiller12cf5372015-07-09 13:48:11 -07001291 grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
1292 &rc->completion);
Craig Tillerc0122582015-07-09 13:21:36 -07001293 GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
Craig Tiller24be0f72015-02-10 14:04:22 -08001294}
1295
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001296const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1297 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001298}
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001299
1300int grpc_server_has_open_connections(grpc_server *server) {
1301 int r;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001302 gpr_mu_lock(&server->mu_global);
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001303 r = server->root_channel_data.next != &server->root_channel_data;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001304 gpr_mu_unlock(&server->mu_global);
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001305 return r;
1306}