blob: a95215c5de99fff7521389209a1731d0896f22d9 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
39#include "src/core/channel/census_filter.h"
40#include "src/core/channel/channel_args.h"
41#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/surface/call.h"
45#include "src/core/surface/channel.h"
46#include "src/core/surface/completion_queue.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080047#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080048#include <grpc/support/alloc.h>
49#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050#include <grpc/support/useful.h>
51
52typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
53
54typedef struct listener {
55 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080056 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
57 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080058 void (*destroy)(grpc_server *server, void *arg);
59 struct listener *next;
60} listener;
61
62typedef struct call_data call_data;
63typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080064typedef struct registered_method registered_method;
65
66typedef struct {
67 call_data *next;
68 call_data *prev;
69} call_link;
70
71typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type;
72
73typedef struct {
74 requested_call_type type;
75 void *tag;
76 union {
77 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080078 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080079 grpc_call **call;
80 grpc_call_details *details;
81 grpc_metadata_array *initial_metadata;
82 } batch;
83 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080084 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080085 grpc_call **call;
86 registered_method *registered_method;
87 gpr_timespec *deadline;
88 grpc_metadata_array *initial_metadata;
89 grpc_byte_buffer **optional_payload;
90 } registered;
91 } data;
92} requested_call;
93
94typedef struct {
95 requested_call *calls;
96 size_t count;
97 size_t capacity;
98} requested_call_array;
99
100struct registered_method {
101 char *method;
102 char *host;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800103 call_data *pending;
Craig Tiller24be0f72015-02-10 14:04:22 -0800104 requested_call_array requested;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800105 grpc_completion_queue *cq;
Craig Tiller24be0f72015-02-10 14:04:22 -0800106 registered_method *next;
107};
108
109typedef struct channel_registered_method {
110 registered_method *server_registered_method;
111 grpc_mdstr *method;
112 grpc_mdstr *host;
113} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800114
115struct channel_data {
116 grpc_server *server;
117 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800118 grpc_mdstr *path_key;
119 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800120 /* linked list of all channels on a server */
121 channel_data *next;
122 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800123 channel_registered_method *registered_methods;
124 gpr_uint32 registered_method_slots;
125 gpr_uint32 registered_method_max_probes;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800126};
127
128struct grpc_server {
129 size_t channel_filter_count;
130 const grpc_channel_filter **channel_filters;
131 grpc_channel_args *channel_args;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800132 grpc_completion_queue *unregistered_cq;
Craig Tillerec3257c2015-02-12 15:59:43 -0800133
Craig Tiller20bc56d2015-02-12 09:02:56 -0800134 grpc_completion_queue **cqs;
135 grpc_pollset **pollsets;
136 size_t cq_count;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800137
138 gpr_mu mu;
139
Craig Tiller24be0f72015-02-10 14:04:22 -0800140 registered_method *registered_methods;
141 requested_call_array requested_calls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800142
143 gpr_uint8 shutdown;
Craig Tilleraea2fc02015-02-17 16:54:53 -0800144 size_t num_shutdown_tags;
145 void **shutdown_tags;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800146
147 call_data *lists[CALL_LIST_COUNT];
148 channel_data root_channel_data;
149
150 listener *listeners;
151 gpr_refcount internal_refcount;
152};
153
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800154typedef enum {
155 /* waiting for metadata */
156 NOT_STARTED,
157 /* inital metadata read, not flow controlled in yet */
158 PENDING,
159 /* flow controlled in, on completion queue */
160 ACTIVATED,
161 /* cancelled before being queued */
162 ZOMBIED
163} call_state;
164
Craig Tillerfb189f82015-02-03 12:07:07 -0800165typedef struct legacy_data {
Craig Tiller24be0f72015-02-10 14:04:22 -0800166 grpc_metadata_array initial_metadata;
Craig Tillerfb189f82015-02-03 12:07:07 -0800167} legacy_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800168
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800169struct call_data {
170 grpc_call *call;
171
172 call_state state;
173 gpr_timespec deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800174 grpc_mdstr *path;
175 grpc_mdstr *host;
176
177 legacy_data *legacy;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800178 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800179
Craig Tiller04cc8be2015-02-10 16:11:22 -0800180 call_data **root[CALL_LIST_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800181 call_link links[CALL_LIST_COUNT];
182};
183
184#define SERVER_FROM_CALL_ELEM(elem) \
185 (((channel_data *)(elem)->channel_data)->server)
186
187static void do_nothing(void *unused, grpc_op_error ignored) {}
188
Craig Tiller24be0f72015-02-10 14:04:22 -0800189static void begin_call(grpc_server *server, call_data *calld,
190 requested_call *rc);
191static void fail_call(grpc_server *server, requested_call *rc);
192
Craig Tiller3b29b562015-02-11 12:58:46 -0800193static int call_list_join(call_data **root, call_data *call, call_list list) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800194 GPR_ASSERT(!call->root[list]);
195 call->root[list] = root;
196 if (!*root) {
197 *root = call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800198 call->links[list].next = call->links[list].prev = call;
199 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800200 call->links[list].next = *root;
201 call->links[list].prev = (*root)->links[list].prev;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202 call->links[list].next->links[list].prev =
203 call->links[list].prev->links[list].next = call;
204 }
205 return 1;
206}
207
Craig Tiller04cc8be2015-02-10 16:11:22 -0800208static call_data *call_list_remove_head(call_data **root, call_list list) {
209 call_data *out = *root;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800210 if (out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800211 out->root[list] = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800212 if (out->links[list].next == out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800213 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800214 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800215 *root = out->links[list].next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800216 out->links[list].next->links[list].prev = out->links[list].prev;
217 out->links[list].prev->links[list].next = out->links[list].next;
218 }
219 }
220 return out;
221}
222
Craig Tiller04cc8be2015-02-10 16:11:22 -0800223static int call_list_remove(call_data *call, call_list list) {
224 call_data **root = call->root[list];
225 if (root == NULL) return 0;
226 call->root[list] = NULL;
227 if (*root == call) {
228 *root = call->links[list].next;
229 if (*root == call) {
230 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800231 return 1;
232 }
233 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800234 GPR_ASSERT(*root != call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800235 call->links[list].next->links[list].prev = call->links[list].prev;
236 call->links[list].prev->links[list].next = call->links[list].next;
237 return 1;
238}
239
Craig Tiller24be0f72015-02-10 14:04:22 -0800240static void requested_call_array_destroy(requested_call_array *array) {
241 gpr_free(array->calls);
242}
243
244static requested_call *requested_call_array_add(requested_call_array *array) {
245 requested_call *rc;
246 if (array->count == array->capacity) {
247 array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
248 array->calls =
249 gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
250 }
251 rc = &array->calls[array->count++];
252 memset(rc, 0, sizeof(*rc));
253 return rc;
254}
255
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800256static void server_ref(grpc_server *server) {
257 gpr_ref(&server->internal_refcount);
258}
259
260static void server_unref(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800261 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800262 if (gpr_unref(&server->internal_refcount)) {
263 grpc_channel_args_destroy(server->channel_args);
264 gpr_mu_destroy(&server->mu);
265 gpr_free(server->channel_filters);
Craig Tiller24be0f72015-02-10 14:04:22 -0800266 requested_call_array_destroy(&server->requested_calls);
Craig Tillerec3257c2015-02-12 15:59:43 -0800267 while ((rm = server->registered_methods) != NULL) {
268 server->registered_methods = rm->next;
269 gpr_free(rm->method);
270 gpr_free(rm->host);
271 requested_call_array_destroy(&rm->requested);
272 gpr_free(rm);
273 }
274 gpr_free(server->cqs);
275 gpr_free(server->pollsets);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800276 gpr_free(server->shutdown_tags);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800277 gpr_free(server);
278 }
279}
280
281static int is_channel_orphaned(channel_data *chand) {
282 return chand->next == chand;
283}
284
285static void orphan_channel(channel_data *chand) {
286 chand->next->prev = chand->prev;
287 chand->prev->next = chand->next;
288 chand->next = chand->prev = chand;
289}
290
ctiller58393c22015-01-07 14:03:30 -0800291static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800292 channel_data *chand = cd;
293 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800294 grpc_channel_destroy(chand->channel);
295 server_unref(server);
296}
297
298static void destroy_channel(channel_data *chand) {
299 if (is_channel_orphaned(chand)) return;
300 GPR_ASSERT(chand->server != NULL);
301 orphan_channel(chand);
302 server_ref(chand->server);
ctiller18b49ab2014-12-09 14:39:16 -0800303 grpc_iomgr_add_callback(finish_destroy_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800304}
305
Craig Tiller3b29b562015-02-11 12:58:46 -0800306static void finish_start_new_rpc_and_unlock(grpc_server *server,
307 grpc_call_element *elem,
308 call_data **pending_root,
309 requested_call_array *array) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800310 requested_call rc;
311 call_data *calld = elem->call_data;
312 if (array->count == 0) {
313 calld->state = PENDING;
314 call_list_join(pending_root, calld, PENDING_START);
315 gpr_mu_unlock(&server->mu);
316 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800317 rc = array->calls[--array->count];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800318 calld->state = ACTIVATED;
319 gpr_mu_unlock(&server->mu);
320 begin_call(server, calld, &rc);
321 }
322}
323
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800324static void start_new_rpc(grpc_call_element *elem) {
325 channel_data *chand = elem->channel_data;
326 call_data *calld = elem->call_data;
327 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800328 gpr_uint32 i;
329 gpr_uint32 hash;
330 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800331
332 gpr_mu_lock(&server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800333 if (chand->registered_methods && calld->path && calld->host) {
Craig Tillera94beff2015-02-17 22:02:06 -0800334 /* TODO(ctiller): unify these two searches */
Craig Tiller04cc8be2015-02-10 16:11:22 -0800335 /* check for an exact match with host */
336 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
337 for (i = 0; i < chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800338 rm = &chand->registered_methods[(hash + i) %
339 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800340 if (!rm) break;
341 if (rm->host != calld->host) continue;
342 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800343 finish_start_new_rpc_and_unlock(server, elem,
344 &rm->server_registered_method->pending,
345 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800346 return;
347 }
348 /* check for a wildcard method definition (no host set) */
349 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800350 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800351 rm = &chand->registered_methods[(hash + i) %
352 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800353 if (!rm) break;
354 if (rm->host != NULL) continue;
355 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800356 finish_start_new_rpc_and_unlock(server, elem,
357 &rm->server_registered_method->pending,
358 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800359 return;
360 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800361 }
Craig Tiller3b29b562015-02-11 12:58:46 -0800362 finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
363 &server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800364}
365
ctiller58393c22015-01-07 14:03:30 -0800366static void kill_zombie(void *elem, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367 grpc_call_destroy(grpc_call_from_top_element(elem));
368}
369
Craig Tillercce17ac2015-01-20 09:29:28 -0800370static void stream_closed(grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800371 call_data *calld = elem->call_data;
372 channel_data *chand = elem->channel_data;
373 gpr_mu_lock(&chand->server->mu);
374 switch (calld->state) {
375 case ACTIVATED:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800376 break;
377 case PENDING:
Craig Tiller04cc8be2015-02-10 16:11:22 -0800378 call_list_remove(calld, PENDING_START);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800379 /* fallthrough intended */
380 case NOT_STARTED:
381 calld->state = ZOMBIED;
ctiller18b49ab2014-12-09 14:39:16 -0800382 grpc_iomgr_add_callback(kill_zombie, elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800383 break;
384 case ZOMBIED:
385 break;
386 }
387 gpr_mu_unlock(&chand->server->mu);
Craig Tiller0a927bf2015-02-05 10:52:53 -0800388 grpc_call_stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800389}
390
Craig Tillercce17ac2015-01-20 09:29:28 -0800391static void read_closed(grpc_call_element *elem) {
392 call_data *calld = elem->call_data;
393 channel_data *chand = elem->channel_data;
394 gpr_mu_lock(&chand->server->mu);
395 switch (calld->state) {
396 case ACTIVATED:
397 case PENDING:
398 grpc_call_read_closed(elem);
399 break;
400 case NOT_STARTED:
401 calld->state = ZOMBIED;
402 grpc_iomgr_add_callback(kill_zombie, elem);
403 break;
404 case ZOMBIED:
405 break;
406 }
407 gpr_mu_unlock(&chand->server->mu);
408}
409
ctillerf962f522014-12-10 15:28:27 -0800410static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
411 grpc_call_op *op) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800412 channel_data *chand = elem->channel_data;
413 call_data *calld = elem->call_data;
414 grpc_mdelem *md;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800415 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
416 switch (op->type) {
417 case GRPC_RECV_METADATA:
Craig Tillercce17ac2015-01-20 09:29:28 -0800418 md = op->data.metadata;
419 if (md->key == chand->path_key) {
420 calld->path = grpc_mdstr_ref(md->value);
421 grpc_mdelem_unref(md);
422 } else if (md->key == chand->authority_key) {
423 calld->host = grpc_mdstr_ref(md->value);
424 grpc_mdelem_unref(md);
425 } else {
426 grpc_call_recv_metadata(elem, md);
427 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800428 break;
429 case GRPC_RECV_END_OF_INITIAL_METADATA:
430 start_new_rpc(elem);
Craig Tiller4069b682015-01-29 14:01:19 -0800431 grpc_call_initial_metadata_complete(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800432 break;
433 case GRPC_RECV_MESSAGE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800434 grpc_call_recv_message(elem, op->data.message);
435 op->done_cb(op->user_data, GRPC_OP_OK);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800436 break;
437 case GRPC_RECV_HALF_CLOSE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800438 read_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800439 break;
440 case GRPC_RECV_FINISH:
Craig Tillercce17ac2015-01-20 09:29:28 -0800441 stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800442 break;
443 case GRPC_RECV_DEADLINE:
444 grpc_call_set_deadline(elem, op->data.deadline);
445 ((call_data *)elem->call_data)->deadline = op->data.deadline;
446 break;
447 default:
448 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
449 grpc_call_next_op(elem, op);
450 break;
451 }
452}
453
ctillerf962f522014-12-10 15:28:27 -0800454static void channel_op(grpc_channel_element *elem,
455 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800456 channel_data *chand = elem->channel_data;
Craig Tiller8b976d02015-02-05 21:41:23 -0800457 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800458
459 switch (op->type) {
460 case GRPC_ACCEPT_CALL:
461 /* create a call */
Craig Tillerfb189f82015-02-03 12:07:07 -0800462 grpc_call_create(chand->channel, NULL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800463 op->data.accept_call.transport_server_data);
464 break;
465 case GRPC_TRANSPORT_CLOSED:
466 /* if the transport is closed for a server channel, we destroy the
467 channel */
Craig Tiller8b976d02015-02-05 21:41:23 -0800468 gpr_mu_lock(&server->mu);
469 server_ref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800470 destroy_channel(chand);
Craig Tiller8b976d02015-02-05 21:41:23 -0800471 gpr_mu_unlock(&server->mu);
472 server_unref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800473 break;
nnoble0c475f02014-12-05 15:37:39 -0800474 case GRPC_TRANSPORT_GOAWAY:
475 gpr_slice_unref(op->data.goaway.message);
476 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800477 default:
478 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
479 grpc_channel_next_op(elem, op);
480 break;
481 }
482}
483
ctiller58393c22015-01-07 14:03:30 -0800484static void finish_shutdown_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800485 channel_data *chand = cd;
486 grpc_channel_op op;
nnoble0c475f02014-12-05 15:37:39 -0800487 op.type = GRPC_CHANNEL_DISCONNECT;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800488 op.dir = GRPC_CALL_DOWN;
489 channel_op(grpc_channel_stack_element(
490 grpc_channel_get_channel_stack(chand->channel), 0),
ctillerf962f522014-12-10 15:28:27 -0800491 NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800492 grpc_channel_internal_unref(chand->channel);
493}
494
495static void shutdown_channel(channel_data *chand) {
496 grpc_channel_internal_ref(chand->channel);
ctiller18b49ab2014-12-09 14:39:16 -0800497 grpc_iomgr_add_callback(finish_shutdown_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800498}
499
500static void init_call_elem(grpc_call_element *elem,
501 const void *server_transport_data) {
502 call_data *calld = elem->call_data;
503 channel_data *chand = elem->channel_data;
504 memset(calld, 0, sizeof(call_data));
505 calld->deadline = gpr_inf_future;
506 calld->call = grpc_call_from_top_element(elem);
507
508 gpr_mu_lock(&chand->server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800509 call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800510 gpr_mu_unlock(&chand->server->mu);
511
512 server_ref(chand->server);
513}
514
515static void destroy_call_elem(grpc_call_element *elem) {
516 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800517 call_data *calld = elem->call_data;
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800518 size_t i, j;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800519
520 gpr_mu_lock(&chand->server->mu);
521 for (i = 0; i < CALL_LIST_COUNT; i++) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800522 call_list_remove(elem->call_data, i);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800523 }
Craig Tilleraea2fc02015-02-17 16:54:53 -0800524 if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
525 for (i = 0; i < chand->server->num_shutdown_tags; i++) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800526 for (j = 0; j < chand->server->cq_count; j++) {
527 grpc_cq_end_server_shutdown(chand->server->cqs[j],
528 chand->server->shutdown_tags[i]);
529 }
Craig Tiller20bc56d2015-02-12 09:02:56 -0800530 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800531 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800532 gpr_mu_unlock(&chand->server->mu);
533
Craig Tiller4df31a62015-01-30 09:44:31 -0800534 if (calld->host) {
535 grpc_mdstr_unref(calld->host);
536 }
537 if (calld->path) {
538 grpc_mdstr_unref(calld->path);
539 }
540
Craig Tillerdb7db992015-01-29 11:19:01 -0800541 if (calld->legacy) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800542 gpr_free(calld->legacy->initial_metadata.metadata);
Craig Tillerdb7db992015-01-29 11:19:01 -0800543 gpr_free(calld->legacy);
544 }
545
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800546 server_unref(chand->server);
547}
548
549static void init_channel_elem(grpc_channel_element *elem,
550 const grpc_channel_args *args,
551 grpc_mdctx *metadata_context, int is_first,
552 int is_last) {
553 channel_data *chand = elem->channel_data;
554 GPR_ASSERT(is_first);
555 GPR_ASSERT(!is_last);
556 chand->server = NULL;
557 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800558 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
559 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800560 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800561 chand->registered_methods = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800562}
563
564static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800565 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800566 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800567 if (chand->registered_methods) {
568 for (i = 0; i < chand->registered_method_slots; i++) {
569 if (chand->registered_methods[i].method) {
570 grpc_mdstr_unref(chand->registered_methods[i].method);
571 }
572 if (chand->registered_methods[i].host) {
573 grpc_mdstr_unref(chand->registered_methods[i].host);
574 }
575 }
576 gpr_free(chand->registered_methods);
577 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800578 if (chand->server) {
579 gpr_mu_lock(&chand->server->mu);
580 chand->next->prev = chand->prev;
581 chand->prev->next = chand->next;
582 chand->next = chand->prev = chand;
583 gpr_mu_unlock(&chand->server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800584 grpc_mdstr_unref(chand->path_key);
585 grpc_mdstr_unref(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800586 server_unref(chand->server);
587 }
588}
589
590static const grpc_channel_filter server_surface_filter = {
Yang Gao5fd0d292015-01-26 00:19:48 -0800591 call_op, channel_op, sizeof(call_data),
592 init_call_elem, destroy_call_elem, sizeof(channel_data),
Craig Tiller9f28ac22015-01-27 17:01:29 -0800593 init_channel_elem, destroy_channel_elem, "server",
594};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800595
Craig Tiller20bc56d2015-02-12 09:02:56 -0800596static void addcq(grpc_server *server, grpc_completion_queue *cq) {
597 size_t i, n;
598 for (i = 0; i < server->cq_count; i++) {
599 if (server->cqs[i] == cq) return;
600 }
601 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800602 server->cqs = gpr_realloc(server->cqs,
603 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800604 server->cqs[n] = cq;
605}
606
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800607grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
608 grpc_channel_filter **filters,
609 size_t filter_count,
610 const grpc_channel_args *args) {
611 size_t i;
612 int census_enabled = grpc_channel_args_is_census_enabled(args);
613
614 grpc_server *server = gpr_malloc(sizeof(grpc_server));
615 memset(server, 0, sizeof(grpc_server));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800616 if (cq) addcq(server, cq);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800617
618 gpr_mu_init(&server->mu);
619
Craig Tiller20bc56d2015-02-12 09:02:56 -0800620 server->unregistered_cq = cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800621 /* decremented by grpc_server_destroy */
622 gpr_ref_init(&server->internal_refcount, 1);
623 server->root_channel_data.next = server->root_channel_data.prev =
624 &server->root_channel_data;
625
626 /* Server filter stack is:
627
628 server_surface_filter - for making surface API calls
629 grpc_server_census_filter (optional) - for stats collection and tracing
630 {passed in filter stack}
631 grpc_connected_channel_filter - for interfacing with transports */
632 server->channel_filter_count = filter_count + 1 + census_enabled;
633 server->channel_filters =
634 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
635 server->channel_filters[0] = &server_surface_filter;
636 if (census_enabled) {
637 server->channel_filters[1] = &grpc_server_census_filter;
638 }
639 for (i = 0; i < filter_count; i++) {
640 server->channel_filters[i + 1 + census_enabled] = filters[i];
641 }
642
643 server->channel_args = grpc_channel_args_copy(args);
644
645 return server;
646}
647
Craig Tiller24be0f72015-02-10 14:04:22 -0800648static int streq(const char *a, const char *b) {
649 if (a == NULL && b == NULL) return 1;
650 if (a == NULL) return 0;
651 if (b == NULL) return 0;
652 return 0 == strcmp(a, b);
653}
654
655void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerec3257c2015-02-12 15:59:43 -0800656 const char *host,
657 grpc_completion_queue *cq_new_rpc) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800658 registered_method *m;
659 if (!method) {
660 gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
661 return NULL;
662 }
663 for (m = server->registered_methods; m; m = m->next) {
664 if (streq(m->method, method) && streq(m->host, host)) {
665 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
666 host ? host : "*");
667 return NULL;
668 }
669 }
Craig Tiller20bc56d2015-02-12 09:02:56 -0800670 addcq(server, cq_new_rpc);
Craig Tiller24be0f72015-02-10 14:04:22 -0800671 m = gpr_malloc(sizeof(registered_method));
672 memset(m, 0, sizeof(*m));
673 m->method = gpr_strdup(method);
674 m->host = gpr_strdup(host);
675 m->next = server->registered_methods;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800676 m->cq = cq_new_rpc;
Craig Tiller24be0f72015-02-10 14:04:22 -0800677 server->registered_methods = m;
678 return m;
679}
680
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800681void grpc_server_start(grpc_server *server) {
682 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800683 size_t i;
684
Craig Tillerec3257c2015-02-12 15:59:43 -0800685 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800686 for (i = 0; i < server->cq_count; i++) {
687 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
688 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800689
690 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800691 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800692 }
693}
694
695grpc_transport_setup_result grpc_server_setup_transport(
696 grpc_server *s, grpc_transport *transport,
697 grpc_channel_filter const **extra_filters, size_t num_extra_filters,
698 grpc_mdctx *mdctx) {
699 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
700 grpc_channel_filter const **filters =
701 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
702 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800703 size_t num_registered_methods;
704 size_t alloc;
705 registered_method *rm;
706 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800707 grpc_channel *channel;
708 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800709 grpc_mdstr *host;
710 grpc_mdstr *method;
711 gpr_uint32 hash;
712 gpr_uint32 slots;
713 gpr_uint32 probes;
714 gpr_uint32 max_probes = 0;
Craig Tiller5d6bd442015-02-12 22:50:38 -0800715 grpc_transport_setup_result result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800716
717 for (i = 0; i < s->channel_filter_count; i++) {
718 filters[i] = s->channel_filters[i];
719 }
720 for (; i < s->channel_filter_count + num_extra_filters; i++) {
721 filters[i] = extra_filters[i - s->channel_filter_count];
722 }
723 filters[i] = &grpc_connected_channel_filter;
724
Craig Tiller20bc56d2015-02-12 09:02:56 -0800725 for (i = 0; i < s->cq_count; i++) {
726 grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
727 }
ctillerd79b4862014-12-17 16:36:59 -0800728
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800729 channel = grpc_channel_create_from_filters(filters, num_filters,
730 s->channel_args, mdctx, 0);
731 chand = (channel_data *)grpc_channel_stack_element(
732 grpc_channel_get_channel_stack(channel), 0)->channel_data;
733 chand->server = s;
734 server_ref(s);
735 chand->channel = channel;
736
Craig Tiller04cc8be2015-02-10 16:11:22 -0800737 num_registered_methods = 0;
738 for (rm = s->registered_methods; rm; rm = rm->next) {
739 num_registered_methods++;
740 }
741 /* build a lookup table phrased in terms of mdstr's in this channels context
742 to quickly find registered methods */
743 if (num_registered_methods > 0) {
744 slots = 2 * num_registered_methods;
745 alloc = sizeof(channel_registered_method) * slots;
746 chand->registered_methods = gpr_malloc(alloc);
747 memset(chand->registered_methods, 0, alloc);
748 for (rm = s->registered_methods; rm; rm = rm->next) {
749 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
Craig Tillere76c9662015-02-11 13:18:31 -0800750 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800751 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800752 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
753 .server_registered_method != NULL;
754 probes++)
755 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800756 if (probes > max_probes) max_probes = probes;
757 crm = &chand->registered_methods[(hash + probes) % slots];
758 crm->server_registered_method = rm;
759 crm->host = host;
760 crm->method = method;
761 }
762 chand->registered_method_slots = slots;
763 chand->registered_method_max_probes = max_probes;
764 }
765
Craig Tiller5d6bd442015-02-12 22:50:38 -0800766 result = grpc_connected_channel_bind_transport(
767 grpc_channel_get_channel_stack(channel), transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800768
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800769 gpr_mu_lock(&s->mu);
770 chand->next = &s->root_channel_data;
771 chand->prev = chand->next->prev;
772 chand->next->prev = chand->prev->next = chand;
773 gpr_mu_unlock(&s->mu);
774
775 gpr_free(filters);
776
Craig Tiller5d6bd442015-02-12 22:50:38 -0800777 return result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800778}
779
Craig Tillerbd217572015-02-11 18:10:56 -0800780static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
Craig Tillerec3257c2015-02-12 15:59:43 -0800781 void *shutdown_tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800782 listener *l;
Craig Tiller24be0f72015-02-10 14:04:22 -0800783 requested_call_array requested_calls;
nnoble0c475f02014-12-05 15:37:39 -0800784 channel_data **channels;
785 channel_data *c;
786 size_t nchannels;
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800787 size_t i, j;
nnoble0c475f02014-12-05 15:37:39 -0800788 grpc_channel_op op;
789 grpc_channel_element *elem;
Craig Tillerbd217572015-02-11 18:10:56 -0800790 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800791
792 /* lock, and gather up some stuff to do */
793 gpr_mu_lock(&server->mu);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800794 if (have_shutdown_tag) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800795 for (i = 0; i < server->cq_count; i++) {
796 grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
797 }
Craig Tilleraea2fc02015-02-17 16:54:53 -0800798 server->shutdown_tags =
799 gpr_realloc(server->shutdown_tags,
800 sizeof(void *) * (server->num_shutdown_tags + 1));
801 server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
802 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800803 if (server->shutdown) {
804 gpr_mu_unlock(&server->mu);
805 return;
806 }
807
nnoble0c475f02014-12-05 15:37:39 -0800808 nchannels = 0;
809 for (c = server->root_channel_data.next; c != &server->root_channel_data;
810 c = c->next) {
811 nchannels++;
812 }
813 channels = gpr_malloc(sizeof(channel_data *) * nchannels);
814 i = 0;
815 for (c = server->root_channel_data.next; c != &server->root_channel_data;
816 c = c->next) {
817 grpc_channel_internal_ref(c->channel);
818 channels[i] = c;
819 i++;
820 }
821
Craig Tillerbd217572015-02-11 18:10:56 -0800822 /* collect all unregistered then registered calls */
Craig Tillercce17ac2015-01-20 09:29:28 -0800823 requested_calls = server->requested_calls;
Craig Tiller24be0f72015-02-10 14:04:22 -0800824 memset(&server->requested_calls, 0, sizeof(server->requested_calls));
Craig Tillerbd217572015-02-11 18:10:56 -0800825 for (rm = server->registered_methods; rm; rm = rm->next) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800826 if (requested_calls.count + rm->requested.count >
827 requested_calls.capacity) {
828 requested_calls.capacity =
829 GPR_MAX(requested_calls.count + rm->requested.count,
830 2 * requested_calls.capacity);
831 requested_calls.calls =
832 gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
833 requested_calls.capacity);
Craig Tillerbd217572015-02-11 18:10:56 -0800834 }
Craig Tillerec3257c2015-02-12 15:59:43 -0800835 memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
836 sizeof(*requested_calls.calls) * rm->requested.count);
Craig Tillerbd217572015-02-11 18:10:56 -0800837 requested_calls.count += rm->requested.count;
Craig Tillerec3257c2015-02-12 15:59:43 -0800838 gpr_free(rm->requested.calls);
Craig Tillerbd217572015-02-11 18:10:56 -0800839 memset(&rm->requested, 0, sizeof(rm->requested));
840 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800841
842 server->shutdown = 1;
Craig Tilleraea2fc02015-02-17 16:54:53 -0800843 if (server->lists[ALL_CALLS] == NULL) {
844 for (i = 0; i < server->num_shutdown_tags; i++) {
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800845 for (j = 0; j < server->cq_count; j++) {
846 grpc_cq_end_server_shutdown(server->cqs[j], server->shutdown_tags[i]);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800847 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800848 }
849 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800850 gpr_mu_unlock(&server->mu);
851
nnoble0c475f02014-12-05 15:37:39 -0800852 for (i = 0; i < nchannels; i++) {
853 c = channels[i];
854 elem = grpc_channel_stack_element(
855 grpc_channel_get_channel_stack(c->channel), 0);
856
857 op.type = GRPC_CHANNEL_GOAWAY;
858 op.dir = GRPC_CALL_DOWN;
859 op.data.goaway.status = GRPC_STATUS_OK;
860 op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
ctillerf962f522014-12-10 15:28:27 -0800861 elem->filter->channel_op(elem, NULL, &op);
nnoble0c475f02014-12-05 15:37:39 -0800862
863 grpc_channel_internal_unref(c->channel);
864 }
865 gpr_free(channels);
866
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800867 /* terminate all the requested calls */
Craig Tiller24be0f72015-02-10 14:04:22 -0800868 for (i = 0; i < requested_calls.count; i++) {
869 fail_call(server, &requested_calls.calls[i]);
Craig Tillercce17ac2015-01-20 09:29:28 -0800870 }
Craig Tiller24be0f72015-02-10 14:04:22 -0800871 gpr_free(requested_calls.calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800872
873 /* Shutdown listeners */
874 for (l = server->listeners; l; l = l->next) {
875 l->destroy(server, l->arg);
876 }
877 while (server->listeners) {
878 l = server->listeners;
879 server->listeners = l->next;
880 gpr_free(l);
881 }
882}
883
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800884void grpc_server_shutdown(grpc_server *server) {
885 shutdown_internal(server, 0, NULL);
886}
887
888void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
889 shutdown_internal(server, 1, tag);
890}
891
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800892void grpc_server_destroy(grpc_server *server) {
893 channel_data *c;
894 gpr_mu_lock(&server->mu);
Craig Tilleraea2fc02015-02-17 16:54:53 -0800895 if (!server->shutdown) {
896 gpr_mu_unlock(&server->mu);
897 grpc_server_shutdown(server);
898 gpr_mu_lock(&server->mu);
899 }
900
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800901 for (c = server->root_channel_data.next; c != &server->root_channel_data;
902 c = c->next) {
903 shutdown_channel(c);
904 }
905 gpr_mu_unlock(&server->mu);
906
907 server_unref(server);
908}
909
910void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -0800911 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -0800912 grpc_pollset **pollsets,
913 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800914 void (*destroy)(grpc_server *server, void *arg)) {
915 listener *l = gpr_malloc(sizeof(listener));
916 l->arg = arg;
917 l->start = start;
918 l->destroy = destroy;
919 l->next = server->listeners;
920 server->listeners = l;
921}
922
Craig Tiller9f28ac22015-01-27 17:01:29 -0800923static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -0800924 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800925 call_data *calld = NULL;
Craig Tiller0ef1a922015-02-11 16:23:01 -0800926 requested_call_array *requested_calls = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800927 gpr_mu_lock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800928 if (server->shutdown) {
929 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800930 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800931 return GRPC_CALL_OK;
932 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800933 switch (rc->type) {
934 case LEGACY_CALL:
935 case BATCH_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800936 calld =
937 call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800938 requested_calls = &server->requested_calls;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800939 break;
940 case REGISTERED_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800941 calld = call_list_remove_head(
942 &rc->data.registered.registered_method->pending, PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800943 requested_calls = &rc->data.registered.registered_method->requested;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800944 break;
945 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800946 if (calld) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800947 GPR_ASSERT(calld->state == PENDING);
Craig Tillerbb3f22f2015-01-29 16:40:56 -0800948 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -0800949 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800950 begin_call(server, calld, rc);
Craig Tillercce17ac2015-01-20 09:29:28 -0800951 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800952 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800953 *requested_call_array_add(requested_calls) = *rc;
Craig Tillercce17ac2015-01-20 09:29:28 -0800954 gpr_mu_unlock(&server->mu);
955 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800956 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800957}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800958
Craig Tiller24be0f72015-02-10 14:04:22 -0800959grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
960 grpc_call_details *details,
961 grpc_metadata_array *initial_metadata,
Craig Tiller3b29b562015-02-11 12:58:46 -0800962 grpc_completion_queue *cq_bind,
963 void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800964 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800965 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -0800966 rc.type = BATCH_CALL;
967 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -0800968 rc.data.batch.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -0800969 rc.data.batch.call = call;
970 rc.data.batch.details = details;
971 rc.data.batch.initial_metadata = initial_metadata;
972 return queue_call_request(server, &rc);
973}
974
975grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -0800976 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
977 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
978 grpc_completion_queue *cq_bind, void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800979 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800980 registered_method *registered_method = rm;
981 grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -0800982 rc.type = REGISTERED_CALL;
983 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -0800984 rc.data.registered.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -0800985 rc.data.registered.call = call;
986 rc.data.registered.registered_method = registered_method;
987 rc.data.registered.deadline = deadline;
988 rc.data.registered.initial_metadata = initial_metadata;
989 rc.data.registered.optional_payload = optional_payload;
990 return queue_call_request(server, &rc);
991}
992
993grpc_call_error grpc_server_request_call_old(grpc_server *server,
994 void *tag_new) {
995 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800996 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
Craig Tiller24be0f72015-02-10 14:04:22 -0800997 rc.type = LEGACY_CALL;
998 rc.tag = tag_new;
999 return queue_call_request(server, &rc);
1000}
1001
1002static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
1003static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1004 void *tag);
Craig Tiller3b29b562015-02-11 12:58:46 -08001005static void publish_was_not_set(grpc_call *call, grpc_op_error status,
1006 void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001007 abort();
1008}
Craig Tiller24be0f72015-02-10 14:04:22 -08001009
Craig Tiller166e2502015-02-03 20:14:41 -08001010static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1011 gpr_slice slice = value->slice;
1012 size_t len = GPR_SLICE_LENGTH(slice);
1013
1014 if (len + 1 > *capacity) {
1015 *capacity = GPR_MAX(len + 1, *capacity * 2);
1016 *dest = gpr_realloc(*dest, *capacity);
1017 }
1018 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1019}
1020
Craig Tiller24be0f72015-02-10 14:04:22 -08001021static void begin_call(grpc_server *server, call_data *calld,
1022 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001023 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001024 grpc_ioreq req[2];
1025 grpc_ioreq *r = req;
1026
1027 /* called once initial metadata has been read by the call, but BEFORE
1028 the ioreq to fetch it out of the call has been executed.
1029 This means metadata related fields can be relied on in calld, but to
1030 fill in the metadata array passed by the client, we need to perform
1031 an ioreq op, that should complete immediately. */
1032
1033 switch (rc->type) {
1034 case LEGACY_CALL:
1035 calld->legacy = gpr_malloc(sizeof(legacy_data));
1036 memset(calld->legacy, 0, sizeof(legacy_data));
1037 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1038 r->data.recv_metadata = &calld->legacy->initial_metadata;
1039 r++;
1040 publish = publish_legacy;
1041 break;
1042 case BATCH_CALL:
1043 cpstr(&rc->data.batch.details->host,
1044 &rc->data.batch.details->host_capacity, calld->host);
1045 cpstr(&rc->data.batch.details->method,
1046 &rc->data.batch.details->method_capacity, calld->path);
Craig Tiller8e8fd892015-02-10 17:02:08 -08001047 grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001048 *rc->data.batch.call = calld->call;
1049 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1050 r->data.recv_metadata = rc->data.batch.initial_metadata;
1051 r++;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001052 calld->cq_new = server->unregistered_cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001053 publish = publish_registered_or_batch;
1054 break;
1055 case REGISTERED_CALL:
1056 *rc->data.registered.deadline = calld->deadline;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001057 grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001058 *rc->data.registered.call = calld->call;
1059 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1060 r->data.recv_metadata = rc->data.registered.initial_metadata;
1061 r++;
1062 if (rc->data.registered.optional_payload) {
1063 r->op = GRPC_IOREQ_RECV_MESSAGE;
1064 r->data.recv_message = rc->data.registered.optional_payload;
1065 r++;
1066 }
Craig Tiller20bc56d2015-02-12 09:02:56 -08001067 calld->cq_new = rc->data.registered.registered_method->cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001068 publish = publish_registered_or_batch;
1069 break;
1070 }
1071
1072 grpc_call_internal_ref(calld->call);
1073 grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
1074 rc->tag);
1075}
1076
1077static void fail_call(grpc_server *server, requested_call *rc) {
1078 switch (rc->type) {
1079 case LEGACY_CALL:
Craig Tillerec3257c2015-02-12 15:59:43 -08001080 grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing,
1081 NULL, NULL, NULL, gpr_inf_past, 0, NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001082 break;
1083 case BATCH_CALL:
1084 *rc->data.batch.call = NULL;
1085 rc->data.batch.initial_metadata->count = 0;
Craig Tillerec3257c2015-02-12 15:59:43 -08001086 grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL,
1087 do_nothing, NULL, GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001088 break;
1089 case REGISTERED_CALL:
1090 *rc->data.registered.call = NULL;
1091 rc->data.registered.initial_metadata->count = 0;
Craig Tillerec3257c2015-02-12 15:59:43 -08001092 grpc_cq_end_op_complete(rc->data.registered.registered_method->cq,
1093 rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001094 break;
1095 }
1096}
1097
1098static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
1099 grpc_call_element *elem =
1100 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1101 call_data *calld = elem->call_data;
1102 channel_data *chand = elem->channel_data;
1103 grpc_server *server = chand->server;
1104
1105 if (status == GRPC_OP_OK) {
Craig Tiller20bc56d2015-02-12 09:02:56 -08001106 grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
Craig Tiller24be0f72015-02-10 14:04:22 -08001107 grpc_mdstr_as_c_string(calld->path),
1108 grpc_mdstr_as_c_string(calld->host), calld->deadline,
1109 calld->legacy->initial_metadata.count,
1110 calld->legacy->initial_metadata.metadata);
1111 } else {
1112 gpr_log(GPR_ERROR, "should never reach here");
1113 abort();
1114 }
1115}
1116
1117static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1118 void *tag) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001119 grpc_call_element *elem =
1120 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller20bc56d2015-02-12 09:02:56 -08001121 call_data *calld = elem->call_data;
1122 grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status);
Craig Tiller24be0f72015-02-10 14:04:22 -08001123}
1124
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001125const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1126 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001127}