blob: 3d404f78a4f4c67e52c1e6b55066937a9b7db990 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
Craig Tillerf96dfc32015-09-10 14:43:18 -070036#include <limits.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080037#include <stdlib.h>
38#include <string.h>
39
Craig Tiller6a006ce2015-07-13 16:25:40 -070040#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
42#include <grpc/support/string_util.h>
43#include <grpc/support/useful.h>
44
Hongyu Chene09dc782015-08-21 11:28:33 -070045#include "src/core/census/grpc_filter.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080046#include "src/core/channel/channel_args.h"
47#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080048#include "src/core/iomgr/iomgr.h"
Craig Tiller6a006ce2015-07-13 16:25:40 -070049#include "src/core/support/stack_lockfree.h"
Craig Tiller485d7762015-01-23 12:54:05 -080050#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051#include "src/core/surface/call.h"
52#include "src/core/surface/channel.h"
53#include "src/core/surface/completion_queue.h"
Craig Tiller60fd3612015-03-05 16:24:22 -080054#include "src/core/surface/init.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080055#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080056
57typedef struct listener {
58 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080059 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
60 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080061 void (*destroy)(grpc_server *server, void *arg);
62 struct listener *next;
63} listener;
64
65typedef struct call_data call_data;
66typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080067typedef struct registered_method registered_method;
68
69typedef struct {
70 call_data *next;
71 call_data *prev;
72} call_link;
73
Craig Tiller0e919562015-04-28 14:03:47 -070074typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
Craig Tiller24be0f72015-02-10 14:04:22 -080075
Craig Tiller97fc6a32015-07-08 15:31:35 -070076typedef struct requested_call {
Craig Tiller24be0f72015-02-10 14:04:22 -080077 requested_call_type type;
78 void *tag;
Craig Tiller6a006ce2015-07-13 16:25:40 -070079 grpc_server *server;
Craig Tillerf9e6adf2015-05-06 11:45:59 -070080 grpc_completion_queue *cq_bound_to_call;
81 grpc_completion_queue *cq_for_notification;
82 grpc_call **call;
Craig Tiller97fc6a32015-07-08 15:31:35 -070083 grpc_cq_completion completion;
Craig Tiller24be0f72015-02-10 14:04:22 -080084 union {
85 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080086 grpc_call_details *details;
87 grpc_metadata_array *initial_metadata;
88 } batch;
89 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080090 registered_method *registered_method;
91 gpr_timespec *deadline;
92 grpc_metadata_array *initial_metadata;
93 grpc_byte_buffer **optional_payload;
94 } registered;
95 } data;
96} requested_call;
97
Craig Tiller24be0f72015-02-10 14:04:22 -080098typedef struct channel_registered_method {
99 registered_method *server_registered_method;
100 grpc_mdstr *method;
101 grpc_mdstr *host;
102} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800103
104struct channel_data {
105 grpc_server *server;
Craig Tillere039f032015-06-25 12:54:23 -0700106 grpc_connectivity_state connectivity_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800107 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800108 grpc_mdstr *path_key;
109 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800110 /* linked list of all channels on a server */
111 channel_data *next;
112 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800113 channel_registered_method *registered_methods;
114 gpr_uint32 registered_method_slots;
115 gpr_uint32 registered_method_max_probes;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700116 grpc_iomgr_closure finish_destroy_channel_closure;
Craig Tillere039f032015-06-25 12:54:23 -0700117 grpc_iomgr_closure channel_connectivity_changed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800118};
119
Craig Tillerbce999f2015-05-27 09:55:51 -0700120typedef struct shutdown_tag {
121 void *tag;
122 grpc_completion_queue *cq;
Craig Tiller97fc6a32015-07-08 15:31:35 -0700123 grpc_cq_completion completion;
Craig Tillerbce999f2015-05-27 09:55:51 -0700124} shutdown_tag;
125
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800126typedef enum {
127 /* waiting for metadata */
128 NOT_STARTED,
129 /* inital metadata read, not flow controlled in yet */
130 PENDING,
131 /* flow controlled in, on completion queue */
132 ACTIVATED,
133 /* cancelled before being queued */
134 ZOMBIED
135} call_state;
136
Craig Tiller729b35a2015-07-13 12:36:47 -0700137typedef struct request_matcher request_matcher;
138
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800139struct call_data {
140 grpc_call *call;
141
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700142 /** protects state */
143 gpr_mu mu_state;
144 /** the current state of a call - see call_state */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800145 call_state state;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700146
Craig Tillercce17ac2015-01-20 09:29:28 -0800147 grpc_mdstr *path;
148 grpc_mdstr *host;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700149 gpr_timespec deadline;
150 int got_initial_metadata;
Craig Tillercce17ac2015-01-20 09:29:28 -0800151
Craig Tiller20bc56d2015-02-12 09:02:56 -0800152 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800153
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700154 grpc_stream_op_buffer *recv_ops;
155 grpc_stream_state *recv_state;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700156 grpc_iomgr_closure *on_done_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700157
Craig Tiller1e6facb2015-06-11 22:47:11 -0700158 grpc_iomgr_closure server_on_recv;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700159 grpc_iomgr_closure kill_zombie_closure;
160
Craig Tiller729b35a2015-07-13 12:36:47 -0700161 call_data *pending_next;
162};
163
164struct request_matcher {
165 call_data *pending_head;
166 call_data *pending_tail;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700167 gpr_stack_lockfree *requests;
Craig Tiller729b35a2015-07-13 12:36:47 -0700168};
169
170struct registered_method {
171 char *method;
172 char *host;
173 request_matcher request_matcher;
174 registered_method *next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800175};
176
Craig Tillerff3ae682015-06-29 17:44:04 -0700177typedef struct {
178 grpc_channel **channels;
179 size_t num_channels;
180} channel_broadcaster;
181
Craig Tiller729b35a2015-07-13 12:36:47 -0700182struct grpc_server {
183 size_t channel_filter_count;
184 const grpc_channel_filter **channel_filters;
185 grpc_channel_args *channel_args;
186
187 grpc_completion_queue **cqs;
188 grpc_pollset **pollsets;
189 size_t cq_count;
190
191 /* The two following mutexes control access to server-state
192 mu_global controls access to non-call-related state (e.g., channel state)
193 mu_call controls access to call-related state (e.g., the call lists)
194
195 If they are ever required to be nested, you must lock mu_global
196 before mu_call. This is currently used in shutdown processing
197 (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
198 gpr_mu mu_global; /* mutex for server and channel state */
199 gpr_mu mu_call; /* mutex for call-specific state */
200
201 registered_method *registered_methods;
202 request_matcher unregistered_request_matcher;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700203 /** free list of available requested_calls indices */
204 gpr_stack_lockfree *request_freelist;
205 /** requested call backing data */
206 requested_call *requested_calls;
Craig Tiller32ca48c2015-09-10 11:47:15 -0700207 size_t max_requested_calls;
Craig Tiller729b35a2015-07-13 12:36:47 -0700208
Craig Tiller6a006ce2015-07-13 16:25:40 -0700209 gpr_atm shutdown_flag;
Craig Tiller729b35a2015-07-13 12:36:47 -0700210 gpr_uint8 shutdown_published;
211 size_t num_shutdown_tags;
212 shutdown_tag *shutdown_tags;
213
214 channel_data root_channel_data;
215
216 listener *listeners;
217 int listeners_destroyed;
218 gpr_refcount internal_refcount;
219
220 /** when did we print the last shutdown progress message */
221 gpr_timespec last_shutdown_message_time;
222};
223
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800224#define SERVER_FROM_CALL_ELEM(elem) \
225 (((channel_data *)(elem)->channel_data)->server)
226
Craig Tiller24be0f72015-02-10 14:04:22 -0800227static void begin_call(grpc_server *server, call_data *calld,
228 requested_call *rc);
229static void fail_call(grpc_server *server, requested_call *rc);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700230/* Before calling maybe_finish_shutdown, we must hold mu_global and not
231 hold mu_call */
Craig Tiller52760dd2015-05-29 23:09:26 -0700232static void maybe_finish_shutdown(grpc_server *server);
Craig Tiller24be0f72015-02-10 14:04:22 -0800233
Craig Tiller729b35a2015-07-13 12:36:47 -0700234/*
235 * channel broadcaster
236 */
Craig Tillerff3ae682015-06-29 17:44:04 -0700237
238/* assumes server locked */
239static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
240 channel_data *c;
241 size_t count = 0;
Craig Tiller079a11b2015-06-30 10:07:15 -0700242 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
243 count++;
Craig Tillerff3ae682015-06-29 17:44:04 -0700244 }
245 cb->num_channels = count;
246 cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
247 count = 0;
Craig Tiller079a11b2015-06-30 10:07:15 -0700248 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
Craig Tiller49924e02015-06-29 22:42:33 -0700249 cb->channels[count++] = c->channel;
Craig Tillerff3ae682015-06-29 17:44:04 -0700250 GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
Craig Tillerff3ae682015-06-29 17:44:04 -0700251 }
252}
253
254struct shutdown_cleanup_args {
255 grpc_iomgr_closure closure;
256 gpr_slice slice;
257};
258
259static void shutdown_cleanup(void *arg, int iomgr_status_ignored) {
260 struct shutdown_cleanup_args *a = arg;
261 gpr_slice_unref(a->slice);
262 gpr_free(a);
263}
264
Craig Tiller079a11b2015-06-30 10:07:15 -0700265static void send_shutdown(grpc_channel *channel, int send_goaway,
266 int send_disconnect) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700267 grpc_transport_op op;
268 struct shutdown_cleanup_args *sc;
269 grpc_channel_element *elem;
270
271 memset(&op, 0, sizeof(op));
Craig Tillerff3ae682015-06-29 17:44:04 -0700272 op.send_goaway = send_goaway;
273 sc = gpr_malloc(sizeof(*sc));
274 sc->slice = gpr_slice_from_copied_string("Server shutdown");
275 op.goaway_message = &sc->slice;
276 op.goaway_status = GRPC_STATUS_OK;
277 op.disconnect = send_disconnect;
278 grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc);
279 op.on_consumed = &sc->closure;
280
Craig Tiller079a11b2015-06-30 10:07:15 -0700281 elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
Craig Tillerff3ae682015-06-29 17:44:04 -0700282 elem->filter->start_transport_op(elem, &op);
283}
284
Craig Tiller079a11b2015-06-30 10:07:15 -0700285static void channel_broadcaster_shutdown(channel_broadcaster *cb,
Craig Tiller12cf5372015-07-09 13:48:11 -0700286 int send_goaway,
287 int force_disconnect) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700288 size_t i;
289
290 for (i = 0; i < cb->num_channels; i++) {
Craig Tiller9188d7a2015-07-05 12:44:37 -0700291 send_shutdown(cb->channels[i], send_goaway, force_disconnect);
Craig Tillerff3ae682015-06-29 17:44:04 -0700292 GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
293 }
294 gpr_free(cb->channels);
295}
296
Craig Tiller729b35a2015-07-13 12:36:47 -0700297/*
298 * request_matcher
299 */
Craig Tillerff3ae682015-06-29 17:44:04 -0700300
Craig Tiller6a006ce2015-07-13 16:25:40 -0700301static void request_matcher_init(request_matcher *request_matcher,
Craig Tiller32ca48c2015-09-10 11:47:15 -0700302 size_t entries) {
Craig Tiller729b35a2015-07-13 12:36:47 -0700303 memset(request_matcher, 0, sizeof(*request_matcher));
Craig Tiller6a006ce2015-07-13 16:25:40 -0700304 request_matcher->requests = gpr_stack_lockfree_create(entries);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800305}
306
Craig Tiller6a006ce2015-07-13 16:25:40 -0700307static void request_matcher_destroy(request_matcher *request_matcher) {
308 GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1);
309 gpr_stack_lockfree_destroy(request_matcher->requests);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800310}
311
Craig Tiller729b35a2015-07-13 12:36:47 -0700312static void kill_zombie(void *elem, int success) {
313 grpc_call_destroy(grpc_call_from_top_element(elem));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800314}
315
Craig Tiller729b35a2015-07-13 12:36:47 -0700316static void request_matcher_zombify_all_pending_calls(
317 request_matcher *request_matcher) {
318 while (request_matcher->pending_head) {
319 call_data *calld = request_matcher->pending_head;
320 request_matcher->pending_head = calld->pending_next;
Craig Tillerb6450262015-07-14 07:12:19 -0700321 gpr_mu_lock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700322 calld->state = ZOMBIED;
Craig Tillerb6450262015-07-14 07:12:19 -0700323 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700324 grpc_iomgr_closure_init(
325 &calld->kill_zombie_closure, kill_zombie,
326 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
327 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800328 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800329}
330
Craig Tiller1191e212015-07-30 14:49:02 -0700331static void request_matcher_kill_requests(grpc_server *server,
332 request_matcher *rm) {
333 int request_id;
334 while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
335 fail_call(server, &server->requested_calls[request_id]);
336 }
337}
338
Craig Tiller729b35a2015-07-13 12:36:47 -0700339/*
340 * server proper
341 */
342
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800343static void server_ref(grpc_server *server) {
344 gpr_ref(&server->internal_refcount);
345}
346
Craig Tilleree945e82015-05-26 16:15:34 -0700347static void server_delete(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800348 registered_method *rm;
Craig Tiller89504612015-04-27 11:48:46 -0700349 size_t i;
Craig Tilleree945e82015-05-26 16:15:34 -0700350 grpc_channel_args_destroy(server->channel_args);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700351 gpr_mu_destroy(&server->mu_global);
352 gpr_mu_destroy(&server->mu_call);
Craig Tilleree945e82015-05-26 16:15:34 -0700353 gpr_free(server->channel_filters);
Craig Tilleree945e82015-05-26 16:15:34 -0700354 while ((rm = server->registered_methods) != NULL) {
355 server->registered_methods = rm->next;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700356 request_matcher_destroy(&rm->request_matcher);
Craig Tilleree945e82015-05-26 16:15:34 -0700357 gpr_free(rm->method);
358 gpr_free(rm->host);
Craig Tilleree945e82015-05-26 16:15:34 -0700359 gpr_free(rm);
360 }
361 for (i = 0; i < server->cq_count; i++) {
Craig Tiller463f2372015-05-28 16:16:15 -0700362 GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
Craig Tilleree945e82015-05-26 16:15:34 -0700363 }
Craig Tiller6a006ce2015-07-13 16:25:40 -0700364 request_matcher_destroy(&server->unregistered_request_matcher);
365 gpr_stack_lockfree_destroy(server->request_freelist);
Craig Tilleree945e82015-05-26 16:15:34 -0700366 gpr_free(server->cqs);
367 gpr_free(server->pollsets);
368 gpr_free(server->shutdown_tags);
Craig Tiller6a006ce2015-07-13 16:25:40 -0700369 gpr_free(server->requested_calls);
Craig Tilleree945e82015-05-26 16:15:34 -0700370 gpr_free(server);
371}
372
373static void server_unref(grpc_server *server) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800374 if (gpr_unref(&server->internal_refcount)) {
Craig Tilleree945e82015-05-26 16:15:34 -0700375 server_delete(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800376 }
377}
378
379static int is_channel_orphaned(channel_data *chand) {
380 return chand->next == chand;
381}
382
383static void orphan_channel(channel_data *chand) {
384 chand->next->prev = chand->prev;
385 chand->prev->next = chand->next;
386 chand->next = chand->prev = chand;
387}
388
ctiller58393c22015-01-07 14:03:30 -0800389static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800390 channel_data *chand = cd;
391 grpc_server *server = chand->server;
Craig Tiller9ec2a522015-05-29 22:46:54 -0700392 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800393 server_unref(server);
394}
395
396static void destroy_channel(channel_data *chand) {
397 if (is_channel_orphaned(chand)) return;
398 GPR_ASSERT(chand->server != NULL);
399 orphan_channel(chand);
400 server_ref(chand->server);
Craig Tiller52760dd2015-05-29 23:09:26 -0700401 maybe_finish_shutdown(chand->server);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700402 chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
403 chand->finish_destroy_channel_closure.cb_arg = chand;
404 grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800405}
406
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700407static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
Craig Tiller729b35a2015-07-13 12:36:47 -0700408 request_matcher *request_matcher) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800409 call_data *calld = elem->call_data;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700410 int request_id;
411
Craig Tiller9835a132015-07-22 17:37:39 -0700412 if (gpr_atm_acq_load(&server->shutdown_flag)) {
413 gpr_mu_lock(&calld->mu_state);
414 calld->state = ZOMBIED;
415 gpr_mu_unlock(&calld->mu_state);
416 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
417 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
418 return;
419 }
420
Craig Tiller6a006ce2015-07-13 16:25:40 -0700421 request_id = gpr_stack_lockfree_pop(request_matcher->requests);
422 if (request_id == -1) {
423 gpr_mu_lock(&server->mu_call);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700424 gpr_mu_lock(&calld->mu_state);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800425 calld->state = PENDING;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700426 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700427 if (request_matcher->pending_head == NULL) {
428 request_matcher->pending_tail = request_matcher->pending_head = calld;
429 } else {
430 request_matcher->pending_tail->pending_next = calld;
431 request_matcher->pending_tail = calld;
432 }
433 calld->pending_next = NULL;
Vijay Pai8931cdd2015-06-17 12:42:17 -0700434 gpr_mu_unlock(&server->mu_call);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800435 } else {
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700436 gpr_mu_lock(&calld->mu_state);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800437 calld->state = ACTIVATED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700438 gpr_mu_unlock(&calld->mu_state);
Craig Tiller6a006ce2015-07-13 16:25:40 -0700439 begin_call(server, calld, &server->requested_calls[request_id]);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800440 }
441}
442
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800443static void start_new_rpc(grpc_call_element *elem) {
444 channel_data *chand = elem->channel_data;
445 call_data *calld = elem->call_data;
446 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800447 gpr_uint32 i;
448 gpr_uint32 hash;
449 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800450
Craig Tiller04cc8be2015-02-10 16:11:22 -0800451 if (chand->registered_methods && calld->path && calld->host) {
Craig Tillera94beff2015-02-17 22:02:06 -0800452 /* TODO(ctiller): unify these two searches */
Craig Tiller04cc8be2015-02-10 16:11:22 -0800453 /* check for an exact match with host */
454 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
Craig Tillerc6d6d902015-07-09 15:53:50 -0700455 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800456 rm = &chand->registered_methods[(hash + i) %
457 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800458 if (!rm) break;
459 if (rm->host != calld->host) continue;
460 if (rm->method != calld->path) continue;
Craig Tiller729b35a2015-07-13 12:36:47 -0700461 finish_start_new_rpc(server, elem,
462 &rm->server_registered_method->request_matcher);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800463 return;
464 }
465 /* check for a wildcard method definition (no host set) */
466 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800467 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800468 rm = &chand->registered_methods[(hash + i) %
469 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800470 if (!rm) break;
471 if (rm->host != NULL) continue;
472 if (rm->method != calld->path) continue;
Craig Tiller729b35a2015-07-13 12:36:47 -0700473 finish_start_new_rpc(server, elem,
474 &rm->server_registered_method->request_matcher);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800475 return;
476 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800477 }
Craig Tiller729b35a2015-07-13 12:36:47 -0700478 finish_start_new_rpc(server, elem, &server->unregistered_request_matcher);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800479}
480
Craig Tilleree945e82015-05-26 16:15:34 -0700481static int num_listeners(grpc_server *server) {
482 listener *l;
483 int n = 0;
484 for (l = server->listeners; l; l = l->next) {
485 n++;
486 }
487 return n;
488}
489
Craig Tiller97fc6a32015-07-08 15:31:35 -0700490static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
491 server_unref(server);
492}
493
Craig Tillerab54f792015-07-08 08:34:20 -0700494static int num_channels(grpc_server *server) {
495 channel_data *chand;
496 int n = 0;
497 for (chand = server->root_channel_data.next;
498 chand != &server->root_channel_data; chand = chand->next) {
499 n++;
500 }
501 return n;
502}
503
Craig Tiller1191e212015-07-30 14:49:02 -0700504static void kill_pending_work_locked(grpc_server *server) {
505 registered_method *rm;
506 request_matcher_kill_requests(server, &server->unregistered_request_matcher);
507 request_matcher_zombify_all_pending_calls(
508 &server->unregistered_request_matcher);
509 for (rm = server->registered_methods; rm; rm = rm->next) {
510 request_matcher_kill_requests(server, &rm->request_matcher);
511 request_matcher_zombify_all_pending_calls(&rm->request_matcher);
512 }
513}
514
Craig Tillerdc627722015-05-26 15:27:02 -0700515static void maybe_finish_shutdown(grpc_server *server) {
Craig Tillerbce999f2015-05-27 09:55:51 -0700516 size_t i;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700517 if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
Craig Tillerc4a1f522015-05-29 22:32:04 -0700518 return;
519 }
Vijay Pai8931cdd2015-06-17 12:42:17 -0700520
Craig Tiller1191e212015-07-30 14:49:02 -0700521 kill_pending_work_locked(server);
522
Craig Tillerab54f792015-07-08 08:34:20 -0700523 if (server->root_channel_data.next != &server->root_channel_data ||
524 server->listeners_destroyed < num_listeners(server)) {
Craig Tiller58bbc862015-07-13 09:51:17 -0700525 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
526 server->last_shutdown_message_time),
527 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
Craig Tiller080d6c52015-07-10 10:23:10 -0700528 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
Craig Tillerab54f792015-07-08 08:34:20 -0700529 gpr_log(GPR_DEBUG,
530 "Waiting for %d channels and %d/%d listeners to be destroyed"
531 " before shutting down server",
532 num_channels(server),
533 num_listeners(server) - server->listeners_destroyed,
534 num_listeners(server));
535 }
Craig Tillerc4a1f522015-05-29 22:32:04 -0700536 return;
537 }
538 server->shutdown_published = 1;
539 for (i = 0; i < server->num_shutdown_tags; i++) {
Craig Tiller97fc6a32015-07-08 15:31:35 -0700540 server_ref(server);
Craig Tiller12cf5372015-07-09 13:48:11 -0700541 grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1,
542 done_shutdown_event, server,
543 &server->shutdown_tags[i].completion);
Craig Tillerdc627722015-05-26 15:27:02 -0700544 }
545}
546
Craig Tiller6902ad22015-04-16 08:01:49 -0700547static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
548 grpc_call_element *elem = user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800549 channel_data *chand = elem->channel_data;
550 call_data *calld = elem->call_data;
Craig Tiller6902ad22015-04-16 08:01:49 -0700551 if (md->key == chand->path_key) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700552 calld->path = GRPC_MDSTR_REF(md->value);
Craig Tiller6902ad22015-04-16 08:01:49 -0700553 return NULL;
554 } else if (md->key == chand->authority_key) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700555 calld->host = GRPC_MDSTR_REF(md->value);
Craig Tiller6902ad22015-04-16 08:01:49 -0700556 return NULL;
557 }
558 return md;
559}
560
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700561static void server_on_recv(void *ptr, int success) {
562 grpc_call_element *elem = ptr;
Craig Tiller6902ad22015-04-16 08:01:49 -0700563 call_data *calld = elem->call_data;
Craig Tiller94329d02015-07-23 09:52:11 -0700564 gpr_timespec op_deadline;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700565
566 if (success && !calld->got_initial_metadata) {
567 size_t i;
568 size_t nops = calld->recv_ops->nops;
569 grpc_stream_op *ops = calld->recv_ops->ops;
570 for (i = 0; i < nops; i++) {
571 grpc_stream_op *op = &ops[i];
572 if (op->type != GRPC_OP_METADATA) continue;
Craig Tiller205aee12015-04-16 14:46:41 -0700573 grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
Craig Tiller94329d02015-07-23 09:52:11 -0700574 op_deadline = op->data.metadata.deadline;
575 if (0 !=
576 gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
Craig Tiller6902ad22015-04-16 08:01:49 -0700577 calld->deadline = op->data.metadata.deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800578 }
Craig Tillerc4b56b62015-07-23 17:44:11 -0700579 if (calld->host && calld->path) {
580 calld->got_initial_metadata = 1;
581 start_new_rpc(elem);
582 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800583 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700584 }
585 }
586
587 switch (*calld->recv_state) {
Craig Tiller06aeea72015-04-23 10:54:45 -0700588 case GRPC_STREAM_OPEN:
589 break;
590 case GRPC_STREAM_SEND_CLOSED:
591 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700592 case GRPC_STREAM_RECV_CLOSED:
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700593 gpr_mu_lock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700594 if (calld->state == NOT_STARTED) {
595 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700596 gpr_mu_unlock(&calld->mu_state);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700597 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
598 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700599 } else {
600 gpr_mu_unlock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700601 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800602 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700603 case GRPC_STREAM_CLOSED:
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700604 gpr_mu_lock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700605 if (calld->state == NOT_STARTED) {
606 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700607 gpr_mu_unlock(&calld->mu_state);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700608 grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
609 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700610 } else if (calld->state == PENDING) {
Craig Tillerc9d03822015-05-20 16:08:45 -0700611 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700612 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700613 /* zombied call will be destroyed when it's removed from the pending
614 queue... later */
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700615 } else {
616 gpr_mu_unlock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700617 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800618 break;
619 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700620
Craig Tiller1e6facb2015-06-11 22:47:11 -0700621 calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700622}
623
Craig Tillerb7959a02015-06-25 08:50:54 -0700624static void server_mutate_op(grpc_call_element *elem,
625 grpc_transport_stream_op *op) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700626 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700627
628 if (op->recv_ops) {
629 /* substitute our callback for the higher callback */
630 calld->recv_ops = op->recv_ops;
631 calld->recv_state = op->recv_state;
632 calld->on_done_recv = op->on_done_recv;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700633 op->on_done_recv = &calld->server_on_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700634 }
Craig Tiller50d9db52015-04-23 10:52:14 -0700635}
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700636
Craig Tillere039f032015-06-25 12:54:23 -0700637static void server_start_transport_stream_op(grpc_call_element *elem,
638 grpc_transport_stream_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -0700639 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
640 server_mutate_op(elem, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700641 grpc_call_next_op(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800642}
643
Craig Tillere039f032015-06-25 12:54:23 -0700644static void accept_stream(void *cd, grpc_transport *transport,
645 const void *transport_server_data) {
646 channel_data *chand = cd;
647 /* create a call */
Craig Tiller3e7c6a72015-07-31 16:17:04 -0700648 grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL,
649 0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
Craig Tillere039f032015-06-25 12:54:23 -0700650}
651
652static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
653 channel_data *chand = cd;
654 grpc_server *server = chand->server;
655 if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
656 grpc_transport_op op;
657 memset(&op, 0, sizeof(op));
658 op.on_connectivity_state_change = &chand->channel_connectivity_changed,
659 op.connectivity_state = &chand->connectivity_state;
660 grpc_channel_next_op(grpc_channel_stack_element(
661 grpc_channel_get_channel_stack(chand->channel), 0),
662 &op);
663 } else {
664 gpr_mu_lock(&server->mu_global);
665 destroy_channel(chand);
666 gpr_mu_unlock(&server->mu_global);
667 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
668 }
669}
670
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800671static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700672 const void *server_transport_data,
Craig Tillerb7959a02015-06-25 08:50:54 -0700673 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800674 call_data *calld = elem->call_data;
675 channel_data *chand = elem->channel_data;
676 memset(calld, 0, sizeof(call_data));
Craig Tiller143e7bf2015-07-13 08:41:49 -0700677 calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800678 calld->call = grpc_call_from_top_element(elem);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700679 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800680
Craig Tiller1e6facb2015-06-11 22:47:11 -0700681 grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
682
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800683 server_ref(chand->server);
Craig Tiller50d9db52015-04-23 10:52:14 -0700684
Craig Tiller482ef8b2015-04-23 11:38:20 -0700685 if (initial_op) server_mutate_op(elem, initial_op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800686}
687
688static void destroy_call_elem(grpc_call_element *elem) {
689 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800690 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800691
Craig Tiller729b35a2015-07-13 12:36:47 -0700692 GPR_ASSERT(calld->state != PENDING);
Craig Tiller092d8d12015-07-04 22:35:00 -0700693
Craig Tiller4df31a62015-01-30 09:44:31 -0800694 if (calld->host) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700695 GRPC_MDSTR_UNREF(calld->host);
Craig Tiller4df31a62015-01-30 09:44:31 -0800696 }
697 if (calld->path) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700698 GRPC_MDSTR_UNREF(calld->path);
Craig Tiller4df31a62015-01-30 09:44:31 -0800699 }
700
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700701 gpr_mu_destroy(&calld->mu_state);
702
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800703 server_unref(chand->server);
704}
705
Craig Tiller079a11b2015-06-30 10:07:15 -0700706static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800707 const grpc_channel_args *args,
708 grpc_mdctx *metadata_context, int is_first,
709 int is_last) {
710 channel_data *chand = elem->channel_data;
711 GPR_ASSERT(is_first);
712 GPR_ASSERT(!is_last);
713 chand->server = NULL;
714 chand->channel = NULL;
Craig Tiller6999c092015-07-22 08:14:12 -0700715 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path", 0);
Craig Tillerd6c98df2015-08-18 09:33:44 -0700716 chand->authority_key =
717 grpc_mdstr_from_string(metadata_context, ":authority", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800718 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800719 chand->registered_methods = NULL;
Craig Tillere039f032015-06-25 12:54:23 -0700720 chand->connectivity_state = GRPC_CHANNEL_IDLE;
721 grpc_iomgr_closure_init(&chand->channel_connectivity_changed,
722 channel_connectivity_changed, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800723}
724
725static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800726 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800727 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800728 if (chand->registered_methods) {
729 for (i = 0; i < chand->registered_method_slots; i++) {
730 if (chand->registered_methods[i].method) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700731 GRPC_MDSTR_UNREF(chand->registered_methods[i].method);
Craig Tillerec3257c2015-02-12 15:59:43 -0800732 }
733 if (chand->registered_methods[i].host) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700734 GRPC_MDSTR_UNREF(chand->registered_methods[i].host);
Craig Tillerec3257c2015-02-12 15:59:43 -0800735 }
736 }
737 gpr_free(chand->registered_methods);
738 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800739 if (chand->server) {
Vijay Pai8931cdd2015-06-17 12:42:17 -0700740 gpr_mu_lock(&chand->server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800741 chand->next->prev = chand->prev;
742 chand->prev->next = chand->next;
743 chand->next = chand->prev = chand;
Craig Tillerb76d05b2015-05-29 17:21:56 -0700744 maybe_finish_shutdown(chand->server);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700745 gpr_mu_unlock(&chand->server->mu_global);
Craig Tiller1a65a232015-07-06 10:22:32 -0700746 GRPC_MDSTR_UNREF(chand->path_key);
747 GRPC_MDSTR_UNREF(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800748 server_unref(chand->server);
749 }
750}
751
752static const grpc_channel_filter server_surface_filter = {
Craig Tillere039f032015-06-25 12:54:23 -0700753 server_start_transport_stream_op,
754 grpc_channel_next_op,
Craig Tillerb76d05b2015-05-29 17:21:56 -0700755 sizeof(call_data),
756 init_call_elem,
757 destroy_call_elem,
758 sizeof(channel_data),
759 init_channel_elem,
760 destroy_channel_elem,
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700761 grpc_call_next_get_peer,
Craig Tillerb76d05b2015-05-29 17:21:56 -0700762 "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800763};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800764
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700765void grpc_server_register_completion_queue(grpc_server *server,
Nicolas "Pixel" Nobleebb51402015-07-23 02:41:33 +0200766 grpc_completion_queue *cq,
767 void *reserved) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800768 size_t i, n;
Nicolas "Pixel" Noble45992882015-07-29 23:52:14 +0200769 GPR_ASSERT(!reserved);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800770 for (i = 0; i < server->cq_count; i++) {
771 if (server->cqs[i] == cq) return;
772 }
Craig Tiller463f2372015-05-28 16:16:15 -0700773 GRPC_CQ_INTERNAL_REF(cq, "server");
Craig Tillerb56975c2015-06-15 10:11:16 -0700774 grpc_cq_mark_server_cq(cq);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800775 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800776 server->cqs = gpr_realloc(server->cqs,
777 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800778 server->cqs[n] = cq;
779}
780
David Garcia Quintase25e9282015-06-23 10:18:52 -0700781grpc_server *grpc_server_create_from_filters(
782 const grpc_channel_filter **filters, size_t filter_count,
783 const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800784 size_t i;
Alistair Veitch9d48ebf2015-06-01 10:01:03 -0700785 /* TODO(census): restore this once we finalize census filter etc.
786 int census_enabled = grpc_channel_args_is_census_enabled(args); */
787 int census_enabled = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800788
789 grpc_server *server = gpr_malloc(sizeof(grpc_server));
Craig Tiller60fd3612015-03-05 16:24:22 -0800790
791 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
792
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800793 memset(server, 0, sizeof(grpc_server));
794
Vijay Pai8931cdd2015-06-17 12:42:17 -0700795 gpr_mu_init(&server->mu_global);
796 gpr_mu_init(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800797
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800798 /* decremented by grpc_server_destroy */
799 gpr_ref_init(&server->internal_refcount, 1);
800 server->root_channel_data.next = server->root_channel_data.prev =
801 &server->root_channel_data;
802
Craig Tiller6a006ce2015-07-13 16:25:40 -0700803 /* TODO(ctiller): expose a channel_arg for this */
804 server->max_requested_calls = 32768;
805 server->request_freelist =
806 gpr_stack_lockfree_create(server->max_requested_calls);
807 for (i = 0; i < (size_t)server->max_requested_calls; i++) {
Craig Tillerf96dfc32015-09-10 14:43:18 -0700808 gpr_stack_lockfree_push(server->request_freelist, (int)i);
Craig Tiller6a006ce2015-07-13 16:25:40 -0700809 }
810 request_matcher_init(&server->unregistered_request_matcher,
811 server->max_requested_calls);
812 server->requested_calls = gpr_malloc(server->max_requested_calls *
813 sizeof(*server->requested_calls));
Craig Tiller729b35a2015-07-13 12:36:47 -0700814
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800815 /* Server filter stack is:
816
817 server_surface_filter - for making surface API calls
818 grpc_server_census_filter (optional) - for stats collection and tracing
819 {passed in filter stack}
820 grpc_connected_channel_filter - for interfacing with transports */
Craig Tiller32ca48c2015-09-10 11:47:15 -0700821 server->channel_filter_count = filter_count + 1u + (census_enabled ? 1u : 0u);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800822 server->channel_filters =
823 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
824 server->channel_filters[0] = &server_surface_filter;
825 if (census_enabled) {
826 server->channel_filters[1] = &grpc_server_census_filter;
Hongyu Chenf68e4722015-08-07 18:06:42 -0700827 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800828 for (i = 0; i < filter_count; i++) {
Craig Tiller32ca48c2015-09-10 11:47:15 -0700829 server->channel_filters[i + 1u + (census_enabled ? 1u : 0u)] = filters[i];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800830 }
831
832 server->channel_args = grpc_channel_args_copy(args);
833
834 return server;
835}
836
Craig Tiller24be0f72015-02-10 14:04:22 -0800837static int streq(const char *a, const char *b) {
838 if (a == NULL && b == NULL) return 1;
839 if (a == NULL) return 0;
840 if (b == NULL) return 0;
841 return 0 == strcmp(a, b);
842}
843
844void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700845 const char *host) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800846 registered_method *m;
847 if (!method) {
Craig Tillerb76d05b2015-05-29 17:21:56 -0700848 gpr_log(GPR_ERROR,
849 "grpc_server_register_method method string cannot be NULL");
Craig Tiller24be0f72015-02-10 14:04:22 -0800850 return NULL;
851 }
852 for (m = server->registered_methods; m; m = m->next) {
853 if (streq(m->method, method) && streq(m->host, host)) {
854 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
855 host ? host : "*");
856 return NULL;
857 }
858 }
859 m = gpr_malloc(sizeof(registered_method));
860 memset(m, 0, sizeof(*m));
Craig Tiller6a006ce2015-07-13 16:25:40 -0700861 request_matcher_init(&m->request_matcher, server->max_requested_calls);
Craig Tiller24be0f72015-02-10 14:04:22 -0800862 m->method = gpr_strdup(method);
863 m->host = gpr_strdup(host);
864 m->next = server->registered_methods;
865 server->registered_methods = m;
866 return m;
867}
868
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800869void grpc_server_start(grpc_server *server) {
870 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800871 size_t i;
872
Craig Tillerec3257c2015-02-12 15:59:43 -0800873 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800874 for (i = 0; i < server->cq_count; i++) {
875 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
876 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800877
878 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800879 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800880 }
881}
882
Craig Tiller1064f8b2015-06-25 13:52:57 -0700883void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
884 grpc_channel_filter const **extra_filters,
885 size_t num_extra_filters, grpc_mdctx *mdctx,
886 const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800887 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
888 grpc_channel_filter const **filters =
889 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
890 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800891 size_t num_registered_methods;
892 size_t alloc;
893 registered_method *rm;
894 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800895 grpc_channel *channel;
896 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800897 grpc_mdstr *host;
898 grpc_mdstr *method;
899 gpr_uint32 hash;
Craig Tillerf96dfc32015-09-10 14:43:18 -0700900 size_t slots;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800901 gpr_uint32 probes;
902 gpr_uint32 max_probes = 0;
Craig Tillere039f032015-06-25 12:54:23 -0700903 grpc_transport_op op;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800904
905 for (i = 0; i < s->channel_filter_count; i++) {
906 filters[i] = s->channel_filters[i];
907 }
908 for (; i < s->channel_filter_count + num_extra_filters; i++) {
909 filters[i] = extra_filters[i - s->channel_filter_count];
910 }
911 filters[i] = &grpc_connected_channel_filter;
912
Craig Tiller20bc56d2015-02-12 09:02:56 -0800913 for (i = 0; i < s->cq_count; i++) {
Craig Tillere039f032015-06-25 12:54:23 -0700914 memset(&op, 0, sizeof(op));
915 op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
916 grpc_transport_perform_op(transport, &op);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800917 }
ctillerd79b4862014-12-17 16:36:59 -0800918
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700919 channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
920 mdctx, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800921 chand = (channel_data *)grpc_channel_stack_element(
Craig Tillerc02c1d82015-04-07 16:21:55 -0700922 grpc_channel_get_channel_stack(channel), 0)
923 ->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800924 chand->server = s;
925 server_ref(s);
926 chand->channel = channel;
927
Craig Tiller04cc8be2015-02-10 16:11:22 -0800928 num_registered_methods = 0;
929 for (rm = s->registered_methods; rm; rm = rm->next) {
930 num_registered_methods++;
931 }
932 /* build a lookup table phrased in terms of mdstr's in this channels context
933 to quickly find registered methods */
934 if (num_registered_methods > 0) {
935 slots = 2 * num_registered_methods;
936 alloc = sizeof(channel_registered_method) * slots;
937 chand->registered_methods = gpr_malloc(alloc);
938 memset(chand->registered_methods, 0, alloc);
939 for (rm = s->registered_methods; rm; rm = rm->next) {
Craig Tiller6999c092015-07-22 08:14:12 -0700940 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host, 0) : NULL;
941 method = grpc_mdstr_from_string(mdctx, rm->method, 0);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800942 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800943 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
Craig Tillerc02c1d82015-04-07 16:21:55 -0700944 .server_registered_method != NULL;
Craig Tiller3b29b562015-02-11 12:58:46 -0800945 probes++)
946 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800947 if (probes > max_probes) max_probes = probes;
948 crm = &chand->registered_methods[(hash + probes) % slots];
949 crm->server_registered_method = rm;
950 crm->host = host;
951 crm->method = method;
952 }
Craig Tillerf96dfc32015-09-10 14:43:18 -0700953 GPR_ASSERT(slots <= GPR_UINT32_MAX);
954 chand->registered_method_slots = (gpr_uint32)slots;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800955 chand->registered_method_max_probes = max_probes;
956 }
957
Craig Tiller1064f8b2015-06-25 13:52:57 -0700958 grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
959 transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800960
Vijay Pai8931cdd2015-06-17 12:42:17 -0700961 gpr_mu_lock(&s->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800962 chand->next = &s->root_channel_data;
963 chand->prev = chand->next->prev;
964 chand->next->prev = chand->prev->next = chand;
Vijay Pai8931cdd2015-06-17 12:42:17 -0700965 gpr_mu_unlock(&s->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800966
967 gpr_free(filters);
Craig Tiller4b804102015-06-26 16:16:12 -0700968
969 GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
970 memset(&op, 0, sizeof(op));
971 op.set_accept_stream = accept_stream;
972 op.set_accept_stream_user_data = chand;
973 op.on_connectivity_state_change = &chand->channel_connectivity_changed;
974 op.connectivity_state = &chand->connectivity_state;
Craig Tillerf96dfc32015-09-10 14:43:18 -0700975 op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0;
Craig Tiller4b804102015-06-26 16:16:12 -0700976 grpc_transport_perform_op(transport, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800977}
978
murgatroid9900a3dab2015-08-19 11:15:38 -0700979void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) {
980 (void) done_arg;
981 gpr_free(storage);
982}
983
Craig Tillerbce999f2015-05-27 09:55:51 -0700984void grpc_server_shutdown_and_notify(grpc_server *server,
985 grpc_completion_queue *cq, void *tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800986 listener *l;
Craig Tillerbce999f2015-05-27 09:55:51 -0700987 shutdown_tag *sdt;
Craig Tillerff3ae682015-06-29 17:44:04 -0700988 channel_broadcaster broadcaster;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800989
Craig Tiller7156fca2015-07-15 13:54:20 -0700990 GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
991
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800992 /* lock, and gather up some stuff to do */
Vijay Pai8931cdd2015-06-17 12:42:17 -0700993 gpr_mu_lock(&server->mu_global);
Craig Tiller97fc6a32015-07-08 15:31:35 -0700994 grpc_cq_begin_op(cq);
murgatroid9900a3dab2015-08-19 11:15:38 -0700995 if (server->shutdown_published) {
996 grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL,
997 gpr_malloc(sizeof(grpc_cq_completion)));
998 gpr_mu_unlock(&server->mu_global);
999 return;
1000 }
Craig Tilleree945e82015-05-26 16:15:34 -07001001 server->shutdown_tags =
1002 gpr_realloc(server->shutdown_tags,
Craig Tiller208d2122015-05-29 08:50:08 -07001003 sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
Craig Tillerbce999f2015-05-27 09:55:51 -07001004 sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1005 sdt->tag = tag;
1006 sdt->cq = cq;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001007 if (gpr_atm_acq_load(&server->shutdown_flag)) {
Vijay Pai8931cdd2015-06-17 12:42:17 -07001008 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001009 return;
1010 }
1011
Craig Tiller080d6c52015-07-10 10:23:10 -07001012 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
Craig Tillerab54f792015-07-08 08:34:20 -07001013
Craig Tillerff3ae682015-06-29 17:44:04 -07001014 channel_broadcaster_init(server, &broadcaster);
nnoble0c475f02014-12-05 15:37:39 -08001015
Craig Tillerbd217572015-02-11 18:10:56 -08001016 /* collect all unregistered then registered calls */
Vijay Pai8931cdd2015-06-17 12:42:17 -07001017 gpr_mu_lock(&server->mu_call);
Craig Tiller1191e212015-07-30 14:49:02 -07001018 kill_pending_work_locked(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001019 gpr_mu_unlock(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001020
Craig Tiller6a006ce2015-07-13 16:25:40 -07001021 gpr_atm_rel_store(&server->shutdown_flag, 1);
Craig Tillerdc627722015-05-26 15:27:02 -07001022 maybe_finish_shutdown(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001023 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001024
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001025 /* Shutdown listeners */
1026 for (l = server->listeners; l; l = l->next) {
1027 l->destroy(server, l->arg);
1028 }
Craig Tillerff3ae682015-06-29 17:44:04 -07001029
1030 channel_broadcaster_shutdown(&broadcaster, 1, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001031}
1032
Craig Tilleraec96aa2015-04-07 14:32:15 -07001033void grpc_server_listener_destroy_done(void *s) {
1034 grpc_server *server = s;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001035 gpr_mu_lock(&server->mu_global);
Craig Tilleraec96aa2015-04-07 14:32:15 -07001036 server->listeners_destroyed++;
Craig Tilleree945e82015-05-26 16:15:34 -07001037 maybe_finish_shutdown(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001038 gpr_mu_unlock(&server->mu_global);
Craig Tilleraec96aa2015-04-07 14:32:15 -07001039}
1040
Craig Tillerafa2d632015-05-26 16:39:13 -07001041void grpc_server_cancel_all_calls(grpc_server *server) {
Craig Tiller092d8d12015-07-04 22:35:00 -07001042 channel_broadcaster broadcaster;
Craig Tillerafa2d632015-05-26 16:39:13 -07001043
Craig Tiller092d8d12015-07-04 22:35:00 -07001044 gpr_mu_lock(&server->mu_global);
1045 channel_broadcaster_init(server, &broadcaster);
1046 gpr_mu_unlock(&server->mu_global);
Craig Tillerafa2d632015-05-26 16:39:13 -07001047
Craig Tiller092d8d12015-07-04 22:35:00 -07001048 channel_broadcaster_shutdown(&broadcaster, 0, 1);
Craig Tillerafa2d632015-05-26 16:39:13 -07001049}
1050
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001051void grpc_server_destroy(grpc_server *server) {
Craig Tilleraec96aa2015-04-07 14:32:15 -07001052 listener *l;
Craig Tiller872af022015-04-24 15:57:52 -07001053
Vijay Pai8931cdd2015-06-17 12:42:17 -07001054 gpr_mu_lock(&server->mu_global);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001055 GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
Craig Tilleree945e82015-05-26 16:15:34 -07001056 GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
Craig Tilleraec96aa2015-04-07 14:32:15 -07001057
1058 while (server->listeners) {
1059 l = server->listeners;
1060 server->listeners = l->next;
1061 gpr_free(l);
1062 }
1063
Vijay Pai8931cdd2015-06-17 12:42:17 -07001064 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001065
1066 server_unref(server);
1067}
1068
1069void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -08001070 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -08001071 grpc_pollset **pollsets,
1072 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001073 void (*destroy)(grpc_server *server, void *arg)) {
1074 listener *l = gpr_malloc(sizeof(listener));
1075 l->arg = arg;
1076 l->start = start;
1077 l->destroy = destroy;
1078 l->next = server->listeners;
1079 server->listeners = l;
1080}
1081
Craig Tiller9f28ac22015-01-27 17:01:29 -08001082static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -08001083 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001084 call_data *calld = NULL;
Craig Tiller729b35a2015-07-13 12:36:47 -07001085 request_matcher *request_matcher = NULL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001086 int request_id;
1087 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1088 fail_call(server, rc);
1089 return GRPC_CALL_OK;
1090 }
1091 request_id = gpr_stack_lockfree_pop(server->request_freelist);
1092 if (request_id == -1) {
1093 /* out of request ids: just fail this one */
Craig Tiller24be0f72015-02-10 14:04:22 -08001094 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001095 return GRPC_CALL_OK;
1096 }
Craig Tiller04cc8be2015-02-10 16:11:22 -08001097 switch (rc->type) {
Craig Tiller04cc8be2015-02-10 16:11:22 -08001098 case BATCH_CALL:
Craig Tiller729b35a2015-07-13 12:36:47 -07001099 request_matcher = &server->unregistered_request_matcher;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001100 break;
1101 case REGISTERED_CALL:
Craig Tiller729b35a2015-07-13 12:36:47 -07001102 request_matcher = &rc->data.registered.registered_method->request_matcher;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001103 break;
1104 }
Craig Tiller6a006ce2015-07-13 16:25:40 -07001105 server->requested_calls[request_id] = *rc;
1106 gpr_free(rc);
1107 if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) {
1108 /* this was the first queued request: we need to lock and start
1109 matching calls */
1110 gpr_mu_lock(&server->mu_call);
1111 while ((calld = request_matcher->pending_head) != NULL) {
1112 request_id = gpr_stack_lockfree_pop(request_matcher->requests);
1113 if (request_id == -1) break;
1114 request_matcher->pending_head = calld->pending_next;
1115 gpr_mu_unlock(&server->mu_call);
1116 gpr_mu_lock(&calld->mu_state);
1117 if (calld->state == ZOMBIED) {
1118 gpr_mu_unlock(&calld->mu_state);
1119 grpc_iomgr_closure_init(
1120 &calld->kill_zombie_closure, kill_zombie,
1121 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
1122 grpc_iomgr_add_callback(&calld->kill_zombie_closure);
1123 } else {
1124 GPR_ASSERT(calld->state == PENDING);
1125 calld->state = ACTIVATED;
1126 gpr_mu_unlock(&calld->mu_state);
Craig Tillerb6450262015-07-14 07:12:19 -07001127 begin_call(server, calld, &server->requested_calls[request_id]);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001128 }
1129 gpr_mu_lock(&server->mu_call);
Craig Tiller729b35a2015-07-13 12:36:47 -07001130 }
Craig Tiller76d2c3b2015-07-07 11:46:01 -07001131 gpr_mu_unlock(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001132 }
Craig Tiller6a006ce2015-07-13 16:25:40 -07001133 return GRPC_CALL_OK;
Craig Tillercce17ac2015-01-20 09:29:28 -08001134}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001135
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001136grpc_call_error grpc_server_request_call(
1137 grpc_server *server, grpc_call **call, grpc_call_details *details,
1138 grpc_metadata_array *initial_metadata,
1139 grpc_completion_queue *cq_bound_to_call,
1140 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001141 requested_call *rc = gpr_malloc(sizeof(*rc));
murgatroid99ad7c20c2015-05-22 14:42:29 -07001142 GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
1143 initial_metadata, cq_bound_to_call,
1144 cq_for_notification, tag);
Craig Tillerb56975c2015-06-15 10:11:16 -07001145 if (!grpc_cq_is_server_cq(cq_for_notification)) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001146 gpr_free(rc);
Craig Tillerb56975c2015-06-15 10:11:16 -07001147 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1148 }
Craig Tiller97fc6a32015-07-08 15:31:35 -07001149 grpc_cq_begin_op(cq_for_notification);
Craig Tiller9928d392015-08-18 09:40:24 -07001150 details->reserved = NULL;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001151 rc->type = BATCH_CALL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001152 rc->server = server;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001153 rc->tag = tag;
1154 rc->cq_bound_to_call = cq_bound_to_call;
1155 rc->cq_for_notification = cq_for_notification;
1156 rc->call = call;
1157 rc->data.batch.details = details;
1158 rc->data.batch.initial_metadata = initial_metadata;
1159 return queue_call_request(server, rc);
Craig Tiller24be0f72015-02-10 14:04:22 -08001160}
1161
1162grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -08001163 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
1164 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001165 grpc_completion_queue *cq_bound_to_call,
1166 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001167 requested_call *rc = gpr_malloc(sizeof(*rc));
Craig Tiller20bc56d2015-02-12 09:02:56 -08001168 registered_method *registered_method = rm;
Craig Tillerb56975c2015-06-15 10:11:16 -07001169 if (!grpc_cq_is_server_cq(cq_for_notification)) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001170 gpr_free(rc);
Craig Tillerb56975c2015-06-15 10:11:16 -07001171 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1172 }
Craig Tiller97fc6a32015-07-08 15:31:35 -07001173 grpc_cq_begin_op(cq_for_notification);
1174 rc->type = REGISTERED_CALL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001175 rc->server = server;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001176 rc->tag = tag;
1177 rc->cq_bound_to_call = cq_bound_to_call;
1178 rc->cq_for_notification = cq_for_notification;
1179 rc->call = call;
1180 rc->data.registered.registered_method = registered_method;
1181 rc->data.registered.deadline = deadline;
1182 rc->data.registered.initial_metadata = initial_metadata;
1183 rc->data.registered.optional_payload = optional_payload;
1184 return queue_call_request(server, rc);
Craig Tiller24be0f72015-02-10 14:04:22 -08001185}
1186
Craig Tiller64be9f72015-05-04 14:53:51 -07001187static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller24be0f72015-02-10 14:04:22 -08001188 void *tag);
Craig Tiller64be9f72015-05-04 14:53:51 -07001189static void publish_was_not_set(grpc_call *call, int success, void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001190 abort();
1191}
Craig Tiller24be0f72015-02-10 14:04:22 -08001192
Craig Tiller166e2502015-02-03 20:14:41 -08001193static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1194 gpr_slice slice = value->slice;
1195 size_t len = GPR_SLICE_LENGTH(slice);
1196
1197 if (len + 1 > *capacity) {
1198 *capacity = GPR_MAX(len + 1, *capacity * 2);
1199 *dest = gpr_realloc(*dest, *capacity);
1200 }
1201 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1202}
1203
Craig Tiller24be0f72015-02-10 14:04:22 -08001204static void begin_call(grpc_server *server, call_data *calld,
1205 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001206 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001207 grpc_ioreq req[2];
1208 grpc_ioreq *r = req;
1209
1210 /* called once initial metadata has been read by the call, but BEFORE
1211 the ioreq to fetch it out of the call has been executed.
1212 This means metadata related fields can be relied on in calld, but to
1213 fill in the metadata array passed by the client, we need to perform
1214 an ioreq op, that should complete immediately. */
1215
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001216 grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
1217 *rc->call = calld->call;
1218 calld->cq_new = rc->cq_for_notification;
Craig Tiller24be0f72015-02-10 14:04:22 -08001219 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001220 case BATCH_CALL:
Craig Tiller04c5d4b2015-06-26 17:21:41 -07001221 GPR_ASSERT(calld->host != NULL);
1222 GPR_ASSERT(calld->path != NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001223 cpstr(&rc->data.batch.details->host,
1224 &rc->data.batch.details->host_capacity, calld->host);
1225 cpstr(&rc->data.batch.details->method,
1226 &rc->data.batch.details->method_capacity, calld->path);
Masood Malekghassemibf177c82015-04-27 12:14:38 -07001227 rc->data.batch.details->deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001228 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1229 r->data.recv_metadata = rc->data.batch.initial_metadata;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001230 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001231 r++;
1232 publish = publish_registered_or_batch;
1233 break;
1234 case REGISTERED_CALL:
1235 *rc->data.registered.deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001236 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1237 r->data.recv_metadata = rc->data.registered.initial_metadata;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001238 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001239 r++;
1240 if (rc->data.registered.optional_payload) {
1241 r->op = GRPC_IOREQ_RECV_MESSAGE;
1242 r->data.recv_message = rc->data.registered.optional_payload;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001243 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001244 r++;
1245 }
1246 publish = publish_registered_or_batch;
1247 break;
1248 }
1249
Craig Tiller4df412b2015-04-28 07:57:54 -07001250 GRPC_CALL_INTERNAL_REF(calld->call, "server");
Craig Tiller32ca48c2015-09-10 11:47:15 -07001251 grpc_call_start_ioreq_and_call_back(calld->call, req, (size_t)(r - req),
1252 publish, rc);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001253}
1254
1255static void done_request_event(void *req, grpc_cq_completion *c) {
Craig Tiller6a006ce2015-07-13 16:25:40 -07001256 requested_call *rc = req;
1257 grpc_server *server = rc->server;
1258
1259 if (rc >= server->requested_calls &&
1260 rc < server->requested_calls + server->max_requested_calls) {
Craig Tillerf96dfc32015-09-10 14:43:18 -07001261 GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001262 gpr_stack_lockfree_push(server->request_freelist,
Craig Tillerf96dfc32015-09-10 14:43:18 -07001263 (int)(rc - server->requested_calls));
Craig Tiller6a006ce2015-07-13 16:25:40 -07001264 } else {
1265 gpr_free(req);
1266 }
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001267
1268 server_unref(server);
Craig Tiller24be0f72015-02-10 14:04:22 -08001269}
1270
1271static void fail_call(grpc_server *server, requested_call *rc) {
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001272 *rc->call = NULL;
Craig Tiller24be0f72015-02-10 14:04:22 -08001273 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001274 case BATCH_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001275 rc->data.batch.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001276 break;
1277 case REGISTERED_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001278 rc->data.registered.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001279 break;
1280 }
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001281 server_ref(server);
Craig Tiller12cf5372015-07-09 13:48:11 -07001282 grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
1283 &rc->completion);
Craig Tiller24be0f72015-02-10 14:04:22 -08001284}
1285
Craig Tiller64be9f72015-05-04 14:53:51 -07001286static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller97fc6a32015-07-08 15:31:35 -07001287 void *prc) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001288 grpc_call_element *elem =
1289 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001290 requested_call *rc = prc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001291 call_data *calld = elem->call_data;
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001292 channel_data *chand = elem->channel_data;
1293 server_ref(chand->server);
Craig Tiller12cf5372015-07-09 13:48:11 -07001294 grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
1295 &rc->completion);
Craig Tillerc0122582015-07-09 13:21:36 -07001296 GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
Craig Tiller24be0f72015-02-10 14:04:22 -08001297}
1298
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001299const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1300 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001301}
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001302
1303int grpc_server_has_open_connections(grpc_server *server) {
1304 int r;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001305 gpr_mu_lock(&server->mu_global);
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001306 r = server->root_channel_data.next != &server->root_channel_data;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001307 gpr_mu_unlock(&server->mu_global);
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001308 return r;
1309}