blob: 80b248ee84eaf16de49b121ac517e3fc260f4c27 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
39#include "src/core/channel/census_filter.h"
40#include "src/core/channel/channel_args.h"
41#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/surface/call.h"
45#include "src/core/surface/channel.h"
46#include "src/core/surface/completion_queue.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080047#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080048#include <grpc/support/alloc.h>
49#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050#include <grpc/support/useful.h>
51
52typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
53
54typedef struct listener {
55 void *arg;
ctiller58393c22015-01-07 14:03:30 -080056 void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080057 void (*destroy)(grpc_server *server, void *arg);
58 struct listener *next;
59} listener;
60
61typedef struct call_data call_data;
62typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080063typedef struct registered_method registered_method;
64
65typedef struct {
66 call_data *next;
67 call_data *prev;
68} call_link;
69
70typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type;
71
72typedef struct {
73 requested_call_type type;
74 void *tag;
75 union {
76 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080077 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080078 grpc_call **call;
79 grpc_call_details *details;
80 grpc_metadata_array *initial_metadata;
81 } batch;
82 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080083 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080084 grpc_call **call;
85 registered_method *registered_method;
86 gpr_timespec *deadline;
87 grpc_metadata_array *initial_metadata;
88 grpc_byte_buffer **optional_payload;
89 } registered;
90 } data;
91} requested_call;
92
93typedef struct {
94 requested_call *calls;
95 size_t count;
96 size_t capacity;
97} requested_call_array;
98
99struct registered_method {
100 char *method;
101 char *host;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800102 call_data *pending;
Craig Tiller24be0f72015-02-10 14:04:22 -0800103 requested_call_array requested;
104 registered_method *next;
105};
106
107typedef struct channel_registered_method {
108 registered_method *server_registered_method;
109 grpc_mdstr *method;
110 grpc_mdstr *host;
111} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800112
113struct channel_data {
114 grpc_server *server;
115 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800116 grpc_mdstr *path_key;
117 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800118 /* linked list of all channels on a server */
119 channel_data *next;
120 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800121 channel_registered_method *registered_methods;
122 gpr_uint32 registered_method_slots;
123 gpr_uint32 registered_method_max_probes;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800124};
125
126struct grpc_server {
127 size_t channel_filter_count;
128 const grpc_channel_filter **channel_filters;
129 grpc_channel_args *channel_args;
130 grpc_completion_queue *cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800131
132 gpr_mu mu;
133
Craig Tiller24be0f72015-02-10 14:04:22 -0800134 registered_method *registered_methods;
135 requested_call_array requested_calls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800136
137 gpr_uint8 shutdown;
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800138 gpr_uint8 have_shutdown_tag;
139 void *shutdown_tag;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800140
141 call_data *lists[CALL_LIST_COUNT];
142 channel_data root_channel_data;
143
144 listener *listeners;
145 gpr_refcount internal_refcount;
146};
147
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800148typedef enum {
149 /* waiting for metadata */
150 NOT_STARTED,
151 /* inital metadata read, not flow controlled in yet */
152 PENDING,
153 /* flow controlled in, on completion queue */
154 ACTIVATED,
155 /* cancelled before being queued */
156 ZOMBIED
157} call_state;
158
Craig Tillerfb189f82015-02-03 12:07:07 -0800159typedef struct legacy_data {
Craig Tiller24be0f72015-02-10 14:04:22 -0800160 grpc_metadata_array initial_metadata;
Craig Tillerfb189f82015-02-03 12:07:07 -0800161} legacy_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800162
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800163struct call_data {
164 grpc_call *call;
165
166 call_state state;
167 gpr_timespec deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800168 grpc_mdstr *path;
169 grpc_mdstr *host;
170
171 legacy_data *legacy;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800172
Craig Tiller04cc8be2015-02-10 16:11:22 -0800173 call_data **root[CALL_LIST_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800174 call_link links[CALL_LIST_COUNT];
175};
176
177#define SERVER_FROM_CALL_ELEM(elem) \
178 (((channel_data *)(elem)->channel_data)->server)
179
180static void do_nothing(void *unused, grpc_op_error ignored) {}
181
Craig Tiller24be0f72015-02-10 14:04:22 -0800182static void begin_call(grpc_server *server, call_data *calld,
183 requested_call *rc);
184static void fail_call(grpc_server *server, requested_call *rc);
185
Craig Tiller3b29b562015-02-11 12:58:46 -0800186static int call_list_join(call_data **root, call_data *call, call_list list) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800187 GPR_ASSERT(!call->root[list]);
188 call->root[list] = root;
189 if (!*root) {
190 *root = call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800191 call->links[list].next = call->links[list].prev = call;
192 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800193 call->links[list].next = *root;
194 call->links[list].prev = (*root)->links[list].prev;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800195 call->links[list].next->links[list].prev =
196 call->links[list].prev->links[list].next = call;
197 }
198 return 1;
199}
200
Craig Tiller04cc8be2015-02-10 16:11:22 -0800201static call_data *call_list_remove_head(call_data **root, call_list list) {
202 call_data *out = *root;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800203 if (out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800204 out->root[list] = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800205 if (out->links[list].next == out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800206 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800207 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800208 *root = out->links[list].next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800209 out->links[list].next->links[list].prev = out->links[list].prev;
210 out->links[list].prev->links[list].next = out->links[list].next;
211 }
212 }
213 return out;
214}
215
Craig Tiller04cc8be2015-02-10 16:11:22 -0800216static int call_list_remove(call_data *call, call_list list) {
217 call_data **root = call->root[list];
218 if (root == NULL) return 0;
219 call->root[list] = NULL;
220 if (*root == call) {
221 *root = call->links[list].next;
222 if (*root == call) {
223 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800224 return 1;
225 }
226 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800227 GPR_ASSERT(*root != call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800228 call->links[list].next->links[list].prev = call->links[list].prev;
229 call->links[list].prev->links[list].next = call->links[list].next;
230 return 1;
231}
232
Craig Tiller24be0f72015-02-10 14:04:22 -0800233static void requested_call_array_destroy(requested_call_array *array) {
234 gpr_free(array->calls);
235}
236
237static requested_call *requested_call_array_add(requested_call_array *array) {
238 requested_call *rc;
239 if (array->count == array->capacity) {
240 array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
241 array->calls =
242 gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
243 }
244 rc = &array->calls[array->count++];
245 memset(rc, 0, sizeof(*rc));
246 return rc;
247}
248
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800249static void server_ref(grpc_server *server) {
250 gpr_ref(&server->internal_refcount);
251}
252
253static void server_unref(grpc_server *server) {
254 if (gpr_unref(&server->internal_refcount)) {
255 grpc_channel_args_destroy(server->channel_args);
256 gpr_mu_destroy(&server->mu);
257 gpr_free(server->channel_filters);
Craig Tiller24be0f72015-02-10 14:04:22 -0800258 requested_call_array_destroy(&server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800259 gpr_free(server);
260 }
261}
262
263static int is_channel_orphaned(channel_data *chand) {
264 return chand->next == chand;
265}
266
267static void orphan_channel(channel_data *chand) {
268 chand->next->prev = chand->prev;
269 chand->prev->next = chand->next;
270 chand->next = chand->prev = chand;
271}
272
ctiller58393c22015-01-07 14:03:30 -0800273static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800274 channel_data *chand = cd;
275 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800276 grpc_channel_destroy(chand->channel);
277 server_unref(server);
278}
279
280static void destroy_channel(channel_data *chand) {
281 if (is_channel_orphaned(chand)) return;
282 GPR_ASSERT(chand->server != NULL);
283 orphan_channel(chand);
284 server_ref(chand->server);
ctiller18b49ab2014-12-09 14:39:16 -0800285 grpc_iomgr_add_callback(finish_destroy_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800286}
287
Craig Tiller3b29b562015-02-11 12:58:46 -0800288static void finish_start_new_rpc_and_unlock(grpc_server *server,
289 grpc_call_element *elem,
290 call_data **pending_root,
291 requested_call_array *array) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800292 requested_call rc;
293 call_data *calld = elem->call_data;
294 if (array->count == 0) {
295 calld->state = PENDING;
296 call_list_join(pending_root, calld, PENDING_START);
297 gpr_mu_unlock(&server->mu);
298 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800299 rc = array->calls[--array->count];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800300 calld->state = ACTIVATED;
301 gpr_mu_unlock(&server->mu);
302 begin_call(server, calld, &rc);
303 }
304}
305
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800306static void start_new_rpc(grpc_call_element *elem) {
307 channel_data *chand = elem->channel_data;
308 call_data *calld = elem->call_data;
309 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800310 gpr_uint32 i;
311 gpr_uint32 hash;
312 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313
314 gpr_mu_lock(&server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800315 if (chand->registered_methods && calld->path && calld->host) {
316 /* check for an exact match with host */
317 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
318 for (i = 0; i < chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800319 rm = &chand->registered_methods[(hash + i) %
320 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800321 if (!rm) break;
322 if (rm->host != calld->host) continue;
323 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800324 finish_start_new_rpc_and_unlock(server, elem,
325 &rm->server_registered_method->pending,
326 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800327 return;
328 }
329 /* check for a wildcard method definition (no host set) */
330 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800331 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800332 rm = &chand->registered_methods[(hash + i) %
333 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800334 if (!rm) break;
335 if (rm->host != NULL) continue;
336 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800337 finish_start_new_rpc_and_unlock(server, elem,
338 &rm->server_registered_method->pending,
339 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800340 return;
341 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800342 }
Craig Tiller3b29b562015-02-11 12:58:46 -0800343 finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
344 &server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800345}
346
ctiller58393c22015-01-07 14:03:30 -0800347static void kill_zombie(void *elem, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800348 grpc_call_destroy(grpc_call_from_top_element(elem));
349}
350
Craig Tillercce17ac2015-01-20 09:29:28 -0800351static void stream_closed(grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800352 call_data *calld = elem->call_data;
353 channel_data *chand = elem->channel_data;
354 gpr_mu_lock(&chand->server->mu);
355 switch (calld->state) {
356 case ACTIVATED:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800357 break;
358 case PENDING:
Craig Tiller04cc8be2015-02-10 16:11:22 -0800359 call_list_remove(calld, PENDING_START);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800360 /* fallthrough intended */
361 case NOT_STARTED:
362 calld->state = ZOMBIED;
ctiller18b49ab2014-12-09 14:39:16 -0800363 grpc_iomgr_add_callback(kill_zombie, elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800364 break;
365 case ZOMBIED:
366 break;
367 }
368 gpr_mu_unlock(&chand->server->mu);
Craig Tiller0a927bf2015-02-05 10:52:53 -0800369 grpc_call_stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800370}
371
Craig Tillercce17ac2015-01-20 09:29:28 -0800372static void read_closed(grpc_call_element *elem) {
373 call_data *calld = elem->call_data;
374 channel_data *chand = elem->channel_data;
375 gpr_mu_lock(&chand->server->mu);
376 switch (calld->state) {
377 case ACTIVATED:
378 case PENDING:
379 grpc_call_read_closed(elem);
380 break;
381 case NOT_STARTED:
382 calld->state = ZOMBIED;
383 grpc_iomgr_add_callback(kill_zombie, elem);
384 break;
385 case ZOMBIED:
386 break;
387 }
388 gpr_mu_unlock(&chand->server->mu);
389}
390
ctillerf962f522014-12-10 15:28:27 -0800391static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
392 grpc_call_op *op) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800393 channel_data *chand = elem->channel_data;
394 call_data *calld = elem->call_data;
395 grpc_mdelem *md;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800396 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
397 switch (op->type) {
398 case GRPC_RECV_METADATA:
Craig Tillercce17ac2015-01-20 09:29:28 -0800399 md = op->data.metadata;
400 if (md->key == chand->path_key) {
401 calld->path = grpc_mdstr_ref(md->value);
402 grpc_mdelem_unref(md);
403 } else if (md->key == chand->authority_key) {
404 calld->host = grpc_mdstr_ref(md->value);
405 grpc_mdelem_unref(md);
406 } else {
407 grpc_call_recv_metadata(elem, md);
408 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800409 break;
410 case GRPC_RECV_END_OF_INITIAL_METADATA:
411 start_new_rpc(elem);
Craig Tiller4069b682015-01-29 14:01:19 -0800412 grpc_call_initial_metadata_complete(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800413 break;
414 case GRPC_RECV_MESSAGE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800415 grpc_call_recv_message(elem, op->data.message);
416 op->done_cb(op->user_data, GRPC_OP_OK);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800417 break;
418 case GRPC_RECV_HALF_CLOSE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800419 read_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800420 break;
421 case GRPC_RECV_FINISH:
Craig Tillercce17ac2015-01-20 09:29:28 -0800422 stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800423 break;
424 case GRPC_RECV_DEADLINE:
425 grpc_call_set_deadline(elem, op->data.deadline);
426 ((call_data *)elem->call_data)->deadline = op->data.deadline;
427 break;
428 default:
429 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
430 grpc_call_next_op(elem, op);
431 break;
432 }
433}
434
ctillerf962f522014-12-10 15:28:27 -0800435static void channel_op(grpc_channel_element *elem,
436 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800437 channel_data *chand = elem->channel_data;
Craig Tiller8b976d02015-02-05 21:41:23 -0800438 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800439
440 switch (op->type) {
441 case GRPC_ACCEPT_CALL:
442 /* create a call */
Craig Tillerfb189f82015-02-03 12:07:07 -0800443 grpc_call_create(chand->channel, NULL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800444 op->data.accept_call.transport_server_data);
445 break;
446 case GRPC_TRANSPORT_CLOSED:
447 /* if the transport is closed for a server channel, we destroy the
448 channel */
Craig Tiller8b976d02015-02-05 21:41:23 -0800449 gpr_mu_lock(&server->mu);
450 server_ref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800451 destroy_channel(chand);
Craig Tiller8b976d02015-02-05 21:41:23 -0800452 gpr_mu_unlock(&server->mu);
453 server_unref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800454 break;
nnoble0c475f02014-12-05 15:37:39 -0800455 case GRPC_TRANSPORT_GOAWAY:
456 gpr_slice_unref(op->data.goaway.message);
457 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800458 default:
459 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
460 grpc_channel_next_op(elem, op);
461 break;
462 }
463}
464
ctiller58393c22015-01-07 14:03:30 -0800465static void finish_shutdown_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800466 channel_data *chand = cd;
467 grpc_channel_op op;
nnoble0c475f02014-12-05 15:37:39 -0800468 op.type = GRPC_CHANNEL_DISCONNECT;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800469 op.dir = GRPC_CALL_DOWN;
470 channel_op(grpc_channel_stack_element(
471 grpc_channel_get_channel_stack(chand->channel), 0),
ctillerf962f522014-12-10 15:28:27 -0800472 NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800473 grpc_channel_internal_unref(chand->channel);
474}
475
476static void shutdown_channel(channel_data *chand) {
477 grpc_channel_internal_ref(chand->channel);
ctiller18b49ab2014-12-09 14:39:16 -0800478 grpc_iomgr_add_callback(finish_shutdown_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800479}
480
481static void init_call_elem(grpc_call_element *elem,
482 const void *server_transport_data) {
483 call_data *calld = elem->call_data;
484 channel_data *chand = elem->channel_data;
485 memset(calld, 0, sizeof(call_data));
486 calld->deadline = gpr_inf_future;
487 calld->call = grpc_call_from_top_element(elem);
488
489 gpr_mu_lock(&chand->server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800490 call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800491 gpr_mu_unlock(&chand->server->mu);
492
493 server_ref(chand->server);
494}
495
496static void destroy_call_elem(grpc_call_element *elem) {
497 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800498 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800499 int i;
500
501 gpr_mu_lock(&chand->server->mu);
502 for (i = 0; i < CALL_LIST_COUNT; i++) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800503 call_list_remove(elem->call_data, i);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800504 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800505 if (chand->server->shutdown && chand->server->have_shutdown_tag &&
506 chand->server->lists[ALL_CALLS] == NULL) {
507 grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag);
508 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800509 gpr_mu_unlock(&chand->server->mu);
510
Craig Tiller4df31a62015-01-30 09:44:31 -0800511 if (calld->host) {
512 grpc_mdstr_unref(calld->host);
513 }
514 if (calld->path) {
515 grpc_mdstr_unref(calld->path);
516 }
517
Craig Tillerdb7db992015-01-29 11:19:01 -0800518 if (calld->legacy) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800519 gpr_free(calld->legacy->initial_metadata.metadata);
Craig Tillerdb7db992015-01-29 11:19:01 -0800520 gpr_free(calld->legacy);
521 }
522
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800523 server_unref(chand->server);
524}
525
526static void init_channel_elem(grpc_channel_element *elem,
527 const grpc_channel_args *args,
528 grpc_mdctx *metadata_context, int is_first,
529 int is_last) {
530 channel_data *chand = elem->channel_data;
531 GPR_ASSERT(is_first);
532 GPR_ASSERT(!is_last);
533 chand->server = NULL;
534 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800535 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
536 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800537 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800538 chand->registered_methods = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800539}
540
541static void destroy_channel_elem(grpc_channel_element *elem) {
542 channel_data *chand = elem->channel_data;
543 if (chand->server) {
544 gpr_mu_lock(&chand->server->mu);
545 chand->next->prev = chand->prev;
546 chand->prev->next = chand->next;
547 chand->next = chand->prev = chand;
548 gpr_mu_unlock(&chand->server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800549 grpc_mdstr_unref(chand->path_key);
550 grpc_mdstr_unref(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800551 server_unref(chand->server);
552 }
553}
554
555static const grpc_channel_filter server_surface_filter = {
Craig Tiller24be0f72015-02-10 14:04:22 -0800556 call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
557 sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800558};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800559
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800560grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
561 grpc_channel_filter **filters,
562 size_t filter_count,
563 const grpc_channel_args *args) {
564 size_t i;
565 int census_enabled = grpc_channel_args_is_census_enabled(args);
566
567 grpc_server *server = gpr_malloc(sizeof(grpc_server));
568 memset(server, 0, sizeof(grpc_server));
569
570 gpr_mu_init(&server->mu);
571
572 server->cq = cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800573 /* decremented by grpc_server_destroy */
574 gpr_ref_init(&server->internal_refcount, 1);
575 server->root_channel_data.next = server->root_channel_data.prev =
576 &server->root_channel_data;
577
578 /* Server filter stack is:
579
580 server_surface_filter - for making surface API calls
581 grpc_server_census_filter (optional) - for stats collection and tracing
582 {passed in filter stack}
583 grpc_connected_channel_filter - for interfacing with transports */
584 server->channel_filter_count = filter_count + 1 + census_enabled;
585 server->channel_filters =
586 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
587 server->channel_filters[0] = &server_surface_filter;
588 if (census_enabled) {
589 server->channel_filters[1] = &grpc_server_census_filter;
590 }
591 for (i = 0; i < filter_count; i++) {
592 server->channel_filters[i + 1 + census_enabled] = filters[i];
593 }
594
595 server->channel_args = grpc_channel_args_copy(args);
596
597 return server;
598}
599
Craig Tiller24be0f72015-02-10 14:04:22 -0800600static int streq(const char *a, const char *b) {
601 if (a == NULL && b == NULL) return 1;
602 if (a == NULL) return 0;
603 if (b == NULL) return 0;
604 return 0 == strcmp(a, b);
605}
606
607void *grpc_server_register_method(grpc_server *server, const char *method,
608 const char *host) {
609 registered_method *m;
610 if (!method) {
611 gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
612 return NULL;
613 }
614 for (m = server->registered_methods; m; m = m->next) {
615 if (streq(m->method, method) && streq(m->host, host)) {
616 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
617 host ? host : "*");
618 return NULL;
619 }
620 }
621 m = gpr_malloc(sizeof(registered_method));
622 memset(m, 0, sizeof(*m));
623 m->method = gpr_strdup(method);
624 m->host = gpr_strdup(host);
625 m->next = server->registered_methods;
626 server->registered_methods = m;
627 return m;
628}
629
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800630void grpc_server_start(grpc_server *server) {
631 listener *l;
632
633 for (l = server->listeners; l; l = l->next) {
ctiller58393c22015-01-07 14:03:30 -0800634 l->start(server, l->arg, grpc_cq_pollset(server->cq));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800635 }
636}
637
638grpc_transport_setup_result grpc_server_setup_transport(
639 grpc_server *s, grpc_transport *transport,
640 grpc_channel_filter const **extra_filters, size_t num_extra_filters,
641 grpc_mdctx *mdctx) {
642 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
643 grpc_channel_filter const **filters =
644 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
645 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800646 size_t num_registered_methods;
647 size_t alloc;
648 registered_method *rm;
649 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800650 grpc_channel *channel;
651 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800652 grpc_mdstr *host;
653 grpc_mdstr *method;
654 gpr_uint32 hash;
655 gpr_uint32 slots;
656 gpr_uint32 probes;
657 gpr_uint32 max_probes = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800658
659 for (i = 0; i < s->channel_filter_count; i++) {
660 filters[i] = s->channel_filters[i];
661 }
662 for (; i < s->channel_filter_count + num_extra_filters; i++) {
663 filters[i] = extra_filters[i - s->channel_filter_count];
664 }
665 filters[i] = &grpc_connected_channel_filter;
666
ctillerd79b4862014-12-17 16:36:59 -0800667 grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq));
668
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800669 channel = grpc_channel_create_from_filters(filters, num_filters,
670 s->channel_args, mdctx, 0);
671 chand = (channel_data *)grpc_channel_stack_element(
672 grpc_channel_get_channel_stack(channel), 0)->channel_data;
673 chand->server = s;
674 server_ref(s);
675 chand->channel = channel;
676
Craig Tiller04cc8be2015-02-10 16:11:22 -0800677 num_registered_methods = 0;
678 for (rm = s->registered_methods; rm; rm = rm->next) {
679 num_registered_methods++;
680 }
681 /* build a lookup table phrased in terms of mdstr's in this channels context
682 to quickly find registered methods */
683 if (num_registered_methods > 0) {
684 slots = 2 * num_registered_methods;
685 alloc = sizeof(channel_registered_method) * slots;
686 chand->registered_methods = gpr_malloc(alloc);
687 memset(chand->registered_methods, 0, alloc);
688 for (rm = s->registered_methods; rm; rm = rm->next) {
689 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
Craig Tillere76c9662015-02-11 13:18:31 -0800690 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800691 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800692 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
693 .server_registered_method != NULL;
694 probes++)
695 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800696 if (probes > max_probes) max_probes = probes;
697 crm = &chand->registered_methods[(hash + probes) % slots];
698 crm->server_registered_method = rm;
699 crm->host = host;
700 crm->method = method;
701 }
702 chand->registered_method_slots = slots;
703 chand->registered_method_max_probes = max_probes;
704 }
705
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800706 gpr_mu_lock(&s->mu);
707 chand->next = &s->root_channel_data;
708 chand->prev = chand->next->prev;
709 chand->next->prev = chand->prev->next = chand;
710 gpr_mu_unlock(&s->mu);
711
712 gpr_free(filters);
713
714 return grpc_connected_channel_bind_transport(
715 grpc_channel_get_channel_stack(channel), transport);
716}
717
Craig Tillerbd217572015-02-11 18:10:56 -0800718static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800719 void *shutdown_tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800720 listener *l;
Craig Tiller24be0f72015-02-10 14:04:22 -0800721 requested_call_array requested_calls;
nnoble0c475f02014-12-05 15:37:39 -0800722 channel_data **channels;
723 channel_data *c;
724 size_t nchannels;
725 size_t i;
726 grpc_channel_op op;
727 grpc_channel_element *elem;
Craig Tillerbd217572015-02-11 18:10:56 -0800728 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800729
730 /* lock, and gather up some stuff to do */
731 gpr_mu_lock(&server->mu);
732 if (server->shutdown) {
733 gpr_mu_unlock(&server->mu);
734 return;
735 }
736
nnoble0c475f02014-12-05 15:37:39 -0800737 nchannels = 0;
738 for (c = server->root_channel_data.next; c != &server->root_channel_data;
739 c = c->next) {
740 nchannels++;
741 }
742 channels = gpr_malloc(sizeof(channel_data *) * nchannels);
743 i = 0;
744 for (c = server->root_channel_data.next; c != &server->root_channel_data;
745 c = c->next) {
746 grpc_channel_internal_ref(c->channel);
747 channels[i] = c;
748 i++;
749 }
750
Craig Tillerbd217572015-02-11 18:10:56 -0800751 /* collect all unregistered then registered calls */
Craig Tillercce17ac2015-01-20 09:29:28 -0800752 requested_calls = server->requested_calls;
Craig Tiller24be0f72015-02-10 14:04:22 -0800753 memset(&server->requested_calls, 0, sizeof(server->requested_calls));
Craig Tillerbd217572015-02-11 18:10:56 -0800754 for (rm = server->registered_methods; rm; rm = rm->next) {
755 if (requested_calls.count + rm->requested.count > requested_calls.capacity) {
756 requested_calls.capacity = GPR_MAX(requested_calls.count + rm->requested.count, 2 * requested_calls.capacity);
757 requested_calls.calls = gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * requested_calls.capacity);
758 }
759 memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, sizeof(*requested_calls.calls) * rm->requested.count);
760 requested_calls.count += rm->requested.count;
761 memset(&rm->requested, 0, sizeof(rm->requested));
762 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800763
764 server->shutdown = 1;
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800765 server->have_shutdown_tag = have_shutdown_tag;
766 server->shutdown_tag = shutdown_tag;
767 if (have_shutdown_tag) {
768 grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
769 if (server->lists[ALL_CALLS] == NULL) {
770 grpc_cq_end_server_shutdown(server->cq, shutdown_tag);
771 }
772 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800773 gpr_mu_unlock(&server->mu);
774
nnoble0c475f02014-12-05 15:37:39 -0800775 for (i = 0; i < nchannels; i++) {
776 c = channels[i];
777 elem = grpc_channel_stack_element(
778 grpc_channel_get_channel_stack(c->channel), 0);
779
780 op.type = GRPC_CHANNEL_GOAWAY;
781 op.dir = GRPC_CALL_DOWN;
782 op.data.goaway.status = GRPC_STATUS_OK;
783 op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
ctillerf962f522014-12-10 15:28:27 -0800784 elem->filter->channel_op(elem, NULL, &op);
nnoble0c475f02014-12-05 15:37:39 -0800785
786 grpc_channel_internal_unref(c->channel);
787 }
788 gpr_free(channels);
789
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800790 /* terminate all the requested calls */
Craig Tiller24be0f72015-02-10 14:04:22 -0800791 for (i = 0; i < requested_calls.count; i++) {
792 fail_call(server, &requested_calls.calls[i]);
Craig Tillercce17ac2015-01-20 09:29:28 -0800793 }
Craig Tiller24be0f72015-02-10 14:04:22 -0800794 gpr_free(requested_calls.calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800795
796 /* Shutdown listeners */
797 for (l = server->listeners; l; l = l->next) {
798 l->destroy(server, l->arg);
799 }
800 while (server->listeners) {
801 l = server->listeners;
802 server->listeners = l->next;
803 gpr_free(l);
804 }
805}
806
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800807void grpc_server_shutdown(grpc_server *server) {
808 shutdown_internal(server, 0, NULL);
809}
810
811void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
812 shutdown_internal(server, 1, tag);
813}
814
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800815void grpc_server_destroy(grpc_server *server) {
816 channel_data *c;
817 gpr_mu_lock(&server->mu);
818 for (c = server->root_channel_data.next; c != &server->root_channel_data;
819 c = c->next) {
820 shutdown_channel(c);
821 }
822 gpr_mu_unlock(&server->mu);
823
824 server_unref(server);
825}
826
827void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -0800828 void (*start)(grpc_server *server, void *arg,
829 grpc_pollset *pollset),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800830 void (*destroy)(grpc_server *server, void *arg)) {
831 listener *l = gpr_malloc(sizeof(listener));
832 l->arg = arg;
833 l->start = start;
834 l->destroy = destroy;
835 l->next = server->listeners;
836 server->listeners = l;
837}
838
Craig Tiller9f28ac22015-01-27 17:01:29 -0800839static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -0800840 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800841 call_data *calld = NULL;
Craig Tiller0ef1a922015-02-11 16:23:01 -0800842 requested_call_array *requested_calls = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800843 gpr_mu_lock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800844 if (server->shutdown) {
845 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800846 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800847 return GRPC_CALL_OK;
848 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800849 switch (rc->type) {
850 case LEGACY_CALL:
851 case BATCH_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800852 calld =
853 call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800854 requested_calls = &server->requested_calls;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800855 break;
856 case REGISTERED_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800857 calld = call_list_remove_head(
858 &rc->data.registered.registered_method->pending, PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800859 requested_calls = &rc->data.registered.registered_method->requested;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800860 break;
861 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800862 if (calld) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800863 GPR_ASSERT(calld->state == PENDING);
Craig Tillerbb3f22f2015-01-29 16:40:56 -0800864 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -0800865 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800866 begin_call(server, calld, rc);
Craig Tillercce17ac2015-01-20 09:29:28 -0800867 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800868 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800869 *requested_call_array_add(requested_calls) = *rc;
Craig Tillercce17ac2015-01-20 09:29:28 -0800870 gpr_mu_unlock(&server->mu);
871 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800872 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800873}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800874
Craig Tiller24be0f72015-02-10 14:04:22 -0800875grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
876 grpc_call_details *details,
877 grpc_metadata_array *initial_metadata,
Craig Tiller3b29b562015-02-11 12:58:46 -0800878 grpc_completion_queue *cq_bind,
879 void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800880 requested_call rc;
Craig Tiller3b29b562015-02-11 12:58:46 -0800881 grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -0800882 rc.type = BATCH_CALL;
883 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -0800884 rc.data.batch.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -0800885 rc.data.batch.call = call;
886 rc.data.batch.details = details;
887 rc.data.batch.initial_metadata = initial_metadata;
888 return queue_call_request(server, &rc);
889}
890
891grpc_call_error grpc_server_request_registered_call(
892 grpc_server *server, void *registered_method, grpc_call **call,
893 gpr_timespec *deadline, grpc_metadata_array *initial_metadata,
Craig Tiller3b29b562015-02-11 12:58:46 -0800894 grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind,
Craig Tiller8e8fd892015-02-10 17:02:08 -0800895 void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800896 requested_call rc;
Craig Tiller3b29b562015-02-11 12:58:46 -0800897 grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -0800898 rc.type = REGISTERED_CALL;
899 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -0800900 rc.data.registered.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -0800901 rc.data.registered.call = call;
902 rc.data.registered.registered_method = registered_method;
903 rc.data.registered.deadline = deadline;
904 rc.data.registered.initial_metadata = initial_metadata;
905 rc.data.registered.optional_payload = optional_payload;
906 return queue_call_request(server, &rc);
907}
908
909grpc_call_error grpc_server_request_call_old(grpc_server *server,
910 void *tag_new) {
911 requested_call rc;
912 grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
913 rc.type = LEGACY_CALL;
914 rc.tag = tag_new;
915 return queue_call_request(server, &rc);
916}
917
918static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
919static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
920 void *tag);
Craig Tiller3b29b562015-02-11 12:58:46 -0800921static void publish_was_not_set(grpc_call *call, grpc_op_error status,
922 void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800923 abort();
924}
Craig Tiller24be0f72015-02-10 14:04:22 -0800925
Craig Tiller166e2502015-02-03 20:14:41 -0800926static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
927 gpr_slice slice = value->slice;
928 size_t len = GPR_SLICE_LENGTH(slice);
929
930 if (len + 1 > *capacity) {
931 *capacity = GPR_MAX(len + 1, *capacity * 2);
932 *dest = gpr_realloc(*dest, *capacity);
933 }
934 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
935}
936
Craig Tiller24be0f72015-02-10 14:04:22 -0800937static void begin_call(grpc_server *server, call_data *calld,
938 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800939 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -0800940 grpc_ioreq req[2];
941 grpc_ioreq *r = req;
942
943 /* called once initial metadata has been read by the call, but BEFORE
944 the ioreq to fetch it out of the call has been executed.
945 This means metadata related fields can be relied on in calld, but to
946 fill in the metadata array passed by the client, we need to perform
947 an ioreq op, that should complete immediately. */
948
949 switch (rc->type) {
950 case LEGACY_CALL:
951 calld->legacy = gpr_malloc(sizeof(legacy_data));
952 memset(calld->legacy, 0, sizeof(legacy_data));
953 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
954 r->data.recv_metadata = &calld->legacy->initial_metadata;
955 r++;
956 publish = publish_legacy;
957 break;
958 case BATCH_CALL:
959 cpstr(&rc->data.batch.details->host,
960 &rc->data.batch.details->host_capacity, calld->host);
961 cpstr(&rc->data.batch.details->method,
962 &rc->data.batch.details->method_capacity, calld->path);
Craig Tiller8e8fd892015-02-10 17:02:08 -0800963 grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -0800964 *rc->data.batch.call = calld->call;
965 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
966 r->data.recv_metadata = rc->data.batch.initial_metadata;
967 r++;
968 publish = publish_registered_or_batch;
969 break;
970 case REGISTERED_CALL:
971 *rc->data.registered.deadline = calld->deadline;
Craig Tiller8e8fd892015-02-10 17:02:08 -0800972 grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -0800973 *rc->data.registered.call = calld->call;
974 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
975 r->data.recv_metadata = rc->data.registered.initial_metadata;
976 r++;
977 if (rc->data.registered.optional_payload) {
978 r->op = GRPC_IOREQ_RECV_MESSAGE;
979 r->data.recv_message = rc->data.registered.optional_payload;
980 r++;
981 }
982 publish = publish_registered_or_batch;
983 break;
984 }
985
986 grpc_call_internal_ref(calld->call);
987 grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
988 rc->tag);
989}
990
991static void fail_call(grpc_server *server, requested_call *rc) {
992 switch (rc->type) {
993 case LEGACY_CALL:
994 grpc_cq_end_new_rpc(server->cq, rc->tag, NULL, do_nothing, NULL, NULL,
995 NULL, gpr_inf_past, 0, NULL);
996 break;
997 case BATCH_CALL:
998 *rc->data.batch.call = NULL;
999 rc->data.batch.initial_metadata->count = 0;
Craig Tiller3b29b562015-02-11 12:58:46 -08001000 grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
1001 GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001002 break;
1003 case REGISTERED_CALL:
1004 *rc->data.registered.call = NULL;
1005 rc->data.registered.initial_metadata->count = 0;
Craig Tiller3b29b562015-02-11 12:58:46 -08001006 grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
1007 GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001008 break;
1009 }
1010}
1011
1012static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
1013 grpc_call_element *elem =
1014 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1015 call_data *calld = elem->call_data;
1016 channel_data *chand = elem->channel_data;
1017 grpc_server *server = chand->server;
1018
1019 if (status == GRPC_OP_OK) {
1020 grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL,
1021 grpc_mdstr_as_c_string(calld->path),
1022 grpc_mdstr_as_c_string(calld->host), calld->deadline,
1023 calld->legacy->initial_metadata.count,
1024 calld->legacy->initial_metadata.metadata);
1025 } else {
1026 gpr_log(GPR_ERROR, "should never reach here");
1027 abort();
1028 }
1029}
1030
1031static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1032 void *tag) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001033 grpc_call_element *elem =
1034 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller3b29b562015-02-11 12:58:46 -08001035 channel_data *chand = elem->channel_data;
1036 grpc_server *server = chand->server;
1037 grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, status);
Craig Tiller24be0f72015-02-10 14:04:22 -08001038}
1039
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001040const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1041 return server->channel_args;
1042}