blob: 169fb1a78132f16bfe4a6051e4526f637d417245 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
39#include "src/core/channel/census_filter.h"
40#include "src/core/channel/channel_args.h"
41#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/surface/call.h"
45#include "src/core/surface/channel.h"
46#include "src/core/surface/completion_queue.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080047#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080048#include <grpc/support/alloc.h>
49#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050#include <grpc/support/useful.h>
51
52typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
53
54typedef struct listener {
55 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080056 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
57 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080058 void (*destroy)(grpc_server *server, void *arg);
59 struct listener *next;
60} listener;
61
62typedef struct call_data call_data;
63typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080064typedef struct registered_method registered_method;
65
66typedef struct {
67 call_data *next;
68 call_data *prev;
69} call_link;
70
71typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type;
72
73typedef struct {
74 requested_call_type type;
75 void *tag;
76 union {
77 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080078 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080079 grpc_call **call;
80 grpc_call_details *details;
81 grpc_metadata_array *initial_metadata;
82 } batch;
83 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080084 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080085 grpc_call **call;
86 registered_method *registered_method;
87 gpr_timespec *deadline;
88 grpc_metadata_array *initial_metadata;
89 grpc_byte_buffer **optional_payload;
90 } registered;
91 } data;
92} requested_call;
93
94typedef struct {
95 requested_call *calls;
96 size_t count;
97 size_t capacity;
98} requested_call_array;
99
100struct registered_method {
101 char *method;
102 char *host;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800103 call_data *pending;
Craig Tiller24be0f72015-02-10 14:04:22 -0800104 requested_call_array requested;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800105 grpc_completion_queue *cq;
Craig Tiller24be0f72015-02-10 14:04:22 -0800106 registered_method *next;
107};
108
109typedef struct channel_registered_method {
110 registered_method *server_registered_method;
111 grpc_mdstr *method;
112 grpc_mdstr *host;
113} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800114
115struct channel_data {
116 grpc_server *server;
117 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800118 grpc_mdstr *path_key;
119 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800120 /* linked list of all channels on a server */
121 channel_data *next;
122 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800123 channel_registered_method *registered_methods;
124 gpr_uint32 registered_method_slots;
125 gpr_uint32 registered_method_max_probes;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800126};
127
128struct grpc_server {
129 size_t channel_filter_count;
130 const grpc_channel_filter **channel_filters;
131 grpc_channel_args *channel_args;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800132 grpc_completion_queue *unregistered_cq;
Craig Tillerec3257c2015-02-12 15:59:43 -0800133
Craig Tiller20bc56d2015-02-12 09:02:56 -0800134 grpc_completion_queue **cqs;
135 grpc_pollset **pollsets;
136 size_t cq_count;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800137
138 gpr_mu mu;
139
Craig Tiller24be0f72015-02-10 14:04:22 -0800140 registered_method *registered_methods;
141 requested_call_array requested_calls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800142
143 gpr_uint8 shutdown;
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800144 gpr_uint8 have_shutdown_tag;
145 void *shutdown_tag;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800146
147 call_data *lists[CALL_LIST_COUNT];
148 channel_data root_channel_data;
149
150 listener *listeners;
151 gpr_refcount internal_refcount;
152};
153
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800154typedef enum {
155 /* waiting for metadata */
156 NOT_STARTED,
157 /* inital metadata read, not flow controlled in yet */
158 PENDING,
159 /* flow controlled in, on completion queue */
160 ACTIVATED,
161 /* cancelled before being queued */
162 ZOMBIED
163} call_state;
164
Craig Tillerfb189f82015-02-03 12:07:07 -0800165typedef struct legacy_data {
Craig Tiller24be0f72015-02-10 14:04:22 -0800166 grpc_metadata_array initial_metadata;
Craig Tillerfb189f82015-02-03 12:07:07 -0800167} legacy_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800168
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800169struct call_data {
170 grpc_call *call;
171
172 call_state state;
173 gpr_timespec deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800174 grpc_mdstr *path;
175 grpc_mdstr *host;
176
177 legacy_data *legacy;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800178 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800179
Craig Tiller04cc8be2015-02-10 16:11:22 -0800180 call_data **root[CALL_LIST_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800181 call_link links[CALL_LIST_COUNT];
182};
183
184#define SERVER_FROM_CALL_ELEM(elem) \
185 (((channel_data *)(elem)->channel_data)->server)
186
187static void do_nothing(void *unused, grpc_op_error ignored) {}
188
Craig Tiller24be0f72015-02-10 14:04:22 -0800189static void begin_call(grpc_server *server, call_data *calld,
190 requested_call *rc);
191static void fail_call(grpc_server *server, requested_call *rc);
192
Craig Tiller3b29b562015-02-11 12:58:46 -0800193static int call_list_join(call_data **root, call_data *call, call_list list) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800194 GPR_ASSERT(!call->root[list]);
195 call->root[list] = root;
196 if (!*root) {
197 *root = call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800198 call->links[list].next = call->links[list].prev = call;
199 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800200 call->links[list].next = *root;
201 call->links[list].prev = (*root)->links[list].prev;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202 call->links[list].next->links[list].prev =
203 call->links[list].prev->links[list].next = call;
204 }
205 return 1;
206}
207
Craig Tiller04cc8be2015-02-10 16:11:22 -0800208static call_data *call_list_remove_head(call_data **root, call_list list) {
209 call_data *out = *root;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800210 if (out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800211 out->root[list] = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800212 if (out->links[list].next == out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800213 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800214 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800215 *root = out->links[list].next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800216 out->links[list].next->links[list].prev = out->links[list].prev;
217 out->links[list].prev->links[list].next = out->links[list].next;
218 }
219 }
220 return out;
221}
222
Craig Tiller04cc8be2015-02-10 16:11:22 -0800223static int call_list_remove(call_data *call, call_list list) {
224 call_data **root = call->root[list];
225 if (root == NULL) return 0;
226 call->root[list] = NULL;
227 if (*root == call) {
228 *root = call->links[list].next;
229 if (*root == call) {
230 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800231 return 1;
232 }
233 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800234 GPR_ASSERT(*root != call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800235 call->links[list].next->links[list].prev = call->links[list].prev;
236 call->links[list].prev->links[list].next = call->links[list].next;
237 return 1;
238}
239
Craig Tiller24be0f72015-02-10 14:04:22 -0800240static void requested_call_array_destroy(requested_call_array *array) {
241 gpr_free(array->calls);
242}
243
244static requested_call *requested_call_array_add(requested_call_array *array) {
245 requested_call *rc;
246 if (array->count == array->capacity) {
247 array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
248 array->calls =
249 gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
250 }
251 rc = &array->calls[array->count++];
252 memset(rc, 0, sizeof(*rc));
253 return rc;
254}
255
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800256static void server_ref(grpc_server *server) {
257 gpr_ref(&server->internal_refcount);
258}
259
260static void server_unref(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800261 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800262 if (gpr_unref(&server->internal_refcount)) {
263 grpc_channel_args_destroy(server->channel_args);
264 gpr_mu_destroy(&server->mu);
265 gpr_free(server->channel_filters);
Craig Tiller24be0f72015-02-10 14:04:22 -0800266 requested_call_array_destroy(&server->requested_calls);
Craig Tillerec3257c2015-02-12 15:59:43 -0800267 while ((rm = server->registered_methods) != NULL) {
268 server->registered_methods = rm->next;
269 gpr_free(rm->method);
270 gpr_free(rm->host);
271 requested_call_array_destroy(&rm->requested);
272 gpr_free(rm);
273 }
274 gpr_free(server->cqs);
275 gpr_free(server->pollsets);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800276 gpr_free(server);
277 }
278}
279
280static int is_channel_orphaned(channel_data *chand) {
281 return chand->next == chand;
282}
283
284static void orphan_channel(channel_data *chand) {
285 chand->next->prev = chand->prev;
286 chand->prev->next = chand->next;
287 chand->next = chand->prev = chand;
288}
289
ctiller58393c22015-01-07 14:03:30 -0800290static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800291 channel_data *chand = cd;
292 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800293 grpc_channel_destroy(chand->channel);
294 server_unref(server);
295}
296
297static void destroy_channel(channel_data *chand) {
298 if (is_channel_orphaned(chand)) return;
299 GPR_ASSERT(chand->server != NULL);
300 orphan_channel(chand);
301 server_ref(chand->server);
ctiller18b49ab2014-12-09 14:39:16 -0800302 grpc_iomgr_add_callback(finish_destroy_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800303}
304
Craig Tiller3b29b562015-02-11 12:58:46 -0800305static void finish_start_new_rpc_and_unlock(grpc_server *server,
306 grpc_call_element *elem,
307 call_data **pending_root,
308 requested_call_array *array) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800309 requested_call rc;
310 call_data *calld = elem->call_data;
311 if (array->count == 0) {
312 calld->state = PENDING;
313 call_list_join(pending_root, calld, PENDING_START);
314 gpr_mu_unlock(&server->mu);
315 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800316 rc = array->calls[--array->count];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800317 calld->state = ACTIVATED;
318 gpr_mu_unlock(&server->mu);
319 begin_call(server, calld, &rc);
320 }
321}
322
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800323static void start_new_rpc(grpc_call_element *elem) {
324 channel_data *chand = elem->channel_data;
325 call_data *calld = elem->call_data;
326 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800327 gpr_uint32 i;
328 gpr_uint32 hash;
329 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800330
331 gpr_mu_lock(&server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800332 if (chand->registered_methods && calld->path && calld->host) {
333 /* check for an exact match with host */
334 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
335 for (i = 0; i < chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800336 rm = &chand->registered_methods[(hash + i) %
337 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800338 if (!rm) break;
339 if (rm->host != calld->host) continue;
340 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800341 finish_start_new_rpc_and_unlock(server, elem,
342 &rm->server_registered_method->pending,
343 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800344 return;
345 }
346 /* check for a wildcard method definition (no host set) */
347 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800348 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800349 rm = &chand->registered_methods[(hash + i) %
350 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800351 if (!rm) break;
352 if (rm->host != NULL) continue;
353 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800354 finish_start_new_rpc_and_unlock(server, elem,
355 &rm->server_registered_method->pending,
356 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800357 return;
358 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800359 }
Craig Tiller3b29b562015-02-11 12:58:46 -0800360 finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
361 &server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800362}
363
ctiller58393c22015-01-07 14:03:30 -0800364static void kill_zombie(void *elem, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800365 grpc_call_destroy(grpc_call_from_top_element(elem));
366}
367
Craig Tillercce17ac2015-01-20 09:29:28 -0800368static void stream_closed(grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800369 call_data *calld = elem->call_data;
370 channel_data *chand = elem->channel_data;
371 gpr_mu_lock(&chand->server->mu);
372 switch (calld->state) {
373 case ACTIVATED:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800374 break;
375 case PENDING:
Craig Tiller04cc8be2015-02-10 16:11:22 -0800376 call_list_remove(calld, PENDING_START);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800377 /* fallthrough intended */
378 case NOT_STARTED:
379 calld->state = ZOMBIED;
ctiller18b49ab2014-12-09 14:39:16 -0800380 grpc_iomgr_add_callback(kill_zombie, elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800381 break;
382 case ZOMBIED:
383 break;
384 }
385 gpr_mu_unlock(&chand->server->mu);
Craig Tiller0a927bf2015-02-05 10:52:53 -0800386 grpc_call_stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800387}
388
Craig Tillercce17ac2015-01-20 09:29:28 -0800389static void read_closed(grpc_call_element *elem) {
390 call_data *calld = elem->call_data;
391 channel_data *chand = elem->channel_data;
392 gpr_mu_lock(&chand->server->mu);
393 switch (calld->state) {
394 case ACTIVATED:
395 case PENDING:
396 grpc_call_read_closed(elem);
397 break;
398 case NOT_STARTED:
399 calld->state = ZOMBIED;
400 grpc_iomgr_add_callback(kill_zombie, elem);
401 break;
402 case ZOMBIED:
403 break;
404 }
405 gpr_mu_unlock(&chand->server->mu);
406}
407
ctillerf962f522014-12-10 15:28:27 -0800408static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
409 grpc_call_op *op) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800410 channel_data *chand = elem->channel_data;
411 call_data *calld = elem->call_data;
412 grpc_mdelem *md;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800413 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
414 switch (op->type) {
415 case GRPC_RECV_METADATA:
Craig Tillercce17ac2015-01-20 09:29:28 -0800416 md = op->data.metadata;
417 if (md->key == chand->path_key) {
418 calld->path = grpc_mdstr_ref(md->value);
419 grpc_mdelem_unref(md);
420 } else if (md->key == chand->authority_key) {
421 calld->host = grpc_mdstr_ref(md->value);
422 grpc_mdelem_unref(md);
423 } else {
424 grpc_call_recv_metadata(elem, md);
425 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800426 break;
427 case GRPC_RECV_END_OF_INITIAL_METADATA:
428 start_new_rpc(elem);
Craig Tiller4069b682015-01-29 14:01:19 -0800429 grpc_call_initial_metadata_complete(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800430 break;
431 case GRPC_RECV_MESSAGE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800432 grpc_call_recv_message(elem, op->data.message);
433 op->done_cb(op->user_data, GRPC_OP_OK);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800434 break;
435 case GRPC_RECV_HALF_CLOSE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800436 read_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800437 break;
438 case GRPC_RECV_FINISH:
Craig Tillercce17ac2015-01-20 09:29:28 -0800439 stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800440 break;
441 case GRPC_RECV_DEADLINE:
442 grpc_call_set_deadline(elem, op->data.deadline);
443 ((call_data *)elem->call_data)->deadline = op->data.deadline;
444 break;
445 default:
446 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
447 grpc_call_next_op(elem, op);
448 break;
449 }
450}
451
ctillerf962f522014-12-10 15:28:27 -0800452static void channel_op(grpc_channel_element *elem,
453 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800454 channel_data *chand = elem->channel_data;
Craig Tiller8b976d02015-02-05 21:41:23 -0800455 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800456
457 switch (op->type) {
458 case GRPC_ACCEPT_CALL:
459 /* create a call */
Craig Tillerfb189f82015-02-03 12:07:07 -0800460 grpc_call_create(chand->channel, NULL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800461 op->data.accept_call.transport_server_data);
462 break;
463 case GRPC_TRANSPORT_CLOSED:
464 /* if the transport is closed for a server channel, we destroy the
465 channel */
Craig Tiller8b976d02015-02-05 21:41:23 -0800466 gpr_mu_lock(&server->mu);
467 server_ref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800468 destroy_channel(chand);
Craig Tiller8b976d02015-02-05 21:41:23 -0800469 gpr_mu_unlock(&server->mu);
470 server_unref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800471 break;
nnoble0c475f02014-12-05 15:37:39 -0800472 case GRPC_TRANSPORT_GOAWAY:
473 gpr_slice_unref(op->data.goaway.message);
474 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800475 default:
476 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
477 grpc_channel_next_op(elem, op);
478 break;
479 }
480}
481
ctiller58393c22015-01-07 14:03:30 -0800482static void finish_shutdown_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800483 channel_data *chand = cd;
484 grpc_channel_op op;
nnoble0c475f02014-12-05 15:37:39 -0800485 op.type = GRPC_CHANNEL_DISCONNECT;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800486 op.dir = GRPC_CALL_DOWN;
487 channel_op(grpc_channel_stack_element(
488 grpc_channel_get_channel_stack(chand->channel), 0),
ctillerf962f522014-12-10 15:28:27 -0800489 NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800490 grpc_channel_internal_unref(chand->channel);
491}
492
493static void shutdown_channel(channel_data *chand) {
494 grpc_channel_internal_ref(chand->channel);
ctiller18b49ab2014-12-09 14:39:16 -0800495 grpc_iomgr_add_callback(finish_shutdown_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800496}
497
498static void init_call_elem(grpc_call_element *elem,
499 const void *server_transport_data) {
500 call_data *calld = elem->call_data;
501 channel_data *chand = elem->channel_data;
502 memset(calld, 0, sizeof(call_data));
503 calld->deadline = gpr_inf_future;
504 calld->call = grpc_call_from_top_element(elem);
505
506 gpr_mu_lock(&chand->server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800507 call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800508 gpr_mu_unlock(&chand->server->mu);
509
510 server_ref(chand->server);
511}
512
513static void destroy_call_elem(grpc_call_element *elem) {
514 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800515 call_data *calld = elem->call_data;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800516 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800517
518 gpr_mu_lock(&chand->server->mu);
519 for (i = 0; i < CALL_LIST_COUNT; i++) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800520 call_list_remove(elem->call_data, i);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800521 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800522 if (chand->server->shutdown && chand->server->have_shutdown_tag &&
523 chand->server->lists[ALL_CALLS] == NULL) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800524 for (i = 0; i < chand->server->cq_count; i++) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800525 grpc_cq_end_server_shutdown(chand->server->cqs[i],
526 chand->server->shutdown_tag);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800527 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800528 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800529 gpr_mu_unlock(&chand->server->mu);
530
Craig Tiller4df31a62015-01-30 09:44:31 -0800531 if (calld->host) {
532 grpc_mdstr_unref(calld->host);
533 }
534 if (calld->path) {
535 grpc_mdstr_unref(calld->path);
536 }
537
Craig Tillerdb7db992015-01-29 11:19:01 -0800538 if (calld->legacy) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800539 gpr_free(calld->legacy->initial_metadata.metadata);
Craig Tillerdb7db992015-01-29 11:19:01 -0800540 gpr_free(calld->legacy);
541 }
542
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800543 server_unref(chand->server);
544}
545
546static void init_channel_elem(grpc_channel_element *elem,
547 const grpc_channel_args *args,
548 grpc_mdctx *metadata_context, int is_first,
549 int is_last) {
550 channel_data *chand = elem->channel_data;
551 GPR_ASSERT(is_first);
552 GPR_ASSERT(!is_last);
553 chand->server = NULL;
554 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800555 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
556 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800557 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800558 chand->registered_methods = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800559}
560
561static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800562 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800563 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800564 if (chand->registered_methods) {
565 for (i = 0; i < chand->registered_method_slots; i++) {
566 if (chand->registered_methods[i].method) {
567 grpc_mdstr_unref(chand->registered_methods[i].method);
568 }
569 if (chand->registered_methods[i].host) {
570 grpc_mdstr_unref(chand->registered_methods[i].host);
571 }
572 }
573 gpr_free(chand->registered_methods);
574 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800575 if (chand->server) {
576 gpr_mu_lock(&chand->server->mu);
577 chand->next->prev = chand->prev;
578 chand->prev->next = chand->next;
579 chand->next = chand->prev = chand;
580 gpr_mu_unlock(&chand->server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800581 grpc_mdstr_unref(chand->path_key);
582 grpc_mdstr_unref(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800583 server_unref(chand->server);
584 }
585}
586
587static const grpc_channel_filter server_surface_filter = {
Craig Tiller24be0f72015-02-10 14:04:22 -0800588 call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
589 sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800590};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800591
Craig Tiller20bc56d2015-02-12 09:02:56 -0800592static void addcq(grpc_server *server, grpc_completion_queue *cq) {
593 size_t i, n;
594 for (i = 0; i < server->cq_count; i++) {
595 if (server->cqs[i] == cq) return;
596 }
597 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800598 server->cqs = gpr_realloc(server->cqs,
599 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800600 server->cqs[n] = cq;
601}
602
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800603grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
604 grpc_channel_filter **filters,
605 size_t filter_count,
606 const grpc_channel_args *args) {
607 size_t i;
608 int census_enabled = grpc_channel_args_is_census_enabled(args);
609
610 grpc_server *server = gpr_malloc(sizeof(grpc_server));
611 memset(server, 0, sizeof(grpc_server));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800612 if (cq) addcq(server, cq);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800613
614 gpr_mu_init(&server->mu);
615
Craig Tiller20bc56d2015-02-12 09:02:56 -0800616 server->unregistered_cq = cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800617 /* decremented by grpc_server_destroy */
618 gpr_ref_init(&server->internal_refcount, 1);
619 server->root_channel_data.next = server->root_channel_data.prev =
620 &server->root_channel_data;
621
622 /* Server filter stack is:
623
624 server_surface_filter - for making surface API calls
625 grpc_server_census_filter (optional) - for stats collection and tracing
626 {passed in filter stack}
627 grpc_connected_channel_filter - for interfacing with transports */
628 server->channel_filter_count = filter_count + 1 + census_enabled;
629 server->channel_filters =
630 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
631 server->channel_filters[0] = &server_surface_filter;
632 if (census_enabled) {
633 server->channel_filters[1] = &grpc_server_census_filter;
634 }
635 for (i = 0; i < filter_count; i++) {
636 server->channel_filters[i + 1 + census_enabled] = filters[i];
637 }
638
639 server->channel_args = grpc_channel_args_copy(args);
640
641 return server;
642}
643
Craig Tiller24be0f72015-02-10 14:04:22 -0800644static int streq(const char *a, const char *b) {
645 if (a == NULL && b == NULL) return 1;
646 if (a == NULL) return 0;
647 if (b == NULL) return 0;
648 return 0 == strcmp(a, b);
649}
650
651void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerec3257c2015-02-12 15:59:43 -0800652 const char *host,
653 grpc_completion_queue *cq_new_rpc) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800654 registered_method *m;
655 if (!method) {
656 gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
657 return NULL;
658 }
659 for (m = server->registered_methods; m; m = m->next) {
660 if (streq(m->method, method) && streq(m->host, host)) {
661 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
662 host ? host : "*");
663 return NULL;
664 }
665 }
Craig Tiller20bc56d2015-02-12 09:02:56 -0800666 addcq(server, cq_new_rpc);
Craig Tiller24be0f72015-02-10 14:04:22 -0800667 m = gpr_malloc(sizeof(registered_method));
668 memset(m, 0, sizeof(*m));
669 m->method = gpr_strdup(method);
670 m->host = gpr_strdup(host);
671 m->next = server->registered_methods;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800672 m->cq = cq_new_rpc;
Craig Tiller24be0f72015-02-10 14:04:22 -0800673 server->registered_methods = m;
674 return m;
675}
676
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800677void grpc_server_start(grpc_server *server) {
678 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800679 size_t i;
680
Craig Tillerec3257c2015-02-12 15:59:43 -0800681 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800682 for (i = 0; i < server->cq_count; i++) {
683 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
684 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800685
686 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800687 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800688 }
689}
690
691grpc_transport_setup_result grpc_server_setup_transport(
692 grpc_server *s, grpc_transport *transport,
693 grpc_channel_filter const **extra_filters, size_t num_extra_filters,
694 grpc_mdctx *mdctx) {
695 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
696 grpc_channel_filter const **filters =
697 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
698 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800699 size_t num_registered_methods;
700 size_t alloc;
701 registered_method *rm;
702 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800703 grpc_channel *channel;
704 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800705 grpc_mdstr *host;
706 grpc_mdstr *method;
707 gpr_uint32 hash;
708 gpr_uint32 slots;
709 gpr_uint32 probes;
710 gpr_uint32 max_probes = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800711
712 for (i = 0; i < s->channel_filter_count; i++) {
713 filters[i] = s->channel_filters[i];
714 }
715 for (; i < s->channel_filter_count + num_extra_filters; i++) {
716 filters[i] = extra_filters[i - s->channel_filter_count];
717 }
718 filters[i] = &grpc_connected_channel_filter;
719
Craig Tiller20bc56d2015-02-12 09:02:56 -0800720 for (i = 0; i < s->cq_count; i++) {
721 grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
722 }
ctillerd79b4862014-12-17 16:36:59 -0800723
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800724 channel = grpc_channel_create_from_filters(filters, num_filters,
725 s->channel_args, mdctx, 0);
726 chand = (channel_data *)grpc_channel_stack_element(
727 grpc_channel_get_channel_stack(channel), 0)->channel_data;
728 chand->server = s;
729 server_ref(s);
730 chand->channel = channel;
731
Craig Tiller04cc8be2015-02-10 16:11:22 -0800732 num_registered_methods = 0;
733 for (rm = s->registered_methods; rm; rm = rm->next) {
734 num_registered_methods++;
735 }
736 /* build a lookup table phrased in terms of mdstr's in this channels context
737 to quickly find registered methods */
738 if (num_registered_methods > 0) {
739 slots = 2 * num_registered_methods;
740 alloc = sizeof(channel_registered_method) * slots;
741 chand->registered_methods = gpr_malloc(alloc);
742 memset(chand->registered_methods, 0, alloc);
743 for (rm = s->registered_methods; rm; rm = rm->next) {
744 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
Craig Tillere76c9662015-02-11 13:18:31 -0800745 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800746 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800747 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
748 .server_registered_method != NULL;
749 probes++)
750 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800751 if (probes > max_probes) max_probes = probes;
752 crm = &chand->registered_methods[(hash + probes) % slots];
753 crm->server_registered_method = rm;
754 crm->host = host;
755 crm->method = method;
756 }
757 chand->registered_method_slots = slots;
758 chand->registered_method_max_probes = max_probes;
759 }
760
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800761 gpr_mu_lock(&s->mu);
762 chand->next = &s->root_channel_data;
763 chand->prev = chand->next->prev;
764 chand->next->prev = chand->prev->next = chand;
765 gpr_mu_unlock(&s->mu);
766
767 gpr_free(filters);
768
769 return grpc_connected_channel_bind_transport(
770 grpc_channel_get_channel_stack(channel), transport);
771}
772
Craig Tillerbd217572015-02-11 18:10:56 -0800773static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
Craig Tillerec3257c2015-02-12 15:59:43 -0800774 void *shutdown_tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800775 listener *l;
Craig Tiller24be0f72015-02-10 14:04:22 -0800776 requested_call_array requested_calls;
nnoble0c475f02014-12-05 15:37:39 -0800777 channel_data **channels;
778 channel_data *c;
779 size_t nchannels;
780 size_t i;
781 grpc_channel_op op;
782 grpc_channel_element *elem;
Craig Tillerbd217572015-02-11 18:10:56 -0800783 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800784
785 /* lock, and gather up some stuff to do */
786 gpr_mu_lock(&server->mu);
787 if (server->shutdown) {
788 gpr_mu_unlock(&server->mu);
789 return;
790 }
791
nnoble0c475f02014-12-05 15:37:39 -0800792 nchannels = 0;
793 for (c = server->root_channel_data.next; c != &server->root_channel_data;
794 c = c->next) {
795 nchannels++;
796 }
797 channels = gpr_malloc(sizeof(channel_data *) * nchannels);
798 i = 0;
799 for (c = server->root_channel_data.next; c != &server->root_channel_data;
800 c = c->next) {
801 grpc_channel_internal_ref(c->channel);
802 channels[i] = c;
803 i++;
804 }
805
Craig Tillerbd217572015-02-11 18:10:56 -0800806 /* collect all unregistered then registered calls */
Craig Tillercce17ac2015-01-20 09:29:28 -0800807 requested_calls = server->requested_calls;
Craig Tiller24be0f72015-02-10 14:04:22 -0800808 memset(&server->requested_calls, 0, sizeof(server->requested_calls));
Craig Tillerbd217572015-02-11 18:10:56 -0800809 for (rm = server->registered_methods; rm; rm = rm->next) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800810 if (requested_calls.count + rm->requested.count >
811 requested_calls.capacity) {
812 requested_calls.capacity =
813 GPR_MAX(requested_calls.count + rm->requested.count,
814 2 * requested_calls.capacity);
815 requested_calls.calls =
816 gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
817 requested_calls.capacity);
Craig Tillerbd217572015-02-11 18:10:56 -0800818 }
Craig Tillerec3257c2015-02-12 15:59:43 -0800819 memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
820 sizeof(*requested_calls.calls) * rm->requested.count);
Craig Tillerbd217572015-02-11 18:10:56 -0800821 requested_calls.count += rm->requested.count;
Craig Tillerec3257c2015-02-12 15:59:43 -0800822 gpr_free(rm->requested.calls);
Craig Tillerbd217572015-02-11 18:10:56 -0800823 memset(&rm->requested, 0, sizeof(rm->requested));
824 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800825
826 server->shutdown = 1;
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800827 server->have_shutdown_tag = have_shutdown_tag;
828 server->shutdown_tag = shutdown_tag;
829 if (have_shutdown_tag) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800830 for (i = 0; i < server->cq_count; i++) {
831 grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
832 if (server->lists[ALL_CALLS] == NULL) {
833 grpc_cq_end_server_shutdown(server->cqs[i], shutdown_tag);
834 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800835 }
836 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800837 gpr_mu_unlock(&server->mu);
838
nnoble0c475f02014-12-05 15:37:39 -0800839 for (i = 0; i < nchannels; i++) {
840 c = channels[i];
841 elem = grpc_channel_stack_element(
842 grpc_channel_get_channel_stack(c->channel), 0);
843
844 op.type = GRPC_CHANNEL_GOAWAY;
845 op.dir = GRPC_CALL_DOWN;
846 op.data.goaway.status = GRPC_STATUS_OK;
847 op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
ctillerf962f522014-12-10 15:28:27 -0800848 elem->filter->channel_op(elem, NULL, &op);
nnoble0c475f02014-12-05 15:37:39 -0800849
850 grpc_channel_internal_unref(c->channel);
851 }
852 gpr_free(channels);
853
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800854 /* terminate all the requested calls */
Craig Tiller24be0f72015-02-10 14:04:22 -0800855 for (i = 0; i < requested_calls.count; i++) {
856 fail_call(server, &requested_calls.calls[i]);
Craig Tillercce17ac2015-01-20 09:29:28 -0800857 }
Craig Tiller24be0f72015-02-10 14:04:22 -0800858 gpr_free(requested_calls.calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800859
860 /* Shutdown listeners */
861 for (l = server->listeners; l; l = l->next) {
862 l->destroy(server, l->arg);
863 }
864 while (server->listeners) {
865 l = server->listeners;
866 server->listeners = l->next;
867 gpr_free(l);
868 }
869}
870
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800871void grpc_server_shutdown(grpc_server *server) {
872 shutdown_internal(server, 0, NULL);
873}
874
875void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
876 shutdown_internal(server, 1, tag);
877}
878
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800879void grpc_server_destroy(grpc_server *server) {
880 channel_data *c;
881 gpr_mu_lock(&server->mu);
882 for (c = server->root_channel_data.next; c != &server->root_channel_data;
883 c = c->next) {
884 shutdown_channel(c);
885 }
886 gpr_mu_unlock(&server->mu);
887
888 server_unref(server);
889}
890
891void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -0800892 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -0800893 grpc_pollset **pollsets,
894 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800895 void (*destroy)(grpc_server *server, void *arg)) {
896 listener *l = gpr_malloc(sizeof(listener));
897 l->arg = arg;
898 l->start = start;
899 l->destroy = destroy;
900 l->next = server->listeners;
901 server->listeners = l;
902}
903
Craig Tiller9f28ac22015-01-27 17:01:29 -0800904static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -0800905 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800906 call_data *calld = NULL;
Craig Tiller0ef1a922015-02-11 16:23:01 -0800907 requested_call_array *requested_calls = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800908 gpr_mu_lock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800909 if (server->shutdown) {
910 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800911 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800912 return GRPC_CALL_OK;
913 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800914 switch (rc->type) {
915 case LEGACY_CALL:
916 case BATCH_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800917 calld =
918 call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800919 requested_calls = &server->requested_calls;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800920 break;
921 case REGISTERED_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800922 calld = call_list_remove_head(
923 &rc->data.registered.registered_method->pending, PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800924 requested_calls = &rc->data.registered.registered_method->requested;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800925 break;
926 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800927 if (calld) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800928 GPR_ASSERT(calld->state == PENDING);
Craig Tillerbb3f22f2015-01-29 16:40:56 -0800929 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -0800930 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800931 begin_call(server, calld, rc);
Craig Tillercce17ac2015-01-20 09:29:28 -0800932 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800933 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800934 *requested_call_array_add(requested_calls) = *rc;
Craig Tillercce17ac2015-01-20 09:29:28 -0800935 gpr_mu_unlock(&server->mu);
936 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800937 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800938}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800939
Craig Tiller24be0f72015-02-10 14:04:22 -0800940grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
941 grpc_call_details *details,
942 grpc_metadata_array *initial_metadata,
Craig Tiller3b29b562015-02-11 12:58:46 -0800943 grpc_completion_queue *cq_bind,
944 void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800945 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800946 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -0800947 rc.type = BATCH_CALL;
948 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -0800949 rc.data.batch.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -0800950 rc.data.batch.call = call;
951 rc.data.batch.details = details;
952 rc.data.batch.initial_metadata = initial_metadata;
953 return queue_call_request(server, &rc);
954}
955
956grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -0800957 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
958 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
959 grpc_completion_queue *cq_bind, void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800960 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800961 registered_method *registered_method = rm;
962 grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -0800963 rc.type = REGISTERED_CALL;
964 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -0800965 rc.data.registered.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -0800966 rc.data.registered.call = call;
967 rc.data.registered.registered_method = registered_method;
968 rc.data.registered.deadline = deadline;
969 rc.data.registered.initial_metadata = initial_metadata;
970 rc.data.registered.optional_payload = optional_payload;
971 return queue_call_request(server, &rc);
972}
973
974grpc_call_error grpc_server_request_call_old(grpc_server *server,
975 void *tag_new) {
976 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800977 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
Craig Tiller24be0f72015-02-10 14:04:22 -0800978 rc.type = LEGACY_CALL;
979 rc.tag = tag_new;
980 return queue_call_request(server, &rc);
981}
982
983static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
984static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
985 void *tag);
Craig Tiller3b29b562015-02-11 12:58:46 -0800986static void publish_was_not_set(grpc_call *call, grpc_op_error status,
987 void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800988 abort();
989}
Craig Tiller24be0f72015-02-10 14:04:22 -0800990
Craig Tiller166e2502015-02-03 20:14:41 -0800991static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
992 gpr_slice slice = value->slice;
993 size_t len = GPR_SLICE_LENGTH(slice);
994
995 if (len + 1 > *capacity) {
996 *capacity = GPR_MAX(len + 1, *capacity * 2);
997 *dest = gpr_realloc(*dest, *capacity);
998 }
999 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1000}
1001
Craig Tiller24be0f72015-02-10 14:04:22 -08001002static void begin_call(grpc_server *server, call_data *calld,
1003 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001004 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001005 grpc_ioreq req[2];
1006 grpc_ioreq *r = req;
1007
1008 /* called once initial metadata has been read by the call, but BEFORE
1009 the ioreq to fetch it out of the call has been executed.
1010 This means metadata related fields can be relied on in calld, but to
1011 fill in the metadata array passed by the client, we need to perform
1012 an ioreq op, that should complete immediately. */
1013
1014 switch (rc->type) {
1015 case LEGACY_CALL:
1016 calld->legacy = gpr_malloc(sizeof(legacy_data));
1017 memset(calld->legacy, 0, sizeof(legacy_data));
1018 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1019 r->data.recv_metadata = &calld->legacy->initial_metadata;
1020 r++;
1021 publish = publish_legacy;
1022 break;
1023 case BATCH_CALL:
1024 cpstr(&rc->data.batch.details->host,
1025 &rc->data.batch.details->host_capacity, calld->host);
1026 cpstr(&rc->data.batch.details->method,
1027 &rc->data.batch.details->method_capacity, calld->path);
Craig Tiller8e8fd892015-02-10 17:02:08 -08001028 grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001029 *rc->data.batch.call = calld->call;
1030 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1031 r->data.recv_metadata = rc->data.batch.initial_metadata;
1032 r++;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001033 calld->cq_new = server->unregistered_cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001034 publish = publish_registered_or_batch;
1035 break;
1036 case REGISTERED_CALL:
1037 *rc->data.registered.deadline = calld->deadline;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001038 grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001039 *rc->data.registered.call = calld->call;
1040 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1041 r->data.recv_metadata = rc->data.registered.initial_metadata;
1042 r++;
1043 if (rc->data.registered.optional_payload) {
1044 r->op = GRPC_IOREQ_RECV_MESSAGE;
1045 r->data.recv_message = rc->data.registered.optional_payload;
1046 r++;
1047 }
Craig Tiller20bc56d2015-02-12 09:02:56 -08001048 calld->cq_new = rc->data.registered.registered_method->cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001049 publish = publish_registered_or_batch;
1050 break;
1051 }
1052
1053 grpc_call_internal_ref(calld->call);
1054 grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
1055 rc->tag);
1056}
1057
1058static void fail_call(grpc_server *server, requested_call *rc) {
1059 switch (rc->type) {
1060 case LEGACY_CALL:
Craig Tillerec3257c2015-02-12 15:59:43 -08001061 grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing,
1062 NULL, NULL, NULL, gpr_inf_past, 0, NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001063 break;
1064 case BATCH_CALL:
1065 *rc->data.batch.call = NULL;
1066 rc->data.batch.initial_metadata->count = 0;
Craig Tillerec3257c2015-02-12 15:59:43 -08001067 grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL,
1068 do_nothing, NULL, GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001069 break;
1070 case REGISTERED_CALL:
1071 *rc->data.registered.call = NULL;
1072 rc->data.registered.initial_metadata->count = 0;
Craig Tillerec3257c2015-02-12 15:59:43 -08001073 grpc_cq_end_op_complete(rc->data.registered.registered_method->cq,
1074 rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001075 break;
1076 }
1077}
1078
1079static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
1080 grpc_call_element *elem =
1081 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1082 call_data *calld = elem->call_data;
1083 channel_data *chand = elem->channel_data;
1084 grpc_server *server = chand->server;
1085
1086 if (status == GRPC_OP_OK) {
Craig Tiller20bc56d2015-02-12 09:02:56 -08001087 grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
Craig Tiller24be0f72015-02-10 14:04:22 -08001088 grpc_mdstr_as_c_string(calld->path),
1089 grpc_mdstr_as_c_string(calld->host), calld->deadline,
1090 calld->legacy->initial_metadata.count,
1091 calld->legacy->initial_metadata.metadata);
1092 } else {
1093 gpr_log(GPR_ERROR, "should never reach here");
1094 abort();
1095 }
1096}
1097
1098static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1099 void *tag) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001100 grpc_call_element *elem =
1101 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller20bc56d2015-02-12 09:02:56 -08001102 call_data *calld = elem->call_data;
1103 grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status);
Craig Tiller24be0f72015-02-10 14:04:22 -08001104}
1105
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001106const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1107 return server->channel_args;
1108}