blob: 01644b447186f7a5b5f616d013c507b92a864a81 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
39#include "src/core/channel/census_filter.h"
40#include "src/core/channel/channel_args.h"
41#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/surface/call.h"
45#include "src/core/surface/channel.h"
46#include "src/core/surface/completion_queue.h"
Craig Tiller60fd3612015-03-05 16:24:22 -080047#include "src/core/surface/init.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080048#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051#include <grpc/support/useful.h>
52
53typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
54
55typedef struct listener {
56 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080057 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
58 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080059 void (*destroy)(grpc_server *server, void *arg);
60 struct listener *next;
61} listener;
62
63typedef struct call_data call_data;
64typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080065typedef struct registered_method registered_method;
66
67typedef struct {
68 call_data *next;
69 call_data *prev;
70} call_link;
71
Craig Tiller0e919562015-04-28 14:03:47 -070072typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
Craig Tiller24be0f72015-02-10 14:04:22 -080073
74typedef struct {
75 requested_call_type type;
76 void *tag;
77 union {
78 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080079 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080080 grpc_call **call;
81 grpc_call_details *details;
82 grpc_metadata_array *initial_metadata;
83 } batch;
84 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080085 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080086 grpc_call **call;
87 registered_method *registered_method;
88 gpr_timespec *deadline;
89 grpc_metadata_array *initial_metadata;
90 grpc_byte_buffer **optional_payload;
91 } registered;
92 } data;
93} requested_call;
94
95typedef struct {
96 requested_call *calls;
97 size_t count;
98 size_t capacity;
99} requested_call_array;
100
101struct registered_method {
102 char *method;
103 char *host;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800104 call_data *pending;
Craig Tiller24be0f72015-02-10 14:04:22 -0800105 requested_call_array requested;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800106 grpc_completion_queue *cq;
Craig Tiller24be0f72015-02-10 14:04:22 -0800107 registered_method *next;
108};
109
110typedef struct channel_registered_method {
111 registered_method *server_registered_method;
112 grpc_mdstr *method;
113 grpc_mdstr *host;
114} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800115
116struct channel_data {
117 grpc_server *server;
118 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800119 grpc_mdstr *path_key;
120 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800121 /* linked list of all channels on a server */
122 channel_data *next;
123 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800124 channel_registered_method *registered_methods;
125 gpr_uint32 registered_method_slots;
126 gpr_uint32 registered_method_max_probes;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800127};
128
129struct grpc_server {
130 size_t channel_filter_count;
131 const grpc_channel_filter **channel_filters;
132 grpc_channel_args *channel_args;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800133 grpc_completion_queue *unregistered_cq;
Craig Tillerec3257c2015-02-12 15:59:43 -0800134
Craig Tiller20bc56d2015-02-12 09:02:56 -0800135 grpc_completion_queue **cqs;
136 grpc_pollset **pollsets;
137 size_t cq_count;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800138
139 gpr_mu mu;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700140 gpr_cv cv;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800141
Craig Tiller24be0f72015-02-10 14:04:22 -0800142 registered_method *registered_methods;
143 requested_call_array requested_calls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800144
145 gpr_uint8 shutdown;
Craig Tilleraea2fc02015-02-17 16:54:53 -0800146 size_t num_shutdown_tags;
147 void **shutdown_tags;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800148
149 call_data *lists[CALL_LIST_COUNT];
150 channel_data root_channel_data;
151
152 listener *listeners;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700153 int listeners_destroyed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800154 gpr_refcount internal_refcount;
155};
156
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800157typedef enum {
158 /* waiting for metadata */
159 NOT_STARTED,
160 /* inital metadata read, not flow controlled in yet */
161 PENDING,
162 /* flow controlled in, on completion queue */
163 ACTIVATED,
164 /* cancelled before being queued */
165 ZOMBIED
166} call_state;
167
168struct call_data {
169 grpc_call *call;
170
171 call_state state;
Craig Tillercce17ac2015-01-20 09:29:28 -0800172 grpc_mdstr *path;
173 grpc_mdstr *host;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700174 gpr_timespec deadline;
175 int got_initial_metadata;
Craig Tillercce17ac2015-01-20 09:29:28 -0800176
Craig Tiller20bc56d2015-02-12 09:02:56 -0800177 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800178
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700179 grpc_stream_op_buffer *recv_ops;
180 grpc_stream_state *recv_state;
181 void (*on_done_recv)(void *user_data, int success);
182 void *recv_user_data;
183
Craig Tiller04cc8be2015-02-10 16:11:22 -0800184 call_data **root[CALL_LIST_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800185 call_link links[CALL_LIST_COUNT];
186};
187
188#define SERVER_FROM_CALL_ELEM(elem) \
189 (((channel_data *)(elem)->channel_data)->server)
190
191static void do_nothing(void *unused, grpc_op_error ignored) {}
192
Craig Tiller24be0f72015-02-10 14:04:22 -0800193static void begin_call(grpc_server *server, call_data *calld,
194 requested_call *rc);
195static void fail_call(grpc_server *server, requested_call *rc);
196
Craig Tiller3b29b562015-02-11 12:58:46 -0800197static int call_list_join(call_data **root, call_data *call, call_list list) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800198 GPR_ASSERT(!call->root[list]);
199 call->root[list] = root;
200 if (!*root) {
201 *root = call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202 call->links[list].next = call->links[list].prev = call;
203 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800204 call->links[list].next = *root;
205 call->links[list].prev = (*root)->links[list].prev;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800206 call->links[list].next->links[list].prev =
207 call->links[list].prev->links[list].next = call;
208 }
209 return 1;
210}
211
Craig Tiller04cc8be2015-02-10 16:11:22 -0800212static call_data *call_list_remove_head(call_data **root, call_list list) {
213 call_data *out = *root;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800214 if (out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800215 out->root[list] = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800216 if (out->links[list].next == out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800217 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800218 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800219 *root = out->links[list].next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800220 out->links[list].next->links[list].prev = out->links[list].prev;
221 out->links[list].prev->links[list].next = out->links[list].next;
222 }
223 }
224 return out;
225}
226
Craig Tiller04cc8be2015-02-10 16:11:22 -0800227static int call_list_remove(call_data *call, call_list list) {
228 call_data **root = call->root[list];
229 if (root == NULL) return 0;
230 call->root[list] = NULL;
231 if (*root == call) {
232 *root = call->links[list].next;
233 if (*root == call) {
234 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800235 return 1;
236 }
237 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800238 GPR_ASSERT(*root != call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800239 call->links[list].next->links[list].prev = call->links[list].prev;
240 call->links[list].prev->links[list].next = call->links[list].next;
241 return 1;
242}
243
Craig Tiller24be0f72015-02-10 14:04:22 -0800244static void requested_call_array_destroy(requested_call_array *array) {
245 gpr_free(array->calls);
246}
247
248static requested_call *requested_call_array_add(requested_call_array *array) {
249 requested_call *rc;
250 if (array->count == array->capacity) {
251 array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
252 array->calls =
253 gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
254 }
255 rc = &array->calls[array->count++];
256 memset(rc, 0, sizeof(*rc));
257 return rc;
258}
259
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800260static void server_ref(grpc_server *server) {
261 gpr_ref(&server->internal_refcount);
262}
263
264static void server_unref(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800265 registered_method *rm;
Craig Tiller89504612015-04-27 11:48:46 -0700266 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800267 if (gpr_unref(&server->internal_refcount)) {
268 grpc_channel_args_destroy(server->channel_args);
269 gpr_mu_destroy(&server->mu);
Craig Tilleraec96aa2015-04-07 14:32:15 -0700270 gpr_cv_destroy(&server->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800271 gpr_free(server->channel_filters);
Craig Tiller24be0f72015-02-10 14:04:22 -0800272 requested_call_array_destroy(&server->requested_calls);
Craig Tillerec3257c2015-02-12 15:59:43 -0800273 while ((rm = server->registered_methods) != NULL) {
274 server->registered_methods = rm->next;
275 gpr_free(rm->method);
276 gpr_free(rm->host);
277 requested_call_array_destroy(&rm->requested);
278 gpr_free(rm);
279 }
Craig Tiller89504612015-04-27 11:48:46 -0700280 for (i = 0; i < server->cq_count; i++) {
281 grpc_cq_internal_unref(server->cqs[i]);
282 }
Craig Tillerec3257c2015-02-12 15:59:43 -0800283 gpr_free(server->cqs);
284 gpr_free(server->pollsets);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800285 gpr_free(server->shutdown_tags);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800286 gpr_free(server);
287 }
288}
289
290static int is_channel_orphaned(channel_data *chand) {
291 return chand->next == chand;
292}
293
294static void orphan_channel(channel_data *chand) {
295 chand->next->prev = chand->prev;
296 chand->prev->next = chand->next;
297 chand->next = chand->prev = chand;
298}
299
ctiller58393c22015-01-07 14:03:30 -0800300static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800301 channel_data *chand = cd;
302 grpc_server *server = chand->server;
Craig Tillerd75fe662015-02-21 07:30:49 -0800303 grpc_channel_internal_unref(chand->channel);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800304 server_unref(server);
305}
306
307static void destroy_channel(channel_data *chand) {
308 if (is_channel_orphaned(chand)) return;
309 GPR_ASSERT(chand->server != NULL);
310 orphan_channel(chand);
311 server_ref(chand->server);
ctiller18b49ab2014-12-09 14:39:16 -0800312 grpc_iomgr_add_callback(finish_destroy_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313}
314
Craig Tiller3b29b562015-02-11 12:58:46 -0800315static void finish_start_new_rpc_and_unlock(grpc_server *server,
316 grpc_call_element *elem,
317 call_data **pending_root,
318 requested_call_array *array) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800319 requested_call rc;
320 call_data *calld = elem->call_data;
321 if (array->count == 0) {
322 calld->state = PENDING;
323 call_list_join(pending_root, calld, PENDING_START);
324 gpr_mu_unlock(&server->mu);
325 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800326 rc = array->calls[--array->count];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800327 calld->state = ACTIVATED;
328 gpr_mu_unlock(&server->mu);
329 begin_call(server, calld, &rc);
330 }
331}
332
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800333static void start_new_rpc(grpc_call_element *elem) {
334 channel_data *chand = elem->channel_data;
335 call_data *calld = elem->call_data;
336 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800337 gpr_uint32 i;
338 gpr_uint32 hash;
339 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800340
341 gpr_mu_lock(&server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800342 if (chand->registered_methods && calld->path && calld->host) {
Craig Tillera94beff2015-02-17 22:02:06 -0800343 /* TODO(ctiller): unify these two searches */
Craig Tiller04cc8be2015-02-10 16:11:22 -0800344 /* check for an exact match with host */
345 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
346 for (i = 0; i < chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800347 rm = &chand->registered_methods[(hash + i) %
348 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800349 if (!rm) break;
350 if (rm->host != calld->host) continue;
351 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800352 finish_start_new_rpc_and_unlock(server, elem,
353 &rm->server_registered_method->pending,
354 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800355 return;
356 }
357 /* check for a wildcard method definition (no host set) */
358 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800359 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800360 rm = &chand->registered_methods[(hash + i) %
361 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800362 if (!rm) break;
363 if (rm->host != NULL) continue;
364 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800365 finish_start_new_rpc_and_unlock(server, elem,
366 &rm->server_registered_method->pending,
367 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800368 return;
369 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800370 }
Craig Tiller3b29b562015-02-11 12:58:46 -0800371 finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
372 &server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800373}
374
ctiller58393c22015-01-07 14:03:30 -0800375static void kill_zombie(void *elem, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800376 grpc_call_destroy(grpc_call_from_top_element(elem));
377}
378
Craig Tiller6902ad22015-04-16 08:01:49 -0700379static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
380 grpc_call_element *elem = user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800381 channel_data *chand = elem->channel_data;
382 call_data *calld = elem->call_data;
Craig Tiller6902ad22015-04-16 08:01:49 -0700383 if (md->key == chand->path_key) {
384 calld->path = grpc_mdstr_ref(md->value);
385 return NULL;
386 } else if (md->key == chand->authority_key) {
387 calld->host = grpc_mdstr_ref(md->value);
388 return NULL;
389 }
390 return md;
391}
392
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700393static void server_on_recv(void *ptr, int success) {
394 grpc_call_element *elem = ptr;
Craig Tiller6902ad22015-04-16 08:01:49 -0700395 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700396 channel_data *chand = elem->channel_data;
397
398 if (success && !calld->got_initial_metadata) {
399 size_t i;
400 size_t nops = calld->recv_ops->nops;
401 grpc_stream_op *ops = calld->recv_ops->ops;
402 for (i = 0; i < nops; i++) {
403 grpc_stream_op *op = &ops[i];
404 if (op->type != GRPC_OP_METADATA) continue;
Craig Tiller205aee12015-04-16 14:46:41 -0700405 grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700406 if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
Craig Tiller6902ad22015-04-16 08:01:49 -0700407 calld->deadline = op->data.metadata.deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800408 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700409 calld->got_initial_metadata = 1;
410 start_new_rpc(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800411 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700412 }
413 }
414
415 switch (*calld->recv_state) {
Craig Tiller06aeea72015-04-23 10:54:45 -0700416 case GRPC_STREAM_OPEN:
417 break;
418 case GRPC_STREAM_SEND_CLOSED:
419 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700420 case GRPC_STREAM_RECV_CLOSED:
421 gpr_mu_lock(&chand->server->mu);
422 if (calld->state == NOT_STARTED) {
423 calld->state = ZOMBIED;
424 grpc_iomgr_add_callback(kill_zombie, elem);
425 }
426 gpr_mu_unlock(&chand->server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800427 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700428 case GRPC_STREAM_CLOSED:
429 gpr_mu_lock(&chand->server->mu);
430 if (calld->state == NOT_STARTED) {
431 calld->state = ZOMBIED;
432 grpc_iomgr_add_callback(kill_zombie, elem);
433 } else if (calld->state == PENDING) {
434 call_list_remove(calld, PENDING_START);
435 }
436 gpr_mu_unlock(&chand->server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800437 break;
438 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700439
440 calld->on_done_recv(calld->recv_user_data, success);
441}
442
Craig Tiller50d9db52015-04-23 10:52:14 -0700443static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700444 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700445
446 if (op->recv_ops) {
447 /* substitute our callback for the higher callback */
448 calld->recv_ops = op->recv_ops;
449 calld->recv_state = op->recv_state;
450 calld->on_done_recv = op->on_done_recv;
451 calld->recv_user_data = op->recv_user_data;
452 op->on_done_recv = server_on_recv;
453 op->recv_user_data = elem;
454 }
Craig Tiller50d9db52015-04-23 10:52:14 -0700455}
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700456
Craig Tiller06aeea72015-04-23 10:54:45 -0700457static void server_start_transport_op(grpc_call_element *elem,
458 grpc_transport_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -0700459 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
460 server_mutate_op(elem, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700461 grpc_call_next_op(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800462}
463
ctillerf962f522014-12-10 15:28:27 -0800464static void channel_op(grpc_channel_element *elem,
465 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800466 channel_data *chand = elem->channel_data;
Craig Tiller8b976d02015-02-05 21:41:23 -0800467 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800468
469 switch (op->type) {
470 case GRPC_ACCEPT_CALL:
471 /* create a call */
Craig Tillerfb189f82015-02-03 12:07:07 -0800472 grpc_call_create(chand->channel, NULL,
Craig Tiller87d5b192015-04-16 14:37:57 -0700473 op->data.accept_call.transport_server_data, NULL, 0,
474 gpr_inf_future);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800475 break;
476 case GRPC_TRANSPORT_CLOSED:
477 /* if the transport is closed for a server channel, we destroy the
478 channel */
Craig Tiller8b976d02015-02-05 21:41:23 -0800479 gpr_mu_lock(&server->mu);
480 server_ref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800481 destroy_channel(chand);
Craig Tiller8b976d02015-02-05 21:41:23 -0800482 gpr_mu_unlock(&server->mu);
483 server_unref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800484 break;
nnoble0c475f02014-12-05 15:37:39 -0800485 case GRPC_TRANSPORT_GOAWAY:
486 gpr_slice_unref(op->data.goaway.message);
487 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800488 default:
489 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
490 grpc_channel_next_op(elem, op);
491 break;
492 }
493}
494
ctiller58393c22015-01-07 14:03:30 -0800495static void finish_shutdown_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800496 channel_data *chand = cd;
497 grpc_channel_op op;
nnoble0c475f02014-12-05 15:37:39 -0800498 op.type = GRPC_CHANNEL_DISCONNECT;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800499 op.dir = GRPC_CALL_DOWN;
500 channel_op(grpc_channel_stack_element(
501 grpc_channel_get_channel_stack(chand->channel), 0),
ctillerf962f522014-12-10 15:28:27 -0800502 NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800503 grpc_channel_internal_unref(chand->channel);
504}
505
506static void shutdown_channel(channel_data *chand) {
507 grpc_channel_internal_ref(chand->channel);
ctiller18b49ab2014-12-09 14:39:16 -0800508 grpc_iomgr_add_callback(finish_shutdown_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800509}
510
511static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700512 const void *server_transport_data,
513 grpc_transport_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800514 call_data *calld = elem->call_data;
515 channel_data *chand = elem->channel_data;
516 memset(calld, 0, sizeof(call_data));
517 calld->deadline = gpr_inf_future;
518 calld->call = grpc_call_from_top_element(elem);
519
520 gpr_mu_lock(&chand->server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800521 call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800522 gpr_mu_unlock(&chand->server->mu);
523
524 server_ref(chand->server);
Craig Tiller50d9db52015-04-23 10:52:14 -0700525
Craig Tiller482ef8b2015-04-23 11:38:20 -0700526 if (initial_op) server_mutate_op(elem, initial_op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800527}
528
529static void destroy_call_elem(grpc_call_element *elem) {
530 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800531 call_data *calld = elem->call_data;
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800532 size_t i, j;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800533
534 gpr_mu_lock(&chand->server->mu);
535 for (i = 0; i < CALL_LIST_COUNT; i++) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800536 call_list_remove(elem->call_data, i);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800537 }
Craig Tilleraea2fc02015-02-17 16:54:53 -0800538 if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
539 for (i = 0; i < chand->server->num_shutdown_tags; i++) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800540 for (j = 0; j < chand->server->cq_count; j++) {
541 grpc_cq_end_server_shutdown(chand->server->cqs[j],
542 chand->server->shutdown_tags[i]);
543 }
Craig Tiller20bc56d2015-02-12 09:02:56 -0800544 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800545 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800546 gpr_mu_unlock(&chand->server->mu);
547
Craig Tiller4df31a62015-01-30 09:44:31 -0800548 if (calld->host) {
549 grpc_mdstr_unref(calld->host);
550 }
551 if (calld->path) {
552 grpc_mdstr_unref(calld->path);
553 }
554
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800555 server_unref(chand->server);
556}
557
558static void init_channel_elem(grpc_channel_element *elem,
559 const grpc_channel_args *args,
560 grpc_mdctx *metadata_context, int is_first,
561 int is_last) {
562 channel_data *chand = elem->channel_data;
563 GPR_ASSERT(is_first);
564 GPR_ASSERT(!is_last);
565 chand->server = NULL;
566 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800567 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
568 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800569 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800570 chand->registered_methods = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800571}
572
573static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800574 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800575 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800576 if (chand->registered_methods) {
577 for (i = 0; i < chand->registered_method_slots; i++) {
578 if (chand->registered_methods[i].method) {
579 grpc_mdstr_unref(chand->registered_methods[i].method);
580 }
581 if (chand->registered_methods[i].host) {
582 grpc_mdstr_unref(chand->registered_methods[i].host);
583 }
584 }
585 gpr_free(chand->registered_methods);
586 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800587 if (chand->server) {
588 gpr_mu_lock(&chand->server->mu);
589 chand->next->prev = chand->prev;
590 chand->prev->next = chand->next;
591 chand->next = chand->prev = chand;
592 gpr_mu_unlock(&chand->server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800593 grpc_mdstr_unref(chand->path_key);
594 grpc_mdstr_unref(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800595 server_unref(chand->server);
596 }
597}
598
599static const grpc_channel_filter server_surface_filter = {
Craig Tiller06aeea72015-04-23 10:54:45 -0700600 server_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
601 destroy_call_elem, sizeof(channel_data), init_channel_elem,
602 destroy_channel_elem, "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800603};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800604
Craig Tiller20bc56d2015-02-12 09:02:56 -0800605static void addcq(grpc_server *server, grpc_completion_queue *cq) {
606 size_t i, n;
607 for (i = 0; i < server->cq_count; i++) {
608 if (server->cqs[i] == cq) return;
609 }
Craig Tiller89504612015-04-27 11:48:46 -0700610 grpc_cq_internal_ref(cq);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800611 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800612 server->cqs = gpr_realloc(server->cqs,
613 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800614 server->cqs[n] = cq;
615}
616
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800617grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
618 grpc_channel_filter **filters,
619 size_t filter_count,
620 const grpc_channel_args *args) {
621 size_t i;
622 int census_enabled = grpc_channel_args_is_census_enabled(args);
623
624 grpc_server *server = gpr_malloc(sizeof(grpc_server));
Craig Tiller60fd3612015-03-05 16:24:22 -0800625
626 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
627
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800628 memset(server, 0, sizeof(grpc_server));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800629 if (cq) addcq(server, cq);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800630
631 gpr_mu_init(&server->mu);
Craig Tilleraec96aa2015-04-07 14:32:15 -0700632 gpr_cv_init(&server->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800633
Craig Tiller20bc56d2015-02-12 09:02:56 -0800634 server->unregistered_cq = cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800635 /* decremented by grpc_server_destroy */
636 gpr_ref_init(&server->internal_refcount, 1);
637 server->root_channel_data.next = server->root_channel_data.prev =
638 &server->root_channel_data;
639
640 /* Server filter stack is:
641
642 server_surface_filter - for making surface API calls
643 grpc_server_census_filter (optional) - for stats collection and tracing
644 {passed in filter stack}
645 grpc_connected_channel_filter - for interfacing with transports */
646 server->channel_filter_count = filter_count + 1 + census_enabled;
647 server->channel_filters =
648 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
649 server->channel_filters[0] = &server_surface_filter;
650 if (census_enabled) {
651 server->channel_filters[1] = &grpc_server_census_filter;
652 }
653 for (i = 0; i < filter_count; i++) {
654 server->channel_filters[i + 1 + census_enabled] = filters[i];
655 }
656
657 server->channel_args = grpc_channel_args_copy(args);
658
659 return server;
660}
661
Craig Tiller24be0f72015-02-10 14:04:22 -0800662static int streq(const char *a, const char *b) {
663 if (a == NULL && b == NULL) return 1;
664 if (a == NULL) return 0;
665 if (b == NULL) return 0;
666 return 0 == strcmp(a, b);
667}
668
669void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerec3257c2015-02-12 15:59:43 -0800670 const char *host,
671 grpc_completion_queue *cq_new_rpc) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800672 registered_method *m;
673 if (!method) {
674 gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
675 return NULL;
676 }
677 for (m = server->registered_methods; m; m = m->next) {
678 if (streq(m->method, method) && streq(m->host, host)) {
679 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
680 host ? host : "*");
681 return NULL;
682 }
683 }
Craig Tiller20bc56d2015-02-12 09:02:56 -0800684 addcq(server, cq_new_rpc);
Craig Tiller24be0f72015-02-10 14:04:22 -0800685 m = gpr_malloc(sizeof(registered_method));
686 memset(m, 0, sizeof(*m));
687 m->method = gpr_strdup(method);
688 m->host = gpr_strdup(host);
689 m->next = server->registered_methods;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800690 m->cq = cq_new_rpc;
Craig Tiller24be0f72015-02-10 14:04:22 -0800691 server->registered_methods = m;
692 return m;
693}
694
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800695void grpc_server_start(grpc_server *server) {
696 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800697 size_t i;
698
Craig Tillerec3257c2015-02-12 15:59:43 -0800699 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800700 for (i = 0; i < server->cq_count; i++) {
701 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
702 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800703
704 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800705 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800706 }
707}
708
709grpc_transport_setup_result grpc_server_setup_transport(
710 grpc_server *s, grpc_transport *transport,
711 grpc_channel_filter const **extra_filters, size_t num_extra_filters,
712 grpc_mdctx *mdctx) {
713 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
714 grpc_channel_filter const **filters =
715 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
716 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800717 size_t num_registered_methods;
718 size_t alloc;
719 registered_method *rm;
720 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800721 grpc_channel *channel;
722 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800723 grpc_mdstr *host;
724 grpc_mdstr *method;
725 gpr_uint32 hash;
726 gpr_uint32 slots;
727 gpr_uint32 probes;
728 gpr_uint32 max_probes = 0;
Craig Tiller5d6bd442015-02-12 22:50:38 -0800729 grpc_transport_setup_result result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800730
731 for (i = 0; i < s->channel_filter_count; i++) {
732 filters[i] = s->channel_filters[i];
733 }
734 for (; i < s->channel_filter_count + num_extra_filters; i++) {
735 filters[i] = extra_filters[i - s->channel_filter_count];
736 }
737 filters[i] = &grpc_connected_channel_filter;
738
Craig Tiller20bc56d2015-02-12 09:02:56 -0800739 for (i = 0; i < s->cq_count; i++) {
740 grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
741 }
ctillerd79b4862014-12-17 16:36:59 -0800742
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800743 channel = grpc_channel_create_from_filters(filters, num_filters,
744 s->channel_args, mdctx, 0);
745 chand = (channel_data *)grpc_channel_stack_element(
Craig Tillerc02c1d82015-04-07 16:21:55 -0700746 grpc_channel_get_channel_stack(channel), 0)
747 ->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800748 chand->server = s;
749 server_ref(s);
750 chand->channel = channel;
751
Craig Tiller04cc8be2015-02-10 16:11:22 -0800752 num_registered_methods = 0;
753 for (rm = s->registered_methods; rm; rm = rm->next) {
754 num_registered_methods++;
755 }
756 /* build a lookup table phrased in terms of mdstr's in this channels context
757 to quickly find registered methods */
758 if (num_registered_methods > 0) {
759 slots = 2 * num_registered_methods;
760 alloc = sizeof(channel_registered_method) * slots;
761 chand->registered_methods = gpr_malloc(alloc);
762 memset(chand->registered_methods, 0, alloc);
763 for (rm = s->registered_methods; rm; rm = rm->next) {
764 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
Craig Tillere76c9662015-02-11 13:18:31 -0800765 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800766 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800767 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
Craig Tillerc02c1d82015-04-07 16:21:55 -0700768 .server_registered_method != NULL;
Craig Tiller3b29b562015-02-11 12:58:46 -0800769 probes++)
770 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800771 if (probes > max_probes) max_probes = probes;
772 crm = &chand->registered_methods[(hash + probes) % slots];
773 crm->server_registered_method = rm;
774 crm->host = host;
775 crm->method = method;
776 }
777 chand->registered_method_slots = slots;
778 chand->registered_method_max_probes = max_probes;
779 }
780
Craig Tiller5d6bd442015-02-12 22:50:38 -0800781 result = grpc_connected_channel_bind_transport(
782 grpc_channel_get_channel_stack(channel), transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800783
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800784 gpr_mu_lock(&s->mu);
785 chand->next = &s->root_channel_data;
786 chand->prev = chand->next->prev;
787 chand->next->prev = chand->prev->next = chand;
788 gpr_mu_unlock(&s->mu);
789
790 gpr_free(filters);
791
Craig Tiller5d6bd442015-02-12 22:50:38 -0800792 return result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800793}
794
Craig Tilleraec96aa2015-04-07 14:32:15 -0700795static int num_listeners(grpc_server *server) {
796 listener *l;
797 int n = 0;
798 for (l = server->listeners; l; l = l->next) {
799 n++;
800 }
801 return n;
802}
803
Craig Tillerbd217572015-02-11 18:10:56 -0800804static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
Craig Tillerec3257c2015-02-12 15:59:43 -0800805 void *shutdown_tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800806 listener *l;
Craig Tiller24be0f72015-02-10 14:04:22 -0800807 requested_call_array requested_calls;
nnoble0c475f02014-12-05 15:37:39 -0800808 channel_data **channels;
809 channel_data *c;
810 size_t nchannels;
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800811 size_t i, j;
nnoble0c475f02014-12-05 15:37:39 -0800812 grpc_channel_op op;
813 grpc_channel_element *elem;
Craig Tillerbd217572015-02-11 18:10:56 -0800814 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800815
816 /* lock, and gather up some stuff to do */
817 gpr_mu_lock(&server->mu);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800818 if (have_shutdown_tag) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800819 for (i = 0; i < server->cq_count; i++) {
820 grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
821 }
Craig Tilleraea2fc02015-02-17 16:54:53 -0800822 server->shutdown_tags =
823 gpr_realloc(server->shutdown_tags,
824 sizeof(void *) * (server->num_shutdown_tags + 1));
825 server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
826 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800827 if (server->shutdown) {
828 gpr_mu_unlock(&server->mu);
829 return;
830 }
831
nnoble0c475f02014-12-05 15:37:39 -0800832 nchannels = 0;
833 for (c = server->root_channel_data.next; c != &server->root_channel_data;
834 c = c->next) {
835 nchannels++;
836 }
837 channels = gpr_malloc(sizeof(channel_data *) * nchannels);
838 i = 0;
839 for (c = server->root_channel_data.next; c != &server->root_channel_data;
840 c = c->next) {
841 grpc_channel_internal_ref(c->channel);
842 channels[i] = c;
843 i++;
844 }
845
Craig Tillerbd217572015-02-11 18:10:56 -0800846 /* collect all unregistered then registered calls */
Craig Tillercce17ac2015-01-20 09:29:28 -0800847 requested_calls = server->requested_calls;
Craig Tiller24be0f72015-02-10 14:04:22 -0800848 memset(&server->requested_calls, 0, sizeof(server->requested_calls));
Craig Tillerbd217572015-02-11 18:10:56 -0800849 for (rm = server->registered_methods; rm; rm = rm->next) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800850 if (requested_calls.count + rm->requested.count >
851 requested_calls.capacity) {
852 requested_calls.capacity =
853 GPR_MAX(requested_calls.count + rm->requested.count,
854 2 * requested_calls.capacity);
855 requested_calls.calls =
856 gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
857 requested_calls.capacity);
Craig Tillerbd217572015-02-11 18:10:56 -0800858 }
Craig Tillerec3257c2015-02-12 15:59:43 -0800859 memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
860 sizeof(*requested_calls.calls) * rm->requested.count);
Craig Tillerbd217572015-02-11 18:10:56 -0800861 requested_calls.count += rm->requested.count;
Craig Tillerec3257c2015-02-12 15:59:43 -0800862 gpr_free(rm->requested.calls);
Craig Tillerbd217572015-02-11 18:10:56 -0800863 memset(&rm->requested, 0, sizeof(rm->requested));
864 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800865
866 server->shutdown = 1;
Craig Tilleraea2fc02015-02-17 16:54:53 -0800867 if (server->lists[ALL_CALLS] == NULL) {
868 for (i = 0; i < server->num_shutdown_tags; i++) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800869 for (j = 0; j < server->cq_count; j++) {
870 grpc_cq_end_server_shutdown(server->cqs[j], server->shutdown_tags[i]);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800871 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800872 }
873 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800874 gpr_mu_unlock(&server->mu);
875
nnoble0c475f02014-12-05 15:37:39 -0800876 for (i = 0; i < nchannels; i++) {
877 c = channels[i];
878 elem = grpc_channel_stack_element(
879 grpc_channel_get_channel_stack(c->channel), 0);
880
881 op.type = GRPC_CHANNEL_GOAWAY;
882 op.dir = GRPC_CALL_DOWN;
883 op.data.goaway.status = GRPC_STATUS_OK;
884 op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
ctillerf962f522014-12-10 15:28:27 -0800885 elem->filter->channel_op(elem, NULL, &op);
nnoble0c475f02014-12-05 15:37:39 -0800886
887 grpc_channel_internal_unref(c->channel);
888 }
889 gpr_free(channels);
890
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800891 /* terminate all the requested calls */
Craig Tiller24be0f72015-02-10 14:04:22 -0800892 for (i = 0; i < requested_calls.count; i++) {
893 fail_call(server, &requested_calls.calls[i]);
Craig Tillercce17ac2015-01-20 09:29:28 -0800894 }
Craig Tiller24be0f72015-02-10 14:04:22 -0800895 gpr_free(requested_calls.calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800896
897 /* Shutdown listeners */
898 for (l = server->listeners; l; l = l->next) {
899 l->destroy(server, l->arg);
900 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800901}
902
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800903void grpc_server_shutdown(grpc_server *server) {
904 shutdown_internal(server, 0, NULL);
905}
906
907void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
908 shutdown_internal(server, 1, tag);
909}
910
Craig Tilleraec96aa2015-04-07 14:32:15 -0700911void grpc_server_listener_destroy_done(void *s) {
912 grpc_server *server = s;
913 gpr_mu_lock(&server->mu);
914 server->listeners_destroyed++;
915 gpr_cv_signal(&server->cv);
916 gpr_mu_unlock(&server->mu);
917}
918
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800919void grpc_server_destroy(grpc_server *server) {
920 channel_data *c;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700921 listener *l;
922 size_t i;
Craig Tiller872af022015-04-24 15:57:52 -0700923 call_data *calld;
924
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800925 gpr_mu_lock(&server->mu);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800926 if (!server->shutdown) {
927 gpr_mu_unlock(&server->mu);
928 grpc_server_shutdown(server);
929 gpr_mu_lock(&server->mu);
930 }
931
Craig Tilleraec96aa2015-04-07 14:32:15 -0700932 while (server->listeners_destroyed != num_listeners(server)) {
933 for (i = 0; i < server->cq_count; i++) {
934 gpr_mu_unlock(&server->mu);
935 grpc_cq_hack_spin_pollset(server->cqs[i]);
936 gpr_mu_lock(&server->mu);
937 }
938
Craig Tillerc02c1d82015-04-07 16:21:55 -0700939 gpr_cv_wait(&server->cv, &server->mu,
940 gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
Craig Tilleraec96aa2015-04-07 14:32:15 -0700941 }
942
943 while (server->listeners) {
944 l = server->listeners;
945 server->listeners = l->next;
946 gpr_free(l);
947 }
948
Craig Tiller872af022015-04-24 15:57:52 -0700949 while ((calld = call_list_remove_head(&server->lists[PENDING_START],
950 PENDING_START)) != NULL) {
951 gpr_log(GPR_DEBUG, "server destroys call %p", calld->call);
952 calld->state = ZOMBIED;
953 grpc_iomgr_add_callback(
954 kill_zombie,
955 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
956 }
957
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800958 for (c = server->root_channel_data.next; c != &server->root_channel_data;
959 c = c->next) {
960 shutdown_channel(c);
961 }
962 gpr_mu_unlock(&server->mu);
963
964 server_unref(server);
965}
966
967void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -0800968 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -0800969 grpc_pollset **pollsets,
970 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800971 void (*destroy)(grpc_server *server, void *arg)) {
972 listener *l = gpr_malloc(sizeof(listener));
973 l->arg = arg;
974 l->start = start;
975 l->destroy = destroy;
976 l->next = server->listeners;
977 server->listeners = l;
978}
979
Craig Tiller9f28ac22015-01-27 17:01:29 -0800980static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -0800981 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800982 call_data *calld = NULL;
Craig Tiller0ef1a922015-02-11 16:23:01 -0800983 requested_call_array *requested_calls = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800984 gpr_mu_lock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800985 if (server->shutdown) {
986 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800987 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800988 return GRPC_CALL_OK;
989 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800990 switch (rc->type) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800991 case BATCH_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800992 calld =
993 call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800994 requested_calls = &server->requested_calls;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800995 break;
996 case REGISTERED_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800997 calld = call_list_remove_head(
998 &rc->data.registered.registered_method->pending, PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800999 requested_calls = &rc->data.registered.registered_method->requested;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001000 break;
1001 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001002 if (calld) {
Craig Tillercce17ac2015-01-20 09:29:28 -08001003 GPR_ASSERT(calld->state == PENDING);
Craig Tillerbb3f22f2015-01-29 16:40:56 -08001004 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -08001005 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -08001006 begin_call(server, calld, rc);
Craig Tillercce17ac2015-01-20 09:29:28 -08001007 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001008 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -08001009 *requested_call_array_add(requested_calls) = *rc;
Craig Tillercce17ac2015-01-20 09:29:28 -08001010 gpr_mu_unlock(&server->mu);
1011 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001012 }
Craig Tillercce17ac2015-01-20 09:29:28 -08001013}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001014
Craig Tiller24be0f72015-02-10 14:04:22 -08001015grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
1016 grpc_call_details *details,
1017 grpc_metadata_array *initial_metadata,
Craig Tiller3b29b562015-02-11 12:58:46 -08001018 grpc_completion_queue *cq_bind,
1019 void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001020 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001021 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -08001022 rc.type = BATCH_CALL;
1023 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001024 rc.data.batch.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -08001025 rc.data.batch.call = call;
1026 rc.data.batch.details = details;
1027 rc.data.batch.initial_metadata = initial_metadata;
1028 return queue_call_request(server, &rc);
1029}
1030
1031grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -08001032 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
1033 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
1034 grpc_completion_queue *cq_bind, void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001035 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001036 registered_method *registered_method = rm;
1037 grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -08001038 rc.type = REGISTERED_CALL;
1039 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001040 rc.data.registered.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -08001041 rc.data.registered.call = call;
1042 rc.data.registered.registered_method = registered_method;
1043 rc.data.registered.deadline = deadline;
1044 rc.data.registered.initial_metadata = initial_metadata;
1045 rc.data.registered.optional_payload = optional_payload;
1046 return queue_call_request(server, &rc);
1047}
1048
Craig Tiller24be0f72015-02-10 14:04:22 -08001049static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1050 void *tag);
Craig Tiller3b29b562015-02-11 12:58:46 -08001051static void publish_was_not_set(grpc_call *call, grpc_op_error status,
1052 void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001053 abort();
1054}
Craig Tiller24be0f72015-02-10 14:04:22 -08001055
Craig Tiller166e2502015-02-03 20:14:41 -08001056static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1057 gpr_slice slice = value->slice;
1058 size_t len = GPR_SLICE_LENGTH(slice);
1059
1060 if (len + 1 > *capacity) {
1061 *capacity = GPR_MAX(len + 1, *capacity * 2);
1062 *dest = gpr_realloc(*dest, *capacity);
1063 }
1064 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1065}
1066
Craig Tiller24be0f72015-02-10 14:04:22 -08001067static void begin_call(grpc_server *server, call_data *calld,
1068 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001069 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001070 grpc_ioreq req[2];
1071 grpc_ioreq *r = req;
1072
1073 /* called once initial metadata has been read by the call, but BEFORE
1074 the ioreq to fetch it out of the call has been executed.
1075 This means metadata related fields can be relied on in calld, but to
1076 fill in the metadata array passed by the client, we need to perform
1077 an ioreq op, that should complete immediately. */
1078
1079 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001080 case BATCH_CALL:
1081 cpstr(&rc->data.batch.details->host,
1082 &rc->data.batch.details->host_capacity, calld->host);
1083 cpstr(&rc->data.batch.details->method,
1084 &rc->data.batch.details->method_capacity, calld->path);
Masood Malekghassemibf177c82015-04-27 12:14:38 -07001085 rc->data.batch.details->deadline = calld->deadline;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001086 grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001087 *rc->data.batch.call = calld->call;
1088 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1089 r->data.recv_metadata = rc->data.batch.initial_metadata;
1090 r++;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001091 calld->cq_new = server->unregistered_cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001092 publish = publish_registered_or_batch;
1093 break;
1094 case REGISTERED_CALL:
1095 *rc->data.registered.deadline = calld->deadline;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001096 grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001097 *rc->data.registered.call = calld->call;
1098 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1099 r->data.recv_metadata = rc->data.registered.initial_metadata;
1100 r++;
1101 if (rc->data.registered.optional_payload) {
1102 r->op = GRPC_IOREQ_RECV_MESSAGE;
1103 r->data.recv_message = rc->data.registered.optional_payload;
1104 r++;
1105 }
Craig Tiller20bc56d2015-02-12 09:02:56 -08001106 calld->cq_new = rc->data.registered.registered_method->cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001107 publish = publish_registered_or_batch;
1108 break;
1109 }
1110
Craig Tiller4df412b2015-04-28 07:57:54 -07001111 GRPC_CALL_INTERNAL_REF(calld->call, "server");
Craig Tiller24be0f72015-02-10 14:04:22 -08001112 grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
1113 rc->tag);
1114}
1115
1116static void fail_call(grpc_server *server, requested_call *rc) {
1117 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001118 case BATCH_CALL:
1119 *rc->data.batch.call = NULL;
1120 rc->data.batch.initial_metadata->count = 0;
Craig Tiller900e4512015-04-29 14:17:10 -07001121 grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
1122 GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001123 break;
1124 case REGISTERED_CALL:
1125 *rc->data.registered.call = NULL;
1126 rc->data.registered.initial_metadata->count = 0;
Craig Tiller900e4512015-04-29 14:17:10 -07001127 grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL,
1128 do_nothing, NULL, GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001129 break;
1130 }
1131}
1132
Craig Tiller24be0f72015-02-10 14:04:22 -08001133static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1134 void *tag) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001135 grpc_call_element *elem =
1136 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller20bc56d2015-02-12 09:02:56 -08001137 call_data *calld = elem->call_data;
Craig Tiller900e4512015-04-29 14:17:10 -07001138 grpc_cq_end_op(calld->cq_new, tag, call, do_nothing, NULL, status);
Craig Tiller24be0f72015-02-10 14:04:22 -08001139}
1140
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001141const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1142 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001143}