blob: bb50419488e1ecd9b66ff75caf0fa7ac37cbb5b6 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
39#include "src/core/channel/census_filter.h"
40#include "src/core/channel/channel_args.h"
41#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/surface/call.h"
45#include "src/core/surface/channel.h"
46#include "src/core/surface/completion_queue.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080047#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080048#include <grpc/support/alloc.h>
49#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050#include <grpc/support/useful.h>
51
52typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
53
54typedef struct listener {
55 void *arg;
ctiller58393c22015-01-07 14:03:30 -080056 void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080057 void (*destroy)(grpc_server *server, void *arg);
58 struct listener *next;
59} listener;
60
61typedef struct call_data call_data;
62typedef struct channel_data channel_data;
63
64struct channel_data {
65 grpc_server *server;
66 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -080067 grpc_mdstr *path_key;
68 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080069 /* linked list of all channels on a server */
70 channel_data *next;
71 channel_data *prev;
72};
73
Craig Tiller9f28ac22015-01-27 17:01:29 -080074typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq,
75 grpc_metadata_array *initial_metadata,
76 call_data *calld, void *user_data);
Craig Tillercce17ac2015-01-20 09:29:28 -080077
78typedef struct {
79 void *user_data;
80 grpc_completion_queue *cq;
81 grpc_metadata_array *initial_metadata;
82 new_call_cb cb;
83} requested_call;
84
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080085struct grpc_server {
86 size_t channel_filter_count;
87 const grpc_channel_filter **channel_filters;
88 grpc_channel_args *channel_args;
89 grpc_completion_queue *cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080090
91 gpr_mu mu;
92
Craig Tillercce17ac2015-01-20 09:29:28 -080093 requested_call *requested_calls;
94 size_t requested_call_count;
95 size_t requested_call_capacity;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080096
97 gpr_uint8 shutdown;
Craig Tiller4ffdcd52015-01-16 11:34:55 -080098 gpr_uint8 have_shutdown_tag;
99 void *shutdown_tag;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800100
101 call_data *lists[CALL_LIST_COUNT];
102 channel_data root_channel_data;
103
104 listener *listeners;
105 gpr_refcount internal_refcount;
106};
107
108typedef struct {
109 call_data *next;
110 call_data *prev;
111} call_link;
112
113typedef enum {
114 /* waiting for metadata */
115 NOT_STARTED,
116 /* inital metadata read, not flow controlled in yet */
117 PENDING,
118 /* flow controlled in, on completion queue */
119 ACTIVATED,
120 /* cancelled before being queued */
121 ZOMBIED
122} call_state;
123
Craig Tiller9f28ac22015-01-27 17:01:29 -0800124typedef struct legacy_data { grpc_metadata_array client_metadata; } legacy_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800125
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800126struct call_data {
127 grpc_call *call;
128
129 call_state state;
130 gpr_timespec deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800131 grpc_mdstr *path;
132 grpc_mdstr *host;
133
134 legacy_data *legacy;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800135
136 gpr_uint8 included[CALL_LIST_COUNT];
137 call_link links[CALL_LIST_COUNT];
138};
139
140#define SERVER_FROM_CALL_ELEM(elem) \
141 (((channel_data *)(elem)->channel_data)->server)
142
143static void do_nothing(void *unused, grpc_op_error ignored) {}
144
145static int call_list_join(grpc_server *server, call_data *call,
146 call_list list) {
147 if (call->included[list]) return 0;
148 call->included[list] = 1;
149 if (!server->lists[list]) {
150 server->lists[list] = call;
151 call->links[list].next = call->links[list].prev = call;
152 } else {
153 call->links[list].next = server->lists[list];
154 call->links[list].prev = server->lists[list]->links[list].prev;
155 call->links[list].next->links[list].prev =
156 call->links[list].prev->links[list].next = call;
157 }
158 return 1;
159}
160
161static call_data *call_list_remove_head(grpc_server *server, call_list list) {
162 call_data *out = server->lists[list];
163 if (out) {
164 out->included[list] = 0;
165 if (out->links[list].next == out) {
166 server->lists[list] = NULL;
167 } else {
168 server->lists[list] = out->links[list].next;
169 out->links[list].next->links[list].prev = out->links[list].prev;
170 out->links[list].prev->links[list].next = out->links[list].next;
171 }
172 }
173 return out;
174}
175
176static int call_list_remove(grpc_server *server, call_data *call,
177 call_list list) {
178 if (!call->included[list]) return 0;
179 call->included[list] = 0;
180 if (server->lists[list] == call) {
181 server->lists[list] = call->links[list].next;
182 if (server->lists[list] == call) {
183 server->lists[list] = NULL;
184 return 1;
185 }
186 }
187 GPR_ASSERT(server->lists[list] != call);
188 call->links[list].next->links[list].prev = call->links[list].prev;
189 call->links[list].prev->links[list].next = call->links[list].next;
190 return 1;
191}
192
193static void server_ref(grpc_server *server) {
194 gpr_ref(&server->internal_refcount);
195}
196
197static void server_unref(grpc_server *server) {
198 if (gpr_unref(&server->internal_refcount)) {
199 grpc_channel_args_destroy(server->channel_args);
200 gpr_mu_destroy(&server->mu);
201 gpr_free(server->channel_filters);
Craig Tillercce17ac2015-01-20 09:29:28 -0800202 gpr_free(server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800203 gpr_free(server);
204 }
205}
206
207static int is_channel_orphaned(channel_data *chand) {
208 return chand->next == chand;
209}
210
211static void orphan_channel(channel_data *chand) {
212 chand->next->prev = chand->prev;
213 chand->prev->next = chand->next;
214 chand->next = chand->prev = chand;
215}
216
ctiller58393c22015-01-07 14:03:30 -0800217static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800218 channel_data *chand = cd;
219 grpc_server *server = chand->server;
220 /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/
221 grpc_channel_destroy(chand->channel);
222 server_unref(server);
223}
224
225static void destroy_channel(channel_data *chand) {
226 if (is_channel_orphaned(chand)) return;
227 GPR_ASSERT(chand->server != NULL);
228 orphan_channel(chand);
229 server_ref(chand->server);
ctiller18b49ab2014-12-09 14:39:16 -0800230 grpc_iomgr_add_callback(finish_destroy_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800231}
232
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800233static void start_new_rpc(grpc_call_element *elem) {
234 channel_data *chand = elem->channel_data;
235 call_data *calld = elem->call_data;
236 grpc_server *server = chand->server;
237
238 gpr_mu_lock(&server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800239 if (server->requested_call_count > 0) {
240 requested_call rc = server->requested_calls[--server->requested_call_count];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800241 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -0800242 gpr_mu_unlock(&server->mu);
243 rc.cb(server, rc.cq, rc.initial_metadata, calld, rc.user_data);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800244 } else {
245 calld->state = PENDING;
246 call_list_join(server, calld, PENDING_START);
Craig Tillercce17ac2015-01-20 09:29:28 -0800247 gpr_mu_unlock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800248 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800249}
250
ctiller58393c22015-01-07 14:03:30 -0800251static void kill_zombie(void *elem, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800252 grpc_call_destroy(grpc_call_from_top_element(elem));
253}
254
Craig Tillercce17ac2015-01-20 09:29:28 -0800255static void stream_closed(grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800256 call_data *calld = elem->call_data;
257 channel_data *chand = elem->channel_data;
258 gpr_mu_lock(&chand->server->mu);
259 switch (calld->state) {
260 case ACTIVATED:
Craig Tillercce17ac2015-01-20 09:29:28 -0800261 grpc_call_stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800262 break;
263 case PENDING:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800264 call_list_remove(chand->server, calld, PENDING_START);
265 /* fallthrough intended */
266 case NOT_STARTED:
267 calld->state = ZOMBIED;
ctiller18b49ab2014-12-09 14:39:16 -0800268 grpc_iomgr_add_callback(kill_zombie, elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800269 break;
270 case ZOMBIED:
271 break;
272 }
273 gpr_mu_unlock(&chand->server->mu);
274}
275
Craig Tillercce17ac2015-01-20 09:29:28 -0800276static void read_closed(grpc_call_element *elem) {
277 call_data *calld = elem->call_data;
278 channel_data *chand = elem->channel_data;
279 gpr_mu_lock(&chand->server->mu);
280 switch (calld->state) {
281 case ACTIVATED:
282 case PENDING:
283 grpc_call_read_closed(elem);
284 break;
285 case NOT_STARTED:
286 calld->state = ZOMBIED;
287 grpc_iomgr_add_callback(kill_zombie, elem);
288 break;
289 case ZOMBIED:
290 break;
291 }
292 gpr_mu_unlock(&chand->server->mu);
293}
294
ctillerf962f522014-12-10 15:28:27 -0800295static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
296 grpc_call_op *op) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800297 channel_data *chand = elem->channel_data;
298 call_data *calld = elem->call_data;
299 grpc_mdelem *md;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800300 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
301 switch (op->type) {
302 case GRPC_RECV_METADATA:
Craig Tillercce17ac2015-01-20 09:29:28 -0800303 md = op->data.metadata;
304 if (md->key == chand->path_key) {
305 calld->path = grpc_mdstr_ref(md->value);
306 grpc_mdelem_unref(md);
307 } else if (md->key == chand->authority_key) {
308 calld->host = grpc_mdstr_ref(md->value);
309 grpc_mdelem_unref(md);
310 } else {
311 grpc_call_recv_metadata(elem, md);
312 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313 break;
314 case GRPC_RECV_END_OF_INITIAL_METADATA:
315 start_new_rpc(elem);
316 break;
317 case GRPC_RECV_MESSAGE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800318 grpc_call_recv_message(elem, op->data.message);
319 op->done_cb(op->user_data, GRPC_OP_OK);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800320 break;
321 case GRPC_RECV_HALF_CLOSE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800322 read_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800323 break;
324 case GRPC_RECV_FINISH:
Craig Tillercce17ac2015-01-20 09:29:28 -0800325 stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800326 break;
327 case GRPC_RECV_DEADLINE:
328 grpc_call_set_deadline(elem, op->data.deadline);
329 ((call_data *)elem->call_data)->deadline = op->data.deadline;
330 break;
331 default:
332 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
333 grpc_call_next_op(elem, op);
334 break;
335 }
336}
337
ctillerf962f522014-12-10 15:28:27 -0800338static void channel_op(grpc_channel_element *elem,
339 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800340 channel_data *chand = elem->channel_data;
341
342 switch (op->type) {
343 case GRPC_ACCEPT_CALL:
344 /* create a call */
345 grpc_call_create(chand->channel,
346 op->data.accept_call.transport_server_data);
347 break;
348 case GRPC_TRANSPORT_CLOSED:
349 /* if the transport is closed for a server channel, we destroy the
350 channel */
351 gpr_mu_lock(&chand->server->mu);
352 server_ref(chand->server);
353 destroy_channel(chand);
354 gpr_mu_unlock(&chand->server->mu);
355 server_unref(chand->server);
356 break;
nnoble0c475f02014-12-05 15:37:39 -0800357 case GRPC_TRANSPORT_GOAWAY:
358 gpr_slice_unref(op->data.goaway.message);
359 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800360 default:
361 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
362 grpc_channel_next_op(elem, op);
363 break;
364 }
365}
366
ctiller58393c22015-01-07 14:03:30 -0800367static void finish_shutdown_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800368 channel_data *chand = cd;
369 grpc_channel_op op;
nnoble0c475f02014-12-05 15:37:39 -0800370 op.type = GRPC_CHANNEL_DISCONNECT;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800371 op.dir = GRPC_CALL_DOWN;
372 channel_op(grpc_channel_stack_element(
373 grpc_channel_get_channel_stack(chand->channel), 0),
ctillerf962f522014-12-10 15:28:27 -0800374 NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800375 grpc_channel_internal_unref(chand->channel);
376}
377
378static void shutdown_channel(channel_data *chand) {
379 grpc_channel_internal_ref(chand->channel);
ctiller18b49ab2014-12-09 14:39:16 -0800380 grpc_iomgr_add_callback(finish_shutdown_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800381}
382
383static void init_call_elem(grpc_call_element *elem,
384 const void *server_transport_data) {
385 call_data *calld = elem->call_data;
386 channel_data *chand = elem->channel_data;
387 memset(calld, 0, sizeof(call_data));
388 calld->deadline = gpr_inf_future;
389 calld->call = grpc_call_from_top_element(elem);
390
391 gpr_mu_lock(&chand->server->mu);
392 call_list_join(chand->server, calld, ALL_CALLS);
393 gpr_mu_unlock(&chand->server->mu);
394
395 server_ref(chand->server);
396}
397
398static void destroy_call_elem(grpc_call_element *elem) {
399 channel_data *chand = elem->channel_data;
400 int i;
401
402 gpr_mu_lock(&chand->server->mu);
403 for (i = 0; i < CALL_LIST_COUNT; i++) {
404 call_list_remove(chand->server, elem->call_data, i);
405 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800406 if (chand->server->shutdown && chand->server->have_shutdown_tag &&
407 chand->server->lists[ALL_CALLS] == NULL) {
408 grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag);
409 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800410 gpr_mu_unlock(&chand->server->mu);
411
412 server_unref(chand->server);
413}
414
415static void init_channel_elem(grpc_channel_element *elem,
416 const grpc_channel_args *args,
417 grpc_mdctx *metadata_context, int is_first,
418 int is_last) {
419 channel_data *chand = elem->channel_data;
420 GPR_ASSERT(is_first);
421 GPR_ASSERT(!is_last);
422 chand->server = NULL;
423 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800424 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
425 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800426 chand->next = chand->prev = chand;
427}
428
429static void destroy_channel_elem(grpc_channel_element *elem) {
430 channel_data *chand = elem->channel_data;
431 if (chand->server) {
432 gpr_mu_lock(&chand->server->mu);
433 chand->next->prev = chand->prev;
434 chand->prev->next = chand->next;
435 chand->next = chand->prev = chand;
436 gpr_mu_unlock(&chand->server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800437 grpc_mdstr_unref(chand->path_key);
438 grpc_mdstr_unref(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800439 server_unref(chand->server);
440 }
441}
442
443static const grpc_channel_filter server_surface_filter = {
Yang Gao5fd0d292015-01-26 00:19:48 -0800444 call_op, channel_op, sizeof(call_data),
445 init_call_elem, destroy_call_elem, sizeof(channel_data),
Craig Tiller9f28ac22015-01-27 17:01:29 -0800446 init_channel_elem, destroy_channel_elem, "server",
447};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800448
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800449grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
450 grpc_channel_filter **filters,
451 size_t filter_count,
452 const grpc_channel_args *args) {
453 size_t i;
454 int census_enabled = grpc_channel_args_is_census_enabled(args);
455
456 grpc_server *server = gpr_malloc(sizeof(grpc_server));
457 memset(server, 0, sizeof(grpc_server));
458
459 gpr_mu_init(&server->mu);
460
461 server->cq = cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800462 /* decremented by grpc_server_destroy */
463 gpr_ref_init(&server->internal_refcount, 1);
464 server->root_channel_data.next = server->root_channel_data.prev =
465 &server->root_channel_data;
466
467 /* Server filter stack is:
468
469 server_surface_filter - for making surface API calls
470 grpc_server_census_filter (optional) - for stats collection and tracing
471 {passed in filter stack}
472 grpc_connected_channel_filter - for interfacing with transports */
473 server->channel_filter_count = filter_count + 1 + census_enabled;
474 server->channel_filters =
475 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
476 server->channel_filters[0] = &server_surface_filter;
477 if (census_enabled) {
478 server->channel_filters[1] = &grpc_server_census_filter;
479 }
480 for (i = 0; i < filter_count; i++) {
481 server->channel_filters[i + 1 + census_enabled] = filters[i];
482 }
483
484 server->channel_args = grpc_channel_args_copy(args);
485
486 return server;
487}
488
489void grpc_server_start(grpc_server *server) {
490 listener *l;
491
492 for (l = server->listeners; l; l = l->next) {
ctiller58393c22015-01-07 14:03:30 -0800493 l->start(server, l->arg, grpc_cq_pollset(server->cq));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800494 }
495}
496
497grpc_transport_setup_result grpc_server_setup_transport(
498 grpc_server *s, grpc_transport *transport,
499 grpc_channel_filter const **extra_filters, size_t num_extra_filters,
500 grpc_mdctx *mdctx) {
501 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
502 grpc_channel_filter const **filters =
503 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
504 size_t i;
505 grpc_channel *channel;
506 channel_data *chand;
507
508 for (i = 0; i < s->channel_filter_count; i++) {
509 filters[i] = s->channel_filters[i];
510 }
511 for (; i < s->channel_filter_count + num_extra_filters; i++) {
512 filters[i] = extra_filters[i - s->channel_filter_count];
513 }
514 filters[i] = &grpc_connected_channel_filter;
515
ctillerd79b4862014-12-17 16:36:59 -0800516 grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq));
517
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800518 channel = grpc_channel_create_from_filters(filters, num_filters,
519 s->channel_args, mdctx, 0);
520 chand = (channel_data *)grpc_channel_stack_element(
521 grpc_channel_get_channel_stack(channel), 0)->channel_data;
522 chand->server = s;
523 server_ref(s);
524 chand->channel = channel;
525
526 gpr_mu_lock(&s->mu);
527 chand->next = &s->root_channel_data;
528 chand->prev = chand->next->prev;
529 chand->next->prev = chand->prev->next = chand;
530 gpr_mu_unlock(&s->mu);
531
532 gpr_free(filters);
533
534 return grpc_connected_channel_bind_transport(
535 grpc_channel_get_channel_stack(channel), transport);
536}
537
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800538void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
539 void *shutdown_tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800540 listener *l;
Craig Tillercce17ac2015-01-20 09:29:28 -0800541 requested_call *requested_calls;
542 size_t requested_call_count;
nnoble0c475f02014-12-05 15:37:39 -0800543 channel_data **channels;
544 channel_data *c;
545 size_t nchannels;
546 size_t i;
547 grpc_channel_op op;
548 grpc_channel_element *elem;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800549
550 /* lock, and gather up some stuff to do */
551 gpr_mu_lock(&server->mu);
552 if (server->shutdown) {
553 gpr_mu_unlock(&server->mu);
554 return;
555 }
556
nnoble0c475f02014-12-05 15:37:39 -0800557 nchannels = 0;
558 for (c = server->root_channel_data.next; c != &server->root_channel_data;
559 c = c->next) {
560 nchannels++;
561 }
562 channels = gpr_malloc(sizeof(channel_data *) * nchannels);
563 i = 0;
564 for (c = server->root_channel_data.next; c != &server->root_channel_data;
565 c = c->next) {
566 grpc_channel_internal_ref(c->channel);
567 channels[i] = c;
568 i++;
569 }
570
Craig Tillercce17ac2015-01-20 09:29:28 -0800571 requested_calls = server->requested_calls;
572 requested_call_count = server->requested_call_count;
573 server->requested_calls = NULL;
574 server->requested_call_count = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800575
576 server->shutdown = 1;
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800577 server->have_shutdown_tag = have_shutdown_tag;
578 server->shutdown_tag = shutdown_tag;
579 if (have_shutdown_tag) {
580 grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
581 if (server->lists[ALL_CALLS] == NULL) {
582 grpc_cq_end_server_shutdown(server->cq, shutdown_tag);
583 }
584 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800585 gpr_mu_unlock(&server->mu);
586
nnoble0c475f02014-12-05 15:37:39 -0800587 for (i = 0; i < nchannels; i++) {
588 c = channels[i];
589 elem = grpc_channel_stack_element(
590 grpc_channel_get_channel_stack(c->channel), 0);
591
592 op.type = GRPC_CHANNEL_GOAWAY;
593 op.dir = GRPC_CALL_DOWN;
594 op.data.goaway.status = GRPC_STATUS_OK;
595 op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
ctillerf962f522014-12-10 15:28:27 -0800596 elem->filter->channel_op(elem, NULL, &op);
nnoble0c475f02014-12-05 15:37:39 -0800597
598 grpc_channel_internal_unref(c->channel);
599 }
600 gpr_free(channels);
601
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800602 /* terminate all the requested calls */
Craig Tillercce17ac2015-01-20 09:29:28 -0800603 for (i = 0; i < requested_call_count; i++) {
Craig Tiller9f28ac22015-01-27 17:01:29 -0800604 requested_calls[i].cb(server, requested_calls[i].cq,
605 requested_calls[i].initial_metadata, NULL,
606 requested_calls[i].user_data);
Craig Tillercce17ac2015-01-20 09:29:28 -0800607 }
608 gpr_free(requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800609
610 /* Shutdown listeners */
611 for (l = server->listeners; l; l = l->next) {
612 l->destroy(server, l->arg);
613 }
614 while (server->listeners) {
615 l = server->listeners;
616 server->listeners = l->next;
617 gpr_free(l);
618 }
619}
620
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800621void grpc_server_shutdown(grpc_server *server) {
622 shutdown_internal(server, 0, NULL);
623}
624
625void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
626 shutdown_internal(server, 1, tag);
627}
628
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800629void grpc_server_destroy(grpc_server *server) {
630 channel_data *c;
631 gpr_mu_lock(&server->mu);
632 for (c = server->root_channel_data.next; c != &server->root_channel_data;
633 c = c->next) {
634 shutdown_channel(c);
635 }
636 gpr_mu_unlock(&server->mu);
637
638 server_unref(server);
639}
640
641void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -0800642 void (*start)(grpc_server *server, void *arg,
643 grpc_pollset *pollset),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800644 void (*destroy)(grpc_server *server, void *arg)) {
645 listener *l = gpr_malloc(sizeof(listener));
646 l->arg = arg;
647 l->start = start;
648 l->destroy = destroy;
649 l->next = server->listeners;
650 server->listeners = l;
651}
652
Craig Tiller9f28ac22015-01-27 17:01:29 -0800653static grpc_call_error queue_call_request(grpc_server *server,
654 grpc_completion_queue *cq,
655 grpc_metadata_array *initial_metadata,
656 new_call_cb cb, void *user_data) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800657 call_data *calld;
Craig Tillercce17ac2015-01-20 09:29:28 -0800658 requested_call *rc;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800659 gpr_mu_lock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800660 if (server->shutdown) {
661 gpr_mu_unlock(&server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800662 cb(server, cq, initial_metadata, NULL, user_data);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800663 return GRPC_CALL_OK;
664 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800665 calld = call_list_remove_head(server, PENDING_START);
666 if (calld) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800667 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -0800668 GPR_ASSERT(calld->state == PENDING);
669 gpr_mu_unlock(&server->mu);
670 cb(server, cq, initial_metadata, calld, user_data);
671 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800672 } else {
Craig Tillercce17ac2015-01-20 09:29:28 -0800673 if (server->requested_call_count == server->requested_call_capacity) {
Craig Tiller9f28ac22015-01-27 17:01:29 -0800674 server->requested_call_capacity =
675 GPR_MAX(server->requested_call_capacity + 8,
676 server->requested_call_capacity * 2);
677 server->requested_calls =
678 gpr_realloc(server->requested_calls,
679 sizeof(requested_call) * server->requested_call_capacity);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800680 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800681 rc = &server->requested_calls[server->requested_call_count++];
682 rc->cb = cb;
683 rc->cq = cq;
684 rc->user_data = user_data;
685 rc->initial_metadata = initial_metadata;
686 gpr_mu_unlock(&server->mu);
687 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800688 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800689}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800690
Craig Tiller9f28ac22015-01-27 17:01:29 -0800691static void begin_request(grpc_server *server, grpc_completion_queue *cq,
692 grpc_metadata_array *initial_metadata,
693 call_data *call_data, void *tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800694 abort();
695}
696
Craig Tiller9f28ac22015-01-27 17:01:29 -0800697grpc_call_error grpc_server_request_call(grpc_server *server,
698 grpc_completion_queue *cq,
699 grpc_call_details *details,
700 grpc_metadata_array *initial_metadata,
701 void *tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800702 grpc_cq_begin_op(cq, NULL, GRPC_IOREQ);
703 return queue_call_request(server, cq, initial_metadata, begin_request, tag);
704}
705
Craig Tiller9f28ac22015-01-27 17:01:29 -0800706static void publish_legacy_request(grpc_call *call, grpc_op_error status,
707 void *tag) {
708 grpc_call_element *elem =
709 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tillercce17ac2015-01-20 09:29:28 -0800710 call_data *calld = elem->call_data;
711 channel_data *chand = elem->channel_data;
712 grpc_server *server = chand->server;
713
714 if (status == GRPC_OP_OK) {
Craig Tiller9f28ac22015-01-27 17:01:29 -0800715 grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL,
716 grpc_mdstr_as_c_string(calld->path),
717 grpc_mdstr_as_c_string(calld->host), calld->deadline,
718 calld->legacy->client_metadata.count,
719 calld->legacy->client_metadata.metadata);
Craig Tillercce17ac2015-01-20 09:29:28 -0800720 }
721}
722
Craig Tiller9f28ac22015-01-27 17:01:29 -0800723static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq,
724 grpc_metadata_array *initial_metadata,
725 call_data *calld, void *tag) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800726 grpc_ioreq req;
727 if (!calld) {
728 gpr_free(initial_metadata);
729 grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL,
730 gpr_inf_past, 0, NULL);
731 return;
732 }
733 req.op = GRPC_IOREQ_RECV_INITIAL_METADATA;
734 req.data.recv_metadata = initial_metadata;
Craig Tiller9f28ac22015-01-27 17:01:29 -0800735 grpc_call_start_ioreq_and_call_back(calld->call, &req, 1,
736 publish_legacy_request, tag);
Craig Tillercce17ac2015-01-20 09:29:28 -0800737}
738
Craig Tiller9f28ac22015-01-27 17:01:29 -0800739grpc_call_error grpc_server_request_call_old(grpc_server *server,
740 void *tag_new) {
741 grpc_metadata_array *client_metadata =
742 gpr_malloc(sizeof(grpc_metadata_array));
Craig Tillercce17ac2015-01-20 09:29:28 -0800743 memset(client_metadata, 0, sizeof(*client_metadata));
744 grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
Craig Tiller9f28ac22015-01-27 17:01:29 -0800745 return queue_call_request(server, server->cq, client_metadata,
746 begin_legacy_request, tag_new);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800747}
748
749const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
750 return server->channel_args;
751}