blob: ebd2ef9af35ae26de544653abc6679855d55f204 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
Craig Tillerf96dfc32015-09-10 14:43:18 -070036#include <limits.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080037#include <stdlib.h>
38#include <string.h>
39
Craig Tiller6a006ce2015-07-13 16:25:40 -070040#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
42#include <grpc/support/string_util.h>
43#include <grpc/support/useful.h>
44
Hongyu Chene09dc782015-08-21 11:28:33 -070045#include "src/core/census/grpc_filter.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080046#include "src/core/channel/channel_args.h"
47#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080048#include "src/core/iomgr/iomgr.h"
Craig Tiller6a006ce2015-07-13 16:25:40 -070049#include "src/core/support/stack_lockfree.h"
Craig Tiller485d7762015-01-23 12:54:05 -080050#include "src/core/support/string.h"
Masood Malekghassemi76c3d742015-08-19 18:22:53 -070051#include "src/core/surface/api_trace.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080052#include "src/core/surface/call.h"
53#include "src/core/surface/channel.h"
54#include "src/core/surface/completion_queue.h"
Craig Tiller60fd3612015-03-05 16:24:22 -080055#include "src/core/surface/init.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080056#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080057
Craig Tillera82950e2015-09-22 12:33:20 -070058typedef struct listener {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080059 void *arg;
Craig Tillera82950e2015-09-22 12:33:20 -070060 void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
61 grpc_pollset **pollsets, size_t pollset_count);
62 void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
63 grpc_closure *closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080064 struct listener *next;
Craig Tillerdfff1b82015-09-21 14:39:57 -070065 grpc_closure destroy_done;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080066} listener;
67
68typedef struct call_data call_data;
69typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080070typedef struct registered_method registered_method;
71
Craig Tillera82950e2015-09-22 12:33:20 -070072typedef struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080073 call_data *next;
74 call_data *prev;
75} call_link;
76
Craig Tillera82950e2015-09-22 12:33:20 -070077typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
Craig Tiller24be0f72015-02-10 14:04:22 -080078
Craig Tillera82950e2015-09-22 12:33:20 -070079typedef struct requested_call {
Craig Tiller24be0f72015-02-10 14:04:22 -080080 requested_call_type type;
81 void *tag;
Craig Tiller6a006ce2015-07-13 16:25:40 -070082 grpc_server *server;
Craig Tillerf9e6adf2015-05-06 11:45:59 -070083 grpc_completion_queue *cq_bound_to_call;
84 grpc_completion_queue *cq_for_notification;
85 grpc_call **call;
Craig Tiller97fc6a32015-07-08 15:31:35 -070086 grpc_cq_completion completion;
Craig Tillera82950e2015-09-22 12:33:20 -070087 union {
88 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080089 grpc_call_details *details;
90 grpc_metadata_array *initial_metadata;
91 } batch;
Craig Tillera82950e2015-09-22 12:33:20 -070092 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080093 registered_method *registered_method;
94 gpr_timespec *deadline;
95 grpc_metadata_array *initial_metadata;
96 grpc_byte_buffer **optional_payload;
97 } registered;
98 } data;
99} requested_call;
100
Craig Tillera82950e2015-09-22 12:33:20 -0700101typedef struct channel_registered_method {
Craig Tiller24be0f72015-02-10 14:04:22 -0800102 registered_method *server_registered_method;
103 grpc_mdstr *method;
104 grpc_mdstr *host;
105} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800106
Craig Tillera82950e2015-09-22 12:33:20 -0700107struct channel_data {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800108 grpc_server *server;
Craig Tillere039f032015-06-25 12:54:23 -0700109 grpc_connectivity_state connectivity_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800110 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800111 grpc_mdstr *path_key;
112 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800113 /* linked list of all channels on a server */
114 channel_data *next;
115 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800116 channel_registered_method *registered_methods;
117 gpr_uint32 registered_method_slots;
118 gpr_uint32 registered_method_max_probes;
Craig Tiller33825112015-09-18 07:44:19 -0700119 grpc_closure finish_destroy_channel_closure;
120 grpc_closure channel_connectivity_changed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800121};
122
Craig Tillera82950e2015-09-22 12:33:20 -0700123typedef struct shutdown_tag {
Craig Tillerbce999f2015-05-27 09:55:51 -0700124 void *tag;
125 grpc_completion_queue *cq;
Craig Tiller97fc6a32015-07-08 15:31:35 -0700126 grpc_cq_completion completion;
Craig Tillerbce999f2015-05-27 09:55:51 -0700127} shutdown_tag;
128
Craig Tillera82950e2015-09-22 12:33:20 -0700129typedef enum {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800130 /* waiting for metadata */
131 NOT_STARTED,
132 /* inital metadata read, not flow controlled in yet */
133 PENDING,
134 /* flow controlled in, on completion queue */
135 ACTIVATED,
136 /* cancelled before being queued */
137 ZOMBIED
138} call_state;
139
Craig Tiller729b35a2015-07-13 12:36:47 -0700140typedef struct request_matcher request_matcher;
141
Craig Tillera82950e2015-09-22 12:33:20 -0700142struct call_data {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800143 grpc_call *call;
144
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700145 /** protects state */
146 gpr_mu mu_state;
147 /** the current state of a call - see call_state */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800148 call_state state;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700149
Craig Tillercce17ac2015-01-20 09:29:28 -0800150 grpc_mdstr *path;
151 grpc_mdstr *host;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700152 gpr_timespec deadline;
153 int got_initial_metadata;
Craig Tillercce17ac2015-01-20 09:29:28 -0800154
Craig Tiller20bc56d2015-02-12 09:02:56 -0800155 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800156
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700157 grpc_stream_op_buffer *recv_ops;
158 grpc_stream_state *recv_state;
Craig Tiller33825112015-09-18 07:44:19 -0700159 grpc_closure *on_done_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700160
Craig Tiller33825112015-09-18 07:44:19 -0700161 grpc_closure server_on_recv;
162 grpc_closure kill_zombie_closure;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700163
Craig Tiller729b35a2015-07-13 12:36:47 -0700164 call_data *pending_next;
165};
166
Craig Tillera82950e2015-09-22 12:33:20 -0700167struct request_matcher {
Craig Tiller729b35a2015-07-13 12:36:47 -0700168 call_data *pending_head;
169 call_data *pending_tail;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700170 gpr_stack_lockfree *requests;
Craig Tiller729b35a2015-07-13 12:36:47 -0700171};
172
Craig Tillera82950e2015-09-22 12:33:20 -0700173struct registered_method {
Craig Tiller729b35a2015-07-13 12:36:47 -0700174 char *method;
175 char *host;
176 request_matcher request_matcher;
177 registered_method *next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800178};
179
Craig Tillera82950e2015-09-22 12:33:20 -0700180typedef struct {
Craig Tillerff3ae682015-06-29 17:44:04 -0700181 grpc_channel **channels;
182 size_t num_channels;
183} channel_broadcaster;
184
Craig Tillera82950e2015-09-22 12:33:20 -0700185struct grpc_server {
Craig Tiller729b35a2015-07-13 12:36:47 -0700186 size_t channel_filter_count;
Craig Tiller565b18b2015-09-23 10:09:42 -0700187 grpc_channel_filter const **channel_filters;
Craig Tiller729b35a2015-07-13 12:36:47 -0700188 grpc_channel_args *channel_args;
189
190 grpc_completion_queue **cqs;
191 grpc_pollset **pollsets;
192 size_t cq_count;
193
194 /* The two following mutexes control access to server-state
195 mu_global controls access to non-call-related state (e.g., channel state)
196 mu_call controls access to call-related state (e.g., the call lists)
197
198 If they are ever required to be nested, you must lock mu_global
199 before mu_call. This is currently used in shutdown processing
200 (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
Craig Tillera82950e2015-09-22 12:33:20 -0700201 gpr_mu mu_global; /* mutex for server and channel state */
202 gpr_mu mu_call; /* mutex for call-specific state */
Craig Tiller729b35a2015-07-13 12:36:47 -0700203
204 registered_method *registered_methods;
205 request_matcher unregistered_request_matcher;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700206 /** free list of available requested_calls indices */
207 gpr_stack_lockfree *request_freelist;
208 /** requested call backing data */
209 requested_call *requested_calls;
Craig Tiller32ca48c2015-09-10 11:47:15 -0700210 size_t max_requested_calls;
Craig Tiller729b35a2015-07-13 12:36:47 -0700211
Craig Tiller6a006ce2015-07-13 16:25:40 -0700212 gpr_atm shutdown_flag;
Craig Tiller729b35a2015-07-13 12:36:47 -0700213 gpr_uint8 shutdown_published;
214 size_t num_shutdown_tags;
215 shutdown_tag *shutdown_tags;
216
217 channel_data root_channel_data;
218
219 listener *listeners;
220 int listeners_destroyed;
221 gpr_refcount internal_refcount;
222
223 /** when did we print the last shutdown progress message */
224 gpr_timespec last_shutdown_message_time;
225};
226
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800227#define SERVER_FROM_CALL_ELEM(elem) \
228 (((channel_data *)(elem)->channel_data)->server)
229
Craig Tillera82950e2015-09-22 12:33:20 -0700230static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
231 call_data *calld, requested_call *rc);
232static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
233 requested_call *rc);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700234/* Before calling maybe_finish_shutdown, we must hold mu_global and not
235 hold mu_call */
Craig Tillera82950e2015-09-22 12:33:20 -0700236static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server);
Craig Tiller24be0f72015-02-10 14:04:22 -0800237
Craig Tiller729b35a2015-07-13 12:36:47 -0700238/*
239 * channel broadcaster
240 */
Craig Tillerff3ae682015-06-29 17:44:04 -0700241
242/* assumes server locked */
Craig Tillera82950e2015-09-22 12:33:20 -0700243static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700244 channel_data *c;
245 size_t count = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700246 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
247 count++;
248 }
Craig Tillerff3ae682015-06-29 17:44:04 -0700249 cb->num_channels = count;
Craig Tillera82950e2015-09-22 12:33:20 -0700250 cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
Craig Tillerff3ae682015-06-29 17:44:04 -0700251 count = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700252 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
253 cb->channels[count++] = c->channel;
254 GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
255 }
Craig Tillerff3ae682015-06-29 17:44:04 -0700256}
257
Craig Tillera82950e2015-09-22 12:33:20 -0700258struct shutdown_cleanup_args {
Craig Tiller33825112015-09-18 07:44:19 -0700259 grpc_closure closure;
Craig Tillerff3ae682015-06-29 17:44:04 -0700260 gpr_slice slice;
261};
262
Craig Tillera82950e2015-09-22 12:33:20 -0700263static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
264 int iomgr_status_ignored) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700265 struct shutdown_cleanup_args *a = arg;
Craig Tillera82950e2015-09-22 12:33:20 -0700266 gpr_slice_unref(a->slice);
267 gpr_free(a);
Craig Tillerff3ae682015-06-29 17:44:04 -0700268}
269
Craig Tillera82950e2015-09-22 12:33:20 -0700270static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
271 int send_goaway, int send_disconnect) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700272 grpc_transport_op op;
273 struct shutdown_cleanup_args *sc;
274 grpc_channel_element *elem;
275
Craig Tillera82950e2015-09-22 12:33:20 -0700276 memset(&op, 0, sizeof(op));
Craig Tillerff3ae682015-06-29 17:44:04 -0700277 op.send_goaway = send_goaway;
Craig Tillera82950e2015-09-22 12:33:20 -0700278 sc = gpr_malloc(sizeof(*sc));
279 sc->slice = gpr_slice_from_copied_string("Server shutdown");
Craig Tillerff3ae682015-06-29 17:44:04 -0700280 op.goaway_message = &sc->slice;
281 op.goaway_status = GRPC_STATUS_OK;
282 op.disconnect = send_disconnect;
Craig Tillera82950e2015-09-22 12:33:20 -0700283 grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
Craig Tillerff3ae682015-06-29 17:44:04 -0700284 op.on_consumed = &sc->closure;
285
Craig Tillera82950e2015-09-22 12:33:20 -0700286 elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
287 elem->filter->start_transport_op(exec_ctx, elem, &op);
Craig Tillerff3ae682015-06-29 17:44:04 -0700288}
289
Craig Tillera82950e2015-09-22 12:33:20 -0700290static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
291 channel_broadcaster *cb,
292 int send_goaway,
293 int force_disconnect) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700294 size_t i;
295
Craig Tillera82950e2015-09-22 12:33:20 -0700296 for (i = 0; i < cb->num_channels; i++) {
297 send_shutdown(exec_ctx, cb->channels[i], send_goaway, force_disconnect);
298 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, cb->channels[i], "broadcast");
299 }
300 gpr_free(cb->channels);
Craig Tillerff3ae682015-06-29 17:44:04 -0700301}
302
Craig Tiller729b35a2015-07-13 12:36:47 -0700303/*
304 * request_matcher
305 */
Craig Tillerff3ae682015-06-29 17:44:04 -0700306
Craig Tiller8dc09712015-09-24 13:58:16 -0700307static void request_matcher_init(request_matcher *rm, size_t entries) {
Craig Tillerb9d35962015-09-11 13:31:16 -0700308 memset(rm, 0, sizeof(*rm));
309 rm->requests = gpr_stack_lockfree_create(entries);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800310}
311
Craig Tillerb9d35962015-09-11 13:31:16 -0700312static void request_matcher_destroy(request_matcher *rm) {
313 GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests) == -1);
314 gpr_stack_lockfree_destroy(rm->requests);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800315}
316
Craig Tillera82950e2015-09-22 12:33:20 -0700317static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, int success) {
318 grpc_call_destroy(grpc_call_from_top_element(elem));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800319}
320
Craig Tiller8dc09712015-09-24 13:58:16 -0700321static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
322 request_matcher *rm) {
Craig Tillerb9d35962015-09-11 13:31:16 -0700323 while (rm->pending_head) {
324 call_data *calld = rm->pending_head;
325 rm->pending_head = calld->pending_next;
Craig Tillera82950e2015-09-22 12:33:20 -0700326 gpr_mu_lock(&calld->mu_state);
327 calld->state = ZOMBIED;
328 gpr_mu_unlock(&calld->mu_state);
329 grpc_closure_init(
330 &calld->kill_zombie_closure, kill_zombie,
331 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
332 grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
333 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800334}
335
Craig Tillera82950e2015-09-22 12:33:20 -0700336static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
337 grpc_server *server,
338 request_matcher *rm) {
Craig Tiller1191e212015-07-30 14:49:02 -0700339 int request_id;
Craig Tillera82950e2015-09-22 12:33:20 -0700340 while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
341 fail_call(exec_ctx, server, &server->requested_calls[request_id]);
342 }
Craig Tiller1191e212015-07-30 14:49:02 -0700343}
344
Craig Tiller729b35a2015-07-13 12:36:47 -0700345/*
346 * server proper
347 */
348
Craig Tillera82950e2015-09-22 12:33:20 -0700349static void server_ref(grpc_server *server) {
350 gpr_ref(&server->internal_refcount);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800351}
352
Craig Tillera82950e2015-09-22 12:33:20 -0700353static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800354 registered_method *rm;
Craig Tiller89504612015-04-27 11:48:46 -0700355 size_t i;
Craig Tillera82950e2015-09-22 12:33:20 -0700356 grpc_channel_args_destroy(server->channel_args);
357 gpr_mu_destroy(&server->mu_global);
358 gpr_mu_destroy(&server->mu_call);
Craig Tiller565b18b2015-09-23 10:09:42 -0700359 gpr_free((void *)server->channel_filters);
Craig Tillera82950e2015-09-22 12:33:20 -0700360 while ((rm = server->registered_methods) != NULL) {
361 server->registered_methods = rm->next;
362 request_matcher_destroy(&rm->request_matcher);
363 gpr_free(rm->method);
364 gpr_free(rm->host);
365 gpr_free(rm);
366 }
367 for (i = 0; i < server->cq_count; i++) {
368 GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
369 }
370 request_matcher_destroy(&server->unregistered_request_matcher);
371 gpr_stack_lockfree_destroy(server->request_freelist);
372 gpr_free(server->cqs);
373 gpr_free(server->pollsets);
374 gpr_free(server->shutdown_tags);
375 gpr_free(server->requested_calls);
376 gpr_free(server);
Craig Tilleree945e82015-05-26 16:15:34 -0700377}
378
Craig Tillera82950e2015-09-22 12:33:20 -0700379static void server_unref(grpc_exec_ctx *exec_ctx, grpc_server *server) {
380 if (gpr_unref(&server->internal_refcount)) {
381 server_delete(exec_ctx, server);
382 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800383}
384
Craig Tillera82950e2015-09-22 12:33:20 -0700385static int is_channel_orphaned(channel_data *chand) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800386 return chand->next == chand;
387}
388
Craig Tillera82950e2015-09-22 12:33:20 -0700389static void orphan_channel(channel_data *chand) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800390 chand->next->prev = chand->prev;
391 chand->prev->next = chand->next;
392 chand->next = chand->prev = chand;
393}
394
Craig Tillera82950e2015-09-22 12:33:20 -0700395static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
396 int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800397 channel_data *chand = cd;
398 grpc_server *server = chand->server;
Craig Tillera82950e2015-09-22 12:33:20 -0700399 gpr_log(GPR_DEBUG, "finish_destroy_channel: %p", chand->channel);
400 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server");
401 server_unref(exec_ctx, server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800402}
403
Craig Tillera82950e2015-09-22 12:33:20 -0700404static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
405 if (is_channel_orphaned(chand)) return;
406 GPR_ASSERT(chand->server != NULL);
407 orphan_channel(chand);
408 server_ref(chand->server);
409 maybe_finish_shutdown(exec_ctx, chand->server);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700410 chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
411 chand->finish_destroy_channel_closure.cb_arg = chand;
Craig Tillera82950e2015-09-22 12:33:20 -0700412 grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800413}
414
Craig Tillera82950e2015-09-22 12:33:20 -0700415static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
Craig Tiller8dc09712015-09-24 13:58:16 -0700416 grpc_call_element *elem, request_matcher *rm) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800417 call_data *calld = elem->call_data;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700418 int request_id;
419
Craig Tillera82950e2015-09-22 12:33:20 -0700420 if (gpr_atm_acq_load(&server->shutdown_flag)) {
421 gpr_mu_lock(&calld->mu_state);
422 calld->state = ZOMBIED;
423 gpr_mu_unlock(&calld->mu_state);
424 grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
425 grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
426 return;
427 }
Craig Tiller45724b32015-09-22 10:42:19 -0700428
Craig Tillerb9d35962015-09-11 13:31:16 -0700429 request_id = gpr_stack_lockfree_pop(rm->requests);
Craig Tillera82950e2015-09-22 12:33:20 -0700430 if (request_id == -1) {
431 gpr_mu_lock(&server->mu_call);
432 gpr_mu_lock(&calld->mu_state);
433 calld->state = PENDING;
434 gpr_mu_unlock(&calld->mu_state);
Craig Tillerb9d35962015-09-11 13:31:16 -0700435 if (rm->pending_head == NULL) {
436 rm->pending_tail = rm->pending_head = calld;
Craig Tillera82950e2015-09-22 12:33:20 -0700437 } else {
Craig Tillerb9d35962015-09-11 13:31:16 -0700438 rm->pending_tail->pending_next = calld;
439 rm->pending_tail = calld;
Craig Tiller45724b32015-09-22 10:42:19 -0700440 }
Craig Tillera82950e2015-09-22 12:33:20 -0700441 calld->pending_next = NULL;
442 gpr_mu_unlock(&server->mu_call);
443 } else {
444 gpr_mu_lock(&calld->mu_state);
445 calld->state = ACTIVATED;
446 gpr_mu_unlock(&calld->mu_state);
447 begin_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
448 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800449}
450
Craig Tillera82950e2015-09-22 12:33:20 -0700451static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800452 channel_data *chand = elem->channel_data;
453 call_data *calld = elem->call_data;
454 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800455 gpr_uint32 i;
456 gpr_uint32 hash;
457 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800458
Craig Tillera82950e2015-09-22 12:33:20 -0700459 if (chand->registered_methods && calld->path && calld->host) {
460 /* TODO(ctiller): unify these two searches */
461 /* check for an exact match with host */
462 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
463 for (i = 0; i <= chand->registered_method_max_probes; i++) {
464 rm = &chand->registered_methods[(hash + i) %
465 chand->registered_method_slots];
466 if (!rm) break;
467 if (rm->host != calld->host) continue;
468 if (rm->method != calld->path) continue;
469 finish_start_new_rpc(exec_ctx, server, elem,
470 &rm->server_registered_method->request_matcher);
471 return;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800472 }
Craig Tillera82950e2015-09-22 12:33:20 -0700473 /* check for a wildcard method definition (no host set) */
474 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
475 for (i = 0; i <= chand->registered_method_max_probes; i++) {
476 rm = &chand->registered_methods[(hash + i) %
477 chand->registered_method_slots];
478 if (!rm) break;
479 if (rm->host != NULL) continue;
480 if (rm->method != calld->path) continue;
481 finish_start_new_rpc(exec_ctx, server, elem,
482 &rm->server_registered_method->request_matcher);
483 return;
484 }
485 }
486 finish_start_new_rpc(exec_ctx, server, elem,
487 &server->unregistered_request_matcher);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800488}
489
Craig Tillera82950e2015-09-22 12:33:20 -0700490static int num_listeners(grpc_server *server) {
Craig Tilleree945e82015-05-26 16:15:34 -0700491 listener *l;
492 int n = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700493 for (l = server->listeners; l; l = l->next) {
494 n++;
495 }
Craig Tilleree945e82015-05-26 16:15:34 -0700496 return n;
497}
498
Craig Tillera82950e2015-09-22 12:33:20 -0700499static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server,
500 grpc_cq_completion *completion) {
501 server_unref(exec_ctx, server);
Craig Tiller97fc6a32015-07-08 15:31:35 -0700502}
503
Craig Tillera82950e2015-09-22 12:33:20 -0700504static int num_channels(grpc_server *server) {
Craig Tillerab54f792015-07-08 08:34:20 -0700505 channel_data *chand;
506 int n = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700507 for (chand = server->root_channel_data.next;
508 chand != &server->root_channel_data; chand = chand->next) {
509 n++;
510 }
Craig Tillerab54f792015-07-08 08:34:20 -0700511 return n;
512}
513
Craig Tillera82950e2015-09-22 12:33:20 -0700514static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
515 grpc_server *server) {
Craig Tiller1191e212015-07-30 14:49:02 -0700516 registered_method *rm;
Craig Tillera82950e2015-09-22 12:33:20 -0700517 request_matcher_kill_requests(exec_ctx, server,
518 &server->unregistered_request_matcher);
519 request_matcher_zombify_all_pending_calls(
520 exec_ctx, &server->unregistered_request_matcher);
521 for (rm = server->registered_methods; rm; rm = rm->next) {
522 request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher);
523 request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher);
524 }
Craig Tillerdc627722015-05-26 15:27:02 -0700525}
526
Craig Tillera82950e2015-09-22 12:33:20 -0700527static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
528 grpc_server *server) {
Craig Tiller45724b32015-09-22 10:42:19 -0700529 size_t i;
Craig Tillera82950e2015-09-22 12:33:20 -0700530 if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
531 return;
532 }
Craig Tiller45724b32015-09-22 10:42:19 -0700533
Craig Tillera82950e2015-09-22 12:33:20 -0700534 kill_pending_work_locked(exec_ctx, server);
Craig Tiller45724b32015-09-22 10:42:19 -0700535
Craig Tillera82950e2015-09-22 12:33:20 -0700536 if (server->root_channel_data.next != &server->root_channel_data ||
537 server->listeners_destroyed < num_listeners(server)) {
538 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
539 server->last_shutdown_message_time),
540 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
541 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
542 gpr_log(GPR_DEBUG,
543 "Waiting for %d channels and %d/%d listeners to be destroyed"
544 " before shutting down server",
545 num_channels(server),
546 num_listeners(server) - server->listeners_destroyed,
547 num_listeners(server));
Craig Tiller45724b32015-09-22 10:42:19 -0700548 }
Craig Tillera82950e2015-09-22 12:33:20 -0700549 return;
550 }
Craig Tiller45724b32015-09-22 10:42:19 -0700551 server->shutdown_published = 1;
Craig Tillera82950e2015-09-22 12:33:20 -0700552 for (i = 0; i < server->num_shutdown_tags; i++) {
553 server_ref(server);
554 grpc_cq_end_op(exec_ctx, server->shutdown_tags[i].cq,
555 server->shutdown_tags[i].tag, 1, done_shutdown_event, server,
556 &server->shutdown_tags[i].completion);
557 }
Craig Tiller45724b32015-09-22 10:42:19 -0700558}
559
Craig Tillera82950e2015-09-22 12:33:20 -0700560static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
Craig Tiller6902ad22015-04-16 08:01:49 -0700561 grpc_call_element *elem = user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800562 channel_data *chand = elem->channel_data;
563 call_data *calld = elem->call_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700564 if (md->key == chand->path_key) {
565 calld->path = GRPC_MDSTR_REF(md->value);
566 return NULL;
567 } else if (md->key == chand->authority_key) {
568 calld->host = GRPC_MDSTR_REF(md->value);
569 return NULL;
570 }
Craig Tiller6902ad22015-04-16 08:01:49 -0700571 return md;
572}
573
Craig Tillera82950e2015-09-22 12:33:20 -0700574static void server_on_recv(grpc_exec_ctx *exec_ctx, void *ptr, int success) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700575 grpc_call_element *elem = ptr;
Craig Tiller6902ad22015-04-16 08:01:49 -0700576 call_data *calld = elem->call_data;
Craig Tiller94329d02015-07-23 09:52:11 -0700577 gpr_timespec op_deadline;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700578
Craig Tillera82950e2015-09-22 12:33:20 -0700579 if (success && !calld->got_initial_metadata) {
580 size_t i;
581 size_t nops = calld->recv_ops->nops;
582 grpc_stream_op *ops = calld->recv_ops->ops;
583 for (i = 0; i < nops; i++) {
584 grpc_stream_op *op = &ops[i];
585 if (op->type != GRPC_OP_METADATA) continue;
586 grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
587 op_deadline = op->data.metadata.deadline;
588 if (0 !=
589 gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
590 calld->deadline = op->data.metadata.deadline;
591 }
592 if (calld->host && calld->path) {
593 calld->got_initial_metadata = 1;
594 start_new_rpc(exec_ctx, elem);
595 }
596 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700597 }
Craig Tillera82950e2015-09-22 12:33:20 -0700598 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700599
Craig Tillera82950e2015-09-22 12:33:20 -0700600 switch (*calld->recv_state) {
Craig Tiller06aeea72015-04-23 10:54:45 -0700601 case GRPC_STREAM_OPEN:
602 break;
603 case GRPC_STREAM_SEND_CLOSED:
604 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700605 case GRPC_STREAM_RECV_CLOSED:
Craig Tillera82950e2015-09-22 12:33:20 -0700606 gpr_mu_lock(&calld->mu_state);
607 if (calld->state == NOT_STARTED) {
608 calld->state = ZOMBIED;
609 gpr_mu_unlock(&calld->mu_state);
610 grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
611 grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
612 } else {
613 gpr_mu_unlock(&calld->mu_state);
614 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800615 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700616 case GRPC_STREAM_CLOSED:
Craig Tillera82950e2015-09-22 12:33:20 -0700617 gpr_mu_lock(&calld->mu_state);
618 if (calld->state == NOT_STARTED) {
619 calld->state = ZOMBIED;
620 gpr_mu_unlock(&calld->mu_state);
621 grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
622 grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
623 } else if (calld->state == PENDING) {
624 calld->state = ZOMBIED;
625 gpr_mu_unlock(&calld->mu_state);
626 /* zombied call will be destroyed when it's removed from the pending
627 queue... later */
628 } else {
629 gpr_mu_unlock(&calld->mu_state);
630 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800631 break;
Craig Tillera82950e2015-09-22 12:33:20 -0700632 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700633
Craig Tillera82950e2015-09-22 12:33:20 -0700634 calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700635}
636
Craig Tillera82950e2015-09-22 12:33:20 -0700637static void server_mutate_op(grpc_call_element *elem,
638 grpc_transport_stream_op *op) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700639 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700640
Craig Tillera82950e2015-09-22 12:33:20 -0700641 if (op->recv_ops) {
642 /* substitute our callback for the higher callback */
643 calld->recv_ops = op->recv_ops;
644 calld->recv_state = op->recv_state;
645 calld->on_done_recv = op->on_done_recv;
646 op->on_done_recv = &calld->server_on_recv;
647 }
Craig Tiller50d9db52015-04-23 10:52:14 -0700648}
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700649
Craig Tillera82950e2015-09-22 12:33:20 -0700650static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
651 grpc_call_element *elem,
652 grpc_transport_stream_op *op) {
653 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
654 server_mutate_op(elem, op);
655 grpc_call_next_op(exec_ctx, elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800656}
657
Craig Tillera82950e2015-09-22 12:33:20 -0700658static void accept_stream(void *cd, grpc_transport *transport,
659 const void *transport_server_data) {
Craig Tillere039f032015-06-25 12:54:23 -0700660 channel_data *chand = cd;
661 /* create a call */
Craig Tillera82950e2015-09-22 12:33:20 -0700662 grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL,
663 0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
Craig Tillere039f032015-06-25 12:54:23 -0700664}
665
Craig Tillera82950e2015-09-22 12:33:20 -0700666static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
667 int iomgr_status_ignored) {
Craig Tillere039f032015-06-25 12:54:23 -0700668 channel_data *chand = cd;
669 grpc_server *server = chand->server;
Craig Tillera82950e2015-09-22 12:33:20 -0700670 if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
671 grpc_transport_op op;
672 memset(&op, 0, sizeof(op));
673 op.on_connectivity_state_change = &chand->channel_connectivity_changed,
674 op.connectivity_state = &chand->connectivity_state;
675 grpc_channel_next_op(exec_ctx,
676 grpc_channel_stack_element(
677 grpc_channel_get_channel_stack(chand->channel), 0),
678 &op);
679 } else {
680 gpr_mu_lock(&server->mu_global);
681 destroy_channel(exec_ctx, chand);
682 gpr_mu_unlock(&server->mu_global);
683 GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity");
684 }
Craig Tillere039f032015-06-25 12:54:23 -0700685}
686
Craig Tillera82950e2015-09-22 12:33:20 -0700687static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
688 const void *server_transport_data,
689 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800690 call_data *calld = elem->call_data;
691 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700692 memset(calld, 0, sizeof(call_data));
693 calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
694 calld->call = grpc_call_from_top_element(elem);
695 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800696
Craig Tillera82950e2015-09-22 12:33:20 -0700697 grpc_closure_init(&calld->server_on_recv, server_on_recv, elem);
Craig Tiller1e6facb2015-06-11 22:47:11 -0700698
Craig Tillera82950e2015-09-22 12:33:20 -0700699 server_ref(chand->server);
Craig Tiller50d9db52015-04-23 10:52:14 -0700700
Craig Tillera82950e2015-09-22 12:33:20 -0700701 if (initial_op) server_mutate_op(elem, initial_op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800702}
703
Craig Tillera82950e2015-09-22 12:33:20 -0700704static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
705 grpc_call_element *elem) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800706 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800707 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800708
Craig Tillera82950e2015-09-22 12:33:20 -0700709 GPR_ASSERT(calld->state != PENDING);
Craig Tiller092d8d12015-07-04 22:35:00 -0700710
Craig Tillera82950e2015-09-22 12:33:20 -0700711 if (calld->host) {
712 GRPC_MDSTR_UNREF(calld->host);
713 }
714 if (calld->path) {
715 GRPC_MDSTR_UNREF(calld->path);
716 }
Craig Tiller4df31a62015-01-30 09:44:31 -0800717
Craig Tillera82950e2015-09-22 12:33:20 -0700718 gpr_mu_destroy(&calld->mu_state);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700719
Craig Tillera82950e2015-09-22 12:33:20 -0700720 server_unref(exec_ctx, chand->server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800721}
722
Craig Tillera82950e2015-09-22 12:33:20 -0700723static void init_channel_elem(grpc_exec_ctx *exec_ctx,
724 grpc_channel_element *elem, grpc_channel *master,
725 const grpc_channel_args *args,
726 grpc_mdctx *metadata_context, int is_first,
727 int is_last) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800728 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700729 GPR_ASSERT(is_first);
730 GPR_ASSERT(!is_last);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800731 chand->server = NULL;
732 chand->channel = NULL;
Craig Tiller4dbdd6a2015-09-25 15:12:16 -0700733 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
734 chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800735 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800736 chand->registered_methods = NULL;
Craig Tillere039f032015-06-25 12:54:23 -0700737 chand->connectivity_state = GRPC_CHANNEL_IDLE;
Craig Tillera82950e2015-09-22 12:33:20 -0700738 grpc_closure_init(&chand->channel_connectivity_changed,
739 channel_connectivity_changed, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800740}
741
Craig Tillera82950e2015-09-22 12:33:20 -0700742static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
743 grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800744 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800745 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -0700746 if (chand->registered_methods) {
747 for (i = 0; i < chand->registered_method_slots; i++) {
748 if (chand->registered_methods[i].method) {
749 GRPC_MDSTR_UNREF(chand->registered_methods[i].method);
750 }
751 if (chand->registered_methods[i].host) {
752 GRPC_MDSTR_UNREF(chand->registered_methods[i].host);
753 }
Craig Tillerec3257c2015-02-12 15:59:43 -0800754 }
Craig Tillera82950e2015-09-22 12:33:20 -0700755 gpr_free(chand->registered_methods);
756 }
757 if (chand->server) {
758 gpr_mu_lock(&chand->server->mu_global);
759 chand->next->prev = chand->prev;
760 chand->prev->next = chand->next;
761 chand->next = chand->prev = chand;
762 maybe_finish_shutdown(exec_ctx, chand->server);
763 gpr_mu_unlock(&chand->server->mu_global);
764 GRPC_MDSTR_UNREF(chand->path_key);
765 GRPC_MDSTR_UNREF(chand->authority_key);
766 server_unref(exec_ctx, chand->server);
767 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800768}
769
770static const grpc_channel_filter server_surface_filter = {
Craig Tiller71a0f9d2015-09-28 17:22:01 -0700771 server_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
772 init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
773 destroy_channel_elem, grpc_call_next_get_peer, "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800774};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800775
Craig Tillera82950e2015-09-22 12:33:20 -0700776void grpc_server_register_completion_queue(grpc_server *server,
777 grpc_completion_queue *cq,
778 void *reserved) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800779 size_t i, n;
Masood Malekghassemi76c3d742015-08-19 18:22:53 -0700780 GRPC_API_TRACE(
781 "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
782 (server, cq, reserved));
Craig Tillera82950e2015-09-22 12:33:20 -0700783 GPR_ASSERT(!reserved);
784 for (i = 0; i < server->cq_count; i++) {
785 if (server->cqs[i] == cq) return;
786 }
787 GRPC_CQ_INTERNAL_REF(cq, "server");
788 grpc_cq_mark_server_cq(cq);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800789 n = server->cq_count++;
Craig Tillera82950e2015-09-22 12:33:20 -0700790 server->cqs = gpr_realloc(server->cqs,
791 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800792 server->cqs[n] = cq;
793}
794
Craig Tillera82950e2015-09-22 12:33:20 -0700795grpc_server *grpc_server_create_from_filters(
796 const grpc_channel_filter **filters, size_t filter_count,
797 const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800798 size_t i;
Alistair Veitch9d48ebf2015-06-01 10:01:03 -0700799 /* TODO(census): restore this once we finalize census filter etc.
800 int census_enabled = grpc_channel_args_is_census_enabled(args); */
801 int census_enabled = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800802
Craig Tillera82950e2015-09-22 12:33:20 -0700803 grpc_server *server = gpr_malloc(sizeof(grpc_server));
Craig Tiller60fd3612015-03-05 16:24:22 -0800804
Craig Tillera82950e2015-09-22 12:33:20 -0700805 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
Craig Tiller60fd3612015-03-05 16:24:22 -0800806
Craig Tillera82950e2015-09-22 12:33:20 -0700807 memset(server, 0, sizeof(grpc_server));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800808
Craig Tillera82950e2015-09-22 12:33:20 -0700809 gpr_mu_init(&server->mu_global);
810 gpr_mu_init(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800811
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800812 /* decremented by grpc_server_destroy */
Craig Tillera82950e2015-09-22 12:33:20 -0700813 gpr_ref_init(&server->internal_refcount, 1);
814 server->root_channel_data.next = server->root_channel_data.prev =
815 &server->root_channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800816
Craig Tiller6a006ce2015-07-13 16:25:40 -0700817 /* TODO(ctiller): expose a channel_arg for this */
818 server->max_requested_calls = 32768;
Craig Tillera82950e2015-09-22 12:33:20 -0700819 server->request_freelist =
820 gpr_stack_lockfree_create(server->max_requested_calls);
821 for (i = 0; i < (size_t)server->max_requested_calls; i++) {
822 gpr_stack_lockfree_push(server->request_freelist, (int)i);
823 }
824 request_matcher_init(&server->unregistered_request_matcher,
825 server->max_requested_calls);
826 server->requested_calls = gpr_malloc(server->max_requested_calls *
827 sizeof(*server->requested_calls));
Craig Tiller729b35a2015-07-13 12:36:47 -0700828
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800829 /* Server filter stack is:
830
831 server_surface_filter - for making surface API calls
832 grpc_server_census_filter (optional) - for stats collection and tracing
833 {passed in filter stack}
834 grpc_connected_channel_filter - for interfacing with transports */
Craig Tiller32ca48c2015-09-10 11:47:15 -0700835 server->channel_filter_count = filter_count + 1u + (census_enabled ? 1u : 0u);
Craig Tillera82950e2015-09-22 12:33:20 -0700836 server->channel_filters =
837 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800838 server->channel_filters[0] = &server_surface_filter;
Craig Tillera82950e2015-09-22 12:33:20 -0700839 if (census_enabled) {
840 server->channel_filters[1] = &grpc_server_census_filter;
841 }
842 for (i = 0; i < filter_count; i++) {
843 server->channel_filters[i + 1u + (census_enabled ? 1u : 0u)] = filters[i];
844 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800845
Craig Tillera82950e2015-09-22 12:33:20 -0700846 server->channel_args = grpc_channel_args_copy(args);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800847
848 return server;
849}
850
Craig Tillera82950e2015-09-22 12:33:20 -0700851static int streq(const char *a, const char *b) {
852 if (a == NULL && b == NULL) return 1;
853 if (a == NULL) return 0;
854 if (b == NULL) return 0;
855 return 0 == strcmp(a, b);
Craig Tiller24be0f72015-02-10 14:04:22 -0800856}
857
Craig Tillera82950e2015-09-22 12:33:20 -0700858void *grpc_server_register_method(grpc_server *server, const char *method,
859 const char *host) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800860 registered_method *m;
Masood Malekghassemi76c3d742015-08-19 18:22:53 -0700861 GRPC_API_TRACE("grpc_server_register_method(server=%p, method=%s, host=%s)",
862 3, (server, method, host));
Craig Tillera82950e2015-09-22 12:33:20 -0700863 if (!method) {
864 gpr_log(GPR_ERROR,
865 "grpc_server_register_method method string cannot be NULL");
866 return NULL;
867 }
868 for (m = server->registered_methods; m; m = m->next) {
869 if (streq(m->method, method) && streq(m->host, host)) {
870 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
871 host ? host : "*");
Craig Tiller24be0f72015-02-10 14:04:22 -0800872 return NULL;
873 }
Craig Tillera82950e2015-09-22 12:33:20 -0700874 }
875 m = gpr_malloc(sizeof(registered_method));
876 memset(m, 0, sizeof(*m));
877 request_matcher_init(&m->request_matcher, server->max_requested_calls);
878 m->method = gpr_strdup(method);
879 m->host = gpr_strdup(host);
Craig Tiller24be0f72015-02-10 14:04:22 -0800880 m->next = server->registered_methods;
881 server->registered_methods = m;
882 return m;
883}
884
Craig Tillera82950e2015-09-22 12:33:20 -0700885void grpc_server_start(grpc_server *server) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800886 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800887 size_t i;
Craig Tillerf5768a62015-09-22 10:54:34 -0700888 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800889
Masood Malekghassemi76c3d742015-08-19 18:22:53 -0700890 GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
891
Craig Tillera82950e2015-09-22 12:33:20 -0700892 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
893 for (i = 0; i < server->cq_count; i++) {
894 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
895 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800896
Craig Tillera82950e2015-09-22 12:33:20 -0700897 for (l = server->listeners; l; l = l->next) {
898 l->start(&exec_ctx, server, l->arg, server->pollsets, server->cq_count);
899 }
Craig Tillerdfff1b82015-09-21 14:39:57 -0700900
Craig Tillera82950e2015-09-22 12:33:20 -0700901 grpc_exec_ctx_finish(&exec_ctx);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800902}
903
Craig Tillera82950e2015-09-22 12:33:20 -0700904void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
905 grpc_transport *transport,
906 grpc_channel_filter const **extra_filters,
907 size_t num_extra_filters, grpc_mdctx *mdctx,
908 const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800909 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
Craig Tillera82950e2015-09-22 12:33:20 -0700910 grpc_channel_filter const **filters =
911 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800912 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800913 size_t num_registered_methods;
914 size_t alloc;
915 registered_method *rm;
916 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800917 grpc_channel *channel;
918 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800919 grpc_mdstr *host;
920 grpc_mdstr *method;
921 gpr_uint32 hash;
Craig Tillerf96dfc32015-09-10 14:43:18 -0700922 size_t slots;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800923 gpr_uint32 probes;
924 gpr_uint32 max_probes = 0;
Craig Tillere039f032015-06-25 12:54:23 -0700925 grpc_transport_op op;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800926
Craig Tillera82950e2015-09-22 12:33:20 -0700927 for (i = 0; i < s->channel_filter_count; i++) {
928 filters[i] = s->channel_filters[i];
929 }
930 for (; i < s->channel_filter_count + num_extra_filters; i++) {
931 filters[i] = extra_filters[i - s->channel_filter_count];
932 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800933 filters[i] = &grpc_connected_channel_filter;
934
Craig Tillera82950e2015-09-22 12:33:20 -0700935 for (i = 0; i < s->cq_count; i++) {
936 memset(&op, 0, sizeof(op));
937 op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
938 grpc_transport_perform_op(exec_ctx, transport, &op);
939 }
ctillerd79b4862014-12-17 16:36:59 -0800940
Craig Tillera82950e2015-09-22 12:33:20 -0700941 channel = grpc_channel_create_from_filters(exec_ctx, NULL, filters,
942 num_filters, args, mdctx, 0);
943 chand = (channel_data *)grpc_channel_stack_element(
Craig Tiller71a0f9d2015-09-28 17:22:01 -0700944 grpc_channel_get_channel_stack(channel), 0)->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800945 chand->server = s;
Craig Tillera82950e2015-09-22 12:33:20 -0700946 server_ref(s);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800947 chand->channel = channel;
948
Craig Tiller04cc8be2015-02-10 16:11:22 -0800949 num_registered_methods = 0;
Craig Tillera82950e2015-09-22 12:33:20 -0700950 for (rm = s->registered_methods; rm; rm = rm->next) {
951 num_registered_methods++;
952 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800953 /* build a lookup table phrased in terms of mdstr's in this channels context
954 to quickly find registered methods */
Craig Tillera82950e2015-09-22 12:33:20 -0700955 if (num_registered_methods > 0) {
956 slots = 2 * num_registered_methods;
957 alloc = sizeof(channel_registered_method) * slots;
958 chand->registered_methods = gpr_malloc(alloc);
959 memset(chand->registered_methods, 0, alloc);
960 for (rm = s->registered_methods; rm; rm = rm->next) {
Craig Tiller4dbdd6a2015-09-25 15:12:16 -0700961 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL;
962 method = grpc_mdstr_from_string(mdctx, rm->method);
Craig Tillera82950e2015-09-22 12:33:20 -0700963 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
964 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
Craig Tiller71a0f9d2015-09-28 17:22:01 -0700965 .server_registered_method != NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700966 probes++)
967 ;
968 if (probes > max_probes) max_probes = probes;
969 crm = &chand->registered_methods[(hash + probes) % slots];
970 crm->server_registered_method = rm;
971 crm->host = host;
972 crm->method = method;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800973 }
Craig Tillera82950e2015-09-22 12:33:20 -0700974 GPR_ASSERT(slots <= GPR_UINT32_MAX);
975 chand->registered_method_slots = (gpr_uint32)slots;
976 chand->registered_method_max_probes = max_probes;
977 }
Craig Tiller04cc8be2015-02-10 16:11:22 -0800978
Craig Tillera82950e2015-09-22 12:33:20 -0700979 grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
980 transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800981
Craig Tillera82950e2015-09-22 12:33:20 -0700982 gpr_mu_lock(&s->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800983 chand->next = &s->root_channel_data;
984 chand->prev = chand->next->prev;
985 chand->next->prev = chand->prev->next = chand;
Craig Tillera82950e2015-09-22 12:33:20 -0700986 gpr_mu_unlock(&s->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800987
Craig Tiller565b18b2015-09-23 10:09:42 -0700988 gpr_free((void *)filters);
Craig Tiller4b804102015-06-26 16:16:12 -0700989
Craig Tillera82950e2015-09-22 12:33:20 -0700990 GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
991 memset(&op, 0, sizeof(op));
Craig Tiller4b804102015-06-26 16:16:12 -0700992 op.set_accept_stream = accept_stream;
993 op.set_accept_stream_user_data = chand;
994 op.on_connectivity_state_change = &chand->channel_connectivity_changed;
995 op.connectivity_state = &chand->connectivity_state;
Craig Tillera82950e2015-09-22 12:33:20 -0700996 op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0;
997 grpc_transport_perform_op(exec_ctx, transport, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800998}
999
Craig Tillera82950e2015-09-22 12:33:20 -07001000void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
1001 grpc_cq_completion *storage) {
1002 (void)done_arg;
1003 gpr_free(storage);
murgatroid9900a3dab2015-08-19 11:15:38 -07001004}
1005
Craig Tillera82950e2015-09-22 12:33:20 -07001006static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s,
1007 int success) {
Craig Tillerdfff1b82015-09-21 14:39:57 -07001008 grpc_server *server = s;
Craig Tillera82950e2015-09-22 12:33:20 -07001009 gpr_mu_lock(&server->mu_global);
Craig Tillerdfff1b82015-09-21 14:39:57 -07001010 server->listeners_destroyed++;
Craig Tillera82950e2015-09-22 12:33:20 -07001011 maybe_finish_shutdown(exec_ctx, server);
1012 gpr_mu_unlock(&server->mu_global);
Craig Tillerdfff1b82015-09-21 14:39:57 -07001013}
1014
Craig Tillera82950e2015-09-22 12:33:20 -07001015void grpc_server_shutdown_and_notify(grpc_server *server,
1016 grpc_completion_queue *cq, void *tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001017 listener *l;
Craig Tillerbce999f2015-05-27 09:55:51 -07001018 shutdown_tag *sdt;
Craig Tillerff3ae682015-06-29 17:44:04 -07001019 channel_broadcaster broadcaster;
Craig Tillerf5768a62015-09-22 10:54:34 -07001020 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001021
Masood Malekghassemi76c3d742015-08-19 18:22:53 -07001022 GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1023 (server, cq, tag));
1024
Craig Tillera82950e2015-09-22 12:33:20 -07001025 GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
Craig Tiller7156fca2015-07-15 13:54:20 -07001026
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001027 /* lock, and gather up some stuff to do */
Craig Tillera82950e2015-09-22 12:33:20 -07001028 gpr_mu_lock(&server->mu_global);
1029 grpc_cq_begin_op(cq);
1030 if (server->shutdown_published) {
1031 grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL,
1032 gpr_malloc(sizeof(grpc_cq_completion)));
1033 gpr_mu_unlock(&server->mu_global);
1034 goto done;
1035 }
1036 server->shutdown_tags =
1037 gpr_realloc(server->shutdown_tags,
1038 sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
Craig Tillerbce999f2015-05-27 09:55:51 -07001039 sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1040 sdt->tag = tag;
1041 sdt->cq = cq;
Craig Tillera82950e2015-09-22 12:33:20 -07001042 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1043 gpr_mu_unlock(&server->mu_global);
1044 goto done;
1045 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001046
Craig Tillera82950e2015-09-22 12:33:20 -07001047 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
Craig Tillerab54f792015-07-08 08:34:20 -07001048
Craig Tillera82950e2015-09-22 12:33:20 -07001049 channel_broadcaster_init(server, &broadcaster);
nnoble0c475f02014-12-05 15:37:39 -08001050
Craig Tillerfc193e12015-09-24 15:29:03 -07001051 gpr_atm_rel_store(&server->shutdown_flag, 1);
1052
Craig Tillerbd217572015-02-11 18:10:56 -08001053 /* collect all unregistered then registered calls */
Craig Tillera82950e2015-09-22 12:33:20 -07001054 gpr_mu_lock(&server->mu_call);
1055 kill_pending_work_locked(&exec_ctx, server);
1056 gpr_mu_unlock(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001057
Craig Tillera82950e2015-09-22 12:33:20 -07001058 maybe_finish_shutdown(&exec_ctx, server);
1059 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001060
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001061 /* Shutdown listeners */
Craig Tillera82950e2015-09-22 12:33:20 -07001062 for (l = server->listeners; l; l = l->next) {
1063 grpc_closure_init(&l->destroy_done, listener_destroy_done, server);
1064 l->destroy(&exec_ctx, server, l->arg, &l->destroy_done);
1065 }
Craig Tillerff3ae682015-06-29 17:44:04 -07001066
Craig Tillera82950e2015-09-22 12:33:20 -07001067 channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 1, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001068
Craig Tillerdfff1b82015-09-21 14:39:57 -07001069done:
Craig Tillera82950e2015-09-22 12:33:20 -07001070 grpc_exec_ctx_finish(&exec_ctx);
Craig Tilleraec96aa2015-04-07 14:32:15 -07001071}
1072
Craig Tillera82950e2015-09-22 12:33:20 -07001073void grpc_server_cancel_all_calls(grpc_server *server) {
Craig Tiller092d8d12015-07-04 22:35:00 -07001074 channel_broadcaster broadcaster;
Craig Tillerf5768a62015-09-22 10:54:34 -07001075 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tillerafa2d632015-05-26 16:39:13 -07001076
Masood Malekghassemi76c3d742015-08-19 18:22:53 -07001077 GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1078
Craig Tillera82950e2015-09-22 12:33:20 -07001079 gpr_mu_lock(&server->mu_global);
1080 channel_broadcaster_init(server, &broadcaster);
1081 gpr_mu_unlock(&server->mu_global);
Craig Tillerafa2d632015-05-26 16:39:13 -07001082
Craig Tillera82950e2015-09-22 12:33:20 -07001083 channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, 1);
1084 grpc_exec_ctx_finish(&exec_ctx);
Craig Tillerafa2d632015-05-26 16:39:13 -07001085}
1086
Craig Tillera82950e2015-09-22 12:33:20 -07001087void grpc_server_destroy(grpc_server *server) {
Craig Tilleraec96aa2015-04-07 14:32:15 -07001088 listener *l;
Craig Tillerf5768a62015-09-22 10:54:34 -07001089 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tiller872af022015-04-24 15:57:52 -07001090
Masood Malekghassemi76c3d742015-08-19 18:22:53 -07001091 GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1092
Craig Tillera82950e2015-09-22 12:33:20 -07001093 gpr_mu_lock(&server->mu_global);
1094 GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
1095 GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
Craig Tilleraec96aa2015-04-07 14:32:15 -07001096
Craig Tillera82950e2015-09-22 12:33:20 -07001097 while (server->listeners) {
1098 l = server->listeners;
1099 server->listeners = l->next;
1100 gpr_free(l);
1101 }
Craig Tilleraec96aa2015-04-07 14:32:15 -07001102
Craig Tillera82950e2015-09-22 12:33:20 -07001103 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001104
Craig Tillera82950e2015-09-22 12:33:20 -07001105 server_unref(&exec_ctx, server);
1106 grpc_exec_ctx_finish(&exec_ctx);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001107}
1108
Craig Tillera82950e2015-09-22 12:33:20 -07001109void grpc_server_add_listener(
1110 grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
1111 void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
1112 grpc_pollset **pollsets, size_t pollset_count),
1113 void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
1114 grpc_closure *on_done)) {
1115 listener *l = gpr_malloc(sizeof(listener));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001116 l->arg = arg;
1117 l->start = start;
1118 l->destroy = destroy;
1119 l->next = server->listeners;
1120 server->listeners = l;
1121}
1122
Craig Tillera82950e2015-09-22 12:33:20 -07001123static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
1124 grpc_server *server,
1125 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001126 call_data *calld = NULL;
Craig Tillerb9d35962015-09-11 13:31:16 -07001127 request_matcher *rm = NULL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001128 int request_id;
Craig Tillera82950e2015-09-22 12:33:20 -07001129 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1130 fail_call(exec_ctx, server, rc);
1131 return GRPC_CALL_OK;
1132 }
1133 request_id = gpr_stack_lockfree_pop(server->request_freelist);
1134 if (request_id == -1) {
1135 /* out of request ids: just fail this one */
1136 fail_call(exec_ctx, server, rc);
1137 return GRPC_CALL_OK;
1138 }
1139 switch (rc->type) {
Craig Tiller04cc8be2015-02-10 16:11:22 -08001140 case BATCH_CALL:
Craig Tillerb9d35962015-09-11 13:31:16 -07001141 rm = &server->unregistered_request_matcher;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001142 break;
1143 case REGISTERED_CALL:
Craig Tillerb9d35962015-09-11 13:31:16 -07001144 rm = &rc->data.registered.registered_method->request_matcher;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001145 break;
Craig Tillera82950e2015-09-22 12:33:20 -07001146 }
Craig Tiller45724b32015-09-22 10:42:19 -07001147 server->requested_calls[request_id] = *rc;
Craig Tillera82950e2015-09-22 12:33:20 -07001148 gpr_free(rc);
Craig Tillerb9d35962015-09-11 13:31:16 -07001149 if (gpr_stack_lockfree_push(rm->requests, request_id)) {
Craig Tillera82950e2015-09-22 12:33:20 -07001150 /* this was the first queued request: we need to lock and start
1151 matching calls */
1152 gpr_mu_lock(&server->mu_call);
Craig Tillerb9d35962015-09-11 13:31:16 -07001153 while ((calld = rm->pending_head) != NULL) {
1154 request_id = gpr_stack_lockfree_pop(rm->requests);
Craig Tillera82950e2015-09-22 12:33:20 -07001155 if (request_id == -1) break;
Craig Tillerb9d35962015-09-11 13:31:16 -07001156 rm->pending_head = calld->pending_next;
Craig Tillera82950e2015-09-22 12:33:20 -07001157 gpr_mu_unlock(&server->mu_call);
1158 gpr_mu_lock(&calld->mu_state);
1159 if (calld->state == ZOMBIED) {
1160 gpr_mu_unlock(&calld->mu_state);
1161 grpc_closure_init(
1162 &calld->kill_zombie_closure, kill_zombie,
1163 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
1164 grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
1165 } else {
1166 GPR_ASSERT(calld->state == PENDING);
1167 calld->state = ACTIVATED;
1168 gpr_mu_unlock(&calld->mu_state);
1169 begin_call(exec_ctx, server, calld,
1170 &server->requested_calls[request_id]);
1171 }
1172 gpr_mu_lock(&server->mu_call);
Craig Tiller45724b32015-09-22 10:42:19 -07001173 }
Craig Tillera82950e2015-09-22 12:33:20 -07001174 gpr_mu_unlock(&server->mu_call);
1175 }
Craig Tiller6a006ce2015-07-13 16:25:40 -07001176 return GRPC_CALL_OK;
Craig Tillercce17ac2015-01-20 09:29:28 -08001177}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001178
Craig Tillera82950e2015-09-22 12:33:20 -07001179grpc_call_error grpc_server_request_call(
1180 grpc_server *server, grpc_call **call, grpc_call_details *details,
1181 grpc_metadata_array *initial_metadata,
1182 grpc_completion_queue *cq_bound_to_call,
1183 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tillerdfff1b82015-09-21 14:39:57 -07001184 grpc_call_error error;
Craig Tillerf5768a62015-09-22 10:54:34 -07001185 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tillera82950e2015-09-22 12:33:20 -07001186 requested_call *rc = gpr_malloc(sizeof(*rc));
Masood Malekghassemi76c3d742015-08-19 18:22:53 -07001187 GRPC_API_TRACE(
1188 "grpc_server_request_call("
1189 "server=%p, call=%p, details=%p, initial_metadata=%p, "
1190 "cq_bound_to_call=%p, cq_for_notification=%p, tag%p)",
1191 7, (server, call, details, initial_metadata, cq_bound_to_call,
1192 cq_for_notification, tag));
Craig Tillera82950e2015-09-22 12:33:20 -07001193 GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
1194 initial_metadata, cq_bound_to_call,
1195 cq_for_notification, tag);
1196 if (!grpc_cq_is_server_cq(cq_for_notification)) {
1197 gpr_free(rc);
1198 error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1199 goto done;
1200 }
1201 grpc_cq_begin_op(cq_for_notification);
Craig Tiller9928d392015-08-18 09:40:24 -07001202 details->reserved = NULL;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001203 rc->type = BATCH_CALL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001204 rc->server = server;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001205 rc->tag = tag;
1206 rc->cq_bound_to_call = cq_bound_to_call;
1207 rc->cq_for_notification = cq_for_notification;
1208 rc->call = call;
1209 rc->data.batch.details = details;
1210 rc->data.batch.initial_metadata = initial_metadata;
Craig Tillera82950e2015-09-22 12:33:20 -07001211 error = queue_call_request(&exec_ctx, server, rc);
Craig Tillerdfff1b82015-09-21 14:39:57 -07001212done:
Craig Tillera82950e2015-09-22 12:33:20 -07001213 grpc_exec_ctx_finish(&exec_ctx);
Craig Tillerdfff1b82015-09-21 14:39:57 -07001214 return error;
Craig Tiller24be0f72015-02-10 14:04:22 -08001215}
1216
Craig Tillera82950e2015-09-22 12:33:20 -07001217grpc_call_error grpc_server_request_registered_call(
Craig Tillerb9d35962015-09-11 13:31:16 -07001218 grpc_server *server, void *rmp, grpc_call **call, gpr_timespec *deadline,
Craig Tillera82950e2015-09-22 12:33:20 -07001219 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
1220 grpc_completion_queue *cq_bound_to_call,
1221 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tillerdfff1b82015-09-21 14:39:57 -07001222 grpc_call_error error;
Craig Tillerf5768a62015-09-22 10:54:34 -07001223 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tillera82950e2015-09-22 12:33:20 -07001224 requested_call *rc = gpr_malloc(sizeof(*rc));
Craig Tillerb9d35962015-09-11 13:31:16 -07001225 registered_method *rm = rmp;
Masood Malekghassemi76c3d742015-08-19 18:22:53 -07001226 GRPC_API_TRACE(
1227 "grpc_server_request_registered_call("
1228 "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
1229 "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
1230 "tag=%p)",
1231 9, (server, rmp, call, deadline, initial_metadata, optional_payload,
1232 cq_bound_to_call, cq_for_notification, tag));
Craig Tillera82950e2015-09-22 12:33:20 -07001233 if (!grpc_cq_is_server_cq(cq_for_notification)) {
1234 gpr_free(rc);
1235 error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1236 goto done;
1237 }
1238 grpc_cq_begin_op(cq_for_notification);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001239 rc->type = REGISTERED_CALL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001240 rc->server = server;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001241 rc->tag = tag;
1242 rc->cq_bound_to_call = cq_bound_to_call;
1243 rc->cq_for_notification = cq_for_notification;
1244 rc->call = call;
Craig Tillerb9d35962015-09-11 13:31:16 -07001245 rc->data.registered.registered_method = rm;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001246 rc->data.registered.deadline = deadline;
1247 rc->data.registered.initial_metadata = initial_metadata;
1248 rc->data.registered.optional_payload = optional_payload;
Craig Tillera82950e2015-09-22 12:33:20 -07001249 error = queue_call_request(&exec_ctx, server, rc);
Craig Tillerdfff1b82015-09-21 14:39:57 -07001250done:
Craig Tillera82950e2015-09-22 12:33:20 -07001251 grpc_exec_ctx_finish(&exec_ctx);
Craig Tillerdfff1b82015-09-21 14:39:57 -07001252 return error;
Craig Tiller24be0f72015-02-10 14:04:22 -08001253}
1254
Craig Tillera82950e2015-09-22 12:33:20 -07001255static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
1256 grpc_call *call, int success,
1257 void *tag);
1258static void publish_was_not_set(grpc_exec_ctx *exec_ctx, grpc_call *call,
1259 int success, void *tag) {
1260 abort();
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001261}
Craig Tiller24be0f72015-02-10 14:04:22 -08001262
Craig Tillera82950e2015-09-22 12:33:20 -07001263static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
Craig Tiller166e2502015-02-03 20:14:41 -08001264 gpr_slice slice = value->slice;
Craig Tillera82950e2015-09-22 12:33:20 -07001265 size_t len = GPR_SLICE_LENGTH(slice);
Craig Tiller166e2502015-02-03 20:14:41 -08001266
Craig Tillera82950e2015-09-22 12:33:20 -07001267 if (len + 1 > *capacity) {
1268 *capacity = GPR_MAX(len + 1, *capacity * 2);
1269 *dest = gpr_realloc(*dest, *capacity);
1270 }
1271 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
Craig Tiller166e2502015-02-03 20:14:41 -08001272}
1273
Craig Tillera82950e2015-09-22 12:33:20 -07001274static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
1275 call_data *calld, requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001276 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001277 grpc_ioreq req[2];
1278 grpc_ioreq *r = req;
1279
1280 /* called once initial metadata has been read by the call, but BEFORE
1281 the ioreq to fetch it out of the call has been executed.
1282 This means metadata related fields can be relied on in calld, but to
1283 fill in the metadata array passed by the client, we need to perform
1284 an ioreq op, that should complete immediately. */
1285
Craig Tillera82950e2015-09-22 12:33:20 -07001286 grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001287 *rc->call = calld->call;
1288 calld->cq_new = rc->cq_for_notification;
Craig Tillera82950e2015-09-22 12:33:20 -07001289 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001290 case BATCH_CALL:
Craig Tillera82950e2015-09-22 12:33:20 -07001291 GPR_ASSERT(calld->host != NULL);
1292 GPR_ASSERT(calld->path != NULL);
1293 cpstr(&rc->data.batch.details->host,
1294 &rc->data.batch.details->host_capacity, calld->host);
1295 cpstr(&rc->data.batch.details->method,
1296 &rc->data.batch.details->method_capacity, calld->path);
Masood Malekghassemibf177c82015-04-27 12:14:38 -07001297 rc->data.batch.details->deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001298 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1299 r->data.recv_metadata = rc->data.batch.initial_metadata;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001300 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001301 r++;
1302 publish = publish_registered_or_batch;
1303 break;
1304 case REGISTERED_CALL:
1305 *rc->data.registered.deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001306 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1307 r->data.recv_metadata = rc->data.registered.initial_metadata;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001308 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001309 r++;
Craig Tillera82950e2015-09-22 12:33:20 -07001310 if (rc->data.registered.optional_payload) {
1311 r->op = GRPC_IOREQ_RECV_MESSAGE;
1312 r->data.recv_message = rc->data.registered.optional_payload;
1313 r->flags = 0;
1314 r++;
1315 }
Craig Tiller24be0f72015-02-10 14:04:22 -08001316 publish = publish_registered_or_batch;
1317 break;
Craig Tillera82950e2015-09-22 12:33:20 -07001318 }
Craig Tiller24be0f72015-02-10 14:04:22 -08001319
Craig Tillera82950e2015-09-22 12:33:20 -07001320 GRPC_CALL_INTERNAL_REF(calld->call, "server");
1321 grpc_call_start_ioreq_and_call_back(exec_ctx, calld->call, req,
1322 (size_t)(r - req), publish, rc);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001323}
1324
Craig Tillera82950e2015-09-22 12:33:20 -07001325static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
1326 grpc_cq_completion *c) {
Craig Tiller6a006ce2015-07-13 16:25:40 -07001327 requested_call *rc = req;
1328 grpc_server *server = rc->server;
1329
Craig Tillera82950e2015-09-22 12:33:20 -07001330 if (rc >= server->requested_calls &&
1331 rc < server->requested_calls + server->max_requested_calls) {
1332 GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
1333 gpr_stack_lockfree_push(server->request_freelist,
1334 (int)(rc - server->requested_calls));
1335 } else {
1336 gpr_free(req);
1337 }
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001338
Craig Tillera82950e2015-09-22 12:33:20 -07001339 server_unref(exec_ctx, server);
Craig Tiller24be0f72015-02-10 14:04:22 -08001340}
1341
Craig Tillera82950e2015-09-22 12:33:20 -07001342static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
1343 requested_call *rc) {
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001344 *rc->call = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -07001345 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001346 case BATCH_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001347 rc->data.batch.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001348 break;
1349 case REGISTERED_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001350 rc->data.registered.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001351 break;
Craig Tillera82950e2015-09-22 12:33:20 -07001352 }
1353 server_ref(server);
1354 grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0,
1355 done_request_event, rc, &rc->completion);
Craig Tiller24be0f72015-02-10 14:04:22 -08001356}
1357
Craig Tillera82950e2015-09-22 12:33:20 -07001358static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
1359 grpc_call *call, int success,
1360 void *prc) {
1361 grpc_call_element *elem =
1362 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001363 requested_call *rc = prc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001364 call_data *calld = elem->call_data;
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001365 channel_data *chand = elem->channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -07001366 server_ref(chand->server);
1367 grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, success, done_request_event,
1368 rc, &rc->completion);
1369 GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "server");
Craig Tiller24be0f72015-02-10 14:04:22 -08001370}
1371
Craig Tillera82950e2015-09-22 12:33:20 -07001372const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001373 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001374}
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001375
Craig Tillera82950e2015-09-22 12:33:20 -07001376int grpc_server_has_open_connections(grpc_server *server) {
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001377 int r;
Craig Tillera82950e2015-09-22 12:33:20 -07001378 gpr_mu_lock(&server->mu_global);
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001379 r = server->root_channel_data.next != &server->root_channel_data;
Craig Tillera82950e2015-09-22 12:33:20 -07001380 gpr_mu_unlock(&server->mu_global);
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001381 return r;
1382}