blob: a76a6c78120cc0e8076ba43c8b28751c10db3a44 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
36#include <stdlib.h>
37#include <string.h>
38
39#include "src/core/channel/census_filter.h"
40#include "src/core/channel/channel_args.h"
41#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080042#include "src/core/iomgr/iomgr.h"
Craig Tiller485d7762015-01-23 12:54:05 -080043#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/core/surface/call.h"
45#include "src/core/surface/channel.h"
46#include "src/core/surface/completion_queue.h"
Craig Tiller60fd3612015-03-05 16:24:22 -080047#include "src/core/surface/init.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080048#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
Masood Malekghassemi701af602015-06-03 15:01:17 -070051#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080052#include <grpc/support/useful.h>
53
54typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
55
56typedef struct listener {
57 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080058 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
59 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080060 void (*destroy)(grpc_server *server, void *arg);
61 struct listener *next;
62} listener;
63
64typedef struct call_data call_data;
65typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080066typedef struct registered_method registered_method;
67
68typedef struct {
69 call_data *next;
70 call_data *prev;
71} call_link;
72
Craig Tiller0e919562015-04-28 14:03:47 -070073typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
Craig Tiller24be0f72015-02-10 14:04:22 -080074
75typedef struct {
76 requested_call_type type;
77 void *tag;
Craig Tillerf9e6adf2015-05-06 11:45:59 -070078 grpc_completion_queue *cq_bound_to_call;
79 grpc_completion_queue *cq_for_notification;
80 grpc_call **call;
Craig Tiller24be0f72015-02-10 14:04:22 -080081 union {
82 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080083 grpc_call_details *details;
84 grpc_metadata_array *initial_metadata;
85 } batch;
86 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080087 registered_method *registered_method;
88 gpr_timespec *deadline;
89 grpc_metadata_array *initial_metadata;
90 grpc_byte_buffer **optional_payload;
91 } registered;
92 } data;
93} requested_call;
94
95typedef struct {
96 requested_call *calls;
97 size_t count;
98 size_t capacity;
99} requested_call_array;
100
101struct registered_method {
102 char *method;
103 char *host;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800104 call_data *pending;
Craig Tiller24be0f72015-02-10 14:04:22 -0800105 requested_call_array requested;
106 registered_method *next;
107};
108
109typedef struct channel_registered_method {
110 registered_method *server_registered_method;
111 grpc_mdstr *method;
112 grpc_mdstr *host;
113} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800114
115struct channel_data {
116 grpc_server *server;
Craig Tillerb76d05b2015-05-29 17:21:56 -0700117 size_t num_calls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800118 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800119 grpc_mdstr *path_key;
120 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800121 /* linked list of all channels on a server */
122 channel_data *next;
123 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800124 channel_registered_method *registered_methods;
125 gpr_uint32 registered_method_slots;
126 gpr_uint32 registered_method_max_probes;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700127 grpc_iomgr_closure finish_destroy_channel_closure;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800128};
129
Craig Tillerbce999f2015-05-27 09:55:51 -0700130typedef struct shutdown_tag {
131 void *tag;
132 grpc_completion_queue *cq;
133} shutdown_tag;
134
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800135struct grpc_server {
136 size_t channel_filter_count;
137 const grpc_channel_filter **channel_filters;
138 grpc_channel_args *channel_args;
Craig Tillerec3257c2015-02-12 15:59:43 -0800139
Craig Tiller20bc56d2015-02-12 09:02:56 -0800140 grpc_completion_queue **cqs;
141 grpc_pollset **pollsets;
142 size_t cq_count;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800143
144 gpr_mu mu;
145
Craig Tiller24be0f72015-02-10 14:04:22 -0800146 registered_method *registered_methods;
147 requested_call_array requested_calls;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800148
149 gpr_uint8 shutdown;
Craig Tiller29f79dc2015-05-27 15:59:23 -0700150 gpr_uint8 shutdown_published;
Craig Tilleraea2fc02015-02-17 16:54:53 -0800151 size_t num_shutdown_tags;
Craig Tillerbce999f2015-05-27 09:55:51 -0700152 shutdown_tag *shutdown_tags;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800153
154 call_data *lists[CALL_LIST_COUNT];
155 channel_data root_channel_data;
156
157 listener *listeners;
Craig Tilleraec96aa2015-04-07 14:32:15 -0700158 int listeners_destroyed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800159 gpr_refcount internal_refcount;
160};
161
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800162typedef enum {
163 /* waiting for metadata */
164 NOT_STARTED,
165 /* inital metadata read, not flow controlled in yet */
166 PENDING,
167 /* flow controlled in, on completion queue */
168 ACTIVATED,
169 /* cancelled before being queued */
170 ZOMBIED
171} call_state;
172
173struct call_data {
174 grpc_call *call;
175
176 call_state state;
Craig Tillercce17ac2015-01-20 09:29:28 -0800177 grpc_mdstr *path;
178 grpc_mdstr *host;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700179 gpr_timespec deadline;
180 int got_initial_metadata;
Craig Tillercce17ac2015-01-20 09:29:28 -0800181
Craig Tiller20bc56d2015-02-12 09:02:56 -0800182 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800183
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700184 grpc_stream_op_buffer *recv_ops;
185 grpc_stream_state *recv_state;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700186 grpc_iomgr_closure *on_done_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700187
Craig Tiller1e6facb2015-06-11 22:47:11 -0700188 grpc_iomgr_closure server_on_recv;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700189 grpc_iomgr_closure kill_zombie_closure;
190
Craig Tiller04cc8be2015-02-10 16:11:22 -0800191 call_data **root[CALL_LIST_COUNT];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800192 call_link links[CALL_LIST_COUNT];
193};
194
195#define SERVER_FROM_CALL_ELEM(elem) \
196 (((channel_data *)(elem)->channel_data)->server)
197
Craig Tiller24be0f72015-02-10 14:04:22 -0800198static void begin_call(grpc_server *server, call_data *calld,
199 requested_call *rc);
200static void fail_call(grpc_server *server, requested_call *rc);
Craig Tillerc4a1f522015-05-29 22:32:04 -0700201static void shutdown_channel(channel_data *chand, int send_goaway,
202 int send_disconnect);
Craig Tiller52760dd2015-05-29 23:09:26 -0700203static void maybe_finish_shutdown(grpc_server *server);
Craig Tiller24be0f72015-02-10 14:04:22 -0800204
Craig Tiller3b29b562015-02-11 12:58:46 -0800205static int call_list_join(call_data **root, call_data *call, call_list list) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800206 GPR_ASSERT(!call->root[list]);
207 call->root[list] = root;
208 if (!*root) {
209 *root = call;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800210 call->links[list].next = call->links[list].prev = call;
211 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800212 call->links[list].next = *root;
213 call->links[list].prev = (*root)->links[list].prev;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800214 call->links[list].next->links[list].prev =
215 call->links[list].prev->links[list].next = call;
216 }
217 return 1;
218}
219
Craig Tiller04cc8be2015-02-10 16:11:22 -0800220static call_data *call_list_remove_head(call_data **root, call_list list) {
221 call_data *out = *root;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800222 if (out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800223 out->root[list] = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800224 if (out->links[list].next == out) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800225 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800226 } else {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800227 *root = out->links[list].next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800228 out->links[list].next->links[list].prev = out->links[list].prev;
229 out->links[list].prev->links[list].next = out->links[list].next;
230 }
231 }
232 return out;
233}
234
Craig Tiller04cc8be2015-02-10 16:11:22 -0800235static int call_list_remove(call_data *call, call_list list) {
236 call_data **root = call->root[list];
237 if (root == NULL) return 0;
238 call->root[list] = NULL;
239 if (*root == call) {
240 *root = call->links[list].next;
241 if (*root == call) {
242 *root = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800243 return 1;
244 }
245 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800246 GPR_ASSERT(*root != call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800247 call->links[list].next->links[list].prev = call->links[list].prev;
248 call->links[list].prev->links[list].next = call->links[list].next;
249 return 1;
250}
251
Craig Tiller24be0f72015-02-10 14:04:22 -0800252static void requested_call_array_destroy(requested_call_array *array) {
253 gpr_free(array->calls);
254}
255
256static requested_call *requested_call_array_add(requested_call_array *array) {
257 requested_call *rc;
258 if (array->count == array->capacity) {
259 array->capacity = GPR_MAX(array->capacity + 8, array->capacity * 2);
260 array->calls =
261 gpr_realloc(array->calls, sizeof(requested_call) * array->capacity);
262 }
263 rc = &array->calls[array->count++];
264 memset(rc, 0, sizeof(*rc));
265 return rc;
266}
267
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800268static void server_ref(grpc_server *server) {
269 gpr_ref(&server->internal_refcount);
270}
271
Craig Tilleree945e82015-05-26 16:15:34 -0700272static void server_delete(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800273 registered_method *rm;
Craig Tiller89504612015-04-27 11:48:46 -0700274 size_t i;
Craig Tilleree945e82015-05-26 16:15:34 -0700275 grpc_channel_args_destroy(server->channel_args);
276 gpr_mu_destroy(&server->mu);
277 gpr_free(server->channel_filters);
278 requested_call_array_destroy(&server->requested_calls);
279 while ((rm = server->registered_methods) != NULL) {
280 server->registered_methods = rm->next;
281 gpr_free(rm->method);
282 gpr_free(rm->host);
283 requested_call_array_destroy(&rm->requested);
284 gpr_free(rm);
285 }
286 for (i = 0; i < server->cq_count; i++) {
Craig Tiller463f2372015-05-28 16:16:15 -0700287 GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
Craig Tilleree945e82015-05-26 16:15:34 -0700288 }
289 gpr_free(server->cqs);
290 gpr_free(server->pollsets);
291 gpr_free(server->shutdown_tags);
292 gpr_free(server);
293}
294
295static void server_unref(grpc_server *server) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800296 if (gpr_unref(&server->internal_refcount)) {
Craig Tilleree945e82015-05-26 16:15:34 -0700297 server_delete(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800298 }
299}
300
301static int is_channel_orphaned(channel_data *chand) {
302 return chand->next == chand;
303}
304
305static void orphan_channel(channel_data *chand) {
306 chand->next->prev = chand->prev;
307 chand->prev->next = chand->next;
308 chand->next = chand->prev = chand;
309}
310
ctiller58393c22015-01-07 14:03:30 -0800311static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800312 channel_data *chand = cd;
313 grpc_server *server = chand->server;
Craig Tiller9ec2a522015-05-29 22:46:54 -0700314 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800315 server_unref(server);
316}
317
318static void destroy_channel(channel_data *chand) {
319 if (is_channel_orphaned(chand)) return;
320 GPR_ASSERT(chand->server != NULL);
321 orphan_channel(chand);
322 server_ref(chand->server);
Craig Tiller52760dd2015-05-29 23:09:26 -0700323 maybe_finish_shutdown(chand->server);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700324 chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
325 chand->finish_destroy_channel_closure.cb_arg = chand;
326 grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800327}
328
Craig Tiller3b29b562015-02-11 12:58:46 -0800329static void finish_start_new_rpc_and_unlock(grpc_server *server,
330 grpc_call_element *elem,
331 call_data **pending_root,
332 requested_call_array *array) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800333 requested_call rc;
334 call_data *calld = elem->call_data;
335 if (array->count == 0) {
336 calld->state = PENDING;
337 call_list_join(pending_root, calld, PENDING_START);
338 gpr_mu_unlock(&server->mu);
339 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -0800340 rc = array->calls[--array->count];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800341 calld->state = ACTIVATED;
342 gpr_mu_unlock(&server->mu);
343 begin_call(server, calld, &rc);
344 }
345}
346
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800347static void start_new_rpc(grpc_call_element *elem) {
348 channel_data *chand = elem->channel_data;
349 call_data *calld = elem->call_data;
350 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800351 gpr_uint32 i;
352 gpr_uint32 hash;
353 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800354
355 gpr_mu_lock(&server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800356 if (chand->registered_methods && calld->path && calld->host) {
Craig Tillera94beff2015-02-17 22:02:06 -0800357 /* TODO(ctiller): unify these two searches */
Craig Tiller04cc8be2015-02-10 16:11:22 -0800358 /* check for an exact match with host */
359 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
360 for (i = 0; i < chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800361 rm = &chand->registered_methods[(hash + i) %
362 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800363 if (!rm) break;
364 if (rm->host != calld->host) continue;
365 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800366 finish_start_new_rpc_and_unlock(server, elem,
367 &rm->server_registered_method->pending,
368 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800369 return;
370 }
371 /* check for a wildcard method definition (no host set) */
372 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800373 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800374 rm = &chand->registered_methods[(hash + i) %
375 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800376 if (!rm) break;
377 if (rm->host != NULL) continue;
378 if (rm->method != calld->path) continue;
Craig Tiller3b29b562015-02-11 12:58:46 -0800379 finish_start_new_rpc_and_unlock(server, elem,
380 &rm->server_registered_method->pending,
381 &rm->server_registered_method->requested);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800382 return;
383 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800384 }
Craig Tiller3b29b562015-02-11 12:58:46 -0800385 finish_start_new_rpc_and_unlock(server, elem, &server->lists[PENDING_START],
386 &server->requested_calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800387}
388
ctiller58393c22015-01-07 14:03:30 -0800389static void kill_zombie(void *elem, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800390 grpc_call_destroy(grpc_call_from_top_element(elem));
391}
392
Craig Tilleree945e82015-05-26 16:15:34 -0700393static int num_listeners(grpc_server *server) {
394 listener *l;
395 int n = 0;
396 for (l = server->listeners; l; l = l->next) {
397 n++;
398 }
399 return n;
400}
401
Craig Tillerdc627722015-05-26 15:27:02 -0700402static void maybe_finish_shutdown(grpc_server *server) {
Craig Tillerbce999f2015-05-27 09:55:51 -0700403 size_t i;
Craig Tillerc4a1f522015-05-29 22:32:04 -0700404 if (!server->shutdown || server->shutdown_published) {
405 return;
406 }
407 if (server->lists[ALL_CALLS] != NULL) {
408 gpr_log(GPR_DEBUG,
409 "Waiting for all calls to finish before destroying server");
410 return;
411 }
412 if (server->root_channel_data.next != &server->root_channel_data) {
413 gpr_log(GPR_DEBUG,
Craig Tiller3e9ff5c2015-05-29 23:03:13 -0700414 "Waiting for all channels to close before destroying server");
Craig Tillerc4a1f522015-05-29 22:32:04 -0700415 return;
416 }
417 if (server->listeners_destroyed < num_listeners(server)) {
418 gpr_log(GPR_DEBUG, "Waiting for all listeners to be destroyed (@ %d/%d)",
419 server->listeners_destroyed, num_listeners(server));
420 return;
421 }
422 server->shutdown_published = 1;
423 for (i = 0; i < server->num_shutdown_tags; i++) {
424 grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
425 NULL, 1);
Craig Tillerdc627722015-05-26 15:27:02 -0700426 }
427}
428
Craig Tiller6902ad22015-04-16 08:01:49 -0700429static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
430 grpc_call_element *elem = user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800431 channel_data *chand = elem->channel_data;
432 call_data *calld = elem->call_data;
Craig Tiller6902ad22015-04-16 08:01:49 -0700433 if (md->key == chand->path_key) {
434 calld->path = grpc_mdstr_ref(md->value);
435 return NULL;
436 } else if (md->key == chand->authority_key) {
437 calld->host = grpc_mdstr_ref(md->value);
438 return NULL;
439 }
440 return md;
441}
442
Craig Tillerc4a1f522015-05-29 22:32:04 -0700443static void decrement_call_count(channel_data *chand) {
444 chand->num_calls--;
445 if (0 == chand->num_calls && chand->server->shutdown) {
446 shutdown_channel(chand, 0, 1);
447 }
448 maybe_finish_shutdown(chand->server);
449}
450
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700451static void server_on_recv(void *ptr, int success) {
452 grpc_call_element *elem = ptr;
Craig Tiller6902ad22015-04-16 08:01:49 -0700453 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700454 channel_data *chand = elem->channel_data;
455
456 if (success && !calld->got_initial_metadata) {
457 size_t i;
458 size_t nops = calld->recv_ops->nops;
459 grpc_stream_op *ops = calld->recv_ops->ops;
460 for (i = 0; i < nops; i++) {
461 grpc_stream_op *op = &ops[i];
462 if (op->type != GRPC_OP_METADATA) continue;
Craig Tiller205aee12015-04-16 14:46:41 -0700463 grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700464 if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
Craig Tiller6902ad22015-04-16 08:01:49 -0700465 calld->deadline = op->data.metadata.deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800466 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700467 calld->got_initial_metadata = 1;
468 start_new_rpc(elem);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800469 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700470 }
471 }
472
473 switch (*calld->recv_state) {
Craig Tiller06aeea72015-04-23 10:54:45 -0700474 case GRPC_STREAM_OPEN:
475 break;
476 case GRPC_STREAM_SEND_CLOSED:
477 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700478 case GRPC_STREAM_RECV_CLOSED:
479 gpr_mu_lock(&chand->server->mu);
480 if (calld->state == NOT_STARTED) {
481 calld->state = ZOMBIED;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700482 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
483 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700484 }
485 gpr_mu_unlock(&chand->server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800486 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700487 case GRPC_STREAM_CLOSED:
488 gpr_mu_lock(&chand->server->mu);
489 if (calld->state == NOT_STARTED) {
490 calld->state = ZOMBIED;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700491 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
492 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700493 } else if (calld->state == PENDING) {
494 call_list_remove(calld, PENDING_START);
Craig Tillerc9d03822015-05-20 16:08:45 -0700495 calld->state = ZOMBIED;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700496 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
497 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700498 }
Craig Tiller29f79dc2015-05-27 15:59:23 -0700499 if (call_list_remove(calld, ALL_CALLS)) {
Craig Tillerc4a1f522015-05-29 22:32:04 -0700500 decrement_call_count(chand);
Craig Tiller29f79dc2015-05-27 15:59:23 -0700501 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700502 gpr_mu_unlock(&chand->server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800503 break;
504 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700505
Craig Tiller1e6facb2015-06-11 22:47:11 -0700506 calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700507}
508
Craig Tiller50d9db52015-04-23 10:52:14 -0700509static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700510 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700511
512 if (op->recv_ops) {
513 /* substitute our callback for the higher callback */
514 calld->recv_ops = op->recv_ops;
515 calld->recv_state = op->recv_state;
516 calld->on_done_recv = op->on_done_recv;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700517 op->on_done_recv = &calld->server_on_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700518 }
Craig Tiller50d9db52015-04-23 10:52:14 -0700519}
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700520
Craig Tiller06aeea72015-04-23 10:54:45 -0700521static void server_start_transport_op(grpc_call_element *elem,
522 grpc_transport_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -0700523 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
524 server_mutate_op(elem, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700525 grpc_call_next_op(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800526}
527
ctillerf962f522014-12-10 15:28:27 -0800528static void channel_op(grpc_channel_element *elem,
529 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800530 channel_data *chand = elem->channel_data;
Craig Tiller8b976d02015-02-05 21:41:23 -0800531 grpc_server *server = chand->server;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800532
533 switch (op->type) {
534 case GRPC_ACCEPT_CALL:
535 /* create a call */
Craig Tillerfb189f82015-02-03 12:07:07 -0800536 grpc_call_create(chand->channel, NULL,
Craig Tiller87d5b192015-04-16 14:37:57 -0700537 op->data.accept_call.transport_server_data, NULL, 0,
538 gpr_inf_future);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800539 break;
540 case GRPC_TRANSPORT_CLOSED:
541 /* if the transport is closed for a server channel, we destroy the
542 channel */
Craig Tiller8b976d02015-02-05 21:41:23 -0800543 gpr_mu_lock(&server->mu);
544 server_ref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800545 destroy_channel(chand);
Craig Tiller8b976d02015-02-05 21:41:23 -0800546 gpr_mu_unlock(&server->mu);
547 server_unref(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800548 break;
nnoble0c475f02014-12-05 15:37:39 -0800549 case GRPC_TRANSPORT_GOAWAY:
550 gpr_slice_unref(op->data.goaway.message);
551 break;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800552 default:
553 GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
554 grpc_channel_next_op(elem, op);
555 break;
556 }
557}
558
Craig Tillerb76d05b2015-05-29 17:21:56 -0700559typedef struct {
560 channel_data *chand;
561 int send_goaway;
562 int send_disconnect;
Craig Tiller668e3582015-06-01 21:21:42 -0700563 grpc_iomgr_closure finish_shutdown_channel_closure;
Craig Tillerb76d05b2015-05-29 17:21:56 -0700564} shutdown_channel_args;
565
566static void finish_shutdown_channel(void *p, int success) {
567 shutdown_channel_args *sca = p;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800568 grpc_channel_op op;
Craig Tillerb76d05b2015-05-29 17:21:56 -0700569
570 if (sca->send_goaway) {
571 op.type = GRPC_CHANNEL_GOAWAY;
572 op.dir = GRPC_CALL_DOWN;
573 op.data.goaway.status = GRPC_STATUS_OK;
574 op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
575 channel_op(grpc_channel_stack_element(
576 grpc_channel_get_channel_stack(sca->chand->channel), 0),
577 NULL, &op);
578 }
579 if (sca->send_disconnect) {
580 op.type = GRPC_CHANNEL_DISCONNECT;
581 op.dir = GRPC_CALL_DOWN;
582 channel_op(grpc_channel_stack_element(
583 grpc_channel_get_channel_stack(sca->chand->channel), 0),
584 NULL, &op);
585 }
Craig Tiller9ec2a522015-05-29 22:46:54 -0700586 GRPC_CHANNEL_INTERNAL_UNREF(sca->chand->channel, "shutdown");
Craig Tillerfe020aa2015-06-01 09:18:50 -0700587
588 gpr_free(sca);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800589}
590
Craig Tiller8674cb12015-06-05 07:09:25 -0700591static void shutdown_channel(channel_data *chand, int send_goaway,
592 int send_disconnect) {
Craig Tillerb76d05b2015-05-29 17:21:56 -0700593 shutdown_channel_args *sca;
Craig Tiller9ec2a522015-05-29 22:46:54 -0700594 GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown");
Craig Tillerb76d05b2015-05-29 17:21:56 -0700595 sca = gpr_malloc(sizeof(shutdown_channel_args));
596 sca->chand = chand;
597 sca->send_goaway = send_goaway;
598 sca->send_disconnect = send_disconnect;
Craig Tiller668e3582015-06-01 21:21:42 -0700599 sca->finish_shutdown_channel_closure.cb = finish_shutdown_channel;
600 sca->finish_shutdown_channel_closure.cb_arg = sca;
601 grpc_iomgr_add_callback(&sca->finish_shutdown_channel_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800602}
603
604static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700605 const void *server_transport_data,
606 grpc_transport_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800607 call_data *calld = elem->call_data;
608 channel_data *chand = elem->channel_data;
609 memset(calld, 0, sizeof(call_data));
610 calld->deadline = gpr_inf_future;
611 calld->call = grpc_call_from_top_element(elem);
612
Craig Tiller1e6facb2015-06-11 22:47:11 -0700613 grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
614
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800615 gpr_mu_lock(&chand->server->mu);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800616 call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
Craig Tillerb76d05b2015-05-29 17:21:56 -0700617 chand->num_calls++;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800618 gpr_mu_unlock(&chand->server->mu);
619
620 server_ref(chand->server);
Craig Tiller50d9db52015-04-23 10:52:14 -0700621
Craig Tiller482ef8b2015-04-23 11:38:20 -0700622 if (initial_op) server_mutate_op(elem, initial_op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800623}
624
625static void destroy_call_elem(grpc_call_element *elem) {
626 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800627 call_data *calld = elem->call_data;
Craig Tilleree945e82015-05-26 16:15:34 -0700628 int removed[CALL_LIST_COUNT];
Craig Tillerdc627722015-05-26 15:27:02 -0700629 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800630
631 gpr_mu_lock(&chand->server->mu);
632 for (i = 0; i < CALL_LIST_COUNT; i++) {
Craig Tilleree945e82015-05-26 16:15:34 -0700633 removed[i] = call_list_remove(elem->call_data, i);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800634 }
Craig Tilleree945e82015-05-26 16:15:34 -0700635 if (removed[ALL_CALLS]) {
Craig Tillerc4a1f522015-05-29 22:32:04 -0700636 decrement_call_count(chand);
Craig Tilleree945e82015-05-26 16:15:34 -0700637 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800638 gpr_mu_unlock(&chand->server->mu);
639
Craig Tiller4df31a62015-01-30 09:44:31 -0800640 if (calld->host) {
641 grpc_mdstr_unref(calld->host);
642 }
643 if (calld->path) {
644 grpc_mdstr_unref(calld->path);
645 }
646
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800647 server_unref(chand->server);
648}
649
650static void init_channel_elem(grpc_channel_element *elem,
651 const grpc_channel_args *args,
652 grpc_mdctx *metadata_context, int is_first,
653 int is_last) {
654 channel_data *chand = elem->channel_data;
655 GPR_ASSERT(is_first);
656 GPR_ASSERT(!is_last);
657 chand->server = NULL;
Craig Tillerb76d05b2015-05-29 17:21:56 -0700658 chand->num_calls = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800659 chand->channel = NULL;
Craig Tillercce17ac2015-01-20 09:29:28 -0800660 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
661 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800662 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800663 chand->registered_methods = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800664}
665
666static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800667 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800668 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800669 if (chand->registered_methods) {
670 for (i = 0; i < chand->registered_method_slots; i++) {
671 if (chand->registered_methods[i].method) {
672 grpc_mdstr_unref(chand->registered_methods[i].method);
673 }
674 if (chand->registered_methods[i].host) {
675 grpc_mdstr_unref(chand->registered_methods[i].host);
676 }
677 }
678 gpr_free(chand->registered_methods);
679 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800680 if (chand->server) {
681 gpr_mu_lock(&chand->server->mu);
682 chand->next->prev = chand->prev;
683 chand->prev->next = chand->next;
684 chand->next = chand->prev = chand;
Craig Tillerb76d05b2015-05-29 17:21:56 -0700685 maybe_finish_shutdown(chand->server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800686 gpr_mu_unlock(&chand->server->mu);
Craig Tillercce17ac2015-01-20 09:29:28 -0800687 grpc_mdstr_unref(chand->path_key);
688 grpc_mdstr_unref(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800689 server_unref(chand->server);
690 }
691}
692
693static const grpc_channel_filter server_surface_filter = {
Craig Tillerb76d05b2015-05-29 17:21:56 -0700694 server_start_transport_op,
695 channel_op,
696 sizeof(call_data),
697 init_call_elem,
698 destroy_call_elem,
699 sizeof(channel_data),
700 init_channel_elem,
701 destroy_channel_elem,
702 "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800703};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800704
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700705void grpc_server_register_completion_queue(grpc_server *server,
706 grpc_completion_queue *cq) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800707 size_t i, n;
708 for (i = 0; i < server->cq_count; i++) {
709 if (server->cqs[i] == cq) return;
710 }
Craig Tiller463f2372015-05-28 16:16:15 -0700711 GRPC_CQ_INTERNAL_REF(cq, "server");
Craig Tiller20bc56d2015-02-12 09:02:56 -0800712 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800713 server->cqs = gpr_realloc(server->cqs,
714 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800715 server->cqs[n] = cq;
716}
717
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700718grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800719 size_t filter_count,
720 const grpc_channel_args *args) {
721 size_t i;
Alistair Veitch9d48ebf2015-06-01 10:01:03 -0700722 /* TODO(census): restore this once we finalize census filter etc.
723 int census_enabled = grpc_channel_args_is_census_enabled(args); */
724 int census_enabled = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800725
726 grpc_server *server = gpr_malloc(sizeof(grpc_server));
Craig Tiller60fd3612015-03-05 16:24:22 -0800727
728 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
729
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800730 memset(server, 0, sizeof(grpc_server));
731
732 gpr_mu_init(&server->mu);
733
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800734 /* decremented by grpc_server_destroy */
735 gpr_ref_init(&server->internal_refcount, 1);
736 server->root_channel_data.next = server->root_channel_data.prev =
737 &server->root_channel_data;
738
739 /* Server filter stack is:
740
741 server_surface_filter - for making surface API calls
742 grpc_server_census_filter (optional) - for stats collection and tracing
743 {passed in filter stack}
744 grpc_connected_channel_filter - for interfacing with transports */
745 server->channel_filter_count = filter_count + 1 + census_enabled;
746 server->channel_filters =
747 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
748 server->channel_filters[0] = &server_surface_filter;
Alistair Veitch9d48ebf2015-06-01 10:01:03 -0700749 /* TODO(census): restore this once we rework census filter
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800750 if (census_enabled) {
751 server->channel_filters[1] = &grpc_server_census_filter;
Alistair Veitch9686dab2015-05-26 14:26:47 -0700752 } */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800753 for (i = 0; i < filter_count; i++) {
754 server->channel_filters[i + 1 + census_enabled] = filters[i];
755 }
756
757 server->channel_args = grpc_channel_args_copy(args);
758
759 return server;
760}
761
Craig Tiller24be0f72015-02-10 14:04:22 -0800762static int streq(const char *a, const char *b) {
763 if (a == NULL && b == NULL) return 1;
764 if (a == NULL) return 0;
765 if (b == NULL) return 0;
766 return 0 == strcmp(a, b);
767}
768
769void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700770 const char *host) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800771 registered_method *m;
772 if (!method) {
Craig Tillerb76d05b2015-05-29 17:21:56 -0700773 gpr_log(GPR_ERROR,
774 "grpc_server_register_method method string cannot be NULL");
Craig Tiller24be0f72015-02-10 14:04:22 -0800775 return NULL;
776 }
777 for (m = server->registered_methods; m; m = m->next) {
778 if (streq(m->method, method) && streq(m->host, host)) {
779 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
780 host ? host : "*");
781 return NULL;
782 }
783 }
784 m = gpr_malloc(sizeof(registered_method));
785 memset(m, 0, sizeof(*m));
786 m->method = gpr_strdup(method);
787 m->host = gpr_strdup(host);
788 m->next = server->registered_methods;
789 server->registered_methods = m;
790 return m;
791}
792
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800793void grpc_server_start(grpc_server *server) {
794 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800795 size_t i;
796
Craig Tillerec3257c2015-02-12 15:59:43 -0800797 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800798 for (i = 0; i < server->cq_count; i++) {
799 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
800 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800801
802 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800803 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800804 }
805}
806
807grpc_transport_setup_result grpc_server_setup_transport(
808 grpc_server *s, grpc_transport *transport,
809 grpc_channel_filter const **extra_filters, size_t num_extra_filters,
Julien Boeufc6f8d0a2015-05-11 22:40:02 -0700810 grpc_mdctx *mdctx, const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800811 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
812 grpc_channel_filter const **filters =
813 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
814 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800815 size_t num_registered_methods;
816 size_t alloc;
817 registered_method *rm;
818 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800819 grpc_channel *channel;
820 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800821 grpc_mdstr *host;
822 grpc_mdstr *method;
823 gpr_uint32 hash;
824 gpr_uint32 slots;
825 gpr_uint32 probes;
826 gpr_uint32 max_probes = 0;
Craig Tiller5d6bd442015-02-12 22:50:38 -0800827 grpc_transport_setup_result result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800828
829 for (i = 0; i < s->channel_filter_count; i++) {
830 filters[i] = s->channel_filters[i];
831 }
832 for (; i < s->channel_filter_count + num_extra_filters; i++) {
833 filters[i] = extra_filters[i - s->channel_filter_count];
834 }
835 filters[i] = &grpc_connected_channel_filter;
836
Craig Tiller20bc56d2015-02-12 09:02:56 -0800837 for (i = 0; i < s->cq_count; i++) {
838 grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
839 }
ctillerd79b4862014-12-17 16:36:59 -0800840
Julien Boeufc6f8d0a2015-05-11 22:40:02 -0700841 channel =
842 grpc_channel_create_from_filters(filters, num_filters, args, mdctx, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800843 chand = (channel_data *)grpc_channel_stack_element(
Craig Tillerc02c1d82015-04-07 16:21:55 -0700844 grpc_channel_get_channel_stack(channel), 0)
845 ->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800846 chand->server = s;
847 server_ref(s);
848 chand->channel = channel;
849
Craig Tiller04cc8be2015-02-10 16:11:22 -0800850 num_registered_methods = 0;
851 for (rm = s->registered_methods; rm; rm = rm->next) {
852 num_registered_methods++;
853 }
854 /* build a lookup table phrased in terms of mdstr's in this channels context
855 to quickly find registered methods */
856 if (num_registered_methods > 0) {
857 slots = 2 * num_registered_methods;
858 alloc = sizeof(channel_registered_method) * slots;
859 chand->registered_methods = gpr_malloc(alloc);
860 memset(chand->registered_methods, 0, alloc);
861 for (rm = s->registered_methods; rm; rm = rm->next) {
862 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
Craig Tillere76c9662015-02-11 13:18:31 -0800863 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800864 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800865 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
Craig Tillerc02c1d82015-04-07 16:21:55 -0700866 .server_registered_method != NULL;
Craig Tiller3b29b562015-02-11 12:58:46 -0800867 probes++)
868 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800869 if (probes > max_probes) max_probes = probes;
870 crm = &chand->registered_methods[(hash + probes) % slots];
871 crm->server_registered_method = rm;
872 crm->host = host;
873 crm->method = method;
874 }
875 chand->registered_method_slots = slots;
876 chand->registered_method_max_probes = max_probes;
877 }
878
Craig Tiller5d6bd442015-02-12 22:50:38 -0800879 result = grpc_connected_channel_bind_transport(
880 grpc_channel_get_channel_stack(channel), transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800881
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800882 gpr_mu_lock(&s->mu);
883 chand->next = &s->root_channel_data;
884 chand->prev = chand->next->prev;
885 chand->next->prev = chand->prev->next = chand;
886 gpr_mu_unlock(&s->mu);
887
888 gpr_free(filters);
889
Craig Tiller5d6bd442015-02-12 22:50:38 -0800890 return result;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800891}
892
Craig Tillerbce999f2015-05-27 09:55:51 -0700893void grpc_server_shutdown_and_notify(grpc_server *server,
894 grpc_completion_queue *cq, void *tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800895 listener *l;
Craig Tiller24be0f72015-02-10 14:04:22 -0800896 requested_call_array requested_calls;
nnoble0c475f02014-12-05 15:37:39 -0800897 channel_data *c;
Craig Tillerdc627722015-05-26 15:27:02 -0700898 size_t i;
Craig Tillerbd217572015-02-11 18:10:56 -0800899 registered_method *rm;
Craig Tillerbce999f2015-05-27 09:55:51 -0700900 shutdown_tag *sdt;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800901
902 /* lock, and gather up some stuff to do */
903 gpr_mu_lock(&server->mu);
Craig Tiller29f79dc2015-05-27 15:59:23 -0700904 grpc_cq_begin_op(cq, NULL);
Craig Tilleree945e82015-05-26 16:15:34 -0700905 server->shutdown_tags =
906 gpr_realloc(server->shutdown_tags,
Craig Tiller208d2122015-05-29 08:50:08 -0700907 sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
Craig Tillerbce999f2015-05-27 09:55:51 -0700908 sdt = &server->shutdown_tags[server->num_shutdown_tags++];
909 sdt->tag = tag;
910 sdt->cq = cq;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800911 if (server->shutdown) {
912 gpr_mu_unlock(&server->mu);
913 return;
914 }
915
nnoble0c475f02014-12-05 15:37:39 -0800916 for (c = server->root_channel_data.next; c != &server->root_channel_data;
917 c = c->next) {
Craig Tillerb76d05b2015-05-29 17:21:56 -0700918 shutdown_channel(c, 1, c->num_calls == 0);
nnoble0c475f02014-12-05 15:37:39 -0800919 }
920
Craig Tillerbd217572015-02-11 18:10:56 -0800921 /* collect all unregistered then registered calls */
Craig Tillercce17ac2015-01-20 09:29:28 -0800922 requested_calls = server->requested_calls;
Craig Tiller24be0f72015-02-10 14:04:22 -0800923 memset(&server->requested_calls, 0, sizeof(server->requested_calls));
Craig Tillerbd217572015-02-11 18:10:56 -0800924 for (rm = server->registered_methods; rm; rm = rm->next) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800925 if (requested_calls.count + rm->requested.count >
926 requested_calls.capacity) {
927 requested_calls.capacity =
928 GPR_MAX(requested_calls.count + rm->requested.count,
929 2 * requested_calls.capacity);
930 requested_calls.calls =
931 gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
932 requested_calls.capacity);
Craig Tillerbd217572015-02-11 18:10:56 -0800933 }
Craig Tillerec3257c2015-02-12 15:59:43 -0800934 memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
935 sizeof(*requested_calls.calls) * rm->requested.count);
Craig Tillerbd217572015-02-11 18:10:56 -0800936 requested_calls.count += rm->requested.count;
Craig Tillerec3257c2015-02-12 15:59:43 -0800937 gpr_free(rm->requested.calls);
Craig Tillerbd217572015-02-11 18:10:56 -0800938 memset(&rm->requested, 0, sizeof(rm->requested));
939 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800940
941 server->shutdown = 1;
Craig Tillerdc627722015-05-26 15:27:02 -0700942 maybe_finish_shutdown(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800943 gpr_mu_unlock(&server->mu);
944
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800945 /* terminate all the requested calls */
Craig Tiller24be0f72015-02-10 14:04:22 -0800946 for (i = 0; i < requested_calls.count; i++) {
947 fail_call(server, &requested_calls.calls[i]);
Craig Tillercce17ac2015-01-20 09:29:28 -0800948 }
Craig Tiller24be0f72015-02-10 14:04:22 -0800949 gpr_free(requested_calls.calls);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800950
951 /* Shutdown listeners */
952 for (l = server->listeners; l; l = l->next) {
953 l->destroy(server, l->arg);
954 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800955}
956
Craig Tilleraec96aa2015-04-07 14:32:15 -0700957void grpc_server_listener_destroy_done(void *s) {
958 grpc_server *server = s;
959 gpr_mu_lock(&server->mu);
960 server->listeners_destroyed++;
Craig Tilleree945e82015-05-26 16:15:34 -0700961 maybe_finish_shutdown(server);
Craig Tilleraec96aa2015-04-07 14:32:15 -0700962 gpr_mu_unlock(&server->mu);
963}
964
Craig Tillerafa2d632015-05-26 16:39:13 -0700965void grpc_server_cancel_all_calls(grpc_server *server) {
966 call_data *calld;
967 grpc_call **calls;
968 size_t call_count;
969 size_t call_capacity;
970 int is_first = 1;
971 size_t i;
972
973 gpr_mu_lock(&server->mu);
974
975 GPR_ASSERT(server->shutdown);
976
977 if (!server->lists[ALL_CALLS]) {
978 gpr_mu_unlock(&server->mu);
979 return;
980 }
981
982 call_capacity = 8;
983 call_count = 0;
984 calls = gpr_malloc(sizeof(grpc_call *) * call_capacity);
985
Craig Tillerb76d05b2015-05-29 17:21:56 -0700986 for (calld = server->lists[ALL_CALLS];
987 calld != server->lists[ALL_CALLS] || is_first;
988 calld = calld->links[ALL_CALLS].next) {
Craig Tillerafa2d632015-05-26 16:39:13 -0700989 if (call_count == call_capacity) {
990 call_capacity *= 2;
991 calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity);
992 }
993 calls[call_count++] = calld->call;
994 GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all");
995 is_first = 0;
996 }
997
998 gpr_mu_unlock(&server->mu);
999
1000 for (i = 0; i < call_count; i++) {
Craig Tillerb76d05b2015-05-29 17:21:56 -07001001 grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE,
1002 "Unavailable");
Craig Tillerafa2d632015-05-26 16:39:13 -07001003 GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1);
1004 }
1005
1006 gpr_free(calls);
1007}
1008
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001009void grpc_server_destroy(grpc_server *server) {
Craig Tilleraec96aa2015-04-07 14:32:15 -07001010 listener *l;
Craig Tiller872af022015-04-24 15:57:52 -07001011
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001012 gpr_mu_lock(&server->mu);
Masood Malekghassemi38bb18f2015-06-10 17:32:02 -07001013 GPR_ASSERT(server->shutdown || !server->listeners);
Craig Tilleree945e82015-05-26 16:15:34 -07001014 GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
Craig Tilleraec96aa2015-04-07 14:32:15 -07001015
1016 while (server->listeners) {
1017 l = server->listeners;
1018 server->listeners = l->next;
1019 gpr_free(l);
1020 }
1021
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001022 gpr_mu_unlock(&server->mu);
1023
1024 server_unref(server);
1025}
1026
1027void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -08001028 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -08001029 grpc_pollset **pollsets,
1030 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001031 void (*destroy)(grpc_server *server, void *arg)) {
1032 listener *l = gpr_malloc(sizeof(listener));
1033 l->arg = arg;
1034 l->start = start;
1035 l->destroy = destroy;
1036 l->next = server->listeners;
1037 server->listeners = l;
1038}
1039
Craig Tiller9f28ac22015-01-27 17:01:29 -08001040static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -08001041 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001042 call_data *calld = NULL;
Craig Tiller0ef1a922015-02-11 16:23:01 -08001043 requested_call_array *requested_calls = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001044 gpr_mu_lock(&server->mu);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001045 if (server->shutdown) {
1046 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -08001047 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001048 return GRPC_CALL_OK;
1049 }
Craig Tiller04cc8be2015-02-10 16:11:22 -08001050 switch (rc->type) {
Craig Tiller04cc8be2015-02-10 16:11:22 -08001051 case BATCH_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -08001052 calld =
1053 call_list_remove_head(&server->lists[PENDING_START], PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -08001054 requested_calls = &server->requested_calls;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001055 break;
1056 case REGISTERED_CALL:
Craig Tiller3b29b562015-02-11 12:58:46 -08001057 calld = call_list_remove_head(
1058 &rc->data.registered.registered_method->pending, PENDING_START);
Craig Tiller0ef1a922015-02-11 16:23:01 -08001059 requested_calls = &rc->data.registered.registered_method->requested;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001060 break;
1061 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001062 if (calld) {
Craig Tillercce17ac2015-01-20 09:29:28 -08001063 GPR_ASSERT(calld->state == PENDING);
Craig Tillerbb3f22f2015-01-29 16:40:56 -08001064 calld->state = ACTIVATED;
Craig Tillercce17ac2015-01-20 09:29:28 -08001065 gpr_mu_unlock(&server->mu);
Craig Tiller24be0f72015-02-10 14:04:22 -08001066 begin_call(server, calld, rc);
Craig Tillercce17ac2015-01-20 09:29:28 -08001067 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001068 } else {
Craig Tiller0ef1a922015-02-11 16:23:01 -08001069 *requested_call_array_add(requested_calls) = *rc;
Craig Tillercce17ac2015-01-20 09:29:28 -08001070 gpr_mu_unlock(&server->mu);
1071 return GRPC_CALL_OK;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001072 }
Craig Tillercce17ac2015-01-20 09:29:28 -08001073}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001074
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001075grpc_call_error grpc_server_request_call(
1076 grpc_server *server, grpc_call **call, grpc_call_details *details,
1077 grpc_metadata_array *initial_metadata,
1078 grpc_completion_queue *cq_bound_to_call,
1079 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001080 requested_call rc;
murgatroid99ad7c20c2015-05-22 14:42:29 -07001081 GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
1082 initial_metadata, cq_bound_to_call,
1083 cq_for_notification, tag);
Craig Tiller54478f82015-05-12 14:08:56 -07001084 grpc_cq_begin_op(cq_for_notification, NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001085 rc.type = BATCH_CALL;
1086 rc.tag = tag;
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001087 rc.cq_bound_to_call = cq_bound_to_call;
1088 rc.cq_for_notification = cq_for_notification;
1089 rc.call = call;
Craig Tiller24be0f72015-02-10 14:04:22 -08001090 rc.data.batch.details = details;
1091 rc.data.batch.initial_metadata = initial_metadata;
1092 return queue_call_request(server, &rc);
1093}
1094
1095grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -08001096 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
1097 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001098 grpc_completion_queue *cq_bound_to_call,
1099 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001100 requested_call rc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001101 registered_method *registered_method = rm;
Craig Tiller54478f82015-05-12 14:08:56 -07001102 grpc_cq_begin_op(cq_for_notification, NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001103 rc.type = REGISTERED_CALL;
1104 rc.tag = tag;
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001105 rc.cq_bound_to_call = cq_bound_to_call;
1106 rc.cq_for_notification = cq_for_notification;
1107 rc.call = call;
Craig Tiller24be0f72015-02-10 14:04:22 -08001108 rc.data.registered.registered_method = registered_method;
1109 rc.data.registered.deadline = deadline;
1110 rc.data.registered.initial_metadata = initial_metadata;
1111 rc.data.registered.optional_payload = optional_payload;
1112 return queue_call_request(server, &rc);
1113}
1114
Craig Tiller64be9f72015-05-04 14:53:51 -07001115static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller24be0f72015-02-10 14:04:22 -08001116 void *tag);
Craig Tiller64be9f72015-05-04 14:53:51 -07001117static void publish_was_not_set(grpc_call *call, int success, void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001118 abort();
1119}
Craig Tiller24be0f72015-02-10 14:04:22 -08001120
Craig Tiller166e2502015-02-03 20:14:41 -08001121static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1122 gpr_slice slice = value->slice;
1123 size_t len = GPR_SLICE_LENGTH(slice);
1124
1125 if (len + 1 > *capacity) {
1126 *capacity = GPR_MAX(len + 1, *capacity * 2);
1127 *dest = gpr_realloc(*dest, *capacity);
1128 }
1129 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1130}
1131
Craig Tiller24be0f72015-02-10 14:04:22 -08001132static void begin_call(grpc_server *server, call_data *calld,
1133 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001134 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001135 grpc_ioreq req[2];
1136 grpc_ioreq *r = req;
1137
1138 /* called once initial metadata has been read by the call, but BEFORE
1139 the ioreq to fetch it out of the call has been executed.
1140 This means metadata related fields can be relied on in calld, but to
1141 fill in the metadata array passed by the client, we need to perform
1142 an ioreq op, that should complete immediately. */
1143
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001144 grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
1145 *rc->call = calld->call;
1146 calld->cq_new = rc->cq_for_notification;
Craig Tiller24be0f72015-02-10 14:04:22 -08001147 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001148 case BATCH_CALL:
1149 cpstr(&rc->data.batch.details->host,
1150 &rc->data.batch.details->host_capacity, calld->host);
1151 cpstr(&rc->data.batch.details->method,
1152 &rc->data.batch.details->method_capacity, calld->path);
Masood Malekghassemibf177c82015-04-27 12:14:38 -07001153 rc->data.batch.details->deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001154 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1155 r->data.recv_metadata = rc->data.batch.initial_metadata;
1156 r++;
1157 publish = publish_registered_or_batch;
1158 break;
1159 case REGISTERED_CALL:
1160 *rc->data.registered.deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001161 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1162 r->data.recv_metadata = rc->data.registered.initial_metadata;
1163 r++;
1164 if (rc->data.registered.optional_payload) {
1165 r->op = GRPC_IOREQ_RECV_MESSAGE;
1166 r->data.recv_message = rc->data.registered.optional_payload;
1167 r++;
1168 }
1169 publish = publish_registered_or_batch;
1170 break;
1171 }
1172
Craig Tiller4df412b2015-04-28 07:57:54 -07001173 GRPC_CALL_INTERNAL_REF(calld->call, "server");
Craig Tiller24be0f72015-02-10 14:04:22 -08001174 grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
1175 rc->tag);
1176}
1177
1178static void fail_call(grpc_server *server, requested_call *rc) {
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001179 *rc->call = NULL;
Craig Tiller24be0f72015-02-10 14:04:22 -08001180 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001181 case BATCH_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001182 rc->data.batch.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001183 break;
1184 case REGISTERED_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001185 rc->data.registered.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001186 break;
1187 }
Craig Tiller54478f82015-05-12 14:08:56 -07001188 grpc_cq_end_op(rc->cq_for_notification, rc->tag, NULL, 0);
Craig Tiller24be0f72015-02-10 14:04:22 -08001189}
1190
Craig Tiller64be9f72015-05-04 14:53:51 -07001191static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller24be0f72015-02-10 14:04:22 -08001192 void *tag) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001193 grpc_call_element *elem =
1194 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller20bc56d2015-02-12 09:02:56 -08001195 call_data *calld = elem->call_data;
Craig Tiller64be9f72015-05-04 14:53:51 -07001196 grpc_cq_end_op(calld->cq_new, tag, call, success);
Craig Tiller24be0f72015-02-10 14:04:22 -08001197}
1198
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001199const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1200 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001201}
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001202
1203int grpc_server_has_open_connections(grpc_server *server) {
1204 int r;
1205 gpr_mu_lock(&server->mu);
1206 r = server->root_channel_data.next != &server->root_channel_data;
1207 gpr_mu_unlock(&server->mu);
1208 return r;
1209}