blob: 22588194ea3b33174da3039aa83a9d2814ebd087 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
39#include "src/core/channel/census_filter.h"
40#include "src/core/channel/channel_args.h"
41#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/surface/call.h"
45#include "src/core/surface/channel.h"
46#include "src/core/surface/completion_queue.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080047#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080048#include <grpc/support/alloc.h>
49#include <grpc/support/log.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050#include <grpc/support/useful.h>
51
52typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
53
54typedef struct listener {
55 void *arg;
Craig Tiller20bc56d2015-02-12 09:02:56 -080056 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080057 void (*destroy)(grpc_server *server, void *arg);
58 struct listener *next;
59} listener;
60
61typedef struct call_data call_data;
62typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080063typedef struct registered_method registered_method;
64
65typedef struct {
66 call_data *next;
67 call_data *prev;
68} call_link;
69
70typedef enum { LEGACY_CALL, BATCH_CALL, REGISTERED_CALL } requested_call_type;
71
72typedef struct {
73 requested_call_type type;
74 void *tag;
75 union {
76 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080077 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080078 grpc_call **call;
79 grpc_call_details *details;
80 grpc_metadata_array *initial_metadata;
81 } batch;
82 struct {
Craig Tiller8e8fd892015-02-10 17:02:08 -080083 grpc_completion_queue *cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -080084 grpc_call **call;
85 registered_method *registered_method;
86 gpr_timespec *deadline;
87 grpc_metadata_array *initial_metadata;
88 grpc_byte_buffer **optional_payload;
89 } registered;
90 } data;
91} requested_call;
92
93typedef struct {
94 requested_call *calls;
95 size_t count;
96 size_t capacity;
97} requested_call_array;
98
99struct registered_method {
100 char *method;
101 char *host;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800102 call_data *pending;
Craig Tiller24be0f72015-02-10 14:04:22 -0800103 requested_call_array requested;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800104 grpc_completion_queue *cq;
Craig Tiller24be0f72015-02-10 14:04:22 -0800105 registered_method *next;
106};
107
108typedef struct channel_registered_method {
109 registered_method *server_registered_method;
110 grpc_mdstr *method;
111 grpc_mdstr *host;
112} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800113
114struct channel_data {
115 grpc_server *server;
116 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800117 grpc_mdstr *path_key;
118 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800119 /* linked list of all channels on a server */
120 channel_data *next;
121 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800122 channel_registered_method *registered_methods;
123 gpr_uint32 registered_method_slots;
124 gpr_uint32 registered_method_max_probes;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800125};
126
127struct grpc_server {
128 size_t channel_filter_count;
129 const grpc_channel_filter **channel_filters;
130 grpc_channel_args *channel_args;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800131 grpc_completion_queue *unregistered_cq;
132
133 grpc_completion_queue **cqs;
134 grpc_pollset **pollsets;
135 size_t cq_count;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800136
137 gpr_mu mu;
138
Craig Tiller24be0f72015-02-10 14:04:22 -0800139 registered_method *registered_methods;
140 requested_call_array requested_calls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800141
142 gpr_uint8 shutdown;
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800143 gpr_uint8 have_shutdown_tag;
144 void *shutdown_tag;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800145
146 call_data *lists[CALL_LIST_COUNT];
147 channel_data root_channel_data;
148
149 listener *listeners;
150 gpr_refcount internal_refcount;
151};
152
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800153typedef enum {
154 /* waiting for metadata */
155 NOT_STARTED,
156 /* inital metadata read, not flow controlled in yet */
157 PENDING,
158 /* flow controlled in, on completion queue */
159 ACTIVATED,
160 /* cancelled before being queued */
161 ZOMBIED
162} call_state;
163
Craig Tillerfb189f82015-02-03 12:07:07 -0800164typedef struct legacy_data {
Craig Tiller24be0f72015-02-10 14:04:22 -0800165 grpc_metadata_array initial_metadata;
Craig Tillerfb189f82015-02-03 12:07:07 -0800166} legacy_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800167
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800168struct call_data {
169 grpc_call *call;
170
171 call_state state;
172 gpr_timespec deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800173 grpc_mdstr *path;
174 grpc_mdstr *host;
175
176 legacy_data *legacy;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800177 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800178
Craig Tiller04cc8be2015-02-10 16:11:22 -0800179 call_data **root[CALL_LIST_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800180 call_link links[CALL_LIST_COUNT];
181};
182
183#define SERVER_FROM_CALL_ELEM(elem) \
184 (((channel_data *)(elem)->channel_data)->server)
185
186static void do_nothing(void *unused, grpc_op_error ignored) {}
187
Craig Tiller24be0f72015-02-10 14:04:22 -0800188static void begin_call(grpc_server *server, call_data *calld,
189 requested_call *rc);
190static void fail_call(grpc_server *server, requested_call *rc);
191
Craig Tiller3b29b562015-02-11 12:58:46 -0800192static int call_list_join(call_data **root, call_data *call, call_list list) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800193 GPR_ASSERT(!call->root[list]);
194 call->root[list] = root;
195 if (!*root) {
196 *root = call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800197 call->links[list].next = call->links[list].prev = call;
198 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800199 call->links[list].next = *root;
200 call->links[list].prev = (*root)->links[list].prev;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800201 call->links[list].next->links[list].prev =
202 call->links[list].prev->links[list].next = call;
203 }
204 return 1;
205}
206
Craig Tiller04cc8be2015-02-10 16:11:22 -0800207static call_data *call_list_remove_head(call_data **root, call_list list) {
208 call_data *out = *root;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800209 if (out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800210 out->root[list] = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800211 if (out->links[list].next == out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800212 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800214 *root = out->links[list].next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800215 out->links[list].next->links[list].prev = out->links[list].prev;
216 out->links[list].prev->links[list].next = out->links[list].next;
217 }
218 }
219 return out;
220}
221
Craig Tiller04cc8be2015-02-10 16:11:22 -0800222static int call_list_remove(call_data *call, call_list list) {
223 call_data **root = call->root[list];
224 if (root == NULL) return 0;
225 call->root[list] = NULL;
226 if (*root == call) {
227 *root = call->links[list].next;
228 if (*root == call) {
229 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800230 return 1;
231 }
232 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800233 GPR_ASSERT(*root != call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800234 call->links[list].next->links[list].prev = call->links[list].prev;
235 call->links[list].prev->links[list].next = call->links[list].next;
236 return 1;
237}
238
Craig Tiller24be0f72015-02-10 14:04:22 -0800239static void requested_call_array_destroy(requested_call_array *array) {
240 gpr_free(array->calls);
241}
242
243static requested_call *requested_call_array_add(requested_call_array *array) {
244 requested_call *rc;
245 if (array->count == array->capacity) {
246 array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
247 array->calls =
248 gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
249 }
250 rc = &array->calls[array->count++];
251 memset(rc, 0, sizeof(*rc));
252 return rc;
253}
254
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800255static void server_ref(grpc_server *server) {
256 gpr_ref(&server->internal_refcount);
257}
258
259static void server_unref(grpc_server *server) {
260 if (gpr_unref(&server->internal_refcount)) {
261 grpc_channel_args_destroy(server->channel_args);
262 gpr_mu_destroy(&server->mu);
263 gpr_free(server->channel_filters);
Craig Tiller24be0f72015-02-10 14:04:22 -0800264 requested_call_array_destroy(&server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800265 gpr_free(server);
266 }
267}
268
269static int is_channel_orphaned(channel_data *chand) {
270 return chand->next == chand;
271}
272
273static void orphan_channel(channel_data *chand) {
274 chand->next->prev = chand->prev;
275 chand->prev->next = chand->next;
276 chand->next = chand->prev = chand;
277}
278
ctiller58393c22015-01-07 14:03:30 -0800279static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800280 channel_data *chand = cd;
281 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800282 grpc_channel_destroy(chand->channel);
283 server_unref(server);
284}
285
286static void destroy_channel(channel_data *chand) {
287 if (is_channel_orphaned(chand)) return;
288 GPR_ASSERT(chand->server != NULL);
289 orphan_channel(chand);
290 server_ref(chand->server);
ctiller18b49ab2014-12-09 14:39:16 -0800291 grpc_iomgr_add_callback(finish_destroy_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800292}
293
Craig Tiller3b29b562015-02-11 12:58:46 -0800294static void finish_start_new_rpc_and_unlock(grpc_server *server,
295 grpc_call_element *elem,
296 call_data **pending_root,
297 requested_call_array *array) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800298 requested_call rc;
299 call_data *calld = elem->call_data;
300 if (array->count == 0) {
301 calld->state = PENDING;
302 call_list_join(pending_root, calld, PENDING_START);
303 gpr_mu_unlock(&server->mu);
304 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800305 rc = array->calls[--array->count];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800306 calld->state = ACTIVATED;
307 gpr_mu_unlock(&server->mu);
308 begin_call(server, calld, &rc);
309 }
310}
311
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800312static void start_new_rpc(grpc_call_element *elem) {
313 channel_data *chand = elem->channel_data;
314 call_data *calld = elem->call_data;
315 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800316 gpr_uint32 i;
317 gpr_uint32 hash;
318 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800319
320 gpr_mu_lock(&server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800321 if (chand->registered_methods && calld->path && calld->host) {
322 /* check for an exact match with host */
323 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
324 for (i = 0; i < chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800325 rm = &chand->registered_methods[(hash + i) %
326 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800327 if (!rm) break;
328 if (rm->host != calld->host) continue;
329 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800330 finish_start_new_rpc_and_unlock(server, elem,
331 &rm->server_registered_method->pending,
332 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800333 return;
334 }
335 /* check for a wildcard method definition (no host set) */
336 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800337 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800338 rm = &chand->registered_methods[(hash + i) %
339 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800340 if (!rm) break;
341 if (rm->host != NULL) continue;
342 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800343 finish_start_new_rpc_and_unlock(server, elem,
344 &rm->server_registered_method->pending,
345 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800346 return;
347 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800348 }
Craig Tiller3b29b562015-02-11 12:58:46 -0800349 finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
350 &server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800351}
352
ctiller58393c22015-01-07 14:03:30 -0800353static void kill_zombie(void *elem, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800354 grpc_call_destroy(grpc_call_from_top_element(elem));
355}
356
Craig Tillercce17ac2015-01-20 09:29:28 -0800357static void stream_closed(grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800358 call_data *calld = elem->call_data;
359 channel_data *chand = elem->channel_data;
360 gpr_mu_lock(&chand->server->mu);
361 switch (calld->state) {
362 case ACTIVATED:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800363 break;
364 case PENDING:
Craig Tiller04cc8be2015-02-10 16:11:22 -0800365 call_list_remove(calld, PENDING_START);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800366 /* fallthrough intended */
367 case NOT_STARTED:
368 calld->state = ZOMBIED;
ctiller18b49ab2014-12-09 14:39:16 -0800369 grpc_iomgr_add_callback(kill_zombie, elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800370 break;
371 case ZOMBIED:
372 break;
373 }
374 gpr_mu_unlock(&chand->server->mu);
Craig Tiller0a927bf2015-02-05 10:52:53 -0800375 grpc_call_stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800376}
377
Craig Tillercce17ac2015-01-20 09:29:28 -0800378static void read_closed(grpc_call_element *elem) {
379 call_data *calld = elem->call_data;
380 channel_data *chand = elem->channel_data;
381 gpr_mu_lock(&chand->server->mu);
382 switch (calld->state) {
383 case ACTIVATED:
384 case PENDING:
385 grpc_call_read_closed(elem);
386 break;
387 case NOT_STARTED:
388 calld->state = ZOMBIED;
389 grpc_iomgr_add_callback(kill_zombie, elem);
390 break;
391 case ZOMBIED:
392 break;
393 }
394 gpr_mu_unlock(&chand->server->mu);
395}
396
ctillerf962f522014-12-10 15:28:27 -0800397static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
398 grpc_call_op *op) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800399 channel_data *chand = elem->channel_data;
400 call_data *calld = elem->call_data;
401 grpc_mdelem *md;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800402 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
403 switch (op->type) {
404 case GRPC_RECV_METADATA:
Craig Tillercce17ac2015-01-20 09:29:28 -0800405 md = op->data.metadata;
406 if (md->key == chand->path_key) {
407 calld->path = grpc_mdstr_ref(md->value);
408 grpc_mdelem_unref(md);
409 } else if (md->key == chand->authority_key) {
410 calld->host = grpc_mdstr_ref(md->value);
411 grpc_mdelem_unref(md);
412 } else {
413 grpc_call_recv_metadata(elem, md);
414 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800415 break;
416 case GRPC_RECV_END_OF_INITIAL_METADATA:
417 start_new_rpc(elem);
Craig Tiller4069b682015-01-29 14:01:19 -0800418 grpc_call_initial_metadata_complete(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800419 break;
420 case GRPC_RECV_MESSAGE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800421 grpc_call_recv_message(elem, op->data.message);
422 op->done_cb(op->user_data, GRPC_OP_OK);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800423 break;
424 case GRPC_RECV_HALF_CLOSE:
Craig Tillercce17ac2015-01-20 09:29:28 -0800425 read_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800426 break;
427 case GRPC_RECV_FINISH:
Craig Tillercce17ac2015-01-20 09:29:28 -0800428 stream_closed(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800429 break;
430 case GRPC_RECV_DEADLINE:
431 grpc_call_set_deadline(elem, op->data.deadline);
432 ((call_data *)elem->call_data)->deadline = op->data.deadline;
433 break;
434 default:
435 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
436 grpc_call_next_op(elem, op);
437 break;
438 }
439}
440
ctillerf962f522014-12-10 15:28:27 -0800441static void channel_op(grpc_channel_element *elem,
442 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800443 channel_data *chand = elem->channel_data;
Craig Tiller8b976d02015-02-05 21:41:23 -0800444 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800445
446 switch (op->type) {
447 case GRPC_ACCEPT_CALL:
448 /* create a call */
Craig Tillerfb189f82015-02-03 12:07:07 -0800449 grpc_call_create(chand->channel, NULL,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800450 op->data.accept_call.transport_server_data);
451 break;
452 case GRPC_TRANSPORT_CLOSED:
453 /* if the transport is closed for a server channel, we destroy the
454 channel */
Craig Tiller8b976d02015-02-05 21:41:23 -0800455 gpr_mu_lock(&server->mu);
456 server_ref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800457 destroy_channel(chand);
Craig Tiller8b976d02015-02-05 21:41:23 -0800458 gpr_mu_unlock(&server->mu);
459 server_unref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800460 break;
nnoble0c475f02014-12-05 15:37:39 -0800461 case GRPC_TRANSPORT_GOAWAY:
462 gpr_slice_unref(op->data.goaway.message);
463 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800464 default:
465 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
466 grpc_channel_next_op(elem, op);
467 break;
468 }
469}
470
ctiller58393c22015-01-07 14:03:30 -0800471static void finish_shutdown_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800472 channel_data *chand = cd;
473 grpc_channel_op op;
nnoble0c475f02014-12-05 15:37:39 -0800474 op.type = GRPC_CHANNEL_DISCONNECT;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800475 op.dir = GRPC_CALL_DOWN;
476 channel_op(grpc_channel_stack_element(
477 grpc_channel_get_channel_stack(chand->channel), 0),
ctillerf962f522014-12-10 15:28:27 -0800478 NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800479 grpc_channel_internal_unref(chand->channel);
480}
481
482static void shutdown_channel(channel_data *chand) {
483 grpc_channel_internal_ref(chand->channel);
ctiller18b49ab2014-12-09 14:39:16 -0800484 grpc_iomgr_add_callback(finish_shutdown_channel, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800485}
486
487static void init_call_elem(grpc_call_element *elem,
488 const void *server_transport_data) {
489 call_data *calld = elem->call_data;
490 channel_data *chand = elem->channel_data;
491 memset(calld, 0, sizeof(call_data));
492 calld->deadline = gpr_inf_future;
493 calld->call = grpc_call_from_top_element(elem);
494
495 gpr_mu_lock(&chand->server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800496 call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800497 gpr_mu_unlock(&chand->server->mu);
498
499 server_ref(chand->server);
500}
501
502static void destroy_call_elem(grpc_call_element *elem) {
503 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800504 call_data *calld = elem->call_data;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800505 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800506
507 gpr_mu_lock(&chand->server->mu);
508 for (i = 0; i < CALL_LIST_COUNT; i++) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800509 call_list_remove(elem->call_data, i);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800510 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800511 if (chand->server->shutdown && chand->server->have_shutdown_tag &&
512 chand->server->lists[ALL_CALLS] == NULL) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800513 for (i = 0; i < chand->server->cq_count; i++) {
514 grpc_cq_end_server_shutdown(chand->server->cqs[i], chand->server->shutdown_tag);
515 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800516 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800517 gpr_mu_unlock(&chand->server->mu);
518
Craig Tiller4df31a62015-01-30 09:44:31 -0800519 if (calld->host) {
520 grpc_mdstr_unref(calld->host);
521 }
522 if (calld->path) {
523 grpc_mdstr_unref(calld->path);
524 }
525
Craig Tillerdb7db992015-01-29 11:19:01 -0800526 if (calld->legacy) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800527 gpr_free(calld->legacy->initial_metadata.metadata);
Craig Tillerdb7db992015-01-29 11:19:01 -0800528 gpr_free(calld->legacy);
529 }
530
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800531 server_unref(chand->server);
532}
533
534static void init_channel_elem(grpc_channel_element *elem,
535 const grpc_channel_args *args,
536 grpc_mdctx *metadata_context, int is_first,
537 int is_last) {
538 channel_data *chand = elem->channel_data;
539 GPR_ASSERT(is_first);
540 GPR_ASSERT(!is_last);
541 chand->server = NULL;
542 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800543 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
544 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800545 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800546 chand->registered_methods = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800547}
548
549static void destroy_channel_elem(grpc_channel_element *elem) {
550 channel_data *chand = elem->channel_data;
551 if (chand->server) {
552 gpr_mu_lock(&chand->server->mu);
553 chand->next->prev = chand->prev;
554 chand->prev->next = chand->next;
555 chand->next = chand->prev = chand;
556 gpr_mu_unlock(&chand->server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800557 grpc_mdstr_unref(chand->path_key);
558 grpc_mdstr_unref(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800559 server_unref(chand->server);
560 }
561}
562
563static const grpc_channel_filter server_surface_filter = {
Craig Tiller24be0f72015-02-10 14:04:22 -0800564 call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
565 sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800566};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800567
Craig Tiller20bc56d2015-02-12 09:02:56 -0800568static void addcq(grpc_server *server, grpc_completion_queue *cq) {
569 size_t i, n;
570 for (i = 0; i < server->cq_count; i++) {
571 if (server->cqs[i] == cq) return;
572 }
573 n = server->cq_count++;
574 server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue*));
575 server->cqs[n] = cq;
576}
577
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800578grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
579 grpc_channel_filter **filters,
580 size_t filter_count,
581 const grpc_channel_args *args) {
582 size_t i;
583 int census_enabled = grpc_channel_args_is_census_enabled(args);
584
585 grpc_server *server = gpr_malloc(sizeof(grpc_server));
586 memset(server, 0, sizeof(grpc_server));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800587 if (cq) addcq(server, cq);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800588
589 gpr_mu_init(&server->mu);
590
Craig Tiller20bc56d2015-02-12 09:02:56 -0800591 server->unregistered_cq = cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800592 /* decremented by grpc_server_destroy */
593 gpr_ref_init(&server->internal_refcount, 1);
594 server->root_channel_data.next = server->root_channel_data.prev =
595 &server->root_channel_data;
596
597 /* Server filter stack is:
598
599 server_surface_filter - for making surface API calls
600 grpc_server_census_filter (optional) - for stats collection and tracing
601 {passed in filter stack}
602 grpc_connected_channel_filter - for interfacing with transports */
603 server->channel_filter_count = filter_count + 1 + census_enabled;
604 server->channel_filters =
605 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
606 server->channel_filters[0] = &server_surface_filter;
607 if (census_enabled) {
608 server->channel_filters[1] = &grpc_server_census_filter;
609 }
610 for (i = 0; i < filter_count; i++) {
611 server->channel_filters[i + 1 + census_enabled] = filters[i];
612 }
613
614 server->channel_args = grpc_channel_args_copy(args);
615
616 return server;
617}
618
Craig Tiller24be0f72015-02-10 14:04:22 -0800619static int streq(const char *a, const char *b) {
620 if (a == NULL && b == NULL) return 1;
621 if (a == NULL) return 0;
622 if (b == NULL) return 0;
623 return 0 == strcmp(a, b);
624}
625
626void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tiller20bc56d2015-02-12 09:02:56 -0800627 const char *host, grpc_completion_queue *cq_new_rpc) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800628 registered_method *m;
629 if (!method) {
630 gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
631 return NULL;
632 }
633 for (m = server->registered_methods; m; m = m->next) {
634 if (streq(m->method, method) && streq(m->host, host)) {
635 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
636 host ? host : "*");
637 return NULL;
638 }
639 }
Craig Tiller20bc56d2015-02-12 09:02:56 -0800640 addcq(server, cq_new_rpc);
Craig Tiller24be0f72015-02-10 14:04:22 -0800641 m = gpr_malloc(sizeof(registered_method));
642 memset(m, 0, sizeof(*m));
643 m->method = gpr_strdup(method);
644 m->host = gpr_strdup(host);
645 m->next = server->registered_methods;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800646 m->cq = cq_new_rpc;
Craig Tiller24be0f72015-02-10 14:04:22 -0800647 server->registered_methods = m;
648 return m;
649}
650
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800651void grpc_server_start(grpc_server *server) {
652 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800653 size_t i;
654
655 server->pollsets = gpr_malloc(sizeof(grpc_pollset*) * server->cq_count);
656 for (i = 0; i < server->cq_count; i++) {
657 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
658 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800659
660 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800661 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800662 }
663}
664
665grpc_transport_setup_result grpc_server_setup_transport(
666 grpc_server *s, grpc_transport *transport,
667 grpc_channel_filter const **extra_filters, size_t num_extra_filters,
668 grpc_mdctx *mdctx) {
669 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
670 grpc_channel_filter const **filters =
671 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
672 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800673 size_t num_registered_methods;
674 size_t alloc;
675 registered_method *rm;
676 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800677 grpc_channel *channel;
678 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800679 grpc_mdstr *host;
680 grpc_mdstr *method;
681 gpr_uint32 hash;
682 gpr_uint32 slots;
683 gpr_uint32 probes;
684 gpr_uint32 max_probes = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800685
686 for (i = 0; i < s->channel_filter_count; i++) {
687 filters[i] = s->channel_filters[i];
688 }
689 for (; i < s->channel_filter_count + num_extra_filters; i++) {
690 filters[i] = extra_filters[i - s->channel_filter_count];
691 }
692 filters[i] = &grpc_connected_channel_filter;
693
Craig Tiller20bc56d2015-02-12 09:02:56 -0800694 for (i = 0; i < s->cq_count; i++) {
695 grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
696 }
ctillerd79b4862014-12-17 16:36:59 -0800697
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800698 channel = grpc_channel_create_from_filters(filters, num_filters,
699 s->channel_args, mdctx, 0);
700 chand = (channel_data *)grpc_channel_stack_element(
701 grpc_channel_get_channel_stack(channel), 0)->channel_data;
702 chand->server = s;
703 server_ref(s);
704 chand->channel = channel;
705
Craig Tiller04cc8be2015-02-10 16:11:22 -0800706 num_registered_methods = 0;
707 for (rm = s->registered_methods; rm; rm = rm->next) {
708 num_registered_methods++;
709 }
710 /* build a lookup table phrased in terms of mdstr's in this channels context
711 to quickly find registered methods */
712 if (num_registered_methods > 0) {
713 slots = 2 * num_registered_methods;
714 alloc = sizeof(channel_registered_method) * slots;
715 chand->registered_methods = gpr_malloc(alloc);
716 memset(chand->registered_methods, 0, alloc);
717 for (rm = s->registered_methods; rm; rm = rm->next) {
718 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
Craig Tillere76c9662015-02-11 13:18:31 -0800719 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800720 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800721 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
722 .server_registered_method != NULL;
723 probes++)
724 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800725 if (probes > max_probes) max_probes = probes;
726 crm = &chand->registered_methods[(hash + probes) % slots];
727 crm->server_registered_method = rm;
728 crm->host = host;
729 crm->method = method;
730 }
731 chand->registered_method_slots = slots;
732 chand->registered_method_max_probes = max_probes;
733 }
734
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800735 gpr_mu_lock(&s->mu);
736 chand->next = &s->root_channel_data;
737 chand->prev = chand->next->prev;
738 chand->next->prev = chand->prev->next = chand;
739 gpr_mu_unlock(&s->mu);
740
741 gpr_free(filters);
742
743 return grpc_connected_channel_bind_transport(
744 grpc_channel_get_channel_stack(channel), transport);
745}
746
Craig Tillerbd217572015-02-11 18:10:56 -0800747static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800748 void *shutdown_tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800749 listener *l;
Craig Tiller24be0f72015-02-10 14:04:22 -0800750 requested_call_array requested_calls;
nnoble0c475f02014-12-05 15:37:39 -0800751 channel_data **channels;
752 channel_data *c;
753 size_t nchannels;
754 size_t i;
755 grpc_channel_op op;
756 grpc_channel_element *elem;
Craig Tillerbd217572015-02-11 18:10:56 -0800757 registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800758
759 /* lock, and gather up some stuff to do */
760 gpr_mu_lock(&server->mu);
761 if (server->shutdown) {
762 gpr_mu_unlock(&server->mu);
763 return;
764 }
765
nnoble0c475f02014-12-05 15:37:39 -0800766 nchannels = 0;
767 for (c = server->root_channel_data.next; c != &server->root_channel_data;
768 c = c->next) {
769 nchannels++;
770 }
771 channels = gpr_malloc(sizeof(channel_data *) * nchannels);
772 i = 0;
773 for (c = server->root_channel_data.next; c != &server->root_channel_data;
774 c = c->next) {
775 grpc_channel_internal_ref(c->channel);
776 channels[i] = c;
777 i++;
778 }
779
Craig Tillerbd217572015-02-11 18:10:56 -0800780 /* collect all unregistered then registered calls */
Craig Tillercce17ac2015-01-20 09:29:28 -0800781 requested_calls = server->requested_calls;
Craig Tiller24be0f72015-02-10 14:04:22 -0800782 memset(&server->requested_calls, 0, sizeof(server->requested_calls));
Craig Tillerbd217572015-02-11 18:10:56 -0800783 for (rm = server->registered_methods; rm; rm = rm->next) {
784 if (requested_calls.count + rm->requested.count > requested_calls.capacity) {
785 requested_calls.capacity = GPR_MAX(requested_calls.count + rm->requested.count, 2 * requested_calls.capacity);
786 requested_calls.calls = gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * requested_calls.capacity);
787 }
788 memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, sizeof(*requested_calls.calls) * rm->requested.count);
789 requested_calls.count += rm->requested.count;
790 memset(&rm->requested, 0, sizeof(rm->requested));
791 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800792
793 server->shutdown = 1;
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800794 server->have_shutdown_tag = have_shutdown_tag;
795 server->shutdown_tag = shutdown_tag;
796 if (have_shutdown_tag) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800797 for (i = 0; i < server->cq_count; i++) {
798 grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
799 if (server->lists[ALL_CALLS] == NULL) {
800 grpc_cq_end_server_shutdown(server->cqs[i], shutdown_tag);
801 }
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800802 }
803 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800804 gpr_mu_unlock(&server->mu);
805
nnoble0c475f02014-12-05 15:37:39 -0800806 for (i = 0; i < nchannels; i++) {
807 c = channels[i];
808 elem = grpc_channel_stack_element(
809 grpc_channel_get_channel_stack(c->channel), 0);
810
811 op.type = GRPC_CHANNEL_GOAWAY;
812 op.dir = GRPC_CALL_DOWN;
813 op.data.goaway.status = GRPC_STATUS_OK;
814 op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
ctillerf962f522014-12-10 15:28:27 -0800815 elem->filter->channel_op(elem, NULL, &op);
nnoble0c475f02014-12-05 15:37:39 -0800816
817 grpc_channel_internal_unref(c->channel);
818 }
819 gpr_free(channels);
820
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800821 /* terminate all the requested calls */
Craig Tiller24be0f72015-02-10 14:04:22 -0800822 for (i = 0; i < requested_calls.count; i++) {
823 fail_call(server, &requested_calls.calls[i]);
Craig Tillercce17ac2015-01-20 09:29:28 -0800824 }
Craig Tiller24be0f72015-02-10 14:04:22 -0800825 gpr_free(requested_calls.calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800826
827 /* Shutdown listeners */
828 for (l = server->listeners; l; l = l->next) {
829 l->destroy(server, l->arg);
830 }
831 while (server->listeners) {
832 l = server->listeners;
833 server->listeners = l->next;
834 gpr_free(l);
835 }
836}
837
Craig Tiller4ffdcd52015-01-16 11:34:55 -0800838void grpc_server_shutdown(grpc_server *server) {
839 shutdown_internal(server, 0, NULL);
840}
841
842void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
843 shutdown_internal(server, 1, tag);
844}
845
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800846void grpc_server_destroy(grpc_server *server) {
847 channel_data *c;
848 gpr_mu_lock(&server->mu);
849 for (c = server->root_channel_data.next; c != &server->root_channel_data;
850 c = c->next) {
851 shutdown_channel(c);
852 }
853 gpr_mu_unlock(&server->mu);
854
855 server_unref(server);
856}
857
858void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -0800859 void (*start)(grpc_server *server, void *arg,
Craig Tiller20bc56d2015-02-12 09:02:56 -0800860 grpc_pollset **pollsets, size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800861 void (*destroy)(grpc_server *server, void *arg)) {
862 listener *l = gpr_malloc(sizeof(listener));
863 l->arg = arg;
864 l->start = start;
865 l->destroy = destroy;
866 l->next = server->listeners;
867 server->listeners = l;
868}
869
Craig Tiller9f28ac22015-01-27 17:01:29 -0800870static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -0800871 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800872 call_data *calld = NULL;
Craig Tiller0ef1a922015-02-11 16:23:01 -0800873 requested_call_array *requested_calls = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800874 gpr_mu_lock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800875 if (server->shutdown) {
876 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800877 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800878 return GRPC_CALL_OK;
879 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800880 switch (rc->type) {
881 case LEGACY_CALL:
882 case BATCH_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800883 calld =
884 call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800885 requested_calls = &server->requested_calls;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800886 break;
887 case REGISTERED_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -0800888 calld = call_list_remove_head(
889 &rc->data.registered.registered_method->pending, PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800890 requested_calls = &rc->data.registered.registered_method->requested;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800891 break;
892 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800893 if (calld) {
Craig Tillercce17ac2015-01-20 09:29:28 -0800894 GPR_ASSERT(calld->state == PENDING);
Craig Tillerbb3f22f2015-01-29 16:40:56 -0800895 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -0800896 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -0800897 begin_call(server, calld, rc);
Craig Tillercce17ac2015-01-20 09:29:28 -0800898 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800899 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800900 *requested_call_array_add(requested_calls) = *rc;
Craig Tillercce17ac2015-01-20 09:29:28 -0800901 gpr_mu_unlock(&server->mu);
902 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800903 }
Craig Tillercce17ac2015-01-20 09:29:28 -0800904}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800905
Craig Tiller24be0f72015-02-10 14:04:22 -0800906grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
907 grpc_call_details *details,
908 grpc_metadata_array *initial_metadata,
Craig Tiller3b29b562015-02-11 12:58:46 -0800909 grpc_completion_queue *cq_bind,
910 void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800911 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800912 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -0800913 rc.type = BATCH_CALL;
914 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -0800915 rc.data.batch.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -0800916 rc.data.batch.call = call;
917 rc.data.batch.details = details;
918 rc.data.batch.initial_metadata = initial_metadata;
919 return queue_call_request(server, &rc);
920}
921
922grpc_call_error grpc_server_request_registered_call(
Craig Tiller20bc56d2015-02-12 09:02:56 -0800923 grpc_server *server, void *rm, grpc_call **call,
Craig Tiller24be0f72015-02-10 14:04:22 -0800924 gpr_timespec *deadline, grpc_metadata_array *initial_metadata,
Craig Tiller3b29b562015-02-11 12:58:46 -0800925 grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind,
Craig Tiller8e8fd892015-02-10 17:02:08 -0800926 void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800927 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800928 registered_method *registered_method = rm;
929 grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
Craig Tiller24be0f72015-02-10 14:04:22 -0800930 rc.type = REGISTERED_CALL;
931 rc.tag = tag;
Craig Tiller8e8fd892015-02-10 17:02:08 -0800932 rc.data.registered.cq_bind = cq_bind;
Craig Tiller24be0f72015-02-10 14:04:22 -0800933 rc.data.registered.call = call;
934 rc.data.registered.registered_method = registered_method;
935 rc.data.registered.deadline = deadline;
936 rc.data.registered.initial_metadata = initial_metadata;
937 rc.data.registered.optional_payload = optional_payload;
938 return queue_call_request(server, &rc);
939}
940
941grpc_call_error grpc_server_request_call_old(grpc_server *server,
942 void *tag_new) {
943 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800944 grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
Craig Tiller24be0f72015-02-10 14:04:22 -0800945 rc.type = LEGACY_CALL;
946 rc.tag = tag_new;
947 return queue_call_request(server, &rc);
948}
949
950static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag);
951static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
952 void *tag);
Craig Tiller3b29b562015-02-11 12:58:46 -0800953static void publish_was_not_set(grpc_call *call, grpc_op_error status,
954 void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800955 abort();
956}
Craig Tiller24be0f72015-02-10 14:04:22 -0800957
Craig Tiller166e2502015-02-03 20:14:41 -0800958static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
959 gpr_slice slice = value->slice;
960 size_t len = GPR_SLICE_LENGTH(slice);
961
962 if (len + 1 > *capacity) {
963 *capacity = GPR_MAX(len + 1, *capacity * 2);
964 *dest = gpr_realloc(*dest, *capacity);
965 }
966 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
967}
968
Craig Tiller24be0f72015-02-10 14:04:22 -0800969static void begin_call(grpc_server *server, call_data *calld,
970 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -0800971 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -0800972 grpc_ioreq req[2];
973 grpc_ioreq *r = req;
974
975 /* called once initial metadata has been read by the call, but BEFORE
976 the ioreq to fetch it out of the call has been executed.
977 This means metadata related fields can be relied on in calld, but to
978 fill in the metadata array passed by the client, we need to perform
979 an ioreq op, that should complete immediately. */
980
981 switch (rc->type) {
982 case LEGACY_CALL:
983 calld->legacy = gpr_malloc(sizeof(legacy_data));
984 memset(calld->legacy, 0, sizeof(legacy_data));
985 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
986 r->data.recv_metadata = &calld->legacy->initial_metadata;
987 r++;
988 publish = publish_legacy;
989 break;
990 case BATCH_CALL:
991 cpstr(&rc->data.batch.details->host,
992 &rc->data.batch.details->host_capacity, calld->host);
993 cpstr(&rc->data.batch.details->method,
994 &rc->data.batch.details->method_capacity, calld->path);
Craig Tiller8e8fd892015-02-10 17:02:08 -0800995 grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -0800996 *rc->data.batch.call = calld->call;
997 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
998 r->data.recv_metadata = rc->data.batch.initial_metadata;
999 r++;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001000 calld->cq_new = server->unregistered_cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001001 publish = publish_registered_or_batch;
1002 break;
1003 case REGISTERED_CALL:
1004 *rc->data.registered.deadline = calld->deadline;
Craig Tiller8e8fd892015-02-10 17:02:08 -08001005 grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind);
Craig Tiller24be0f72015-02-10 14:04:22 -08001006 *rc->data.registered.call = calld->call;
1007 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1008 r->data.recv_metadata = rc->data.registered.initial_metadata;
1009 r++;
1010 if (rc->data.registered.optional_payload) {
1011 r->op = GRPC_IOREQ_RECV_MESSAGE;
1012 r->data.recv_message = rc->data.registered.optional_payload;
1013 r++;
1014 }
Craig Tiller20bc56d2015-02-12 09:02:56 -08001015 calld->cq_new = rc->data.registered.registered_method->cq;
Craig Tiller24be0f72015-02-10 14:04:22 -08001016 publish = publish_registered_or_batch;
1017 break;
1018 }
1019
1020 grpc_call_internal_ref(calld->call);
1021 grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
1022 rc->tag);
1023}
1024
1025static void fail_call(grpc_server *server, requested_call *rc) {
1026 switch (rc->type) {
1027 case LEGACY_CALL:
Craig Tiller20bc56d2015-02-12 09:02:56 -08001028 grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, NULL,
Craig Tiller24be0f72015-02-10 14:04:22 -08001029 NULL, gpr_inf_past, 0, NULL);
1030 break;
1031 case BATCH_CALL:
1032 *rc->data.batch.call = NULL;
1033 rc->data.batch.initial_metadata->count = 0;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001034 grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
Craig Tiller3b29b562015-02-11 12:58:46 -08001035 GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001036 break;
1037 case REGISTERED_CALL:
1038 *rc->data.registered.call = NULL;
1039 rc->data.registered.initial_metadata->count = 0;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001040 grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, rc->tag, NULL, do_nothing, NULL,
Craig Tiller3b29b562015-02-11 12:58:46 -08001041 GRPC_OP_ERROR);
Craig Tiller24be0f72015-02-10 14:04:22 -08001042 break;
1043 }
1044}
1045
1046static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
1047 grpc_call_element *elem =
1048 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1049 call_data *calld = elem->call_data;
1050 channel_data *chand = elem->channel_data;
1051 grpc_server *server = chand->server;
1052
1053 if (status == GRPC_OP_OK) {
Craig Tiller20bc56d2015-02-12 09:02:56 -08001054 grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
Craig Tiller24be0f72015-02-10 14:04:22 -08001055 grpc_mdstr_as_c_string(calld->path),
1056 grpc_mdstr_as_c_string(calld->host), calld->deadline,
1057 calld->legacy->initial_metadata.count,
1058 calld->legacy->initial_metadata.metadata);
1059 } else {
1060 gpr_log(GPR_ERROR, "should never reach here");
1061 abort();
1062 }
1063}
1064
1065static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
1066 void *tag) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001067 grpc_call_element *elem =
1068 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller20bc56d2015-02-12 09:02:56 -08001069 call_data *calld = elem->call_data;
1070 grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status);
Craig Tiller24be0f72015-02-10 14:04:22 -08001071}
1072
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001073const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1074 return server->channel_args;
1075}