blob: 2e013ea74215ce601cc3432822efc576e6537737 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
39#include "src/core/channel/census_filter.h"
40#include "src/core/channel/channel_args.h"
41#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/surface/call.h"
45#include "src/core/surface/channel.h"
46#include "src/core/surface/completion_queue.h"
Craig Tiller60fd3612015-03-05 16:24:22 -080047#include "src/core/surface/init.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080048#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051#include <grpc/support/useful.h>
52
53typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
54
55typedef struct listener {
56 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080057 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
58 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080059 void (*destroy)(grpc_server *server, void *arg);
60 struct listener *next;
61} listener;
62
63typedef struct call_data call_data;
64typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080065typedef struct registered_method registered_method;
66
67typedef struct {
68 call_data *next;
69 call_data *prev;
70} call_link;
71
72typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type;
73
74typedef struct {
75 requested_call_type type;
76 void *tag;
77 union {
78 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080079 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080080 grpc_call **call;
81 grpc_call_details *details;
82 grpc_metadata_array *initial_metadata;
83 } batch;
84 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080085 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080086 grpc_call **call;
87 registered_method *registered_method;
88 gpr_timespec *deadline;
89 grpc_metadata_array *initial_metadata;
90 grpc_byte_buffer **optional_payload;
91 } registered;
92 } data;
93} requested_call;
94
95typedef struct {
96 requested_call *calls;
97 size_t count;
98 size_t capacity;
99} requested_call_array;
100
101struct registered_method {
102 char *method;
103 char *host;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800104 call_data *pending;
Craig Tiller24be0f72015-02-10 14:04:22 -0800105 requested_call_array requested;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800106 grpc_completion_queue *cq;
Craig Tiller24be0f72015-02-10 14:04:22 -0800107 registered_method *next;
108};
109
110typedef struct channel_registered_method {
111 registered_method *server_registered_method;
112 grpc_mdstr *method;
113 grpc_mdstr *host;
114} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800115
116struct channel_data {
117 grpc_server *server;
118 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800119 grpc_mdstr *path_key;
120 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800121 /* linked list of all channels on a server */
122 channel_data *next;
123 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800124 channel_registered_method *registered_methods;
125 gpr_uint32 registered_method_slots;
126 gpr_uint32 registered_method_max_probes;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800127};
128
129struct grpc_server {
130 size_t channel_filter_count;
131 const grpc_channel_filter **channel_filters;
132 grpc_channel_args *channel_args;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800133 grpc_completion_queue *unregistered_cq;
Craig Tillerec3257c2015-02-12 15:59:43 -0800134
Craig Tiller20bc56d2015-02-12 09:02:56 -0800135 grpc_completion_queue **cqs;
136 grpc_pollset **pollsets;
137 size_t cq_count;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800138
139 gpr_mu mu;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700140 gpr_cv cv;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800141
Craig Tiller24be0f72015-02-10 14:04:22 -0800142 registered_method *registered_methods;
143 requested_call_array requested_calls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800144
145 gpr_uint8 shutdown;
Craig Tilleraea2fc02015-02-17 16:54:53 -0800146 size_t num_shutdown_tags;
147 void **shutdown_tags;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800148
149 call_data *lists[CALL_LIST_COUNT];
150 channel_data root_channel_data;
151
152 listener *listeners;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700153 int listeners_destroyed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800154 gpr_refcount internal_refcount;
155};
156
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800157typedef enum {
158 /* waiting for metadata */
159 NOT_STARTED,
160 /* inital metadata read, not flow controlled in yet */
161 PENDING,
162 /* flow controlled in, on completion queue */
163 ACTIVATED,
164 /* cancelled before being queued */
165 ZOMBIED
166} call_state;
167
Craig Tillerfb189f82015-02-03 12:07:07 -0800168typedef struct legacy_data {
Craig Tiller24be0f72015-02-10 14:04:22 -0800169 grpc_metadata_array initial_metadata;
Craig Tillerfb189f82015-02-03 12:07:07 -0800170} legacy_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800171
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800172struct call_data {
173 grpc_call *call;
174
175 call_state state;
176 gpr_timespec deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800177 grpc_mdstr *path;
178 grpc_mdstr *host;
179
180 legacy_data *legacy;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800181 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800182
Craig Tiller04cc8be2015-02-10 16:11:22 -0800183 call_data **root[CALL_LIST_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800184 call_link links[CALL_LIST_COUNT];
185};
186
187#define SERVER_FROM_CALL_ELEM(elem) \
188 (((channel_data *)(elem)->channel_data)->server)
189
190static void do_nothing(void *unused, grpc_op_error ignored) {}
191
Craig Tiller24be0f72015-02-10 14:04:22 -0800192static void begin_call(grpc_server *server, call_data *calld,
193 requested_call *rc);
194static void fail_call(grpc_server *server, requested_call *rc);
195
Craig Tiller3b29b562015-02-11 12:58:46 -0800196static int call_list_join(call_data **root, call_data *call, call_list list) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800197 GPR_ASSERT(!call->root[list]);
198 call->root[list] = root;
199 if (!*root) {
200 *root = call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800201 call->links[list].next = call->links[list].prev = call;
202 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800203 call->links[list].next = *root;
204 call->links[list].prev = (*root)->links[list].prev;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800205 call->links[list].next->links[list].prev =
206 call->links[list].prev->links[list].next = call;
207 }
208 return 1;
209}
210
Craig Tiller04cc8be2015-02-10 16:11:22 -0800211static call_data *call_list_remove_head(call_data **root, call_list list) {
212 call_data *out = *root;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213 if (out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800214 out->root[list] = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800215 if (out->links[list].next == out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800216 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800217 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800218 *root = out->links[list].next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800219 out->links[list].next->links[list].prev = out->links[list].prev;
220 out->links[list].prev->links[list].next = out->links[list].next;
221 }
222 }
223 return out;
224}
225
Craig Tiller04cc8be2015-02-10 16:11:22 -0800226static int call_list_remove(call_data *call, call_list list) {
227 call_data **root = call->root[list];
228 if (root == NULL) return 0;
229 call->root[list] = NULL;
230 if (*root == call) {
231 *root = call->links[list].next;
232 if (*root == call) {
233 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800234 return 1;
235 }
236 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800237 GPR_ASSERT(*root != call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800238 call->links[list].next->links[list].prev = call->links[list].prev;
239 call->links[list].prev->links[list].next = call->links[list].next;
240 return 1;
241}
242
Craig Tiller24be0f72015-02-10 14:04:22 -0800243static void requested_call_array_destroy(requested_call_array *array) {
244 gpr_free(array->calls);
245}
246
247static requested_call *requested_call_array_add(requested_call_array *array) {
248 requested_call *rc;
249 if (array->count == array->capacity) {
250 array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
251 array->calls =
252 gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
253 }
254 rc = &array->calls[array->count++];
255 memset(rc, 0, sizeof(*rc));
256 return rc;
257}
258
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800259static void server_ref(grpc_server *server) {
260 gpr_ref(&server->internal_refcount);
261}
262
263static void server_unref(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800264 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800265 if (gpr_unref(&server->internal_refcount)) {
266 grpc_channel_args_destroy(server->channel_args);
267 gpr_mu_destroy(&server->mu);
Craig Tilleraec96aa2015-04-07 14:32:15 -0700268 gpr_cv_destroy(&server->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800269 gpr_free(server->channel_filters);
Craig Tiller24be0f72015-02-10 14:04:22 -0800270 requested_call_array_destroy(&server->requested_calls);
Craig Tillerec3257c2015-02-12 15:59:43 -0800271 while ((rm = server->registered_methods) != NULL) {
272 server->registered_methods = rm->next;
273 gpr_free(rm->method);
274 gpr_free(rm->host);
275 requested_call_array_destroy(&rm->requested);
276 gpr_free(rm);
277 }
278 gpr_free(server->cqs);
279 gpr_free(server->pollsets);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800280 gpr_free(server->shutdown_tags);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800281 gpr_free(server);
282 }
283}
284
285static int is_channel_orphaned(channel_data *chand) {
286 return chand->next == chand;
287}
288
289static void orphan_channel(channel_data *chand) {
290 chand->next->prev = chand->prev;
291 chand->prev->next = chand->next;
292 chand->next = chand->prev = chand;
293}
294
ctiller58393c22015-01-07 14:03:30 -0800295static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800296 channel_data *chand = cd;
297 grpc_server *server = chand->server;
Craig Tillerd75fe662015-02-21 07:30:49 -0800298 grpc_channel_internal_unref(chand->channel);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800299 server_unref(server);
300}
301
302static void destroy_channel(channel_data *chand) {
303 if (is_channel_orphaned(chand)) return;
304 GPR_ASSERT(chand->server != NULL);
305 orphan_channel(chand);
306 server_ref(chand->server);
ctiller18b49ab2014-12-09 14:39:16 -0800307 grpc_iomgr_add_callback(finish_destroy_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800308}
309
Craig Tiller3b29b562015-02-11 12:58:46 -0800310static void finish_start_new_rpc_and_unlock(grpc_server *server,
311 grpc_call_element *elem,
312 call_data **pending_root,
313 requested_call_array *array) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800314 requested_call rc;
315 call_data *calld = elem->call_data;
316 if (array->count == 0) {
317 calld->state = PENDING;
318 call_list_join(pending_root, calld, PENDING_START);
319 gpr_mu_unlock(&server->mu);
320 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800321 rc = array->calls[--array->count];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800322 calld->state = ACTIVATED;
323 gpr_mu_unlock(&server->mu);
324 begin_call(server, calld, &rc);
325 }
326}
327
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800328static void start_new_rpc(grpc_call_element *elem) {
329 channel_data *chand = elem->channel_data;
330 call_data *calld = elem->call_data;
331 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800332 gpr_uint32 i;
333 gpr_uint32 hash;
334 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800335
336 gpr_mu_lock(&server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800337 if (chand->registered_methods && calld->path && calld->host) {
Craig Tillera94beff2015-02-17 22:02:06 -0800338 /* TODO(ctiller): unify these two searches */
Craig Tiller04cc8be2015-02-10 16:11:22 -0800339 /* check for an exact match with host */
340 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
341 for (i = 0; i < chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800342 rm = &chand->registered_methods[(hash + i) %
343 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800344 if (!rm) break;
345 if (rm->host != calld->host) continue;
346 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800347 finish_start_new_rpc_and_unlock(server, elem,
348 &rm->server_registered_method->pending,
349 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800350 return;
351 }
352 /* check for a wildcard method definition (no host set) */
353 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800354 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800355 rm = &chand->registered_methods[(hash + i) %
356 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800357 if (!rm) break;
358 if (rm->host != NULL) continue;
359 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800360 finish_start_new_rpc_and_unlock(server, elem,
361 &rm->server_registered_method->pending,
362 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800363 return;
364 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800365 }
Craig Tiller3b29b562015-02-11 12:58:46 -0800366 finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
367 &server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800368}
369
ctiller58393c22015-01-07 14:03:30 -0800370static void kill_zombie(void *elem, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800371 grpc_call_destroy(grpc_call_from_top_element(elem));
372}
373
Craig Tillercce17ac2015-01-20 09:29:28 -0800374static void stream_closed(grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800375 call_data *calld = elem->call_data;
376 channel_data *chand = elem->channel_data;
377 gpr_mu_lock(&chand->server->mu);
378 switch (calld->state) {
379 case ACTIVATED:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800380 break;
381 case PENDING:
Craig Tiller04cc8be2015-02-10 16:11:22 -0800382 call_list_remove(calld, PENDING_START);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800383 /* fallthrough intended */
384 case NOT_STARTED:
385 calld->state = ZOMBIED;
ctiller18b49ab2014-12-09 14:39:16 -0800386 grpc_iomgr_add_callback(kill_zombie, elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800387 break;
388 case ZOMBIED:
389 break;
390 }
391 gpr_mu_unlock(&chand->server->mu);
Craig Tiller0a927bf2015-02-05 10:52:53 -0800392 grpc_call_stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800393}
394
Craig Tillercce17ac2015-01-20 09:29:28 -0800395static void read_closed(grpc_call_element *elem) {
396 call_data *calld = elem->call_data;
397 channel_data *chand = elem->channel_data;
398 gpr_mu_lock(&chand->server->mu);
399 switch (calld->state) {
400 case ACTIVATED:
401 case PENDING:
402 grpc_call_read_closed(elem);
403 break;
404 case NOT_STARTED:
405 calld->state = ZOMBIED;
406 grpc_iomgr_add_callback(kill_zombie, elem);
407 break;
408 case ZOMBIED:
409 break;
410 }
411 gpr_mu_unlock(&chand->server->mu);
412}
413
ctillerf962f522014-12-10 15:28:27 -0800414static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
415 grpc_call_op *op) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800416 channel_data *chand = elem->channel_data;
417 call_data *calld = elem->call_data;
418 grpc_mdelem *md;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800419 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
420 switch (op->type) {
421 case GRPC_RECV_METADATA:
Craig Tillercce17ac2015-01-20 09:29:28 -0800422 md = op->data.metadata;
423 if (md->key == chand->path_key) {
424 calld->path = grpc_mdstr_ref(md->value);
425 grpc_mdelem_unref(md);
426 } else if (md->key == chand->authority_key) {
427 calld->host = grpc_mdstr_ref(md->value);
428 grpc_mdelem_unref(md);
429 } else {
430 grpc_call_recv_metadata(elem, md);
431 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800432 break;
433 case GRPC_RECV_END_OF_INITIAL_METADATA:
434 start_new_rpc(elem);
Craig Tiller4069b682015-01-29 14:01:19 -0800435 grpc_call_initial_metadata_complete(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800436 break;
437 case GRPC_RECV_MESSAGE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800438 grpc_call_recv_message(elem, op->data.message);
439 op->done_cb(op->user_data, GRPC_OP_OK);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800440 break;
441 case GRPC_RECV_HALF_CLOSE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800442 read_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800443 break;
444 case GRPC_RECV_FINISH:
Craig Tillercce17ac2015-01-20 09:29:28 -0800445 stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800446 break;
447 case GRPC_RECV_DEADLINE:
448 grpc_call_set_deadline(elem, op->data.deadline);
449 ((call_data *)elem->call_data)->deadline = op->data.deadline;
450 break;
451 default:
452 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
453 grpc_call_next_op(elem, op);
454 break;
455 }
456}
457
ctillerf962f522014-12-10 15:28:27 -0800458static void channel_op(grpc_channel_element *elem,
459 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800460 channel_data *chand = elem->channel_data;
Craig Tiller8b976d02015-02-05 21:41:23 -0800461 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800462
463 switch (op->type) {
464 case GRPC_ACCEPT_CALL:
465 /* create a call */
Craig Tillerfb189f82015-02-03 12:07:07 -0800466 grpc_call_create(chand->channel, NULL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800467 op->data.accept_call.transport_server_data);
468 break;
469 case GRPC_TRANSPORT_CLOSED:
470 /* if the transport is closed for a server channel, we destroy the
471 channel */
Craig Tiller8b976d02015-02-05 21:41:23 -0800472 gpr_mu_lock(&server->mu);
473 server_ref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800474 destroy_channel(chand);
Craig Tiller8b976d02015-02-05 21:41:23 -0800475 gpr_mu_unlock(&server->mu);
476 server_unref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800477 break;
nnoble0c475f02014-12-05 15:37:39 -0800478 case GRPC_TRANSPORT_GOAWAY:
479 gpr_slice_unref(op->data.goaway.message);
480 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800481 default:
482 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
483 grpc_channel_next_op(elem, op);
484 break;
485 }
486}
487
ctiller58393c22015-01-07 14:03:30 -0800488static void finish_shutdown_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800489 channel_data *chand = cd;
490 grpc_channel_op op;
nnoble0c475f02014-12-05 15:37:39 -0800491 op.type = GRPC_CHANNEL_DISCONNECT;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800492 op.dir = GRPC_CALL_DOWN;
493 channel_op(grpc_channel_stack_element(
494 grpc_channel_get_channel_stack(chand->channel), 0),
ctillerf962f522014-12-10 15:28:27 -0800495 NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800496 grpc_channel_internal_unref(chand->channel);
497}
498
499static void shutdown_channel(channel_data *chand) {
500 grpc_channel_internal_ref(chand->channel);
ctiller18b49ab2014-12-09 14:39:16 -0800501 grpc_iomgr_add_callback(finish_shutdown_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800502}
503
504static void init_call_elem(grpc_call_element *elem,
505 const void *server_transport_data) {
506 call_data *calld = elem->call_data;
507 channel_data *chand = elem->channel_data;
508 memset(calld, 0, sizeof(call_data));
509 calld->deadline = gpr_inf_future;
510 calld->call = grpc_call_from_top_element(elem);
511
512 gpr_mu_lock(&chand->server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800513 call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800514 gpr_mu_unlock(&chand->server->mu);
515
516 server_ref(chand->server);
517}
518
519static void destroy_call_elem(grpc_call_element *elem) {
520 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800521 call_data *calld = elem->call_data;
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800522 size_t i, j;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800523
524 gpr_mu_lock(&chand->server->mu);
525 for (i = 0; i < CALL_LIST_COUNT; i++) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800526 call_list_remove(elem->call_data, i);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800527 }
Craig Tilleraea2fc02015-02-17 16:54:53 -0800528 if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
529 for (i = 0; i < chand->server->num_shutdown_tags; i++) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800530 for (j = 0; j < chand->server->cq_count; j++) {
531 grpc_cq_end_server_shutdown(chand->server->cqs[j],
532 chand->server->shutdown_tags[i]);
533 }
Craig Tiller20bc56d2015-02-12 09:02:56 -0800534 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800535 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800536 gpr_mu_unlock(&chand->server->mu);
537
Craig Tiller4df31a62015-01-30 09:44:31 -0800538 if (calld->host) {
539 grpc_mdstr_unref(calld->host);
540 }
541 if (calld->path) {
542 grpc_mdstr_unref(calld->path);
543 }
544
Craig Tillerdb7db992015-01-29 11:19:01 -0800545 if (calld->legacy) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800546 gpr_free(calld->legacy->initial_metadata.metadata);
Craig Tillerdb7db992015-01-29 11:19:01 -0800547 gpr_free(calld->legacy);
548 }
549
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800550 server_unref(chand->server);
551}
552
553static void init_channel_elem(grpc_channel_element *elem,
554 const grpc_channel_args *args,
555 grpc_mdctx *metadata_context, int is_first,
556 int is_last) {
557 channel_data *chand = elem->channel_data;
558 GPR_ASSERT(is_first);
559 GPR_ASSERT(!is_last);
560 chand->server = NULL;
561 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800562 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
563 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800564 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800565 chand->registered_methods = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800566}
567
568static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800569 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800570 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800571 if (chand->registered_methods) {
572 for (i = 0; i < chand->registered_method_slots; i++) {
573 if (chand->registered_methods[i].method) {
574 grpc_mdstr_unref(chand->registered_methods[i].method);
575 }
576 if (chand->registered_methods[i].host) {
577 grpc_mdstr_unref(chand->registered_methods[i].host);
578 }
579 }
580 gpr_free(chand->registered_methods);
581 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800582 if (chand->server) {
583 gpr_mu_lock(&chand->server->mu);
584 chand->next->prev = chand->prev;
585 chand->prev->next = chand->next;
586 chand->next = chand->prev = chand;
587 gpr_mu_unlock(&chand->server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800588 grpc_mdstr_unref(chand->path_key);
589 grpc_mdstr_unref(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800590 server_unref(chand->server);
591 }
592}
593
594static const grpc_channel_filter server_surface_filter = {
Yang Gao5fd0d292015-01-26 00:19:48 -0800595 call_op, channel_op, sizeof(call_data),
596 init_call_elem, destroy_call_elem, sizeof(channel_data),
Craig Tiller9f28ac22015-01-27 17:01:29 -0800597 init_channel_elem, destroy_channel_elem, "server",
598};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800599
Craig Tiller20bc56d2015-02-12 09:02:56 -0800600static void addcq(grpc_server *server, grpc_completion_queue *cq) {
601 size_t i, n;
602 for (i = 0; i < server->cq_count; i++) {
603 if (server->cqs[i] == cq) return;
604 }
605 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800606 server->cqs = gpr_realloc(server->cqs,
607 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800608 server->cqs[n] = cq;
609}
610
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800611grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
612 grpc_channel_filter **filters,
613 size_t filter_count,
614 const grpc_channel_args *args) {
615 size_t i;
616 int census_enabled = grpc_channel_args_is_census_enabled(args);
617
618 grpc_server *server = gpr_malloc(sizeof(grpc_server));
Craig Tiller60fd3612015-03-05 16:24:22 -0800619
620 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
621
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800622 memset(server, 0, sizeof(grpc_server));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800623 if (cq) addcq(server, cq);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800624
625 gpr_mu_init(&server->mu);
Craig Tilleraec96aa2015-04-07 14:32:15 -0700626 gpr_cv_init(&server->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800627
Craig Tiller20bc56d2015-02-12 09:02:56 -0800628 server->unregistered_cq = cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800629 /* decremented by grpc_server_destroy */
630 gpr_ref_init(&server->internal_refcount, 1);
631 server->root_channel_data.next = server->root_channel_data.prev =
632 &server->root_channel_data;
633
634 /* Server filter stack is:
635
636 server_surface_filter - for making surface API calls
637 grpc_server_census_filter (optional) - for stats collection and tracing
638 {passed in filter stack}
639 grpc_connected_channel_filter - for interfacing with transports */
640 server->channel_filter_count = filter_count + 1 + census_enabled;
641 server->channel_filters =
642 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
643 server->channel_filters[0] = &server_surface_filter;
644 if (census_enabled) {
645 server->channel_filters[1] = &grpc_server_census_filter;
646 }
647 for (i = 0; i < filter_count; i++) {
648 server->channel_filters[i + 1 + census_enabled] = filters[i];
649 }
650
651 server->channel_args = grpc_channel_args_copy(args);
652
653 return server;
654}
655
Craig Tiller24be0f72015-02-10 14:04:22 -0800656static int streq(const char *a, const char *b) {
657 if (a == NULL && b == NULL) return 1;
658 if (a == NULL) return 0;
659 if (b == NULL) return 0;
660 return 0 == strcmp(a, b);
661}
662
663void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerec3257c2015-02-12 15:59:43 -0800664 const char *host,
665 grpc_completion_queue *cq_new_rpc) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800666 registered_method *m;
667 if (!method) {
668 gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
669 return NULL;
670 }
671 for (m = server->registered_methods; m; m = m->next) {
672 if (streq(m->method, method) && streq(m->host, host)) {
673 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
674 host ? host : "*");
675 return NULL;
676 }
677 }
Craig Tiller20bc56d2015-02-12 09:02:56 -0800678 addcq(server, cq_new_rpc);
Craig Tiller24be0f72015-02-10 14:04:22 -0800679 m = gpr_malloc(sizeof(registered_method));
680 memset(m, 0, sizeof(*m));
681 m->method = gpr_strdup(method);
682 m->host = gpr_strdup(host);
683 m->next = server->registered_methods;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800684 m->cq = cq_new_rpc;
Craig Tiller24be0f72015-02-10 14:04:22 -0800685 server->registered_methods = m;
686 return m;
687}
688
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800689void grpc_server_start(grpc_server *server) {
690 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800691 size_t i;
692
Craig Tillerec3257c2015-02-12 15:59:43 -0800693 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800694 for (i = 0; i < server->cq_count; i++) {
695 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
696 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800697
698 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800699 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800700 }
701}
702
703grpc_transport_setup_result grpc_server_setup_transport(
704 grpc_server *s, grpc_transport *transport,
705 grpc_channel_filter const **extra_filters, size_t num_extra_filters,
706 grpc_mdctx *mdctx) {
707 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
708 grpc_channel_filter const **filters =
709 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
710 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800711 size_t num_registered_methods;
712 size_t alloc;
713 registered_method *rm;
714 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800715 grpc_channel *channel;
716 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800717 grpc_mdstr *host;
718 grpc_mdstr *method;
719 gpr_uint32 hash;
720 gpr_uint32 slots;
721 gpr_uint32 probes;
722 gpr_uint32 max_probes = 0;
Craig Tiller5d6bd442015-02-12 22:50:38 -0800723 grpc_transport_setup_result result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800724
725 for (i = 0; i < s->channel_filter_count; i++) {
726 filters[i] = s->channel_filters[i];
727 }
728 for (; i < s->channel_filter_count + num_extra_filters; i++) {
729 filters[i] = extra_filters[i - s->channel_filter_count];
730 }
731 filters[i] = &grpc_connected_channel_filter;
732
Craig Tiller20bc56d2015-02-12 09:02:56 -0800733 for (i = 0; i < s->cq_count; i++) {
734 grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
735 }
ctillerd79b4862014-12-17 16:36:59 -0800736
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800737 channel = grpc_channel_create_from_filters(filters, num_filters,
738 s->channel_args, mdctx, 0);
739 chand = (channel_data *)grpc_channel_stack_element(
740 grpc_channel_get_channel_stack(channel), 0)->channel_data;
741 chand->server = s;
742 server_ref(s);
743 chand->channel = channel;
744
Craig Tiller04cc8be2015-02-10 16:11:22 -0800745 num_registered_methods = 0;
746 for (rm = s->registered_methods; rm; rm = rm->next) {
747 num_registered_methods++;
748 }
749 /* build a lookup table phrased in terms of mdstr's in this channels context
750 to quickly find registered methods */
751 if (num_registered_methods > 0) {
752 slots = 2 * num_registered_methods;
753 alloc = sizeof(channel_registered_method) * slots;
754 chand->registered_methods = gpr_malloc(alloc);
755 memset(chand->registered_methods, 0, alloc);
756 for (rm = s->registered_methods; rm; rm = rm->next) {
757 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
Craig Tillere76c9662015-02-11 13:18:31 -0800758 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800759 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800760 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
761 .server_registered_method != NULL;
762 probes++)
763 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800764 if (probes > max_probes) max_probes = probes;
765 crm = &chand->registered_methods[(hash + probes) % slots];
766 crm->server_registered_method = rm;
767 crm->host = host;
768 crm->method = method;
769 }
770 chand->registered_method_slots = slots;
771 chand->registered_method_max_probes = max_probes;
772 }
773
Craig Tiller5d6bd442015-02-12 22:50:38 -0800774 result = grpc_connected_channel_bind_transport(
775 grpc_channel_get_channel_stack(channel), transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800776
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800777 gpr_mu_lock(&s->mu);
778 chand->next = &s->root_channel_data;
779 chand->prev = chand->next->prev;
780 chand->next->prev = chand->prev->next = chand;
781 gpr_mu_unlock(&s->mu);
782
783 gpr_free(filters);
784
Craig Tiller5d6bd442015-02-12 22:50:38 -0800785 return result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800786}
787
Craig Tilleraec96aa2015-04-07 14:32:15 -0700788static int num_listeners(grpc_server *server) {
789 listener *l;
790 int n = 0;
791 for (l = server->listeners; l; l = l->next) {
792 n++;
793 }
794 return n;
795}
796
Craig Tillerbd217572015-02-11 18:10:56 -0800797static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
Craig Tillerec3257c2015-02-12 15:59:43 -0800798 void *shutdown_tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800799 listener *l;
Craig Tiller24be0f72015-02-10 14:04:22 -0800800 requested_call_array requested_calls;
nnoble0c475f02014-12-05 15:37:39 -0800801 channel_data **channels;
802 channel_data *c;
803 size_t nchannels;
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800804 size_t i, j;
nnoble0c475f02014-12-05 15:37:39 -0800805 grpc_channel_op op;
806 grpc_channel_element *elem;
Craig Tillerbd217572015-02-11 18:10:56 -0800807 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800808
809 /* lock, and gather up some stuff to do */
810 gpr_mu_lock(&server->mu);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800811 if (have_shutdown_tag) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800812 for (i = 0; i < server->cq_count; i++) {
813 grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
814 }
Craig Tilleraea2fc02015-02-17 16:54:53 -0800815 server->shutdown_tags =
816 gpr_realloc(server->shutdown_tags,
817 sizeof(void *) * (server->num_shutdown_tags + 1));
818 server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
819 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800820 if (server->shutdown) {
821 gpr_mu_unlock(&server->mu);
822 return;
823 }
824
nnoble0c475f02014-12-05 15:37:39 -0800825 nchannels = 0;
826 for (c = server->root_channel_data.next; c != &server->root_channel_data;
827 c = c->next) {
828 nchannels++;
829 }
830 channels = gpr_malloc(sizeof(channel_data *) * nchannels);
831 i = 0;
832 for (c = server->root_channel_data.next; c != &server->root_channel_data;
833 c = c->next) {
834 grpc_channel_internal_ref(c->channel);
835 channels[i] = c;
836 i++;
837 }
838
Craig Tillerbd217572015-02-11 18:10:56 -0800839 /* collect all unregistered then registered calls */
Craig Tillercce17ac2015-01-20 09:29:28 -0800840 requested_calls = server->requested_calls;
Craig Tiller24be0f72015-02-10 14:04:22 -0800841 memset(&server->requested_calls, 0, sizeof(server->requested_calls));
Craig Tillerbd217572015-02-11 18:10:56 -0800842 for (rm = server->registered_methods; rm; rm = rm->next) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800843 if (requested_calls.count + rm->requested.count >
844 requested_calls.capacity) {
845 requested_calls.capacity =
846 GPR_MAX(requested_calls.count + rm->requested.count,
847 2 * requested_calls.capacity);
848 requested_calls.calls =
849 gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
850 requested_calls.capacity);
Craig Tillerbd217572015-02-11 18:10:56 -0800851 }
Craig Tillerec3257c2015-02-12 15:59:43 -0800852 memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
853 sizeof(*requested_calls.calls) * rm->requested.count);
Craig Tillerbd217572015-02-11 18:10:56 -0800854 requested_calls.count += rm->requested.count;
Craig Tillerec3257c2015-02-12 15:59:43 -0800855 gpr_free(rm->requested.calls);
Craig Tillerbd217572015-02-11 18:10:56 -0800856 memset(&rm->requested, 0, sizeof(rm->requested));
857 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800858
859 server->shutdown = 1;
Craig Tilleraea2fc02015-02-17 16:54:53 -0800860 if (server->lists[ALL_CALLS] == NULL) {
861 for (i = 0; i < server->num_shutdown_tags; i++) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800862 for (j = 0; j < server->cq_count; j++) {
863 grpc_cq_end_server_shutdown(server->cqs[j], server->shutdown_tags[i]);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800864 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800865 }
866 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800867 gpr_mu_unlock(&server->mu);
868
nnoble0c475f02014-12-05 15:37:39 -0800869 for (i = 0; i < nchannels; i++) {
870 c = channels[i];
871 elem = grpc_channel_stack_element(
872 grpc_channel_get_channel_stack(c->channel), 0);
873
874 op.type = GRPC_CHANNEL_GOAWAY;
875 op.dir = GRPC_CALL_DOWN;
876 op.data.goaway.status = GRPC_STATUS_OK;
877 op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
ctillerf962f522014-12-10 15:28:27 -0800878 elem->filter->channel_op(elem, NULL, &op);
nnoble0c475f02014-12-05 15:37:39 -0800879
880 grpc_channel_internal_unref(c->channel);
881 }
882 gpr_free(channels);
883
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800884 /* terminate all the requested calls */
Craig Tiller24be0f72015-02-10 14:04:22 -0800885 for (i = 0; i < requested_calls.count; i++) {
886 fail_call(server, &requested_calls.calls[i]);
Craig Tillercce17ac2015-01-20 09:29:28 -0800887 }
Craig Tiller24be0f72015-02-10 14:04:22 -0800888 gpr_free(requested_calls.calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800889
890 /* Shutdown listeners */
891 for (l = server->listeners; l; l = l->next) {
892 l->destroy(server, l->arg);
893 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800894}
895
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800896void grpc_server_shutdown(grpc_server *server) {
897 shutdown_internal(server, 0, NULL);
898}
899
900void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
901 shutdown_internal(server, 1, tag);
902}
903
Craig Tilleraec96aa2015-04-07 14:32:15 -0700904void grpc_server_listener_destroy_done(void *s) {
905 grpc_server *server = s;
906 gpr_mu_lock(&server->mu);
907 server->listeners_destroyed++;
908 gpr_cv_signal(&server->cv);
909 gpr_mu_unlock(&server->mu);
910}
911
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800912void grpc_server_destroy(grpc_server *server) {
913 channel_data *c;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700914 listener *l;
915 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800916 gpr_mu_lock(&server->mu);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800917 if (!server->shutdown) {
918 gpr_mu_unlock(&server->mu);
919 grpc_server_shutdown(server);
920 gpr_mu_lock(&server->mu);
921 }
922
Craig Tilleraec96aa2015-04-07 14:32:15 -0700923 while (server->listeners_destroyed != num_listeners(server)) {
924 for (i = 0; i < server->cq_count; i++) {
925 gpr_mu_unlock(&server->mu);
926 grpc_cq_hack_spin_pollset(server->cqs[i]);
927 gpr_mu_lock(&server->mu);
928 }
929
930 gpr_cv_wait(&server->cv, &server->mu, gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
931 }
932
933 while (server->listeners) {
934 l = server->listeners;
935 server->listeners = l->next;
936 gpr_free(l);
937 }
938
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800939 for (c = server->root_channel_data.next; c != &server->root_channel_data;
940 c = c->next) {
941 shutdown_channel(c);
942 }
943 gpr_mu_unlock(&server->mu);
944
945 server_unref(server);
946}
947
948void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -0800949 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -0800950 grpc_pollset **pollsets,
951 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800952 void (*destroy)(grpc_server *server, void *arg)) {
953 listener *l = gpr_malloc(sizeof(listener));
954 l->arg = arg;
955 l->start = start;
956 l->destroy = destroy;
957 l->next = server->listeners;
958 server->listeners = l;
959}
960
Craig Tiller9f28ac22015-01-27 17:01:29 -0800961static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -0800962 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800963 call_data *calld = NULL;
Craig Tiller0ef1a922015-02-11 16:23:01 -0800964 requested_call_array *requested_calls = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800965 gpr_mu_lock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800966 if (server->shutdown) {
967 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800968 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800969 return GRPC_CALL_OK;
970 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800971 switch (rc->type) {
972 case LEGACY_CALL:
973 case BATCH_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800974 calld =
975 call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800976 requested_calls = &server->requested_calls;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800977 break;
978 case REGISTERED_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800979 calld = call_list_remove_head(
980 &rc->data.registered.registered_method->pending, PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800981 requested_calls = &rc->data.registered.registered_method->requested;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800982 break;
983 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800984 if (calld) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800985 GPR_ASSERT(calld->state == PENDING);
Craig Tillerbb3f22f2015-01-29 16:40:56 -0800986 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -0800987 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800988 begin_call(server, calld, rc);
Craig Tillercce17ac2015-01-20 09:29:28 -0800989 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800990 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800991 *requested_call_array_add(requested_calls) = *rc;
Craig Tillercce17ac2015-01-20 09:29:28 -0800992 gpr_mu_unlock(&server->mu);
993 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800994 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800995}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800996
Craig Tiller24be0f72015-02-10 14:04:22 -0800997grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
998 grpc_call_details *details,
999 grpc_metadata_array *initial_metadata,
Craig Tiller3b29b562015-02-11 12:58:46 -08001000 grpc_completion_queue *cq_bind,
1001 void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001002 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001003 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -08001004 rc.type = BATCH_CALL;
1005 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001006 rc.data.batch.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -08001007 rc.data.batch.call = call;
1008 rc.data.batch.details = details;
1009 rc.data.batch.initial_metadata = initial_metadata;
1010 return queue_call_request(server, &rc);
1011}
1012
1013grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -08001014 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
1015 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
1016 grpc_completion_queue *cq_bind, void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001017 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001018 registered_method *registered_method = rm;
1019 grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -08001020 rc.type = REGISTERED_CALL;
1021 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001022 rc.data.registered.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -08001023 rc.data.registered.call = call;
1024 rc.data.registered.registered_method = registered_method;
1025 rc.data.registered.deadline = deadline;
1026 rc.data.registered.initial_metadata = initial_metadata;
1027 rc.data.registered.optional_payload = optional_payload;
1028 return queue_call_request(server, &rc);
1029}
1030
1031grpc_call_error grpc_server_request_call_old(grpc_server *server,
1032 void *tag_new) {
1033 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001034 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
Craig Tiller24be0f72015-02-10 14:04:22 -08001035 rc.type = LEGACY_CALL;
1036 rc.tag = tag_new;
1037 return queue_call_request(server, &rc);
1038}
1039
1040static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
1041static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1042 void *tag);
Craig Tiller3b29b562015-02-11 12:58:46 -08001043static void publish_was_not_set(grpc_call *call, grpc_op_error status,
1044 void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001045 abort();
1046}
Craig Tiller24be0f72015-02-10 14:04:22 -08001047
Craig Tiller166e2502015-02-03 20:14:41 -08001048static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1049 gpr_slice slice = value->slice;
1050 size_t len = GPR_SLICE_LENGTH(slice);
1051
1052 if (len + 1 > *capacity) {
1053 *capacity = GPR_MAX(len + 1, *capacity * 2);
1054 *dest = gpr_realloc(*dest, *capacity);
1055 }
1056 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1057}
1058
Craig Tiller24be0f72015-02-10 14:04:22 -08001059static void begin_call(grpc_server *server, call_data *calld,
1060 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001061 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001062 grpc_ioreq req[2];
1063 grpc_ioreq *r = req;
1064
1065 /* called once initial metadata has been read by the call, but BEFORE
1066 the ioreq to fetch it out of the call has been executed.
1067 This means metadata related fields can be relied on in calld, but to
1068 fill in the metadata array passed by the client, we need to perform
1069 an ioreq op, that should complete immediately. */
1070
1071 switch (rc->type) {
1072 case LEGACY_CALL:
1073 calld->legacy = gpr_malloc(sizeof(legacy_data));
1074 memset(calld->legacy, 0, sizeof(legacy_data));
1075 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1076 r->data.recv_metadata = &calld->legacy->initial_metadata;
1077 r++;
1078 publish = publish_legacy;
1079 break;
1080 case BATCH_CALL:
1081 cpstr(&rc->data.batch.details->host,
1082 &rc->data.batch.details->host_capacity, calld->host);
1083 cpstr(&rc->data.batch.details->method,
1084 &rc->data.batch.details->method_capacity, calld->path);
Craig Tiller8e8fd892015-02-10 17:02:08 -08001085 grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001086 *rc->data.batch.call = calld->call;
1087 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1088 r->data.recv_metadata = rc->data.batch.initial_metadata;
1089 r++;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001090 calld->cq_new = server->unregistered_cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001091 publish = publish_registered_or_batch;
1092 break;
1093 case REGISTERED_CALL:
1094 *rc->data.registered.deadline = calld->deadline;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001095 grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001096 *rc->data.registered.call = calld->call;
1097 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1098 r->data.recv_metadata = rc->data.registered.initial_metadata;
1099 r++;
1100 if (rc->data.registered.optional_payload) {
1101 r->op = GRPC_IOREQ_RECV_MESSAGE;
1102 r->data.recv_message = rc->data.registered.optional_payload;
1103 r++;
1104 }
Craig Tiller20bc56d2015-02-12 09:02:56 -08001105 calld->cq_new = rc->data.registered.registered_method->cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001106 publish = publish_registered_or_batch;
1107 break;
1108 }
1109
1110 grpc_call_internal_ref(calld->call);
1111 grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
1112 rc->tag);
1113}
1114
1115static void fail_call(grpc_server *server, requested_call *rc) {
1116 switch (rc->type) {
1117 case LEGACY_CALL:
Craig Tillerec3257c2015-02-12 15:59:43 -08001118 grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing,
1119 NULL, NULL, NULL, gpr_inf_past, 0, NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001120 break;
1121 case BATCH_CALL:
1122 *rc->data.batch.call = NULL;
1123 rc->data.batch.initial_metadata->count = 0;
Craig Tillerec3257c2015-02-12 15:59:43 -08001124 grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL,
1125 do_nothing, NULL, GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001126 break;
1127 case REGISTERED_CALL:
1128 *rc->data.registered.call = NULL;
1129 rc->data.registered.initial_metadata->count = 0;
Craig Tillerec3257c2015-02-12 15:59:43 -08001130 grpc_cq_end_op_complete(rc->data.registered.registered_method->cq,
1131 rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001132 break;
1133 }
1134}
1135
1136static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
1137 grpc_call_element *elem =
1138 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1139 call_data *calld = elem->call_data;
1140 channel_data *chand = elem->channel_data;
1141 grpc_server *server = chand->server;
1142
1143 if (status == GRPC_OP_OK) {
Craig Tiller20bc56d2015-02-12 09:02:56 -08001144 grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
Craig Tiller24be0f72015-02-10 14:04:22 -08001145 grpc_mdstr_as_c_string(calld->path),
1146 grpc_mdstr_as_c_string(calld->host), calld->deadline,
1147 calld->legacy->initial_metadata.count,
1148 calld->legacy->initial_metadata.metadata);
1149 } else {
1150 gpr_log(GPR_ERROR, "should never reach here");
1151 abort();
1152 }
1153}
1154
1155static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1156 void *tag) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001157 grpc_call_element *elem =
1158 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller20bc56d2015-02-12 09:02:56 -08001159 call_data *calld = elem->call_data;
1160 grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status);
Craig Tiller24be0f72015-02-10 14:04:22 -08001161}
1162
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001163const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1164 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001165}