blob: e7719298701c32a3c784e4662a43f09824eb8f2d [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
39#include "src/core/channel/census_filter.h"
40#include "src/core/channel/channel_args.h"
41#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/surface/call.h"
45#include "src/core/surface/channel.h"
46#include "src/core/surface/completion_queue.h"
Craig Tiller60fd3612015-03-05 16:24:22 -080047#include "src/core/surface/init.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080048#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051#include <grpc/support/useful.h>
52
53typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
54
55typedef struct listener {
56 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080057 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
58 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080059 void (*destroy)(grpc_server *server, void *arg);
60 struct listener *next;
61} listener;
62
63typedef struct call_data call_data;
64typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080065typedef struct registered_method registered_method;
66
67typedef struct {
68 call_data *next;
69 call_data *prev;
70} call_link;
71
72typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type;
73
74typedef struct {
75 requested_call_type type;
76 void *tag;
77 union {
78 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080079 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080080 grpc_call **call;
81 grpc_call_details *details;
82 grpc_metadata_array *initial_metadata;
83 } batch;
84 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080085 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080086 grpc_call **call;
87 registered_method *registered_method;
88 gpr_timespec *deadline;
89 grpc_metadata_array *initial_metadata;
90 grpc_byte_buffer **optional_payload;
91 } registered;
92 } data;
93} requested_call;
94
95typedef struct {
96 requested_call *calls;
97 size_t count;
98 size_t capacity;
99} requested_call_array;
100
101struct registered_method {
102 char *method;
103 char *host;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800104 call_data *pending;
Craig Tiller24be0f72015-02-10 14:04:22 -0800105 requested_call_array requested;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800106 grpc_completion_queue *cq;
Craig Tiller24be0f72015-02-10 14:04:22 -0800107 registered_method *next;
108};
109
110typedef struct channel_registered_method {
111 registered_method *server_registered_method;
112 grpc_mdstr *method;
113 grpc_mdstr *host;
114} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800115
116struct channel_data {
117 grpc_server *server;
118 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800119 grpc_mdstr *path_key;
120 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800121 /* linked list of all channels on a server */
122 channel_data *next;
123 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800124 channel_registered_method *registered_methods;
125 gpr_uint32 registered_method_slots;
126 gpr_uint32 registered_method_max_probes;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800127};
128
129struct grpc_server {
130 size_t channel_filter_count;
131 const grpc_channel_filter **channel_filters;
132 grpc_channel_args *channel_args;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800133 grpc_completion_queue *unregistered_cq;
Craig Tillerec3257c2015-02-12 15:59:43 -0800134
Craig Tiller20bc56d2015-02-12 09:02:56 -0800135 grpc_completion_queue **cqs;
136 grpc_pollset **pollsets;
137 size_t cq_count;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800138
139 gpr_mu mu;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700140 gpr_cv cv;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800141
Craig Tiller24be0f72015-02-10 14:04:22 -0800142 registered_method *registered_methods;
143 requested_call_array requested_calls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800144
145 gpr_uint8 shutdown;
Craig Tilleraea2fc02015-02-17 16:54:53 -0800146 size_t num_shutdown_tags;
147 void **shutdown_tags;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800148
149 call_data *lists[CALL_LIST_COUNT];
150 channel_data root_channel_data;
151
152 listener *listeners;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700153 int listeners_destroyed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800154 gpr_refcount internal_refcount;
155};
156
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800157typedef enum {
158 /* waiting for metadata */
159 NOT_STARTED,
160 /* inital metadata read, not flow controlled in yet */
161 PENDING,
162 /* flow controlled in, on completion queue */
163 ACTIVATED,
164 /* cancelled before being queued */
165 ZOMBIED
166} call_state;
167
Craig Tillerfb189f82015-02-03 12:07:07 -0800168typedef struct legacy_data {
Craig Tiller24be0f72015-02-10 14:04:22 -0800169 grpc_metadata_array initial_metadata;
Craig Tillerfb189f82015-02-03 12:07:07 -0800170} legacy_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800171
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800172struct call_data {
173 grpc_call *call;
174
175 call_state state;
176 gpr_timespec deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800177 grpc_mdstr *path;
178 grpc_mdstr *host;
179
180 legacy_data *legacy;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800181 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800182
Craig Tiller04cc8be2015-02-10 16:11:22 -0800183 call_data **root[CALL_LIST_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800184 call_link links[CALL_LIST_COUNT];
185};
186
187#define SERVER_FROM_CALL_ELEM(elem) \
188 (((channel_data *)(elem)->channel_data)->server)
189
190static void do_nothing(void *unused, grpc_op_error ignored) {}
191
Craig Tiller24be0f72015-02-10 14:04:22 -0800192static void begin_call(grpc_server *server, call_data *calld,
193 requested_call *rc);
194static void fail_call(grpc_server *server, requested_call *rc);
195
Craig Tiller3b29b562015-02-11 12:58:46 -0800196static int call_list_join(call_data **root, call_data *call, call_list list) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800197 GPR_ASSERT(!call->root[list]);
198 call->root[list] = root;
199 if (!*root) {
200 *root = call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800201 call->links[list].next = call->links[list].prev = call;
202 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800203 call->links[list].next = *root;
204 call->links[list].prev = (*root)->links[list].prev;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800205 call->links[list].next->links[list].prev =
206 call->links[list].prev->links[list].next = call;
207 }
208 return 1;
209}
210
Craig Tiller04cc8be2015-02-10 16:11:22 -0800211static call_data *call_list_remove_head(call_data **root, call_list list) {
212 call_data *out = *root;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213 if (out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800214 out->root[list] = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800215 if (out->links[list].next == out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800216 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800217 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800218 *root = out->links[list].next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800219 out->links[list].next->links[list].prev = out->links[list].prev;
220 out->links[list].prev->links[list].next = out->links[list].next;
221 }
222 }
223 return out;
224}
225
Craig Tiller04cc8be2015-02-10 16:11:22 -0800226static int call_list_remove(call_data *call, call_list list) {
227 call_data **root = call->root[list];
228 if (root == NULL) return 0;
229 call->root[list] = NULL;
230 if (*root == call) {
231 *root = call->links[list].next;
232 if (*root == call) {
233 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800234 return 1;
235 }
236 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800237 GPR_ASSERT(*root != call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800238 call->links[list].next->links[list].prev = call->links[list].prev;
239 call->links[list].prev->links[list].next = call->links[list].next;
240 return 1;
241}
242
Craig Tiller24be0f72015-02-10 14:04:22 -0800243static void requested_call_array_destroy(requested_call_array *array) {
244 gpr_free(array->calls);
245}
246
247static requested_call *requested_call_array_add(requested_call_array *array) {
248 requested_call *rc;
249 if (array->count == array->capacity) {
250 array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
251 array->calls =
252 gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
253 }
254 rc = &array->calls[array->count++];
255 memset(rc, 0, sizeof(*rc));
256 return rc;
257}
258
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800259static void server_ref(grpc_server *server) {
260 gpr_ref(&server->internal_refcount);
261}
262
263static void server_unref(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800264 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800265 if (gpr_unref(&server->internal_refcount)) {
266 grpc_channel_args_destroy(server->channel_args);
267 gpr_mu_destroy(&server->mu);
Craig Tilleraec96aa2015-04-07 14:32:15 -0700268 gpr_cv_destroy(&server->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800269 gpr_free(server->channel_filters);
Craig Tiller24be0f72015-02-10 14:04:22 -0800270 requested_call_array_destroy(&server->requested_calls);
Craig Tillerec3257c2015-02-12 15:59:43 -0800271 while ((rm = server->registered_methods) != NULL) {
272 server->registered_methods = rm->next;
273 gpr_free(rm->method);
274 gpr_free(rm->host);
275 requested_call_array_destroy(&rm->requested);
276 gpr_free(rm);
277 }
278 gpr_free(server->cqs);
279 gpr_free(server->pollsets);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800280 gpr_free(server->shutdown_tags);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800281 gpr_free(server);
282 }
283}
284
285static int is_channel_orphaned(channel_data *chand) {
286 return chand->next == chand;
287}
288
289static void orphan_channel(channel_data *chand) {
290 chand->next->prev = chand->prev;
291 chand->prev->next = chand->next;
292 chand->next = chand->prev = chand;
293}
294
ctiller58393c22015-01-07 14:03:30 -0800295static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800296 channel_data *chand = cd;
297 grpc_server *server = chand->server;
Craig Tillerd75fe662015-02-21 07:30:49 -0800298 grpc_channel_internal_unref(chand->channel);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800299 server_unref(server);
300}
301
302static void destroy_channel(channel_data *chand) {
303 if (is_channel_orphaned(chand)) return;
304 GPR_ASSERT(chand->server != NULL);
305 orphan_channel(chand);
306 server_ref(chand->server);
ctiller18b49ab2014-12-09 14:39:16 -0800307 grpc_iomgr_add_callback(finish_destroy_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800308}
309
Craig Tiller3b29b562015-02-11 12:58:46 -0800310static void finish_start_new_rpc_and_unlock(grpc_server *server,
311 grpc_call_element *elem,
312 call_data **pending_root,
313 requested_call_array *array) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800314 requested_call rc;
315 call_data *calld = elem->call_data;
316 if (array->count == 0) {
317 calld->state = PENDING;
318 call_list_join(pending_root, calld, PENDING_START);
319 gpr_mu_unlock(&server->mu);
320 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800321 rc = array->calls[--array->count];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800322 calld->state = ACTIVATED;
323 gpr_mu_unlock(&server->mu);
324 begin_call(server, calld, &rc);
325 }
326}
327
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800328static void start_new_rpc(grpc_call_element *elem) {
329 channel_data *chand = elem->channel_data;
330 call_data *calld = elem->call_data;
331 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800332 gpr_uint32 i;
333 gpr_uint32 hash;
334 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800335
336 gpr_mu_lock(&server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800337 if (chand->registered_methods && calld->path && calld->host) {
Craig Tillera94beff2015-02-17 22:02:06 -0800338 /* TODO(ctiller): unify these two searches */
Craig Tiller04cc8be2015-02-10 16:11:22 -0800339 /* check for an exact match with host */
340 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
341 for (i = 0; i < chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800342 rm = &chand->registered_methods[(hash + i) %
343 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800344 if (!rm) break;
345 if (rm->host != calld->host) continue;
346 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800347 finish_start_new_rpc_and_unlock(server, elem,
348 &rm->server_registered_method->pending,
349 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800350 return;
351 }
352 /* check for a wildcard method definition (no host set) */
353 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800354 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800355 rm = &chand->registered_methods[(hash + i) %
356 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800357 if (!rm) break;
358 if (rm->host != NULL) continue;
359 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800360 finish_start_new_rpc_and_unlock(server, elem,
361 &rm->server_registered_method->pending,
362 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800363 return;
364 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800365 }
Craig Tiller3b29b562015-02-11 12:58:46 -0800366 finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
367 &server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800368}
369
ctiller58393c22015-01-07 14:03:30 -0800370static void kill_zombie(void *elem, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800371 grpc_call_destroy(grpc_call_from_top_element(elem));
372}
373
Craig Tillercce17ac2015-01-20 09:29:28 -0800374static void stream_closed(grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800375 call_data *calld = elem->call_data;
376 channel_data *chand = elem->channel_data;
377 gpr_mu_lock(&chand->server->mu);
378 switch (calld->state) {
379 case ACTIVATED:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800380 break;
381 case PENDING:
Craig Tiller04cc8be2015-02-10 16:11:22 -0800382 call_list_remove(calld, PENDING_START);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800383 /* fallthrough intended */
384 case NOT_STARTED:
385 calld->state = ZOMBIED;
ctiller18b49ab2014-12-09 14:39:16 -0800386 grpc_iomgr_add_callback(kill_zombie, elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800387 break;
388 case ZOMBIED:
389 break;
390 }
391 gpr_mu_unlock(&chand->server->mu);
Craig Tiller0a927bf2015-02-05 10:52:53 -0800392 grpc_call_stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800393}
394
Craig Tillercce17ac2015-01-20 09:29:28 -0800395static void read_closed(grpc_call_element *elem) {
396 call_data *calld = elem->call_data;
397 channel_data *chand = elem->channel_data;
398 gpr_mu_lock(&chand->server->mu);
399 switch (calld->state) {
400 case ACTIVATED:
401 case PENDING:
402 grpc_call_read_closed(elem);
403 break;
404 case NOT_STARTED:
405 calld->state = ZOMBIED;
406 grpc_iomgr_add_callback(kill_zombie, elem);
407 break;
408 case ZOMBIED:
409 break;
410 }
411 gpr_mu_unlock(&chand->server->mu);
412}
413
Craig Tiller6902ad22015-04-16 08:01:49 -0700414static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
415 grpc_call_element *elem = user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800416 channel_data *chand = elem->channel_data;
417 call_data *calld = elem->call_data;
Craig Tiller6902ad22015-04-16 08:01:49 -0700418 if (md->key == chand->path_key) {
419 calld->path = grpc_mdstr_ref(md->value);
420 return NULL;
421 } else if (md->key == chand->authority_key) {
422 calld->host = grpc_mdstr_ref(md->value);
423 return NULL;
424 }
425 return md;
426}
427
428static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
429 grpc_call_op *op) {
430 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800431 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
432 switch (op->type) {
433 case GRPC_RECV_METADATA:
Craig Tiller205aee12015-04-16 14:46:41 -0700434 grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
Craig Tiller6902ad22015-04-16 08:01:49 -0700435 if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
436 calld->deadline = op->data.metadata.deadline;
437 start_new_rpc(elem);
Craig Tillercce17ac2015-01-20 09:29:28 -0800438 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800439 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800440 case GRPC_RECV_MESSAGE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800441 grpc_call_recv_message(elem, op->data.message);
442 op->done_cb(op->user_data, GRPC_OP_OK);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800443 break;
444 case GRPC_RECV_HALF_CLOSE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800445 read_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800446 break;
447 case GRPC_RECV_FINISH:
Craig Tillercce17ac2015-01-20 09:29:28 -0800448 stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800449 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800450 default:
451 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
452 grpc_call_next_op(elem, op);
453 break;
454 }
455}
456
ctillerf962f522014-12-10 15:28:27 -0800457static void channel_op(grpc_channel_element *elem,
458 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800459 channel_data *chand = elem->channel_data;
Craig Tiller8b976d02015-02-05 21:41:23 -0800460 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800461
462 switch (op->type) {
463 case GRPC_ACCEPT_CALL:
464 /* create a call */
Craig Tillerfb189f82015-02-03 12:07:07 -0800465 grpc_call_create(chand->channel, NULL,
Craig Tiller87d5b192015-04-16 14:37:57 -0700466 op->data.accept_call.transport_server_data, NULL, 0,
467 gpr_inf_future);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800468 break;
469 case GRPC_TRANSPORT_CLOSED:
470 /* if the transport is closed for a server channel, we destroy the
471 channel */
Craig Tiller8b976d02015-02-05 21:41:23 -0800472 gpr_mu_lock(&server->mu);
473 server_ref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800474 destroy_channel(chand);
Craig Tiller8b976d02015-02-05 21:41:23 -0800475 gpr_mu_unlock(&server->mu);
476 server_unref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800477 break;
nnoble0c475f02014-12-05 15:37:39 -0800478 case GRPC_TRANSPORT_GOAWAY:
479 gpr_slice_unref(op->data.goaway.message);
480 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800481 default:
482 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
483 grpc_channel_next_op(elem, op);
484 break;
485 }
486}
487
ctiller58393c22015-01-07 14:03:30 -0800488static void finish_shutdown_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800489 channel_data *chand = cd;
490 grpc_channel_op op;
nnoble0c475f02014-12-05 15:37:39 -0800491 op.type = GRPC_CHANNEL_DISCONNECT;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800492 op.dir = GRPC_CALL_DOWN;
493 channel_op(grpc_channel_stack_element(
494 grpc_channel_get_channel_stack(chand->channel), 0),
ctillerf962f522014-12-10 15:28:27 -0800495 NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800496 grpc_channel_internal_unref(chand->channel);
497}
498
499static void shutdown_channel(channel_data *chand) {
500 grpc_channel_internal_ref(chand->channel);
ctiller18b49ab2014-12-09 14:39:16 -0800501 grpc_iomgr_add_callback(finish_shutdown_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800502}
503
504static void init_call_elem(grpc_call_element *elem,
505 const void *server_transport_data) {
506 call_data *calld = elem->call_data;
507 channel_data *chand = elem->channel_data;
508 memset(calld, 0, sizeof(call_data));
509 calld->deadline = gpr_inf_future;
510 calld->call = grpc_call_from_top_element(elem);
511
512 gpr_mu_lock(&chand->server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800513 call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800514 gpr_mu_unlock(&chand->server->mu);
515
516 server_ref(chand->server);
517}
518
519static void destroy_call_elem(grpc_call_element *elem) {
520 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800521 call_data *calld = elem->call_data;
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800522 size_t i, j;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800523
524 gpr_mu_lock(&chand->server->mu);
525 for (i = 0; i < CALL_LIST_COUNT; i++) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800526 call_list_remove(elem->call_data, i);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800527 }
Craig Tilleraea2fc02015-02-17 16:54:53 -0800528 if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
529 for (i = 0; i < chand->server->num_shutdown_tags; i++) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800530 for (j = 0; j < chand->server->cq_count; j++) {
531 grpc_cq_end_server_shutdown(chand->server->cqs[j],
532 chand->server->shutdown_tags[i]);
533 }
Craig Tiller20bc56d2015-02-12 09:02:56 -0800534 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800535 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800536 gpr_mu_unlock(&chand->server->mu);
537
Craig Tiller4df31a62015-01-30 09:44:31 -0800538 if (calld->host) {
539 grpc_mdstr_unref(calld->host);
540 }
541 if (calld->path) {
542 grpc_mdstr_unref(calld->path);
543 }
544
Craig Tillerdb7db992015-01-29 11:19:01 -0800545 if (calld->legacy) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800546 gpr_free(calld->legacy->initial_metadata.metadata);
Craig Tillerdb7db992015-01-29 11:19:01 -0800547 gpr_free(calld->legacy);
548 }
549
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800550 server_unref(chand->server);
551}
552
553static void init_channel_elem(grpc_channel_element *elem,
554 const grpc_channel_args *args,
555 grpc_mdctx *metadata_context, int is_first,
556 int is_last) {
557 channel_data *chand = elem->channel_data;
558 GPR_ASSERT(is_first);
559 GPR_ASSERT(!is_last);
560 chand->server = NULL;
561 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800562 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
563 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800564 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800565 chand->registered_methods = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800566}
567
568static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800569 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800570 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800571 if (chand->registered_methods) {
572 for (i = 0; i < chand->registered_method_slots; i++) {
573 if (chand->registered_methods[i].method) {
574 grpc_mdstr_unref(chand->registered_methods[i].method);
575 }
576 if (chand->registered_methods[i].host) {
577 grpc_mdstr_unref(chand->registered_methods[i].host);
578 }
579 }
580 gpr_free(chand->registered_methods);
581 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800582 if (chand->server) {
583 gpr_mu_lock(&chand->server->mu);
584 chand->next->prev = chand->prev;
585 chand->prev->next = chand->next;
586 chand->next = chand->prev = chand;
587 gpr_mu_unlock(&chand->server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800588 grpc_mdstr_unref(chand->path_key);
589 grpc_mdstr_unref(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800590 server_unref(chand->server);
591 }
592}
593
594static const grpc_channel_filter server_surface_filter = {
Craig Tillerc02c1d82015-04-07 16:21:55 -0700595 call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
596 sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800597};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800598
Craig Tiller20bc56d2015-02-12 09:02:56 -0800599static void addcq(grpc_server *server, grpc_completion_queue *cq) {
600 size_t i, n;
601 for (i = 0; i < server->cq_count; i++) {
602 if (server->cqs[i] == cq) return;
603 }
604 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800605 server->cqs = gpr_realloc(server->cqs,
606 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800607 server->cqs[n] = cq;
608}
609
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800610grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
611 grpc_channel_filter **filters,
612 size_t filter_count,
613 const grpc_channel_args *args) {
614 size_t i;
615 int census_enabled = grpc_channel_args_is_census_enabled(args);
616
617 grpc_server *server = gpr_malloc(sizeof(grpc_server));
Craig Tiller60fd3612015-03-05 16:24:22 -0800618
619 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
620
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800621 memset(server, 0, sizeof(grpc_server));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800622 if (cq) addcq(server, cq);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800623
624 gpr_mu_init(&server->mu);
Craig Tilleraec96aa2015-04-07 14:32:15 -0700625 gpr_cv_init(&server->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800626
Craig Tiller20bc56d2015-02-12 09:02:56 -0800627 server->unregistered_cq = cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800628 /* decremented by grpc_server_destroy */
629 gpr_ref_init(&server->internal_refcount, 1);
630 server->root_channel_data.next = server->root_channel_data.prev =
631 &server->root_channel_data;
632
633 /* Server filter stack is:
634
635 server_surface_filter - for making surface API calls
636 grpc_server_census_filter (optional) - for stats collection and tracing
637 {passed in filter stack}
638 grpc_connected_channel_filter - for interfacing with transports */
639 server->channel_filter_count = filter_count + 1 + census_enabled;
640 server->channel_filters =
641 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
642 server->channel_filters[0] = &server_surface_filter;
643 if (census_enabled) {
644 server->channel_filters[1] = &grpc_server_census_filter;
645 }
646 for (i = 0; i < filter_count; i++) {
647 server->channel_filters[i + 1 + census_enabled] = filters[i];
648 }
649
650 server->channel_args = grpc_channel_args_copy(args);
651
652 return server;
653}
654
Craig Tiller24be0f72015-02-10 14:04:22 -0800655static int streq(const char *a, const char *b) {
656 if (a == NULL && b == NULL) return 1;
657 if (a == NULL) return 0;
658 if (b == NULL) return 0;
659 return 0 == strcmp(a, b);
660}
661
662void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerec3257c2015-02-12 15:59:43 -0800663 const char *host,
664 grpc_completion_queue *cq_new_rpc) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800665 registered_method *m;
666 if (!method) {
667 gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
668 return NULL;
669 }
670 for (m = server->registered_methods; m; m = m->next) {
671 if (streq(m->method, method) && streq(m->host, host)) {
672 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
673 host ? host : "*");
674 return NULL;
675 }
676 }
Craig Tiller20bc56d2015-02-12 09:02:56 -0800677 addcq(server, cq_new_rpc);
Craig Tiller24be0f72015-02-10 14:04:22 -0800678 m = gpr_malloc(sizeof(registered_method));
679 memset(m, 0, sizeof(*m));
680 m->method = gpr_strdup(method);
681 m->host = gpr_strdup(host);
682 m->next = server->registered_methods;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800683 m->cq = cq_new_rpc;
Craig Tiller24be0f72015-02-10 14:04:22 -0800684 server->registered_methods = m;
685 return m;
686}
687
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800688void grpc_server_start(grpc_server *server) {
689 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800690 size_t i;
691
Craig Tillerec3257c2015-02-12 15:59:43 -0800692 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800693 for (i = 0; i < server->cq_count; i++) {
694 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
695 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800696
697 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800698 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800699 }
700}
701
702grpc_transport_setup_result grpc_server_setup_transport(
703 grpc_server *s, grpc_transport *transport,
704 grpc_channel_filter const **extra_filters, size_t num_extra_filters,
705 grpc_mdctx *mdctx) {
706 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
707 grpc_channel_filter const **filters =
708 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
709 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800710 size_t num_registered_methods;
711 size_t alloc;
712 registered_method *rm;
713 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800714 grpc_channel *channel;
715 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800716 grpc_mdstr *host;
717 grpc_mdstr *method;
718 gpr_uint32 hash;
719 gpr_uint32 slots;
720 gpr_uint32 probes;
721 gpr_uint32 max_probes = 0;
Craig Tiller5d6bd442015-02-12 22:50:38 -0800722 grpc_transport_setup_result result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800723
724 for (i = 0; i < s->channel_filter_count; i++) {
725 filters[i] = s->channel_filters[i];
726 }
727 for (; i < s->channel_filter_count + num_extra_filters; i++) {
728 filters[i] = extra_filters[i - s->channel_filter_count];
729 }
730 filters[i] = &grpc_connected_channel_filter;
731
Craig Tiller20bc56d2015-02-12 09:02:56 -0800732 for (i = 0; i < s->cq_count; i++) {
733 grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
734 }
ctillerd79b4862014-12-17 16:36:59 -0800735
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800736 channel = grpc_channel_create_from_filters(filters, num_filters,
737 s->channel_args, mdctx, 0);
738 chand = (channel_data *)grpc_channel_stack_element(
Craig Tillerc02c1d82015-04-07 16:21:55 -0700739 grpc_channel_get_channel_stack(channel), 0)
740 ->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800741 chand->server = s;
742 server_ref(s);
743 chand->channel = channel;
744
Craig Tiller04cc8be2015-02-10 16:11:22 -0800745 num_registered_methods = 0;
746 for (rm = s->registered_methods; rm; rm = rm->next) {
747 num_registered_methods++;
748 }
749 /* build a lookup table phrased in terms of mdstr's in this channels context
750 to quickly find registered methods */
751 if (num_registered_methods > 0) {
752 slots = 2 * num_registered_methods;
753 alloc = sizeof(channel_registered_method) * slots;
754 chand->registered_methods = gpr_malloc(alloc);
755 memset(chand->registered_methods, 0, alloc);
756 for (rm = s->registered_methods; rm; rm = rm->next) {
757 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
Craig Tillere76c9662015-02-11 13:18:31 -0800758 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800759 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800760 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
Craig Tillerc02c1d82015-04-07 16:21:55 -0700761 .server_registered_method != NULL;
Craig Tiller3b29b562015-02-11 12:58:46 -0800762 probes++)
763 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800764 if (probes > max_probes) max_probes = probes;
765 crm = &chand->registered_methods[(hash + probes) % slots];
766 crm->server_registered_method = rm;
767 crm->host = host;
768 crm->method = method;
769 }
770 chand->registered_method_slots = slots;
771 chand->registered_method_max_probes = max_probes;
772 }
773
Craig Tiller5d6bd442015-02-12 22:50:38 -0800774 result = grpc_connected_channel_bind_transport(
775 grpc_channel_get_channel_stack(channel), transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800776
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800777 gpr_mu_lock(&s->mu);
778 chand->next = &s->root_channel_data;
779 chand->prev = chand->next->prev;
780 chand->next->prev = chand->prev->next = chand;
781 gpr_mu_unlock(&s->mu);
782
783 gpr_free(filters);
784
Craig Tiller5d6bd442015-02-12 22:50:38 -0800785 return result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800786}
787
Craig Tilleraec96aa2015-04-07 14:32:15 -0700788static int num_listeners(grpc_server *server) {
789 listener *l;
790 int n = 0;
791 for (l = server->listeners; l; l = l->next) {
792 n++;
793 }
794 return n;
795}
796
Craig Tillerbd217572015-02-11 18:10:56 -0800797static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
Craig Tillerec3257c2015-02-12 15:59:43 -0800798 void *shutdown_tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800799 listener *l;
Craig Tiller24be0f72015-02-10 14:04:22 -0800800 requested_call_array requested_calls;
nnoble0c475f02014-12-05 15:37:39 -0800801 channel_data **channels;
802 channel_data *c;
803 size_t nchannels;
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800804 size_t i, j;
nnoble0c475f02014-12-05 15:37:39 -0800805 grpc_channel_op op;
806 grpc_channel_element *elem;
Craig Tillerbd217572015-02-11 18:10:56 -0800807 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800808
809 /* lock, and gather up some stuff to do */
810 gpr_mu_lock(&server->mu);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800811 if (have_shutdown_tag) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800812 for (i = 0; i < server->cq_count; i++) {
813 grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
814 }
Craig Tilleraea2fc02015-02-17 16:54:53 -0800815 server->shutdown_tags =
816 gpr_realloc(server->shutdown_tags,
817 sizeof(void *) * (server->num_shutdown_tags + 1));
818 server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
819 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800820 if (server->shutdown) {
821 gpr_mu_unlock(&server->mu);
822 return;
823 }
824
nnoble0c475f02014-12-05 15:37:39 -0800825 nchannels = 0;
826 for (c = server->root_channel_data.next; c != &server->root_channel_data;
827 c = c->next) {
828 nchannels++;
829 }
830 channels = gpr_malloc(sizeof(channel_data *) * nchannels);
831 i = 0;
832 for (c = server->root_channel_data.next; c != &server->root_channel_data;
833 c = c->next) {
834 grpc_channel_internal_ref(c->channel);
835 channels[i] = c;
836 i++;
837 }
838
Craig Tillerbd217572015-02-11 18:10:56 -0800839 /* collect all unregistered then registered calls */
Craig Tillercce17ac2015-01-20 09:29:28 -0800840 requested_calls = server->requested_calls;
Craig Tiller24be0f72015-02-10 14:04:22 -0800841 memset(&server->requested_calls, 0, sizeof(server->requested_calls));
Craig Tillerbd217572015-02-11 18:10:56 -0800842 for (rm = server->registered_methods; rm; rm = rm->next) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800843 if (requested_calls.count + rm->requested.count >
844 requested_calls.capacity) {
845 requested_calls.capacity =
846 GPR_MAX(requested_calls.count + rm->requested.count,
847 2 * requested_calls.capacity);
848 requested_calls.calls =
849 gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
850 requested_calls.capacity);
Craig Tillerbd217572015-02-11 18:10:56 -0800851 }
Craig Tillerec3257c2015-02-12 15:59:43 -0800852 memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
853 sizeof(*requested_calls.calls) * rm->requested.count);
Craig Tillerbd217572015-02-11 18:10:56 -0800854 requested_calls.count += rm->requested.count;
Craig Tillerec3257c2015-02-12 15:59:43 -0800855 gpr_free(rm->requested.calls);
Craig Tillerbd217572015-02-11 18:10:56 -0800856 memset(&rm->requested, 0, sizeof(rm->requested));
857 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800858
859 server->shutdown = 1;
Craig Tilleraea2fc02015-02-17 16:54:53 -0800860 if (server->lists[ALL_CALLS] == NULL) {
861 for (i = 0; i < server->num_shutdown_tags; i++) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800862 for (j = 0; j < server->cq_count; j++) {
863 grpc_cq_end_server_shutdown(server->cqs[j], server->shutdown_tags[i]);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800864 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800865 }
866 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800867 gpr_mu_unlock(&server->mu);
868
nnoble0c475f02014-12-05 15:37:39 -0800869 for (i = 0; i < nchannels; i++) {
870 c = channels[i];
871 elem = grpc_channel_stack_element(
872 grpc_channel_get_channel_stack(c->channel), 0);
873
874 op.type = GRPC_CHANNEL_GOAWAY;
875 op.dir = GRPC_CALL_DOWN;
876 op.data.goaway.status = GRPC_STATUS_OK;
877 op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
ctillerf962f522014-12-10 15:28:27 -0800878 elem->filter->channel_op(elem, NULL, &op);
nnoble0c475f02014-12-05 15:37:39 -0800879
880 grpc_channel_internal_unref(c->channel);
881 }
882 gpr_free(channels);
883
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800884 /* terminate all the requested calls */
Craig Tiller24be0f72015-02-10 14:04:22 -0800885 for (i = 0; i < requested_calls.count; i++) {
886 fail_call(server, &requested_calls.calls[i]);
Craig Tillercce17ac2015-01-20 09:29:28 -0800887 }
Craig Tiller24be0f72015-02-10 14:04:22 -0800888 gpr_free(requested_calls.calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800889
890 /* Shutdown listeners */
891 for (l = server->listeners; l; l = l->next) {
892 l->destroy(server, l->arg);
893 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800894}
895
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800896void grpc_server_shutdown(grpc_server *server) {
897 shutdown_internal(server, 0, NULL);
898}
899
900void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
901 shutdown_internal(server, 1, tag);
902}
903
Craig Tilleraec96aa2015-04-07 14:32:15 -0700904void grpc_server_listener_destroy_done(void *s) {
905 grpc_server *server = s;
906 gpr_mu_lock(&server->mu);
907 server->listeners_destroyed++;
908 gpr_cv_signal(&server->cv);
909 gpr_mu_unlock(&server->mu);
910}
911
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800912void grpc_server_destroy(grpc_server *server) {
913 channel_data *c;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700914 listener *l;
915 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800916 gpr_mu_lock(&server->mu);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800917 if (!server->shutdown) {
918 gpr_mu_unlock(&server->mu);
919 grpc_server_shutdown(server);
920 gpr_mu_lock(&server->mu);
921 }
922
Craig Tilleraec96aa2015-04-07 14:32:15 -0700923 while (server->listeners_destroyed != num_listeners(server)) {
924 for (i = 0; i < server->cq_count; i++) {
925 gpr_mu_unlock(&server->mu);
926 grpc_cq_hack_spin_pollset(server->cqs[i]);
927 gpr_mu_lock(&server->mu);
928 }
929
Craig Tillerc02c1d82015-04-07 16:21:55 -0700930 gpr_cv_wait(&server->cv, &server->mu,
931 gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
Craig Tilleraec96aa2015-04-07 14:32:15 -0700932 }
933
934 while (server->listeners) {
935 l = server->listeners;
936 server->listeners = l->next;
937 gpr_free(l);
938 }
939
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800940 for (c = server->root_channel_data.next; c != &server->root_channel_data;
941 c = c->next) {
942 shutdown_channel(c);
943 }
944 gpr_mu_unlock(&server->mu);
945
946 server_unref(server);
947}
948
949void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -0800950 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -0800951 grpc_pollset **pollsets,
952 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800953 void (*destroy)(grpc_server *server, void *arg)) {
954 listener *l = gpr_malloc(sizeof(listener));
955 l->arg = arg;
956 l->start = start;
957 l->destroy = destroy;
958 l->next = server->listeners;
959 server->listeners = l;
960}
961
Craig Tiller9f28ac22015-01-27 17:01:29 -0800962static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -0800963 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800964 call_data *calld = NULL;
Craig Tiller0ef1a922015-02-11 16:23:01 -0800965 requested_call_array *requested_calls = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800966 gpr_mu_lock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800967 if (server->shutdown) {
968 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800969 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800970 return GRPC_CALL_OK;
971 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800972 switch (rc->type) {
973 case LEGACY_CALL:
974 case BATCH_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800975 calld =
976 call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800977 requested_calls = &server->requested_calls;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800978 break;
979 case REGISTERED_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800980 calld = call_list_remove_head(
981 &rc->data.registered.registered_method->pending, PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800982 requested_calls = &rc->data.registered.registered_method->requested;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800983 break;
984 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800985 if (calld) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800986 GPR_ASSERT(calld->state == PENDING);
Craig Tillerbb3f22f2015-01-29 16:40:56 -0800987 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -0800988 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800989 begin_call(server, calld, rc);
Craig Tillercce17ac2015-01-20 09:29:28 -0800990 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800991 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800992 *requested_call_array_add(requested_calls) = *rc;
Craig Tillercce17ac2015-01-20 09:29:28 -0800993 gpr_mu_unlock(&server->mu);
994 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800995 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800996}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800997
Craig Tiller24be0f72015-02-10 14:04:22 -0800998grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
999 grpc_call_details *details,
1000 grpc_metadata_array *initial_metadata,
Craig Tiller3b29b562015-02-11 12:58:46 -08001001 grpc_completion_queue *cq_bind,
1002 void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001003 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001004 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -08001005 rc.type = BATCH_CALL;
1006 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001007 rc.data.batch.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -08001008 rc.data.batch.call = call;
1009 rc.data.batch.details = details;
1010 rc.data.batch.initial_metadata = initial_metadata;
1011 return queue_call_request(server, &rc);
1012}
1013
1014grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -08001015 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
1016 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
1017 grpc_completion_queue *cq_bind, void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001018 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001019 registered_method *registered_method = rm;
1020 grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -08001021 rc.type = REGISTERED_CALL;
1022 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001023 rc.data.registered.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -08001024 rc.data.registered.call = call;
1025 rc.data.registered.registered_method = registered_method;
1026 rc.data.registered.deadline = deadline;
1027 rc.data.registered.initial_metadata = initial_metadata;
1028 rc.data.registered.optional_payload = optional_payload;
1029 return queue_call_request(server, &rc);
1030}
1031
1032grpc_call_error grpc_server_request_call_old(grpc_server *server,
1033 void *tag_new) {
1034 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001035 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
Craig Tiller24be0f72015-02-10 14:04:22 -08001036 rc.type = LEGACY_CALL;
1037 rc.tag = tag_new;
1038 return queue_call_request(server, &rc);
1039}
1040
1041static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
1042static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1043 void *tag);
Craig Tiller3b29b562015-02-11 12:58:46 -08001044static void publish_was_not_set(grpc_call *call, grpc_op_error status,
1045 void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001046 abort();
1047}
Craig Tiller24be0f72015-02-10 14:04:22 -08001048
Craig Tiller166e2502015-02-03 20:14:41 -08001049static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1050 gpr_slice slice = value->slice;
1051 size_t len = GPR_SLICE_LENGTH(slice);
1052
1053 if (len + 1 > *capacity) {
1054 *capacity = GPR_MAX(len + 1, *capacity * 2);
1055 *dest = gpr_realloc(*dest, *capacity);
1056 }
1057 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1058}
1059
Craig Tiller24be0f72015-02-10 14:04:22 -08001060static void begin_call(grpc_server *server, call_data *calld,
1061 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001062 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001063 grpc_ioreq req[2];
1064 grpc_ioreq *r = req;
1065
1066 /* called once initial metadata has been read by the call, but BEFORE
1067 the ioreq to fetch it out of the call has been executed.
1068 This means metadata related fields can be relied on in calld, but to
1069 fill in the metadata array passed by the client, we need to perform
1070 an ioreq op, that should complete immediately. */
1071
1072 switch (rc->type) {
1073 case LEGACY_CALL:
1074 calld->legacy = gpr_malloc(sizeof(legacy_data));
1075 memset(calld->legacy, 0, sizeof(legacy_data));
1076 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1077 r->data.recv_metadata = &calld->legacy->initial_metadata;
1078 r++;
1079 publish = publish_legacy;
1080 break;
1081 case BATCH_CALL:
1082 cpstr(&rc->data.batch.details->host,
1083 &rc->data.batch.details->host_capacity, calld->host);
1084 cpstr(&rc->data.batch.details->method,
1085 &rc->data.batch.details->method_capacity, calld->path);
Craig Tiller8e8fd892015-02-10 17:02:08 -08001086 grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001087 *rc->data.batch.call = calld->call;
1088 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1089 r->data.recv_metadata = rc->data.batch.initial_metadata;
1090 r++;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001091 calld->cq_new = server->unregistered_cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001092 publish = publish_registered_or_batch;
1093 break;
1094 case REGISTERED_CALL:
1095 *rc->data.registered.deadline = calld->deadline;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001096 grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001097 *rc->data.registered.call = calld->call;
1098 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1099 r->data.recv_metadata = rc->data.registered.initial_metadata;
1100 r++;
1101 if (rc->data.registered.optional_payload) {
1102 r->op = GRPC_IOREQ_RECV_MESSAGE;
1103 r->data.recv_message = rc->data.registered.optional_payload;
1104 r++;
1105 }
Craig Tiller20bc56d2015-02-12 09:02:56 -08001106 calld->cq_new = rc->data.registered.registered_method->cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001107 publish = publish_registered_or_batch;
1108 break;
1109 }
1110
1111 grpc_call_internal_ref(calld->call);
1112 grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
1113 rc->tag);
1114}
1115
1116static void fail_call(grpc_server *server, requested_call *rc) {
1117 switch (rc->type) {
1118 case LEGACY_CALL:
Craig Tillerec3257c2015-02-12 15:59:43 -08001119 grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing,
1120 NULL, NULL, NULL, gpr_inf_past, 0, NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001121 break;
1122 case BATCH_CALL:
1123 *rc->data.batch.call = NULL;
1124 rc->data.batch.initial_metadata->count = 0;
Craig Tillerec3257c2015-02-12 15:59:43 -08001125 grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL,
1126 do_nothing, NULL, GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001127 break;
1128 case REGISTERED_CALL:
1129 *rc->data.registered.call = NULL;
1130 rc->data.registered.initial_metadata->count = 0;
Craig Tillerec3257c2015-02-12 15:59:43 -08001131 grpc_cq_end_op_complete(rc->data.registered.registered_method->cq,
1132 rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001133 break;
1134 }
1135}
1136
1137static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
1138 grpc_call_element *elem =
1139 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1140 call_data *calld = elem->call_data;
1141 channel_data *chand = elem->channel_data;
1142 grpc_server *server = chand->server;
1143
1144 if (status == GRPC_OP_OK) {
Craig Tiller20bc56d2015-02-12 09:02:56 -08001145 grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
Craig Tiller24be0f72015-02-10 14:04:22 -08001146 grpc_mdstr_as_c_string(calld->path),
1147 grpc_mdstr_as_c_string(calld->host), calld->deadline,
1148 calld->legacy->initial_metadata.count,
1149 calld->legacy->initial_metadata.metadata);
1150 } else {
1151 gpr_log(GPR_ERROR, "should never reach here");
1152 abort();
1153 }
1154}
1155
1156static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1157 void *tag) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001158 grpc_call_element *elem =
1159 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller20bc56d2015-02-12 09:02:56 -08001160 call_data *calld = elem->call_data;
1161 grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status);
Craig Tiller24be0f72015-02-10 14:04:22 -08001162}
1163
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001164const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1165 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001166}