blob: 0bf7f8331fbbfb94afda9cfd387e17f25b14e047 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
39#include "src/core/channel/census_filter.h"
40#include "src/core/channel/channel_args.h"
41#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/surface/call.h"
45#include "src/core/surface/channel.h"
46#include "src/core/surface/completion_queue.h"
Craig Tiller60fd3612015-03-05 16:24:22 -080047#include "src/core/surface/init.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080048#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051#include <grpc/support/useful.h>
52
53typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
54
55typedef struct listener {
56 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080057 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
58 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080059 void (*destroy)(grpc_server *server, void *arg);
60 struct listener *next;
61} listener;
62
63typedef struct call_data call_data;
64typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080065typedef struct registered_method registered_method;
66
67typedef struct {
68 call_data *next;
69 call_data *prev;
70} call_link;
71
Craig Tiller0e919562015-04-28 14:03:47 -070072typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
Craig Tiller24be0f72015-02-10 14:04:22 -080073
74typedef struct {
75 requested_call_type type;
76 void *tag;
Craig Tillerf9e6adf2015-05-06 11:45:59 -070077 grpc_completion_queue *cq_bound_to_call;
78 grpc_completion_queue *cq_for_notification;
79 grpc_call **call;
Craig Tiller24be0f72015-02-10 14:04:22 -080080 union {
81 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080082 grpc_call_details *details;
83 grpc_metadata_array *initial_metadata;
84 } batch;
85 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080086 registered_method *registered_method;
87 gpr_timespec *deadline;
88 grpc_metadata_array *initial_metadata;
89 grpc_byte_buffer **optional_payload;
90 } registered;
91 } data;
92} requested_call;
93
94typedef struct {
95 requested_call *calls;
96 size_t count;
97 size_t capacity;
98} requested_call_array;
99
100struct registered_method {
101 char *method;
102 char *host;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800103 call_data *pending;
Craig Tiller24be0f72015-02-10 14:04:22 -0800104 requested_call_array requested;
105 registered_method *next;
106};
107
108typedef struct channel_registered_method {
109 registered_method *server_registered_method;
110 grpc_mdstr *method;
111 grpc_mdstr *host;
112} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800113
114struct channel_data {
115 grpc_server *server;
116 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800117 grpc_mdstr *path_key;
118 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800119 /* linked list of all channels on a server */
120 channel_data *next;
121 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800122 channel_registered_method *registered_methods;
123 gpr_uint32 registered_method_slots;
124 gpr_uint32 registered_method_max_probes;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800125};
126
Craig Tillerbce999f2015-05-27 09:55:51 -0700127typedef struct shutdown_tag {
128 void *tag;
129 grpc_completion_queue *cq;
130} shutdown_tag;
131
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800132struct grpc_server {
133 size_t channel_filter_count;
134 const grpc_channel_filter **channel_filters;
135 grpc_channel_args *channel_args;
Craig Tillerec3257c2015-02-12 15:59:43 -0800136
Craig Tiller20bc56d2015-02-12 09:02:56 -0800137 grpc_completion_queue **cqs;
138 grpc_pollset **pollsets;
139 size_t cq_count;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800140
141 gpr_mu mu;
142
Craig Tiller24be0f72015-02-10 14:04:22 -0800143 registered_method *registered_methods;
144 requested_call_array requested_calls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800145
146 gpr_uint8 shutdown;
Craig Tilleraea2fc02015-02-17 16:54:53 -0800147 size_t num_shutdown_tags;
Craig Tillerbce999f2015-05-27 09:55:51 -0700148 shutdown_tag *shutdown_tags;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800149
150 call_data *lists[CALL_LIST_COUNT];
151 channel_data root_channel_data;
152
153 listener *listeners;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700154 int listeners_destroyed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800155 gpr_refcount internal_refcount;
156};
157
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800158typedef enum {
159 /* waiting for metadata */
160 NOT_STARTED,
161 /* inital metadata read, not flow controlled in yet */
162 PENDING,
163 /* flow controlled in, on completion queue */
164 ACTIVATED,
165 /* cancelled before being queued */
166 ZOMBIED
167} call_state;
168
169struct call_data {
170 grpc_call *call;
171
172 call_state state;
Craig Tillercce17ac2015-01-20 09:29:28 -0800173 grpc_mdstr *path;
174 grpc_mdstr *host;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700175 gpr_timespec deadline;
176 int got_initial_metadata;
Craig Tillercce17ac2015-01-20 09:29:28 -0800177
Craig Tiller20bc56d2015-02-12 09:02:56 -0800178 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800179
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700180 grpc_stream_op_buffer *recv_ops;
181 grpc_stream_state *recv_state;
182 void (*on_done_recv)(void *user_data, int success);
183 void *recv_user_data;
184
Craig Tiller04cc8be2015-02-10 16:11:22 -0800185 call_data **root[CALL_LIST_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800186 call_link links[CALL_LIST_COUNT];
187};
188
189#define SERVER_FROM_CALL_ELEM(elem) \
190 (((channel_data *)(elem)->channel_data)->server)
191
Craig Tiller24be0f72015-02-10 14:04:22 -0800192static void begin_call(grpc_server *server, call_data *calld,
193 requested_call *rc);
194static void fail_call(grpc_server *server, requested_call *rc);
195
Craig Tiller3b29b562015-02-11 12:58:46 -0800196static int call_list_join(call_data **root, call_data *call, call_list list) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800197 GPR_ASSERT(!call->root[list]);
198 call->root[list] = root;
199 if (!*root) {
200 *root = call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800201 call->links[list].next = call->links[list].prev = call;
202 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800203 call->links[list].next = *root;
204 call->links[list].prev = (*root)->links[list].prev;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800205 call->links[list].next->links[list].prev =
206 call->links[list].prev->links[list].next = call;
207 }
208 return 1;
209}
210
Craig Tiller04cc8be2015-02-10 16:11:22 -0800211static call_data *call_list_remove_head(call_data **root, call_list list) {
212 call_data *out = *root;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213 if (out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800214 out->root[list] = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800215 if (out->links[list].next == out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800216 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800217 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800218 *root = out->links[list].next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800219 out->links[list].next->links[list].prev = out->links[list].prev;
220 out->links[list].prev->links[list].next = out->links[list].next;
221 }
222 }
223 return out;
224}
225
Craig Tiller04cc8be2015-02-10 16:11:22 -0800226static int call_list_remove(call_data *call, call_list list) {
227 call_data **root = call->root[list];
228 if (root == NULL) return 0;
229 call->root[list] = NULL;
230 if (*root == call) {
231 *root = call->links[list].next;
232 if (*root == call) {
233 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800234 return 1;
235 }
236 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800237 GPR_ASSERT(*root != call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800238 call->links[list].next->links[list].prev = call->links[list].prev;
239 call->links[list].prev->links[list].next = call->links[list].next;
240 return 1;
241}
242
Craig Tiller24be0f72015-02-10 14:04:22 -0800243static void requested_call_array_destroy(requested_call_array *array) {
244 gpr_free(array->calls);
245}
246
247static requested_call *requested_call_array_add(requested_call_array *array) {
248 requested_call *rc;
249 if (array->count == array->capacity) {
250 array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
251 array->calls =
252 gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
253 }
254 rc = &array->calls[array->count++];
255 memset(rc, 0, sizeof(*rc));
256 return rc;
257}
258
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800259static void server_ref(grpc_server *server) {
260 gpr_ref(&server->internal_refcount);
261}
262
Craig Tilleree945e82015-05-26 16:15:34 -0700263static void server_delete(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800264 registered_method *rm;
Craig Tiller89504612015-04-27 11:48:46 -0700265 size_t i;
Craig Tilleree945e82015-05-26 16:15:34 -0700266 grpc_channel_args_destroy(server->channel_args);
267 gpr_mu_destroy(&server->mu);
268 gpr_free(server->channel_filters);
269 requested_call_array_destroy(&server->requested_calls);
270 while ((rm = server->registered_methods) != NULL) {
271 server->registered_methods = rm->next;
272 gpr_free(rm->method);
273 gpr_free(rm->host);
274 requested_call_array_destroy(&rm->requested);
275 gpr_free(rm);
276 }
277 for (i = 0; i < server->cq_count; i++) {
278 grpc_cq_internal_unref(server->cqs[i]);
279 }
280 gpr_free(server->cqs);
281 gpr_free(server->pollsets);
282 gpr_free(server->shutdown_tags);
283 gpr_free(server);
284}
285
286static void server_unref(grpc_server *server) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800287 if (gpr_unref(&server->internal_refcount)) {
Craig Tilleree945e82015-05-26 16:15:34 -0700288 server_delete(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800289 }
290}
291
292static int is_channel_orphaned(channel_data *chand) {
293 return chand->next == chand;
294}
295
296static void orphan_channel(channel_data *chand) {
297 chand->next->prev = chand->prev;
298 chand->prev->next = chand->next;
299 chand->next = chand->prev = chand;
300}
301
ctiller58393c22015-01-07 14:03:30 -0800302static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800303 channel_data *chand = cd;
304 grpc_server *server = chand->server;
Craig Tillerd75fe662015-02-21 07:30:49 -0800305 grpc_channel_internal_unref(chand->channel);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800306 server_unref(server);
307}
308
309static void destroy_channel(channel_data *chand) {
310 if (is_channel_orphaned(chand)) return;
311 GPR_ASSERT(chand->server != NULL);
312 orphan_channel(chand);
313 server_ref(chand->server);
ctiller18b49ab2014-12-09 14:39:16 -0800314 grpc_iomgr_add_callback(finish_destroy_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800315}
316
Craig Tiller3b29b562015-02-11 12:58:46 -0800317static void finish_start_new_rpc_and_unlock(grpc_server *server,
318 grpc_call_element *elem,
319 call_data **pending_root,
320 requested_call_array *array) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800321 requested_call rc;
322 call_data *calld = elem->call_data;
323 if (array->count == 0) {
324 calld->state = PENDING;
325 call_list_join(pending_root, calld, PENDING_START);
326 gpr_mu_unlock(&server->mu);
327 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800328 rc = array->calls[--array->count];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800329 calld->state = ACTIVATED;
330 gpr_mu_unlock(&server->mu);
331 begin_call(server, calld, &rc);
332 }
333}
334
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800335static void start_new_rpc(grpc_call_element *elem) {
336 channel_data *chand = elem->channel_data;
337 call_data *calld = elem->call_data;
338 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800339 gpr_uint32 i;
340 gpr_uint32 hash;
341 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800342
343 gpr_mu_lock(&server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800344 if (chand->registered_methods && calld->path && calld->host) {
Craig Tillera94beff2015-02-17 22:02:06 -0800345 /* TODO(ctiller): unify these two searches */
Craig Tiller04cc8be2015-02-10 16:11:22 -0800346 /* check for an exact match with host */
347 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
348 for (i = 0; i < chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800349 rm = &chand->registered_methods[(hash + i) %
350 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800351 if (!rm) break;
352 if (rm->host != calld->host) continue;
353 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800354 finish_start_new_rpc_and_unlock(server, elem,
355 &rm->server_registered_method->pending,
356 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800357 return;
358 }
359 /* check for a wildcard method definition (no host set) */
360 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800361 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800362 rm = &chand->registered_methods[(hash + i) %
363 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800364 if (!rm) break;
365 if (rm->host != NULL) continue;
366 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800367 finish_start_new_rpc_and_unlock(server, elem,
368 &rm->server_registered_method->pending,
369 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800370 return;
371 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800372 }
Craig Tiller3b29b562015-02-11 12:58:46 -0800373 finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
374 &server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800375}
376
ctiller58393c22015-01-07 14:03:30 -0800377static void kill_zombie(void *elem, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800378 grpc_call_destroy(grpc_call_from_top_element(elem));
379}
380
Craig Tilleree945e82015-05-26 16:15:34 -0700381static int num_listeners(grpc_server *server) {
382 listener *l;
383 int n = 0;
384 for (l = server->listeners; l; l = l->next) {
385 n++;
386 }
387 return n;
388}
389
Craig Tillerdc627722015-05-26 15:27:02 -0700390static void maybe_finish_shutdown(grpc_server *server) {
Craig Tillerbce999f2015-05-27 09:55:51 -0700391 size_t i;
Craig Tilleree945e82015-05-26 16:15:34 -0700392 if (server->shutdown && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) {
393 for (i = 0; i < server->num_shutdown_tags; i++) {
Craig Tillerbce999f2015-05-27 09:55:51 -0700394 grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
395 NULL, 1);
Craig Tillerdc627722015-05-26 15:27:02 -0700396 }
397 }
398}
399
Craig Tiller6902ad22015-04-16 08:01:49 -0700400static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
401 grpc_call_element *elem = user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800402 channel_data *chand = elem->channel_data;
403 call_data *calld = elem->call_data;
Craig Tiller6902ad22015-04-16 08:01:49 -0700404 if (md->key == chand->path_key) {
405 calld->path = grpc_mdstr_ref(md->value);
406 return NULL;
407 } else if (md->key == chand->authority_key) {
408 calld->host = grpc_mdstr_ref(md->value);
409 return NULL;
410 }
411 return md;
412}
413
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700414static void server_on_recv(void *ptr, int success) {
415 grpc_call_element *elem = ptr;
Craig Tiller6902ad22015-04-16 08:01:49 -0700416 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700417 channel_data *chand = elem->channel_data;
418
419 if (success && !calld->got_initial_metadata) {
420 size_t i;
421 size_t nops = calld->recv_ops->nops;
422 grpc_stream_op *ops = calld->recv_ops->ops;
423 for (i = 0; i < nops; i++) {
424 grpc_stream_op *op = &ops[i];
425 if (op->type != GRPC_OP_METADATA) continue;
Craig Tiller205aee12015-04-16 14:46:41 -0700426 grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700427 if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
Craig Tiller6902ad22015-04-16 08:01:49 -0700428 calld->deadline = op->data.metadata.deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800429 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700430 calld->got_initial_metadata = 1;
431 start_new_rpc(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800432 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700433 }
434 }
435
436 switch (*calld->recv_state) {
Craig Tiller06aeea72015-04-23 10:54:45 -0700437 case GRPC_STREAM_OPEN:
438 break;
439 case GRPC_STREAM_SEND_CLOSED:
440 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700441 case GRPC_STREAM_RECV_CLOSED:
442 gpr_mu_lock(&chand->server->mu);
443 if (calld->state == NOT_STARTED) {
444 calld->state = ZOMBIED;
445 grpc_iomgr_add_callback(kill_zombie, elem);
446 }
447 gpr_mu_unlock(&chand->server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800448 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700449 case GRPC_STREAM_CLOSED:
450 gpr_mu_lock(&chand->server->mu);
451 if (calld->state == NOT_STARTED) {
452 calld->state = ZOMBIED;
453 grpc_iomgr_add_callback(kill_zombie, elem);
454 } else if (calld->state == PENDING) {
455 call_list_remove(calld, PENDING_START);
Craig Tillerc9d03822015-05-20 16:08:45 -0700456 calld->state = ZOMBIED;
457 grpc_iomgr_add_callback(kill_zombie, elem);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700458 }
Craig Tillerdc627722015-05-26 15:27:02 -0700459 call_list_remove(calld, ALL_CALLS);
460 maybe_finish_shutdown(chand->server);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700461 gpr_mu_unlock(&chand->server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800462 break;
463 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700464
465 calld->on_done_recv(calld->recv_user_data, success);
466}
467
Craig Tiller50d9db52015-04-23 10:52:14 -0700468static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700469 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700470
471 if (op->recv_ops) {
472 /* substitute our callback for the higher callback */
473 calld->recv_ops = op->recv_ops;
474 calld->recv_state = op->recv_state;
475 calld->on_done_recv = op->on_done_recv;
476 calld->recv_user_data = op->recv_user_data;
477 op->on_done_recv = server_on_recv;
478 op->recv_user_data = elem;
479 }
Craig Tiller50d9db52015-04-23 10:52:14 -0700480}
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700481
Craig Tiller06aeea72015-04-23 10:54:45 -0700482static void server_start_transport_op(grpc_call_element *elem,
483 grpc_transport_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -0700484 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
485 server_mutate_op(elem, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700486 grpc_call_next_op(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800487}
488
ctillerf962f522014-12-10 15:28:27 -0800489static void channel_op(grpc_channel_element *elem,
490 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800491 channel_data *chand = elem->channel_data;
Craig Tiller8b976d02015-02-05 21:41:23 -0800492 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800493
494 switch (op->type) {
495 case GRPC_ACCEPT_CALL:
496 /* create a call */
Craig Tillerfb189f82015-02-03 12:07:07 -0800497 grpc_call_create(chand->channel, NULL,
Craig Tiller87d5b192015-04-16 14:37:57 -0700498 op->data.accept_call.transport_server_data, NULL, 0,
499 gpr_inf_future);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800500 break;
501 case GRPC_TRANSPORT_CLOSED:
502 /* if the transport is closed for a server channel, we destroy the
503 channel */
Craig Tiller8b976d02015-02-05 21:41:23 -0800504 gpr_mu_lock(&server->mu);
505 server_ref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800506 destroy_channel(chand);
Craig Tiller8b976d02015-02-05 21:41:23 -0800507 gpr_mu_unlock(&server->mu);
508 server_unref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800509 break;
nnoble0c475f02014-12-05 15:37:39 -0800510 case GRPC_TRANSPORT_GOAWAY:
511 gpr_slice_unref(op->data.goaway.message);
512 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800513 default:
514 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
515 grpc_channel_next_op(elem, op);
516 break;
517 }
518}
519
ctiller58393c22015-01-07 14:03:30 -0800520static void finish_shutdown_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800521 channel_data *chand = cd;
522 grpc_channel_op op;
nnoble0c475f02014-12-05 15:37:39 -0800523 op.type = GRPC_CHANNEL_DISCONNECT;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800524 op.dir = GRPC_CALL_DOWN;
525 channel_op(grpc_channel_stack_element(
526 grpc_channel_get_channel_stack(chand->channel), 0),
ctillerf962f522014-12-10 15:28:27 -0800527 NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800528 grpc_channel_internal_unref(chand->channel);
529}
530
531static void shutdown_channel(channel_data *chand) {
532 grpc_channel_internal_ref(chand->channel);
ctiller18b49ab2014-12-09 14:39:16 -0800533 grpc_iomgr_add_callback(finish_shutdown_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800534}
535
536static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700537 const void *server_transport_data,
538 grpc_transport_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800539 call_data *calld = elem->call_data;
540 channel_data *chand = elem->channel_data;
541 memset(calld, 0, sizeof(call_data));
542 calld->deadline = gpr_inf_future;
543 calld->call = grpc_call_from_top_element(elem);
544
545 gpr_mu_lock(&chand->server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800546 call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800547 gpr_mu_unlock(&chand->server->mu);
548
549 server_ref(chand->server);
Craig Tiller50d9db52015-04-23 10:52:14 -0700550
Craig Tiller482ef8b2015-04-23 11:38:20 -0700551 if (initial_op) server_mutate_op(elem, initial_op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800552}
553
554static void destroy_call_elem(grpc_call_element *elem) {
555 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800556 call_data *calld = elem->call_data;
Craig Tilleree945e82015-05-26 16:15:34 -0700557 int removed[CALL_LIST_COUNT];
Craig Tillerdc627722015-05-26 15:27:02 -0700558 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800559
560 gpr_mu_lock(&chand->server->mu);
561 for (i = 0; i < CALL_LIST_COUNT; i++) {
Craig Tilleree945e82015-05-26 16:15:34 -0700562 removed[i] = call_list_remove(elem->call_data, i);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800563 }
Craig Tilleree945e82015-05-26 16:15:34 -0700564 if (removed[ALL_CALLS]) {
565 maybe_finish_shutdown(chand->server);
566 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800567 gpr_mu_unlock(&chand->server->mu);
568
Craig Tiller4df31a62015-01-30 09:44:31 -0800569 if (calld->host) {
570 grpc_mdstr_unref(calld->host);
571 }
572 if (calld->path) {
573 grpc_mdstr_unref(calld->path);
574 }
575
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800576 server_unref(chand->server);
577}
578
579static void init_channel_elem(grpc_channel_element *elem,
580 const grpc_channel_args *args,
581 grpc_mdctx *metadata_context, int is_first,
582 int is_last) {
583 channel_data *chand = elem->channel_data;
584 GPR_ASSERT(is_first);
585 GPR_ASSERT(!is_last);
586 chand->server = NULL;
587 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800588 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
589 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800590 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800591 chand->registered_methods = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800592}
593
594static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800595 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800596 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800597 if (chand->registered_methods) {
598 for (i = 0; i < chand->registered_method_slots; i++) {
599 if (chand->registered_methods[i].method) {
600 grpc_mdstr_unref(chand->registered_methods[i].method);
601 }
602 if (chand->registered_methods[i].host) {
603 grpc_mdstr_unref(chand->registered_methods[i].host);
604 }
605 }
606 gpr_free(chand->registered_methods);
607 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800608 if (chand->server) {
609 gpr_mu_lock(&chand->server->mu);
610 chand->next->prev = chand->prev;
611 chand->prev->next = chand->next;
612 chand->next = chand->prev = chand;
613 gpr_mu_unlock(&chand->server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800614 grpc_mdstr_unref(chand->path_key);
615 grpc_mdstr_unref(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800616 server_unref(chand->server);
617 }
618}
619
620static const grpc_channel_filter server_surface_filter = {
Craig Tiller06aeea72015-04-23 10:54:45 -0700621 server_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
622 destroy_call_elem, sizeof(channel_data), init_channel_elem,
623 destroy_channel_elem, "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800624};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800625
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700626void grpc_server_register_completion_queue(grpc_server *server,
627 grpc_completion_queue *cq) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800628 size_t i, n;
629 for (i = 0; i < server->cq_count; i++) {
630 if (server->cqs[i] == cq) return;
631 }
Craig Tiller89504612015-04-27 11:48:46 -0700632 grpc_cq_internal_ref(cq);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800633 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800634 server->cqs = gpr_realloc(server->cqs,
635 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800636 server->cqs[n] = cq;
637}
638
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700639grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800640 size_t filter_count,
641 const grpc_channel_args *args) {
642 size_t i;
643 int census_enabled = grpc_channel_args_is_census_enabled(args);
644
645 grpc_server *server = gpr_malloc(sizeof(grpc_server));
Craig Tiller60fd3612015-03-05 16:24:22 -0800646
647 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
648
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800649 memset(server, 0, sizeof(grpc_server));
650
651 gpr_mu_init(&server->mu);
652
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800653 /* decremented by grpc_server_destroy */
654 gpr_ref_init(&server->internal_refcount, 1);
655 server->root_channel_data.next = server->root_channel_data.prev =
656 &server->root_channel_data;
657
658 /* Server filter stack is:
659
660 server_surface_filter - for making surface API calls
661 grpc_server_census_filter (optional) - for stats collection and tracing
662 {passed in filter stack}
663 grpc_connected_channel_filter - for interfacing with transports */
664 server->channel_filter_count = filter_count + 1 + census_enabled;
665 server->channel_filters =
666 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
667 server->channel_filters[0] = &server_surface_filter;
668 if (census_enabled) {
669 server->channel_filters[1] = &grpc_server_census_filter;
670 }
671 for (i = 0; i < filter_count; i++) {
672 server->channel_filters[i + 1 + census_enabled] = filters[i];
673 }
674
675 server->channel_args = grpc_channel_args_copy(args);
676
677 return server;
678}
679
Craig Tiller24be0f72015-02-10 14:04:22 -0800680static int streq(const char *a, const char *b) {
681 if (a == NULL && b == NULL) return 1;
682 if (a == NULL) return 0;
683 if (b == NULL) return 0;
684 return 0 == strcmp(a, b);
685}
686
687void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700688 const char *host) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800689 registered_method *m;
690 if (!method) {
Craig Tiller35696192015-05-24 15:00:37 -0700691 gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL");
Craig Tiller24be0f72015-02-10 14:04:22 -0800692 return NULL;
693 }
694 for (m = server->registered_methods; m; m = m->next) {
695 if (streq(m->method, method) && streq(m->host, host)) {
696 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
697 host ? host : "*");
698 return NULL;
699 }
700 }
701 m = gpr_malloc(sizeof(registered_method));
702 memset(m, 0, sizeof(*m));
703 m->method = gpr_strdup(method);
704 m->host = gpr_strdup(host);
705 m->next = server->registered_methods;
706 server->registered_methods = m;
707 return m;
708}
709
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800710void grpc_server_start(grpc_server *server) {
711 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800712 size_t i;
713
Craig Tillerec3257c2015-02-12 15:59:43 -0800714 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800715 for (i = 0; i < server->cq_count; i++) {
716 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
717 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800718
719 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800720 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800721 }
722}
723
724grpc_transport_setup_result grpc_server_setup_transport(
725 grpc_server *s, grpc_transport *transport,
726 grpc_channel_filter const **extra_filters, size_t num_extra_filters,
727 grpc_mdctx *mdctx) {
728 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
729 grpc_channel_filter const **filters =
730 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
731 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800732 size_t num_registered_methods;
733 size_t alloc;
734 registered_method *rm;
735 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800736 grpc_channel *channel;
737 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800738 grpc_mdstr *host;
739 grpc_mdstr *method;
740 gpr_uint32 hash;
741 gpr_uint32 slots;
742 gpr_uint32 probes;
743 gpr_uint32 max_probes = 0;
Craig Tiller5d6bd442015-02-12 22:50:38 -0800744 grpc_transport_setup_result result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800745
746 for (i = 0; i < s->channel_filter_count; i++) {
747 filters[i] = s->channel_filters[i];
748 }
749 for (; i < s->channel_filter_count + num_extra_filters; i++) {
750 filters[i] = extra_filters[i - s->channel_filter_count];
751 }
752 filters[i] = &grpc_connected_channel_filter;
753
Craig Tiller20bc56d2015-02-12 09:02:56 -0800754 for (i = 0; i < s->cq_count; i++) {
755 grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
756 }
ctillerd79b4862014-12-17 16:36:59 -0800757
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800758 channel = grpc_channel_create_from_filters(filters, num_filters,
759 s->channel_args, mdctx, 0);
760 chand = (channel_data *)grpc_channel_stack_element(
Craig Tillerc02c1d82015-04-07 16:21:55 -0700761 grpc_channel_get_channel_stack(channel), 0)
762 ->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800763 chand->server = s;
764 server_ref(s);
765 chand->channel = channel;
766
Craig Tiller04cc8be2015-02-10 16:11:22 -0800767 num_registered_methods = 0;
768 for (rm = s->registered_methods; rm; rm = rm->next) {
769 num_registered_methods++;
770 }
771 /* build a lookup table phrased in terms of mdstr's in this channels context
772 to quickly find registered methods */
773 if (num_registered_methods > 0) {
774 slots = 2 * num_registered_methods;
775 alloc = sizeof(channel_registered_method) * slots;
776 chand->registered_methods = gpr_malloc(alloc);
777 memset(chand->registered_methods, 0, alloc);
778 for (rm = s->registered_methods; rm; rm = rm->next) {
779 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
Craig Tillere76c9662015-02-11 13:18:31 -0800780 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800781 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800782 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
Craig Tillerc02c1d82015-04-07 16:21:55 -0700783 .server_registered_method != NULL;
Craig Tiller3b29b562015-02-11 12:58:46 -0800784 probes++)
785 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800786 if (probes > max_probes) max_probes = probes;
787 crm = &chand->registered_methods[(hash + probes) % slots];
788 crm->server_registered_method = rm;
789 crm->host = host;
790 crm->method = method;
791 }
792 chand->registered_method_slots = slots;
793 chand->registered_method_max_probes = max_probes;
794 }
795
Craig Tiller5d6bd442015-02-12 22:50:38 -0800796 result = grpc_connected_channel_bind_transport(
797 grpc_channel_get_channel_stack(channel), transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800798
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800799 gpr_mu_lock(&s->mu);
800 chand->next = &s->root_channel_data;
801 chand->prev = chand->next->prev;
802 chand->next->prev = chand->prev->next = chand;
803 gpr_mu_unlock(&s->mu);
804
805 gpr_free(filters);
806
Craig Tiller5d6bd442015-02-12 22:50:38 -0800807 return result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800808}
809
Craig Tillerbce999f2015-05-27 09:55:51 -0700810void grpc_server_shutdown_and_notify(grpc_server *server,
811 grpc_completion_queue *cq, void *tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800812 listener *l;
Craig Tiller24be0f72015-02-10 14:04:22 -0800813 requested_call_array requested_calls;
nnoble0c475f02014-12-05 15:37:39 -0800814 channel_data **channels;
815 channel_data *c;
816 size_t nchannels;
Craig Tillerdc627722015-05-26 15:27:02 -0700817 size_t i;
nnoble0c475f02014-12-05 15:37:39 -0800818 grpc_channel_op op;
819 grpc_channel_element *elem;
Craig Tillerbd217572015-02-11 18:10:56 -0800820 registered_method *rm;
Craig Tillerbce999f2015-05-27 09:55:51 -0700821 shutdown_tag *sdt;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800822
823 /* lock, and gather up some stuff to do */
824 gpr_mu_lock(&server->mu);
Craig Tilleree945e82015-05-26 16:15:34 -0700825 for (i = 0; i < server->cq_count; i++) {
826 grpc_cq_begin_op(server->cqs[i], NULL);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800827 }
Craig Tilleree945e82015-05-26 16:15:34 -0700828 server->shutdown_tags =
829 gpr_realloc(server->shutdown_tags,
830 sizeof(void *) * (server->num_shutdown_tags + 1));
Craig Tillerbce999f2015-05-27 09:55:51 -0700831 sdt = &server->shutdown_tags[server->num_shutdown_tags++];
832 sdt->tag = tag;
833 sdt->cq = cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800834 if (server->shutdown) {
835 gpr_mu_unlock(&server->mu);
836 return;
837 }
838
nnoble0c475f02014-12-05 15:37:39 -0800839 nchannels = 0;
840 for (c = server->root_channel_data.next; c != &server->root_channel_data;
841 c = c->next) {
842 nchannels++;
843 }
844 channels = gpr_malloc(sizeof(channel_data *) * nchannels);
845 i = 0;
846 for (c = server->root_channel_data.next; c != &server->root_channel_data;
847 c = c->next) {
848 grpc_channel_internal_ref(c->channel);
849 channels[i] = c;
850 i++;
851 }
852
Craig Tillerbd217572015-02-11 18:10:56 -0800853 /* collect all unregistered then registered calls */
Craig Tillercce17ac2015-01-20 09:29:28 -0800854 requested_calls = server->requested_calls;
Craig Tiller24be0f72015-02-10 14:04:22 -0800855 memset(&server->requested_calls, 0, sizeof(server->requested_calls));
Craig Tillerbd217572015-02-11 18:10:56 -0800856 for (rm = server->registered_methods; rm; rm = rm->next) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800857 if (requested_calls.count + rm->requested.count >
858 requested_calls.capacity) {
859 requested_calls.capacity =
860 GPR_MAX(requested_calls.count + rm->requested.count,
861 2 * requested_calls.capacity);
862 requested_calls.calls =
863 gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
864 requested_calls.capacity);
Craig Tillerbd217572015-02-11 18:10:56 -0800865 }
Craig Tillerec3257c2015-02-12 15:59:43 -0800866 memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
867 sizeof(*requested_calls.calls) * rm->requested.count);
Craig Tillerbd217572015-02-11 18:10:56 -0800868 requested_calls.count += rm->requested.count;
Craig Tillerec3257c2015-02-12 15:59:43 -0800869 gpr_free(rm->requested.calls);
Craig Tillerbd217572015-02-11 18:10:56 -0800870 memset(&rm->requested, 0, sizeof(rm->requested));
871 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800872
873 server->shutdown = 1;
Craig Tillerdc627722015-05-26 15:27:02 -0700874 maybe_finish_shutdown(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800875 gpr_mu_unlock(&server->mu);
876
nnoble0c475f02014-12-05 15:37:39 -0800877 for (i = 0; i < nchannels; i++) {
878 c = channels[i];
879 elem = grpc_channel_stack_element(
880 grpc_channel_get_channel_stack(c->channel), 0);
881
882 op.type = GRPC_CHANNEL_GOAWAY;
883 op.dir = GRPC_CALL_DOWN;
884 op.data.goaway.status = GRPC_STATUS_OK;
885 op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
ctillerf962f522014-12-10 15:28:27 -0800886 elem->filter->channel_op(elem, NULL, &op);
nnoble0c475f02014-12-05 15:37:39 -0800887
888 grpc_channel_internal_unref(c->channel);
889 }
890 gpr_free(channels);
891
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800892 /* terminate all the requested calls */
Craig Tiller24be0f72015-02-10 14:04:22 -0800893 for (i = 0; i < requested_calls.count; i++) {
894 fail_call(server, &requested_calls.calls[i]);
Craig Tillercce17ac2015-01-20 09:29:28 -0800895 }
Craig Tiller24be0f72015-02-10 14:04:22 -0800896 gpr_free(requested_calls.calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800897
898 /* Shutdown listeners */
899 for (l = server->listeners; l; l = l->next) {
900 l->destroy(server, l->arg);
901 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800902}
903
Craig Tilleraec96aa2015-04-07 14:32:15 -0700904void grpc_server_listener_destroy_done(void *s) {
905 grpc_server *server = s;
906 gpr_mu_lock(&server->mu);
907 server->listeners_destroyed++;
Craig Tilleree945e82015-05-26 16:15:34 -0700908 maybe_finish_shutdown(server);
Craig Tilleraec96aa2015-04-07 14:32:15 -0700909 gpr_mu_unlock(&server->mu);
910}
911
Craig Tillerafa2d632015-05-26 16:39:13 -0700912void grpc_server_cancel_all_calls(grpc_server *server) {
913 call_data *calld;
914 grpc_call **calls;
915 size_t call_count;
916 size_t call_capacity;
917 int is_first = 1;
918 size_t i;
919
920 gpr_mu_lock(&server->mu);
921
922 GPR_ASSERT(server->shutdown);
923
924 if (!server->lists[ALL_CALLS]) {
925 gpr_mu_unlock(&server->mu);
926 return;
927 }
928
929 call_capacity = 8;
930 call_count = 0;
931 calls = gpr_malloc(sizeof(grpc_call *) * call_capacity);
932
933 for (calld = server->lists[ALL_CALLS]; calld != server->lists[ALL_CALLS] || is_first; calld = calld->links[ALL_CALLS].next) {
934 if (call_count == call_capacity) {
935 call_capacity *= 2;
936 calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity);
937 }
938 calls[call_count++] = calld->call;
939 GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all");
940 is_first = 0;
941 }
942
943 gpr_mu_unlock(&server->mu);
944
945 for (i = 0; i < call_count; i++) {
946 grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, "Unavailable");
947 GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1);
948 }
949
950 gpr_free(calls);
951}
952
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800953void grpc_server_destroy(grpc_server *server) {
954 channel_data *c;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700955 listener *l;
Craig Tiller872af022015-04-24 15:57:52 -0700956 call_data *calld;
957
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800958 gpr_mu_lock(&server->mu);
Craig Tilleree945e82015-05-26 16:15:34 -0700959 GPR_ASSERT(server->shutdown);
960 GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
Craig Tilleraec96aa2015-04-07 14:32:15 -0700961
962 while (server->listeners) {
963 l = server->listeners;
964 server->listeners = l->next;
965 gpr_free(l);
966 }
967
Craig Tiller872af022015-04-24 15:57:52 -0700968 while ((calld = call_list_remove_head(&server->lists[PENDING_START],
969 PENDING_START)) != NULL) {
Craig Tiller872af022015-04-24 15:57:52 -0700970 calld->state = ZOMBIED;
971 grpc_iomgr_add_callback(
972 kill_zombie,
973 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
974 }
975
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800976 for (c = server->root_channel_data.next; c != &server->root_channel_data;
977 c = c->next) {
978 shutdown_channel(c);
979 }
980 gpr_mu_unlock(&server->mu);
981
982 server_unref(server);
983}
984
985void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -0800986 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -0800987 grpc_pollset **pollsets,
988 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800989 void (*destroy)(grpc_server *server, void *arg)) {
990 listener *l = gpr_malloc(sizeof(listener));
991 l->arg = arg;
992 l->start = start;
993 l->destroy = destroy;
994 l->next = server->listeners;
995 server->listeners = l;
996}
997
Craig Tiller9f28ac22015-01-27 17:01:29 -0800998static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -0800999 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001000 call_data *calld = NULL;
Craig Tiller0ef1a922015-02-11 16:23:01 -08001001 requested_call_array *requested_calls = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001002 gpr_mu_lock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001003 if (server->shutdown) {
1004 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -08001005 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001006 return GRPC_CALL_OK;
1007 }
Craig Tiller04cc8be2015-02-10 16:11:22 -08001008 switch (rc->type) {
Craig Tiller04cc8be2015-02-10 16:11:22 -08001009 case BATCH_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -08001010 calld =
1011 call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -08001012 requested_calls = &server->requested_calls;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001013 break;
1014 case REGISTERED_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -08001015 calld = call_list_remove_head(
1016 &rc->data.registered.registered_method->pending, PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -08001017 requested_calls = &rc->data.registered.registered_method->requested;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001018 break;
1019 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001020 if (calld) {
Craig Tillercce17ac2015-01-20 09:29:28 -08001021 GPR_ASSERT(calld->state == PENDING);
Craig Tillerbb3f22f2015-01-29 16:40:56 -08001022 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -08001023 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -08001024 begin_call(server, calld, rc);
Craig Tillercce17ac2015-01-20 09:29:28 -08001025 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001026 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -08001027 *requested_call_array_add(requested_calls) = *rc;
Craig Tillercce17ac2015-01-20 09:29:28 -08001028 gpr_mu_unlock(&server->mu);
1029 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001030 }
Craig Tillercce17ac2015-01-20 09:29:28 -08001031}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001032
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001033grpc_call_error grpc_server_request_call(
1034 grpc_server *server, grpc_call **call, grpc_call_details *details,
1035 grpc_metadata_array *initial_metadata,
1036 grpc_completion_queue *cq_bound_to_call,
1037 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001038 requested_call rc;
Craig Tiller54478f82015-05-12 14:08:56 -07001039 grpc_cq_begin_op(cq_for_notification, NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001040 rc.type = BATCH_CALL;
1041 rc.tag = tag;
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001042 rc.cq_bound_to_call = cq_bound_to_call;
1043 rc.cq_for_notification = cq_for_notification;
1044 rc.call = call;
Craig Tiller24be0f72015-02-10 14:04:22 -08001045 rc.data.batch.details = details;
1046 rc.data.batch.initial_metadata = initial_metadata;
1047 return queue_call_request(server, &rc);
1048}
1049
1050grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -08001051 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
1052 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001053 grpc_completion_queue *cq_bound_to_call,
1054 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001055 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001056 registered_method *registered_method = rm;
Craig Tiller54478f82015-05-12 14:08:56 -07001057 grpc_cq_begin_op(cq_for_notification, NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001058 rc.type = REGISTERED_CALL;
1059 rc.tag = tag;
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001060 rc.cq_bound_to_call = cq_bound_to_call;
1061 rc.cq_for_notification = cq_for_notification;
1062 rc.call = call;
Craig Tiller24be0f72015-02-10 14:04:22 -08001063 rc.data.registered.registered_method = registered_method;
1064 rc.data.registered.deadline = deadline;
1065 rc.data.registered.initial_metadata = initial_metadata;
1066 rc.data.registered.optional_payload = optional_payload;
1067 return queue_call_request(server, &rc);
1068}
1069
Craig Tiller64be9f72015-05-04 14:53:51 -07001070static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller24be0f72015-02-10 14:04:22 -08001071 void *tag);
Craig Tiller64be9f72015-05-04 14:53:51 -07001072static void publish_was_not_set(grpc_call *call, int success, void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001073 abort();
1074}
Craig Tiller24be0f72015-02-10 14:04:22 -08001075
Craig Tiller166e2502015-02-03 20:14:41 -08001076static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1077 gpr_slice slice = value->slice;
1078 size_t len = GPR_SLICE_LENGTH(slice);
1079
1080 if (len + 1 > *capacity) {
1081 *capacity = GPR_MAX(len + 1, *capacity * 2);
1082 *dest = gpr_realloc(*dest, *capacity);
1083 }
1084 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1085}
1086
Craig Tiller24be0f72015-02-10 14:04:22 -08001087static void begin_call(grpc_server *server, call_data *calld,
1088 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001089 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001090 grpc_ioreq req[2];
1091 grpc_ioreq *r = req;
1092
1093 /* called once initial metadata has been read by the call, but BEFORE
1094 the ioreq to fetch it out of the call has been executed.
1095 This means metadata related fields can be relied on in calld, but to
1096 fill in the metadata array passed by the client, we need to perform
1097 an ioreq op, that should complete immediately. */
1098
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001099 grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
1100 *rc->call = calld->call;
1101 calld->cq_new = rc->cq_for_notification;
Craig Tiller24be0f72015-02-10 14:04:22 -08001102 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001103 case BATCH_CALL:
1104 cpstr(&rc->data.batch.details->host,
1105 &rc->data.batch.details->host_capacity, calld->host);
1106 cpstr(&rc->data.batch.details->method,
1107 &rc->data.batch.details->method_capacity, calld->path);
Masood Malekghassemibf177c82015-04-27 12:14:38 -07001108 rc->data.batch.details->deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001109 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1110 r->data.recv_metadata = rc->data.batch.initial_metadata;
1111 r++;
1112 publish = publish_registered_or_batch;
1113 break;
1114 case REGISTERED_CALL:
1115 *rc->data.registered.deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001116 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1117 r->data.recv_metadata = rc->data.registered.initial_metadata;
1118 r++;
1119 if (rc->data.registered.optional_payload) {
1120 r->op = GRPC_IOREQ_RECV_MESSAGE;
1121 r->data.recv_message = rc->data.registered.optional_payload;
1122 r++;
1123 }
1124 publish = publish_registered_or_batch;
1125 break;
1126 }
1127
Craig Tiller4df412b2015-04-28 07:57:54 -07001128 GRPC_CALL_INTERNAL_REF(calld->call, "server");
Craig Tiller24be0f72015-02-10 14:04:22 -08001129 grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
1130 rc->tag);
1131}
1132
1133static void fail_call(grpc_server *server, requested_call *rc) {
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001134 *rc->call = NULL;
Craig Tiller24be0f72015-02-10 14:04:22 -08001135 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001136 case BATCH_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001137 rc->data.batch.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001138 break;
1139 case REGISTERED_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001140 rc->data.registered.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001141 break;
1142 }
Craig Tiller54478f82015-05-12 14:08:56 -07001143 grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0);
Craig Tiller24be0f72015-02-10 14:04:22 -08001144}
1145
Craig Tiller64be9f72015-05-04 14:53:51 -07001146static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller24be0f72015-02-10 14:04:22 -08001147 void *tag) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001148 grpc_call_element *elem =
1149 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller20bc56d2015-02-12 09:02:56 -08001150 call_data *calld = elem->call_data;
Craig Tiller64be9f72015-05-04 14:53:51 -07001151 grpc_cq_end_op(calld->cq_new, tag, call, success);
Craig Tiller24be0f72015-02-10 14:04:22 -08001152}
1153
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001154const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1155 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001156}