blob: 75600027ed7e842bede5611422ad056b8c9a6009 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/surface/server.h"
35
Craig Tillerf96dfc32015-09-10 14:43:18 -070036#include <limits.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080037#include <stdlib.h>
38#include <string.h>
39
Craig Tiller6a006ce2015-07-13 16:25:40 -070040#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
42#include <grpc/support/string_util.h>
43#include <grpc/support/useful.h>
44
Hongyu Chene09dc782015-08-21 11:28:33 -070045#include "src/core/census/grpc_filter.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080046#include "src/core/channel/channel_args.h"
47#include "src/core/channel/connected_channel.h"
ctiller18b49ab2014-12-09 14:39:16 -080048#include "src/core/iomgr/iomgr.h"
Craig Tiller6a006ce2015-07-13 16:25:40 -070049#include "src/core/support/stack_lockfree.h"
Craig Tiller485d7762015-01-23 12:54:05 -080050#include "src/core/support/string.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051#include "src/core/surface/call.h"
52#include "src/core/surface/channel.h"
53#include "src/core/surface/completion_queue.h"
Craig Tiller60fd3612015-03-05 16:24:22 -080054#include "src/core/surface/init.h"
Craig Tillercce17ac2015-01-20 09:29:28 -080055#include "src/core/transport/metadata.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080056
57typedef struct listener {
58 void *arg;
Craig Tillerec3257c2015-02-12 15:59:43 -080059 void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
60 size_t pollset_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080061 void (*destroy)(grpc_server *server, void *arg);
62 struct listener *next;
63} listener;
64
65typedef struct call_data call_data;
66typedef struct channel_data channel_data;
Craig Tiller24be0f72015-02-10 14:04:22 -080067typedef struct registered_method registered_method;
68
69typedef struct {
70 call_data *next;
71 call_data *prev;
72} call_link;
73
Craig Tiller0e919562015-04-28 14:03:47 -070074typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
Craig Tiller24be0f72015-02-10 14:04:22 -080075
Craig Tiller97fc6a32015-07-08 15:31:35 -070076typedef struct requested_call {
Craig Tiller24be0f72015-02-10 14:04:22 -080077 requested_call_type type;
78 void *tag;
Craig Tiller6a006ce2015-07-13 16:25:40 -070079 grpc_server *server;
Craig Tillerf9e6adf2015-05-06 11:45:59 -070080 grpc_completion_queue *cq_bound_to_call;
81 grpc_completion_queue *cq_for_notification;
82 grpc_call **call;
Craig Tiller97fc6a32015-07-08 15:31:35 -070083 grpc_cq_completion completion;
Craig Tiller24be0f72015-02-10 14:04:22 -080084 union {
85 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080086 grpc_call_details *details;
87 grpc_metadata_array *initial_metadata;
88 } batch;
89 struct {
Craig Tiller24be0f72015-02-10 14:04:22 -080090 registered_method *registered_method;
91 gpr_timespec *deadline;
92 grpc_metadata_array *initial_metadata;
93 grpc_byte_buffer **optional_payload;
94 } registered;
95 } data;
96} requested_call;
97
Craig Tiller24be0f72015-02-10 14:04:22 -080098typedef struct channel_registered_method {
99 registered_method *server_registered_method;
100 grpc_mdstr *method;
101 grpc_mdstr *host;
102} channel_registered_method;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800103
104struct channel_data {
105 grpc_server *server;
Craig Tillere039f032015-06-25 12:54:23 -0700106 grpc_connectivity_state connectivity_state;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800107 grpc_channel *channel;
Craig Tillercce17ac2015-01-20 09:29:28 -0800108 grpc_mdstr *path_key;
109 grpc_mdstr *authority_key;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800110 /* linked list of all channels on a server */
111 channel_data *next;
112 channel_data *prev;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800113 channel_registered_method *registered_methods;
114 gpr_uint32 registered_method_slots;
115 gpr_uint32 registered_method_max_probes;
Craig Tiller33825112015-09-18 07:44:19 -0700116 grpc_closure finish_destroy_channel_closure;
117 grpc_closure channel_connectivity_changed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800118};
119
Craig Tillerbce999f2015-05-27 09:55:51 -0700120typedef struct shutdown_tag {
121 void *tag;
122 grpc_completion_queue *cq;
Craig Tiller97fc6a32015-07-08 15:31:35 -0700123 grpc_cq_completion completion;
Craig Tillerbce999f2015-05-27 09:55:51 -0700124} shutdown_tag;
125
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800126typedef enum {
127 /* waiting for metadata */
128 NOT_STARTED,
129 /* inital metadata read, not flow controlled in yet */
130 PENDING,
131 /* flow controlled in, on completion queue */
132 ACTIVATED,
133 /* cancelled before being queued */
134 ZOMBIED
135} call_state;
136
Craig Tiller729b35a2015-07-13 12:36:47 -0700137typedef struct request_matcher request_matcher;
138
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800139struct call_data {
140 grpc_call *call;
141
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700142 /** protects state */
143 gpr_mu mu_state;
144 /** the current state of a call - see call_state */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800145 call_state state;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700146
Craig Tillercce17ac2015-01-20 09:29:28 -0800147 grpc_mdstr *path;
148 grpc_mdstr *host;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700149 gpr_timespec deadline;
150 int got_initial_metadata;
Craig Tillercce17ac2015-01-20 09:29:28 -0800151
Craig Tiller20bc56d2015-02-12 09:02:56 -0800152 grpc_completion_queue *cq_new;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800153
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700154 grpc_stream_op_buffer *recv_ops;
155 grpc_stream_state *recv_state;
Craig Tiller33825112015-09-18 07:44:19 -0700156 grpc_closure *on_done_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700157
Craig Tiller33825112015-09-18 07:44:19 -0700158 grpc_closure server_on_recv;
159 grpc_closure kill_zombie_closure;
David Garcia Quintas284488b2015-05-28 16:27:39 -0700160
Craig Tiller729b35a2015-07-13 12:36:47 -0700161 call_data *pending_next;
162};
163
164struct request_matcher {
165 call_data *pending_head;
166 call_data *pending_tail;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700167 gpr_stack_lockfree *requests;
Craig Tiller729b35a2015-07-13 12:36:47 -0700168};
169
170struct registered_method {
171 char *method;
172 char *host;
173 request_matcher request_matcher;
174 registered_method *next;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800175};
176
Craig Tillerff3ae682015-06-29 17:44:04 -0700177typedef struct {
178 grpc_channel **channels;
179 size_t num_channels;
180} channel_broadcaster;
181
Craig Tiller729b35a2015-07-13 12:36:47 -0700182struct grpc_server {
183 size_t channel_filter_count;
184 const grpc_channel_filter **channel_filters;
185 grpc_channel_args *channel_args;
186
187 grpc_completion_queue **cqs;
188 grpc_pollset **pollsets;
189 size_t cq_count;
190
191 /* The two following mutexes control access to server-state
192 mu_global controls access to non-call-related state (e.g., channel state)
193 mu_call controls access to call-related state (e.g., the call lists)
194
195 If they are ever required to be nested, you must lock mu_global
196 before mu_call. This is currently used in shutdown processing
197 (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
198 gpr_mu mu_global; /* mutex for server and channel state */
199 gpr_mu mu_call; /* mutex for call-specific state */
200
201 registered_method *registered_methods;
202 request_matcher unregistered_request_matcher;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700203 /** free list of available requested_calls indices */
204 gpr_stack_lockfree *request_freelist;
205 /** requested call backing data */
206 requested_call *requested_calls;
Craig Tiller32ca48c2015-09-10 11:47:15 -0700207 size_t max_requested_calls;
Craig Tiller729b35a2015-07-13 12:36:47 -0700208
Craig Tiller6a006ce2015-07-13 16:25:40 -0700209 gpr_atm shutdown_flag;
Craig Tiller729b35a2015-07-13 12:36:47 -0700210 gpr_uint8 shutdown_published;
211 size_t num_shutdown_tags;
212 shutdown_tag *shutdown_tags;
213
214 channel_data root_channel_data;
215
216 listener *listeners;
217 int listeners_destroyed;
218 gpr_refcount internal_refcount;
219
220 /** when did we print the last shutdown progress message */
221 gpr_timespec last_shutdown_message_time;
Craig Tiller47a708e2015-09-15 16:16:06 -0700222
223 grpc_workqueue *workqueue;
Craig Tiller729b35a2015-07-13 12:36:47 -0700224};
225
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800226#define SERVER_FROM_CALL_ELEM(elem) \
227 (((channel_data *)(elem)->channel_data)->server)
228
Craig Tiller24be0f72015-02-10 14:04:22 -0800229static void begin_call(grpc_server *server, call_data *calld,
230 requested_call *rc);
231static void fail_call(grpc_server *server, requested_call *rc);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700232/* Before calling maybe_finish_shutdown, we must hold mu_global and not
233 hold mu_call */
Craig Tiller52760dd2015-05-29 23:09:26 -0700234static void maybe_finish_shutdown(grpc_server *server);
Craig Tiller24be0f72015-02-10 14:04:22 -0800235
Craig Tiller729b35a2015-07-13 12:36:47 -0700236/*
237 * channel broadcaster
238 */
Craig Tillerff3ae682015-06-29 17:44:04 -0700239
240/* assumes server locked */
241static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
242 channel_data *c;
243 size_t count = 0;
Craig Tiller079a11b2015-06-30 10:07:15 -0700244 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
245 count++;
Craig Tillerff3ae682015-06-29 17:44:04 -0700246 }
247 cb->num_channels = count;
248 cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
249 count = 0;
Craig Tiller079a11b2015-06-30 10:07:15 -0700250 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
Craig Tiller49924e02015-06-29 22:42:33 -0700251 cb->channels[count++] = c->channel;
Craig Tillerff3ae682015-06-29 17:44:04 -0700252 GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
Craig Tillerff3ae682015-06-29 17:44:04 -0700253 }
254}
255
256struct shutdown_cleanup_args {
Craig Tiller33825112015-09-18 07:44:19 -0700257 grpc_closure closure;
Craig Tillerff3ae682015-06-29 17:44:04 -0700258 gpr_slice slice;
259};
260
261static void shutdown_cleanup(void *arg, int iomgr_status_ignored) {
262 struct shutdown_cleanup_args *a = arg;
263 gpr_slice_unref(a->slice);
264 gpr_free(a);
265}
266
Craig Tiller079a11b2015-06-30 10:07:15 -0700267static void send_shutdown(grpc_channel *channel, int send_goaway,
268 int send_disconnect) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700269 grpc_transport_op op;
270 struct shutdown_cleanup_args *sc;
271 grpc_channel_element *elem;
272
273 memset(&op, 0, sizeof(op));
Craig Tillerff3ae682015-06-29 17:44:04 -0700274 op.send_goaway = send_goaway;
275 sc = gpr_malloc(sizeof(*sc));
276 sc->slice = gpr_slice_from_copied_string("Server shutdown");
277 op.goaway_message = &sc->slice;
278 op.goaway_status = GRPC_STATUS_OK;
279 op.disconnect = send_disconnect;
Craig Tiller33825112015-09-18 07:44:19 -0700280 grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
Craig Tillerff3ae682015-06-29 17:44:04 -0700281 op.on_consumed = &sc->closure;
282
Craig Tiller079a11b2015-06-30 10:07:15 -0700283 elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
Craig Tillerff3ae682015-06-29 17:44:04 -0700284 elem->filter->start_transport_op(elem, &op);
285}
286
Craig Tiller079a11b2015-06-30 10:07:15 -0700287static void channel_broadcaster_shutdown(channel_broadcaster *cb,
Craig Tiller12cf5372015-07-09 13:48:11 -0700288 int send_goaway,
289 int force_disconnect) {
Craig Tillerff3ae682015-06-29 17:44:04 -0700290 size_t i;
291
292 for (i = 0; i < cb->num_channels; i++) {
Craig Tiller9188d7a2015-07-05 12:44:37 -0700293 send_shutdown(cb->channels[i], send_goaway, force_disconnect);
Craig Tillerff3ae682015-06-29 17:44:04 -0700294 GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
295 }
296 gpr_free(cb->channels);
297}
298
Craig Tiller729b35a2015-07-13 12:36:47 -0700299/*
300 * request_matcher
301 */
Craig Tillerff3ae682015-06-29 17:44:04 -0700302
Craig Tiller6a006ce2015-07-13 16:25:40 -0700303static void request_matcher_init(request_matcher *request_matcher,
Craig Tiller32ca48c2015-09-10 11:47:15 -0700304 size_t entries) {
Craig Tiller729b35a2015-07-13 12:36:47 -0700305 memset(request_matcher, 0, sizeof(*request_matcher));
Craig Tiller6a006ce2015-07-13 16:25:40 -0700306 request_matcher->requests = gpr_stack_lockfree_create(entries);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800307}
308
Craig Tiller6a006ce2015-07-13 16:25:40 -0700309static void request_matcher_destroy(request_matcher *request_matcher) {
310 GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1);
311 gpr_stack_lockfree_destroy(request_matcher->requests);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800312}
313
Craig Tiller729b35a2015-07-13 12:36:47 -0700314static void kill_zombie(void *elem, int success) {
315 grpc_call_destroy(grpc_call_from_top_element(elem));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800316}
317
Craig Tiller729b35a2015-07-13 12:36:47 -0700318static void request_matcher_zombify_all_pending_calls(
Craig Tiller47a708e2015-09-15 16:16:06 -0700319 request_matcher *request_matcher, grpc_workqueue *workqueue) {
Craig Tiller729b35a2015-07-13 12:36:47 -0700320 while (request_matcher->pending_head) {
321 call_data *calld = request_matcher->pending_head;
322 request_matcher->pending_head = calld->pending_next;
Craig Tillerb6450262015-07-14 07:12:19 -0700323 gpr_mu_lock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700324 calld->state = ZOMBIED;
Craig Tillerb6450262015-07-14 07:12:19 -0700325 gpr_mu_unlock(&calld->mu_state);
Craig Tiller33825112015-09-18 07:44:19 -0700326 grpc_closure_init(
Craig Tiller729b35a2015-07-13 12:36:47 -0700327 &calld->kill_zombie_closure, kill_zombie,
328 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
Craig Tiller47a708e2015-09-15 16:16:06 -0700329 grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800330 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800331}
332
Craig Tiller1191e212015-07-30 14:49:02 -0700333static void request_matcher_kill_requests(grpc_server *server,
334 request_matcher *rm) {
335 int request_id;
336 while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
337 fail_call(server, &server->requested_calls[request_id]);
338 }
339}
340
Craig Tiller729b35a2015-07-13 12:36:47 -0700341/*
342 * server proper
343 */
344
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800345static void server_ref(grpc_server *server) {
346 gpr_ref(&server->internal_refcount);
347}
348
Craig Tilleree945e82015-05-26 16:15:34 -0700349static void server_delete(grpc_server *server) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800350 registered_method *rm;
Craig Tiller89504612015-04-27 11:48:46 -0700351 size_t i;
Craig Tilleree945e82015-05-26 16:15:34 -0700352 grpc_channel_args_destroy(server->channel_args);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700353 gpr_mu_destroy(&server->mu_global);
354 gpr_mu_destroy(&server->mu_call);
Craig Tilleree945e82015-05-26 16:15:34 -0700355 gpr_free(server->channel_filters);
Craig Tilleree945e82015-05-26 16:15:34 -0700356 while ((rm = server->registered_methods) != NULL) {
357 server->registered_methods = rm->next;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700358 request_matcher_destroy(&rm->request_matcher);
Craig Tilleree945e82015-05-26 16:15:34 -0700359 gpr_free(rm->method);
360 gpr_free(rm->host);
Craig Tilleree945e82015-05-26 16:15:34 -0700361 gpr_free(rm);
362 }
363 for (i = 0; i < server->cq_count; i++) {
Craig Tiller463f2372015-05-28 16:16:15 -0700364 GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
Craig Tilleree945e82015-05-26 16:15:34 -0700365 }
Craig Tiller6a006ce2015-07-13 16:25:40 -0700366 request_matcher_destroy(&server->unregistered_request_matcher);
367 gpr_stack_lockfree_destroy(server->request_freelist);
Craig Tiller3cd6a512015-09-16 16:15:47 -0700368 GRPC_WORKQUEUE_UNREF(server->workqueue, "destroy");
Craig Tilleree945e82015-05-26 16:15:34 -0700369 gpr_free(server->cqs);
370 gpr_free(server->pollsets);
371 gpr_free(server->shutdown_tags);
Craig Tiller6a006ce2015-07-13 16:25:40 -0700372 gpr_free(server->requested_calls);
Craig Tilleree945e82015-05-26 16:15:34 -0700373 gpr_free(server);
374}
375
376static void server_unref(grpc_server *server) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800377 if (gpr_unref(&server->internal_refcount)) {
Craig Tilleree945e82015-05-26 16:15:34 -0700378 server_delete(server);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800379 }
380}
381
382static int is_channel_orphaned(channel_data *chand) {
383 return chand->next == chand;
384}
385
386static void orphan_channel(channel_data *chand) {
387 chand->next->prev = chand->prev;
388 chand->prev->next = chand->next;
389 chand->next = chand->prev = chand;
390}
391
ctiller58393c22015-01-07 14:03:30 -0800392static void finish_destroy_channel(void *cd, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800393 channel_data *chand = cd;
394 grpc_server *server = chand->server;
Craig Tiller3cd6a512015-09-16 16:15:47 -0700395 gpr_log(GPR_DEBUG, "finish_destroy_channel: %p", chand->channel);
Craig Tiller9ec2a522015-05-29 22:46:54 -0700396 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800397 server_unref(server);
398}
399
400static void destroy_channel(channel_data *chand) {
401 if (is_channel_orphaned(chand)) return;
402 GPR_ASSERT(chand->server != NULL);
403 orphan_channel(chand);
404 server_ref(chand->server);
Craig Tiller52760dd2015-05-29 23:09:26 -0700405 maybe_finish_shutdown(chand->server);
David Garcia Quintas284488b2015-05-28 16:27:39 -0700406 chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
407 chand->finish_destroy_channel_closure.cb_arg = chand;
Craig Tiller3cd6a512015-09-16 16:15:47 -0700408 gpr_log(GPR_DEBUG, "queue finish_destroy_channel: %p on %p", chand->channel,
409 chand->server->workqueue);
Craig Tiller47a708e2015-09-15 16:16:06 -0700410 grpc_workqueue_push(chand->server->workqueue,
411 &chand->finish_destroy_channel_closure, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800412}
413
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700414static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
Craig Tiller729b35a2015-07-13 12:36:47 -0700415 request_matcher *request_matcher) {
Craig Tiller04cc8be2015-02-10 16:11:22 -0800416 call_data *calld = elem->call_data;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700417 int request_id;
418
Craig Tiller9835a132015-07-22 17:37:39 -0700419 if (gpr_atm_acq_load(&server->shutdown_flag)) {
420 gpr_mu_lock(&calld->mu_state);
421 calld->state = ZOMBIED;
422 gpr_mu_unlock(&calld->mu_state);
Craig Tiller33825112015-09-18 07:44:19 -0700423 grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
Craig Tiller47a708e2015-09-15 16:16:06 -0700424 grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
Craig Tiller9835a132015-07-22 17:37:39 -0700425 return;
426 }
427
Craig Tiller6a006ce2015-07-13 16:25:40 -0700428 request_id = gpr_stack_lockfree_pop(request_matcher->requests);
429 if (request_id == -1) {
430 gpr_mu_lock(&server->mu_call);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700431 gpr_mu_lock(&calld->mu_state);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800432 calld->state = PENDING;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700433 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700434 if (request_matcher->pending_head == NULL) {
435 request_matcher->pending_tail = request_matcher->pending_head = calld;
436 } else {
437 request_matcher->pending_tail->pending_next = calld;
438 request_matcher->pending_tail = calld;
439 }
440 calld->pending_next = NULL;
Vijay Pai8931cdd2015-06-17 12:42:17 -0700441 gpr_mu_unlock(&server->mu_call);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800442 } else {
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700443 gpr_mu_lock(&calld->mu_state);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800444 calld->state = ACTIVATED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700445 gpr_mu_unlock(&calld->mu_state);
Craig Tiller6a006ce2015-07-13 16:25:40 -0700446 begin_call(server, calld, &server->requested_calls[request_id]);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800447 }
448}
449
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800450static void start_new_rpc(grpc_call_element *elem) {
451 channel_data *chand = elem->channel_data;
452 call_data *calld = elem->call_data;
453 grpc_server *server = chand->server;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800454 gpr_uint32 i;
455 gpr_uint32 hash;
456 channel_registered_method *rm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800457
Craig Tiller04cc8be2015-02-10 16:11:22 -0800458 if (chand->registered_methods && calld->path && calld->host) {
Craig Tillera94beff2015-02-17 22:02:06 -0800459 /* TODO(ctiller): unify these two searches */
Craig Tiller04cc8be2015-02-10 16:11:22 -0800460 /* check for an exact match with host */
461 hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
Craig Tillerc6d6d902015-07-09 15:53:50 -0700462 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800463 rm = &chand->registered_methods[(hash + i) %
464 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800465 if (!rm) break;
466 if (rm->host != calld->host) continue;
467 if (rm->method != calld->path) continue;
Craig Tiller729b35a2015-07-13 12:36:47 -0700468 finish_start_new_rpc(server, elem,
469 &rm->server_registered_method->request_matcher);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800470 return;
471 }
472 /* check for a wildcard method definition (no host set) */
473 hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
Craig Tiller0ef1a922015-02-11 16:23:01 -0800474 for (i = 0; i <= chand->registered_method_max_probes; i++) {
Craig Tiller3b29b562015-02-11 12:58:46 -0800475 rm = &chand->registered_methods[(hash + i) %
476 chand->registered_method_slots];
Craig Tiller04cc8be2015-02-10 16:11:22 -0800477 if (!rm) break;
478 if (rm->host != NULL) continue;
479 if (rm->method != calld->path) continue;
Craig Tiller729b35a2015-07-13 12:36:47 -0700480 finish_start_new_rpc(server, elem,
481 &rm->server_registered_method->request_matcher);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800482 return;
483 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800484 }
Craig Tiller729b35a2015-07-13 12:36:47 -0700485 finish_start_new_rpc(server, elem, &server->unregistered_request_matcher);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800486}
487
Craig Tilleree945e82015-05-26 16:15:34 -0700488static int num_listeners(grpc_server *server) {
489 listener *l;
490 int n = 0;
491 for (l = server->listeners; l; l = l->next) {
492 n++;
493 }
494 return n;
495}
496
Craig Tiller97fc6a32015-07-08 15:31:35 -0700497static void done_shutdown_event(void *server, grpc_cq_completion *completion) {
498 server_unref(server);
499}
500
Craig Tillerab54f792015-07-08 08:34:20 -0700501static int num_channels(grpc_server *server) {
502 channel_data *chand;
503 int n = 0;
504 for (chand = server->root_channel_data.next;
505 chand != &server->root_channel_data; chand = chand->next) {
506 n++;
507 }
508 return n;
509}
510
Craig Tiller1191e212015-07-30 14:49:02 -0700511static void kill_pending_work_locked(grpc_server *server) {
512 registered_method *rm;
513 request_matcher_kill_requests(server, &server->unregistered_request_matcher);
514 request_matcher_zombify_all_pending_calls(
Craig Tiller47a708e2015-09-15 16:16:06 -0700515 &server->unregistered_request_matcher, server->workqueue);
Craig Tiller1191e212015-07-30 14:49:02 -0700516 for (rm = server->registered_methods; rm; rm = rm->next) {
517 request_matcher_kill_requests(server, &rm->request_matcher);
Craig Tiller47a708e2015-09-15 16:16:06 -0700518 request_matcher_zombify_all_pending_calls(&rm->request_matcher,
519 server->workqueue);
Craig Tiller1191e212015-07-30 14:49:02 -0700520 }
521}
522
Craig Tillerdc627722015-05-26 15:27:02 -0700523static void maybe_finish_shutdown(grpc_server *server) {
Craig Tillerbce999f2015-05-27 09:55:51 -0700524 size_t i;
Craig Tiller6a006ce2015-07-13 16:25:40 -0700525 if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
Craig Tillerc4a1f522015-05-29 22:32:04 -0700526 return;
527 }
Vijay Pai8931cdd2015-06-17 12:42:17 -0700528
Craig Tiller1191e212015-07-30 14:49:02 -0700529 kill_pending_work_locked(server);
530
Craig Tillerab54f792015-07-08 08:34:20 -0700531 if (server->root_channel_data.next != &server->root_channel_data ||
532 server->listeners_destroyed < num_listeners(server)) {
Craig Tiller58bbc862015-07-13 09:51:17 -0700533 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
534 server->last_shutdown_message_time),
535 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
Craig Tiller080d6c52015-07-10 10:23:10 -0700536 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
Craig Tillerab54f792015-07-08 08:34:20 -0700537 gpr_log(GPR_DEBUG,
538 "Waiting for %d channels and %d/%d listeners to be destroyed"
539 " before shutting down server",
540 num_channels(server),
541 num_listeners(server) - server->listeners_destroyed,
542 num_listeners(server));
543 }
Craig Tillerc4a1f522015-05-29 22:32:04 -0700544 return;
545 }
546 server->shutdown_published = 1;
547 for (i = 0; i < server->num_shutdown_tags; i++) {
Craig Tiller97fc6a32015-07-08 15:31:35 -0700548 server_ref(server);
Craig Tiller12cf5372015-07-09 13:48:11 -0700549 grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, 1,
550 done_shutdown_event, server,
551 &server->shutdown_tags[i].completion);
Craig Tillerdc627722015-05-26 15:27:02 -0700552 }
553}
554
Craig Tiller6902ad22015-04-16 08:01:49 -0700555static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
556 grpc_call_element *elem = user_data;
Craig Tillercce17ac2015-01-20 09:29:28 -0800557 channel_data *chand = elem->channel_data;
558 call_data *calld = elem->call_data;
Craig Tiller6902ad22015-04-16 08:01:49 -0700559 if (md->key == chand->path_key) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700560 calld->path = GRPC_MDSTR_REF(md->value);
Craig Tiller6902ad22015-04-16 08:01:49 -0700561 return NULL;
562 } else if (md->key == chand->authority_key) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700563 calld->host = GRPC_MDSTR_REF(md->value);
Craig Tiller6902ad22015-04-16 08:01:49 -0700564 return NULL;
565 }
566 return md;
567}
568
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700569static void server_on_recv(void *ptr, int success) {
570 grpc_call_element *elem = ptr;
Craig Tiller6902ad22015-04-16 08:01:49 -0700571 call_data *calld = elem->call_data;
Craig Tiller47a708e2015-09-15 16:16:06 -0700572 channel_data *chand = elem->channel_data;
Craig Tiller94329d02015-07-23 09:52:11 -0700573 gpr_timespec op_deadline;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700574
575 if (success && !calld->got_initial_metadata) {
576 size_t i;
577 size_t nops = calld->recv_ops->nops;
578 grpc_stream_op *ops = calld->recv_ops->ops;
579 for (i = 0; i < nops; i++) {
580 grpc_stream_op *op = &ops[i];
581 if (op->type != GRPC_OP_METADATA) continue;
Craig Tiller205aee12015-04-16 14:46:41 -0700582 grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
Craig Tiller94329d02015-07-23 09:52:11 -0700583 op_deadline = op->data.metadata.deadline;
584 if (0 !=
585 gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
Craig Tiller6902ad22015-04-16 08:01:49 -0700586 calld->deadline = op->data.metadata.deadline;
Craig Tillercce17ac2015-01-20 09:29:28 -0800587 }
Craig Tillerc4b56b62015-07-23 17:44:11 -0700588 if (calld->host && calld->path) {
589 calld->got_initial_metadata = 1;
590 start_new_rpc(elem);
591 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800592 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700593 }
594 }
595
596 switch (*calld->recv_state) {
Craig Tiller06aeea72015-04-23 10:54:45 -0700597 case GRPC_STREAM_OPEN:
598 break;
599 case GRPC_STREAM_SEND_CLOSED:
600 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700601 case GRPC_STREAM_RECV_CLOSED:
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700602 gpr_mu_lock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700603 if (calld->state == NOT_STARTED) {
604 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700605 gpr_mu_unlock(&calld->mu_state);
Craig Tiller33825112015-09-18 07:44:19 -0700606 grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
Craig Tiller47a708e2015-09-15 16:16:06 -0700607 grpc_workqueue_push(chand->server->workqueue,
608 &calld->kill_zombie_closure, 1);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700609 } else {
610 gpr_mu_unlock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700611 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800612 break;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700613 case GRPC_STREAM_CLOSED:
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700614 gpr_mu_lock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700615 if (calld->state == NOT_STARTED) {
616 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700617 gpr_mu_unlock(&calld->mu_state);
Craig Tiller33825112015-09-18 07:44:19 -0700618 grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
Craig Tiller47a708e2015-09-15 16:16:06 -0700619 grpc_workqueue_push(chand->server->workqueue,
620 &calld->kill_zombie_closure, 1);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700621 } else if (calld->state == PENDING) {
Craig Tillerc9d03822015-05-20 16:08:45 -0700622 calld->state = ZOMBIED;
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700623 gpr_mu_unlock(&calld->mu_state);
Craig Tiller729b35a2015-07-13 12:36:47 -0700624 /* zombied call will be destroyed when it's removed from the pending
625 queue... later */
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700626 } else {
627 gpr_mu_unlock(&calld->mu_state);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700628 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800629 break;
630 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700631
Craig Tiller1e6facb2015-06-11 22:47:11 -0700632 calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700633}
634
Craig Tillerb7959a02015-06-25 08:50:54 -0700635static void server_mutate_op(grpc_call_element *elem,
636 grpc_transport_stream_op *op) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700637 call_data *calld = elem->call_data;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700638
639 if (op->recv_ops) {
640 /* substitute our callback for the higher callback */
641 calld->recv_ops = op->recv_ops;
642 calld->recv_state = op->recv_state;
643 calld->on_done_recv = op->on_done_recv;
Craig Tiller1e6facb2015-06-11 22:47:11 -0700644 op->on_done_recv = &calld->server_on_recv;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700645 }
Craig Tiller50d9db52015-04-23 10:52:14 -0700646}
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700647
Craig Tillere039f032015-06-25 12:54:23 -0700648static void server_start_transport_stream_op(grpc_call_element *elem,
649 grpc_transport_stream_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -0700650 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
651 server_mutate_op(elem, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700652 grpc_call_next_op(elem, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800653}
654
Craig Tillere039f032015-06-25 12:54:23 -0700655static void accept_stream(void *cd, grpc_transport *transport,
656 const void *transport_server_data) {
657 channel_data *chand = cd;
658 /* create a call */
Craig Tiller3e7c6a72015-07-31 16:17:04 -0700659 grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL,
660 0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
Craig Tillere039f032015-06-25 12:54:23 -0700661}
662
663static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
664 channel_data *chand = cd;
665 grpc_server *server = chand->server;
666 if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
667 grpc_transport_op op;
668 memset(&op, 0, sizeof(op));
669 op.on_connectivity_state_change = &chand->channel_connectivity_changed,
670 op.connectivity_state = &chand->connectivity_state;
671 grpc_channel_next_op(grpc_channel_stack_element(
672 grpc_channel_get_channel_stack(chand->channel), 0),
673 &op);
674 } else {
675 gpr_mu_lock(&server->mu_global);
676 destroy_channel(chand);
677 gpr_mu_unlock(&server->mu_global);
678 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
679 }
680}
681
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800682static void init_call_elem(grpc_call_element *elem,
Craig Tiller06aeea72015-04-23 10:54:45 -0700683 const void *server_transport_data,
Craig Tillerb7959a02015-06-25 08:50:54 -0700684 grpc_transport_stream_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800685 call_data *calld = elem->call_data;
686 channel_data *chand = elem->channel_data;
687 memset(calld, 0, sizeof(call_data));
Craig Tiller143e7bf2015-07-13 08:41:49 -0700688 calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800689 calld->call = grpc_call_from_top_element(elem);
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700690 gpr_mu_init(&calld->mu_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800691
Craig Tiller33825112015-09-18 07:44:19 -0700692 grpc_closure_init(&calld->server_on_recv, server_on_recv, elem);
Craig Tiller1e6facb2015-06-11 22:47:11 -0700693
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800694 server_ref(chand->server);
Craig Tiller50d9db52015-04-23 10:52:14 -0700695
Craig Tiller482ef8b2015-04-23 11:38:20 -0700696 if (initial_op) server_mutate_op(elem, initial_op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800697}
698
699static void destroy_call_elem(grpc_call_element *elem) {
700 channel_data *chand = elem->channel_data;
Craig Tillerdb7db992015-01-29 11:19:01 -0800701 call_data *calld = elem->call_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800702
Craig Tiller729b35a2015-07-13 12:36:47 -0700703 GPR_ASSERT(calld->state != PENDING);
Craig Tiller092d8d12015-07-04 22:35:00 -0700704
Craig Tiller4df31a62015-01-30 09:44:31 -0800705 if (calld->host) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700706 GRPC_MDSTR_UNREF(calld->host);
Craig Tiller4df31a62015-01-30 09:44:31 -0800707 }
708 if (calld->path) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700709 GRPC_MDSTR_UNREF(calld->path);
Craig Tiller4df31a62015-01-30 09:44:31 -0800710 }
711
Craig Tiller76d2c3b2015-07-07 11:46:01 -0700712 gpr_mu_destroy(&calld->mu_state);
713
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800714 server_unref(chand->server);
715}
716
Craig Tiller079a11b2015-06-30 10:07:15 -0700717static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800718 const grpc_channel_args *args,
719 grpc_mdctx *metadata_context, int is_first,
720 int is_last) {
721 channel_data *chand = elem->channel_data;
722 GPR_ASSERT(is_first);
723 GPR_ASSERT(!is_last);
724 chand->server = NULL;
725 chand->channel = NULL;
Craig Tiller6999c092015-07-22 08:14:12 -0700726 chand->path_key = grpc_mdstr_from_string(metadata_context, ":path", 0);
Craig Tillerd6c98df2015-08-18 09:33:44 -0700727 chand->authority_key =
728 grpc_mdstr_from_string(metadata_context, ":authority", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800729 chand->next = chand->prev = chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800730 chand->registered_methods = NULL;
Craig Tillere039f032015-06-25 12:54:23 -0700731 chand->connectivity_state = GRPC_CHANNEL_IDLE;
Craig Tiller33825112015-09-18 07:44:19 -0700732 grpc_closure_init(&chand->channel_connectivity_changed,
733 channel_connectivity_changed, chand);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800734}
735
736static void destroy_channel_elem(grpc_channel_element *elem) {
Craig Tillerec3257c2015-02-12 15:59:43 -0800737 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800738 channel_data *chand = elem->channel_data;
Craig Tillerec3257c2015-02-12 15:59:43 -0800739 if (chand->registered_methods) {
740 for (i = 0; i < chand->registered_method_slots; i++) {
741 if (chand->registered_methods[i].method) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700742 GRPC_MDSTR_UNREF(chand->registered_methods[i].method);
Craig Tillerec3257c2015-02-12 15:59:43 -0800743 }
744 if (chand->registered_methods[i].host) {
Craig Tiller1a65a232015-07-06 10:22:32 -0700745 GRPC_MDSTR_UNREF(chand->registered_methods[i].host);
Craig Tillerec3257c2015-02-12 15:59:43 -0800746 }
747 }
748 gpr_free(chand->registered_methods);
749 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800750 if (chand->server) {
Vijay Pai8931cdd2015-06-17 12:42:17 -0700751 gpr_mu_lock(&chand->server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800752 chand->next->prev = chand->prev;
753 chand->prev->next = chand->next;
754 chand->next = chand->prev = chand;
Craig Tillerb76d05b2015-05-29 17:21:56 -0700755 maybe_finish_shutdown(chand->server);
Vijay Pai8931cdd2015-06-17 12:42:17 -0700756 gpr_mu_unlock(&chand->server->mu_global);
Craig Tiller1a65a232015-07-06 10:22:32 -0700757 GRPC_MDSTR_UNREF(chand->path_key);
758 GRPC_MDSTR_UNREF(chand->authority_key);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800759 server_unref(chand->server);
760 }
761}
762
763static const grpc_channel_filter server_surface_filter = {
Craig Tillere039f032015-06-25 12:54:23 -0700764 server_start_transport_stream_op,
765 grpc_channel_next_op,
Craig Tillerb76d05b2015-05-29 17:21:56 -0700766 sizeof(call_data),
767 init_call_elem,
768 destroy_call_elem,
769 sizeof(channel_data),
770 init_channel_elem,
771 destroy_channel_elem,
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700772 grpc_call_next_get_peer,
Craig Tillerb76d05b2015-05-29 17:21:56 -0700773 "server",
Craig Tiller9f28ac22015-01-27 17:01:29 -0800774};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800775
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700776void grpc_server_register_completion_queue(grpc_server *server,
Nicolas "Pixel" Nobleebb51402015-07-23 02:41:33 +0200777 grpc_completion_queue *cq,
778 void *reserved) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800779 size_t i, n;
Nicolas "Pixel" Noble45992882015-07-29 23:52:14 +0200780 GPR_ASSERT(!reserved);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800781 for (i = 0; i < server->cq_count; i++) {
782 if (server->cqs[i] == cq) return;
783 }
Craig Tiller463f2372015-05-28 16:16:15 -0700784 GRPC_CQ_INTERNAL_REF(cq, "server");
Craig Tillerb56975c2015-06-15 10:11:16 -0700785 grpc_cq_mark_server_cq(cq);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800786 n = server->cq_count++;
Craig Tillerec3257c2015-02-12 15:59:43 -0800787 server->cqs = gpr_realloc(server->cqs,
788 server->cq_count * sizeof(grpc_completion_queue *));
Craig Tiller20bc56d2015-02-12 09:02:56 -0800789 server->cqs[n] = cq;
790}
791
David Garcia Quintase25e9282015-06-23 10:18:52 -0700792grpc_server *grpc_server_create_from_filters(
793 const grpc_channel_filter **filters, size_t filter_count,
794 const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800795 size_t i;
Alistair Veitch9d48ebf2015-06-01 10:01:03 -0700796 /* TODO(census): restore this once we finalize census filter etc.
797 int census_enabled = grpc_channel_args_is_census_enabled(args); */
798 int census_enabled = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800799
800 grpc_server *server = gpr_malloc(sizeof(grpc_server));
Craig Tiller60fd3612015-03-05 16:24:22 -0800801
802 GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
803
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800804 memset(server, 0, sizeof(grpc_server));
805
Vijay Pai8931cdd2015-06-17 12:42:17 -0700806 gpr_mu_init(&server->mu_global);
807 gpr_mu_init(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800808
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800809 /* decremented by grpc_server_destroy */
810 gpr_ref_init(&server->internal_refcount, 1);
811 server->root_channel_data.next = server->root_channel_data.prev =
812 &server->root_channel_data;
Craig Tiller47a708e2015-09-15 16:16:06 -0700813 server->workqueue = grpc_workqueue_create();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800814
Craig Tiller6a006ce2015-07-13 16:25:40 -0700815 /* TODO(ctiller): expose a channel_arg for this */
816 server->max_requested_calls = 32768;
817 server->request_freelist =
818 gpr_stack_lockfree_create(server->max_requested_calls);
819 for (i = 0; i < (size_t)server->max_requested_calls; i++) {
Craig Tillerf96dfc32015-09-10 14:43:18 -0700820 gpr_stack_lockfree_push(server->request_freelist, (int)i);
Craig Tiller6a006ce2015-07-13 16:25:40 -0700821 }
822 request_matcher_init(&server->unregistered_request_matcher,
823 server->max_requested_calls);
824 server->requested_calls = gpr_malloc(server->max_requested_calls *
825 sizeof(*server->requested_calls));
Craig Tiller729b35a2015-07-13 12:36:47 -0700826
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800827 /* Server filter stack is:
828
829 server_surface_filter - for making surface API calls
830 grpc_server_census_filter (optional) - for stats collection and tracing
831 {passed in filter stack}
832 grpc_connected_channel_filter - for interfacing with transports */
Craig Tiller32ca48c2015-09-10 11:47:15 -0700833 server->channel_filter_count = filter_count + 1u + (census_enabled ? 1u : 0u);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800834 server->channel_filters =
835 gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
836 server->channel_filters[0] = &server_surface_filter;
837 if (census_enabled) {
838 server->channel_filters[1] = &grpc_server_census_filter;
Hongyu Chenf68e4722015-08-07 18:06:42 -0700839 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800840 for (i = 0; i < filter_count; i++) {
Craig Tiller32ca48c2015-09-10 11:47:15 -0700841 server->channel_filters[i + 1u + (census_enabled ? 1u : 0u)] = filters[i];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800842 }
843
844 server->channel_args = grpc_channel_args_copy(args);
845
846 return server;
847}
848
Craig Tiller24be0f72015-02-10 14:04:22 -0800849static int streq(const char *a, const char *b) {
850 if (a == NULL && b == NULL) return 1;
851 if (a == NULL) return 0;
852 if (b == NULL) return 0;
853 return 0 == strcmp(a, b);
854}
855
856void *grpc_server_register_method(grpc_server *server, const char *method,
Craig Tillerf9e6adf2015-05-06 11:45:59 -0700857 const char *host) {
Craig Tiller24be0f72015-02-10 14:04:22 -0800858 registered_method *m;
859 if (!method) {
Craig Tillerb76d05b2015-05-29 17:21:56 -0700860 gpr_log(GPR_ERROR,
861 "grpc_server_register_method method string cannot be NULL");
Craig Tiller24be0f72015-02-10 14:04:22 -0800862 return NULL;
863 }
864 for (m = server->registered_methods; m; m = m->next) {
865 if (streq(m->method, method) && streq(m->host, host)) {
866 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
867 host ? host : "*");
868 return NULL;
869 }
870 }
871 m = gpr_malloc(sizeof(registered_method));
872 memset(m, 0, sizeof(*m));
Craig Tiller6a006ce2015-07-13 16:25:40 -0700873 request_matcher_init(&m->request_matcher, server->max_requested_calls);
Craig Tiller24be0f72015-02-10 14:04:22 -0800874 m->method = gpr_strdup(method);
875 m->host = gpr_strdup(host);
876 m->next = server->registered_methods;
877 server->registered_methods = m;
878 return m;
879}
880
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800881void grpc_server_start(grpc_server *server) {
882 listener *l;
Craig Tiller20bc56d2015-02-12 09:02:56 -0800883 size_t i;
884
Craig Tillerec3257c2015-02-12 15:59:43 -0800885 server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800886 for (i = 0; i < server->cq_count; i++) {
887 server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
Craig Tiller47a708e2015-09-15 16:16:06 -0700888 grpc_workqueue_add_to_pollset(server->workqueue, server->pollsets[i]);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800889 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800890
891 for (l = server->listeners; l; l = l->next) {
Craig Tiller20bc56d2015-02-12 09:02:56 -0800892 l->start(server, l->arg, server->pollsets, server->cq_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800893 }
894}
895
Craig Tiller1064f8b2015-06-25 13:52:57 -0700896void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
897 grpc_channel_filter const **extra_filters,
898 size_t num_extra_filters, grpc_mdctx *mdctx,
Craig Tiller47a708e2015-09-15 16:16:06 -0700899 grpc_workqueue *workqueue,
Craig Tiller1064f8b2015-06-25 13:52:57 -0700900 const grpc_channel_args *args) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800901 size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
902 grpc_channel_filter const **filters =
903 gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
904 size_t i;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800905 size_t num_registered_methods;
906 size_t alloc;
907 registered_method *rm;
908 channel_registered_method *crm;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800909 grpc_channel *channel;
910 channel_data *chand;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800911 grpc_mdstr *host;
912 grpc_mdstr *method;
913 gpr_uint32 hash;
Craig Tillerf96dfc32015-09-10 14:43:18 -0700914 size_t slots;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800915 gpr_uint32 probes;
916 gpr_uint32 max_probes = 0;
Craig Tillere039f032015-06-25 12:54:23 -0700917 grpc_transport_op op;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800918
919 for (i = 0; i < s->channel_filter_count; i++) {
920 filters[i] = s->channel_filters[i];
921 }
922 for (; i < s->channel_filter_count + num_extra_filters; i++) {
923 filters[i] = extra_filters[i - s->channel_filter_count];
924 }
925 filters[i] = &grpc_connected_channel_filter;
926
Craig Tiller20bc56d2015-02-12 09:02:56 -0800927 for (i = 0; i < s->cq_count; i++) {
Craig Tillere039f032015-06-25 12:54:23 -0700928 memset(&op, 0, sizeof(op));
929 op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
930 grpc_transport_perform_op(transport, &op);
Craig Tiller20bc56d2015-02-12 09:02:56 -0800931 }
ctillerd79b4862014-12-17 16:36:59 -0800932
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700933 channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
Craig Tiller47a708e2015-09-15 16:16:06 -0700934 mdctx, workqueue, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800935 chand = (channel_data *)grpc_channel_stack_element(
Craig Tillerc02c1d82015-04-07 16:21:55 -0700936 grpc_channel_get_channel_stack(channel), 0)
937 ->channel_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800938 chand->server = s;
939 server_ref(s);
940 chand->channel = channel;
941
Craig Tiller04cc8be2015-02-10 16:11:22 -0800942 num_registered_methods = 0;
943 for (rm = s->registered_methods; rm; rm = rm->next) {
944 num_registered_methods++;
945 }
946 /* build a lookup table phrased in terms of mdstr's in this channels context
947 to quickly find registered methods */
948 if (num_registered_methods > 0) {
949 slots = 2 * num_registered_methods;
950 alloc = sizeof(channel_registered_method) * slots;
951 chand->registered_methods = gpr_malloc(alloc);
952 memset(chand->registered_methods, 0, alloc);
953 for (rm = s->registered_methods; rm; rm = rm->next) {
Craig Tiller6999c092015-07-22 08:14:12 -0700954 host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host, 0) : NULL;
955 method = grpc_mdstr_from_string(mdctx, rm->method, 0);
Craig Tiller04cc8be2015-02-10 16:11:22 -0800956 hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
Craig Tiller3b29b562015-02-11 12:58:46 -0800957 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
Craig Tillerc02c1d82015-04-07 16:21:55 -0700958 .server_registered_method != NULL;
Craig Tiller3b29b562015-02-11 12:58:46 -0800959 probes++)
960 ;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800961 if (probes > max_probes) max_probes = probes;
962 crm = &chand->registered_methods[(hash + probes) % slots];
963 crm->server_registered_method = rm;
964 crm->host = host;
965 crm->method = method;
966 }
Craig Tillerf96dfc32015-09-10 14:43:18 -0700967 GPR_ASSERT(slots <= GPR_UINT32_MAX);
968 chand->registered_method_slots = (gpr_uint32)slots;
Craig Tiller04cc8be2015-02-10 16:11:22 -0800969 chand->registered_method_max_probes = max_probes;
970 }
971
Craig Tiller1064f8b2015-06-25 13:52:57 -0700972 grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
973 transport);
Craig Tiller7bd5ab12015-02-17 22:29:04 -0800974
Vijay Pai8931cdd2015-06-17 12:42:17 -0700975 gpr_mu_lock(&s->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800976 chand->next = &s->root_channel_data;
977 chand->prev = chand->next->prev;
978 chand->next->prev = chand->prev->next = chand;
Vijay Pai8931cdd2015-06-17 12:42:17 -0700979 gpr_mu_unlock(&s->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800980
981 gpr_free(filters);
Craig Tiller4b804102015-06-26 16:16:12 -0700982
983 GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
984 memset(&op, 0, sizeof(op));
985 op.set_accept_stream = accept_stream;
986 op.set_accept_stream_user_data = chand;
987 op.on_connectivity_state_change = &chand->channel_connectivity_changed;
988 op.connectivity_state = &chand->connectivity_state;
Craig Tillerf96dfc32015-09-10 14:43:18 -0700989 op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0;
Craig Tiller4b804102015-06-26 16:16:12 -0700990 grpc_transport_perform_op(transport, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800991}
992
murgatroid9900a3dab2015-08-19 11:15:38 -0700993void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) {
994 (void) done_arg;
995 gpr_free(storage);
996}
997
Craig Tillerbce999f2015-05-27 09:55:51 -0700998void grpc_server_shutdown_and_notify(grpc_server *server,
999 grpc_completion_queue *cq, void *tag) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001000 listener *l;
Craig Tillerbce999f2015-05-27 09:55:51 -07001001 shutdown_tag *sdt;
Craig Tillerff3ae682015-06-29 17:44:04 -07001002 channel_broadcaster broadcaster;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001003
Craig Tiller7156fca2015-07-15 13:54:20 -07001004 GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag);
1005
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001006 /* lock, and gather up some stuff to do */
Vijay Pai8931cdd2015-06-17 12:42:17 -07001007 gpr_mu_lock(&server->mu_global);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001008 grpc_cq_begin_op(cq);
murgatroid9900a3dab2015-08-19 11:15:38 -07001009 if (server->shutdown_published) {
1010 grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL,
1011 gpr_malloc(sizeof(grpc_cq_completion)));
1012 gpr_mu_unlock(&server->mu_global);
1013 return;
1014 }
Craig Tilleree945e82015-05-26 16:15:34 -07001015 server->shutdown_tags =
1016 gpr_realloc(server->shutdown_tags,
Craig Tiller208d2122015-05-29 08:50:08 -07001017 sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
Craig Tillerbce999f2015-05-27 09:55:51 -07001018 sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1019 sdt->tag = tag;
1020 sdt->cq = cq;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001021 if (gpr_atm_acq_load(&server->shutdown_flag)) {
Vijay Pai8931cdd2015-06-17 12:42:17 -07001022 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001023 return;
1024 }
1025
Craig Tiller080d6c52015-07-10 10:23:10 -07001026 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
Craig Tillerab54f792015-07-08 08:34:20 -07001027
Craig Tillerff3ae682015-06-29 17:44:04 -07001028 channel_broadcaster_init(server, &broadcaster);
nnoble0c475f02014-12-05 15:37:39 -08001029
Craig Tillerbd217572015-02-11 18:10:56 -08001030 /* collect all unregistered then registered calls */
Vijay Pai8931cdd2015-06-17 12:42:17 -07001031 gpr_mu_lock(&server->mu_call);
Craig Tiller1191e212015-07-30 14:49:02 -07001032 kill_pending_work_locked(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001033 gpr_mu_unlock(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001034
Craig Tiller6a006ce2015-07-13 16:25:40 -07001035 gpr_atm_rel_store(&server->shutdown_flag, 1);
Craig Tillerdc627722015-05-26 15:27:02 -07001036 maybe_finish_shutdown(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001037 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001038
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001039 /* Shutdown listeners */
1040 for (l = server->listeners; l; l = l->next) {
1041 l->destroy(server, l->arg);
1042 }
Craig Tillerff3ae682015-06-29 17:44:04 -07001043
1044 channel_broadcaster_shutdown(&broadcaster, 1, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001045}
1046
Craig Tilleraec96aa2015-04-07 14:32:15 -07001047void grpc_server_listener_destroy_done(void *s) {
1048 grpc_server *server = s;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001049 gpr_mu_lock(&server->mu_global);
Craig Tilleraec96aa2015-04-07 14:32:15 -07001050 server->listeners_destroyed++;
Craig Tilleree945e82015-05-26 16:15:34 -07001051 maybe_finish_shutdown(server);
Vijay Pai8931cdd2015-06-17 12:42:17 -07001052 gpr_mu_unlock(&server->mu_global);
Craig Tilleraec96aa2015-04-07 14:32:15 -07001053}
1054
Craig Tillerafa2d632015-05-26 16:39:13 -07001055void grpc_server_cancel_all_calls(grpc_server *server) {
Craig Tiller092d8d12015-07-04 22:35:00 -07001056 channel_broadcaster broadcaster;
Craig Tillerafa2d632015-05-26 16:39:13 -07001057
Craig Tiller092d8d12015-07-04 22:35:00 -07001058 gpr_mu_lock(&server->mu_global);
1059 channel_broadcaster_init(server, &broadcaster);
1060 gpr_mu_unlock(&server->mu_global);
Craig Tillerafa2d632015-05-26 16:39:13 -07001061
Craig Tiller092d8d12015-07-04 22:35:00 -07001062 channel_broadcaster_shutdown(&broadcaster, 0, 1);
Craig Tillerafa2d632015-05-26 16:39:13 -07001063}
1064
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001065void grpc_server_destroy(grpc_server *server) {
Craig Tilleraec96aa2015-04-07 14:32:15 -07001066 listener *l;
Craig Tiller872af022015-04-24 15:57:52 -07001067
Vijay Pai8931cdd2015-06-17 12:42:17 -07001068 gpr_mu_lock(&server->mu_global);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001069 GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
Craig Tilleree945e82015-05-26 16:15:34 -07001070 GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
Craig Tilleraec96aa2015-04-07 14:32:15 -07001071
1072 while (server->listeners) {
1073 l = server->listeners;
1074 server->listeners = l->next;
1075 gpr_free(l);
1076 }
1077
Vijay Pai8931cdd2015-06-17 12:42:17 -07001078 gpr_mu_unlock(&server->mu_global);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001079
Craig Tiller33825112015-09-18 07:44:19 -07001080 grpc_workqueue_flush(server->workqueue);
Craig Tiller3cd6a512015-09-16 16:15:47 -07001081
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001082 server_unref(server);
1083}
1084
1085void grpc_server_add_listener(grpc_server *server, void *arg,
ctiller58393c22015-01-07 14:03:30 -08001086 void (*start)(grpc_server *server, void *arg,
Craig Tillerec3257c2015-02-12 15:59:43 -08001087 grpc_pollset **pollsets,
1088 size_t pollset_count),
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001089 void (*destroy)(grpc_server *server, void *arg)) {
1090 listener *l = gpr_malloc(sizeof(listener));
1091 l->arg = arg;
1092 l->start = start;
1093 l->destroy = destroy;
1094 l->next = server->listeners;
1095 server->listeners = l;
1096}
1097
Craig Tiller9f28ac22015-01-27 17:01:29 -08001098static grpc_call_error queue_call_request(grpc_server *server,
Craig Tiller24be0f72015-02-10 14:04:22 -08001099 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001100 call_data *calld = NULL;
Craig Tiller729b35a2015-07-13 12:36:47 -07001101 request_matcher *request_matcher = NULL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001102 int request_id;
1103 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1104 fail_call(server, rc);
1105 return GRPC_CALL_OK;
1106 }
1107 request_id = gpr_stack_lockfree_pop(server->request_freelist);
1108 if (request_id == -1) {
1109 /* out of request ids: just fail this one */
Craig Tiller24be0f72015-02-10 14:04:22 -08001110 fail_call(server, rc);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001111 return GRPC_CALL_OK;
1112 }
Craig Tiller04cc8be2015-02-10 16:11:22 -08001113 switch (rc->type) {
Craig Tiller04cc8be2015-02-10 16:11:22 -08001114 case BATCH_CALL:
Craig Tiller729b35a2015-07-13 12:36:47 -07001115 request_matcher = &server->unregistered_request_matcher;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001116 break;
1117 case REGISTERED_CALL:
Craig Tiller729b35a2015-07-13 12:36:47 -07001118 request_matcher = &rc->data.registered.registered_method->request_matcher;
Craig Tiller04cc8be2015-02-10 16:11:22 -08001119 break;
1120 }
Craig Tiller6a006ce2015-07-13 16:25:40 -07001121 server->requested_calls[request_id] = *rc;
1122 gpr_free(rc);
1123 if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) {
1124 /* this was the first queued request: we need to lock and start
1125 matching calls */
1126 gpr_mu_lock(&server->mu_call);
1127 while ((calld = request_matcher->pending_head) != NULL) {
1128 request_id = gpr_stack_lockfree_pop(request_matcher->requests);
1129 if (request_id == -1) break;
1130 request_matcher->pending_head = calld->pending_next;
1131 gpr_mu_unlock(&server->mu_call);
1132 gpr_mu_lock(&calld->mu_state);
1133 if (calld->state == ZOMBIED) {
1134 gpr_mu_unlock(&calld->mu_state);
Craig Tiller33825112015-09-18 07:44:19 -07001135 grpc_closure_init(
Craig Tiller6a006ce2015-07-13 16:25:40 -07001136 &calld->kill_zombie_closure, kill_zombie,
1137 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
Craig Tiller47a708e2015-09-15 16:16:06 -07001138 grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001139 } else {
1140 GPR_ASSERT(calld->state == PENDING);
1141 calld->state = ACTIVATED;
1142 gpr_mu_unlock(&calld->mu_state);
Craig Tillerb6450262015-07-14 07:12:19 -07001143 begin_call(server, calld, &server->requested_calls[request_id]);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001144 }
1145 gpr_mu_lock(&server->mu_call);
Craig Tiller729b35a2015-07-13 12:36:47 -07001146 }
Craig Tiller76d2c3b2015-07-07 11:46:01 -07001147 gpr_mu_unlock(&server->mu_call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001148 }
Craig Tiller6a006ce2015-07-13 16:25:40 -07001149 return GRPC_CALL_OK;
Craig Tillercce17ac2015-01-20 09:29:28 -08001150}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001151
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001152grpc_call_error grpc_server_request_call(
1153 grpc_server *server, grpc_call **call, grpc_call_details *details,
1154 grpc_metadata_array *initial_metadata,
1155 grpc_completion_queue *cq_bound_to_call,
1156 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001157 requested_call *rc = gpr_malloc(sizeof(*rc));
murgatroid99ad7c20c2015-05-22 14:42:29 -07001158 GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
1159 initial_metadata, cq_bound_to_call,
1160 cq_for_notification, tag);
Craig Tillerb56975c2015-06-15 10:11:16 -07001161 if (!grpc_cq_is_server_cq(cq_for_notification)) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001162 gpr_free(rc);
Craig Tillerb56975c2015-06-15 10:11:16 -07001163 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1164 }
Craig Tiller97fc6a32015-07-08 15:31:35 -07001165 grpc_cq_begin_op(cq_for_notification);
Craig Tiller9928d392015-08-18 09:40:24 -07001166 details->reserved = NULL;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001167 rc->type = BATCH_CALL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001168 rc->server = server;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001169 rc->tag = tag;
1170 rc->cq_bound_to_call = cq_bound_to_call;
1171 rc->cq_for_notification = cq_for_notification;
1172 rc->call = call;
1173 rc->data.batch.details = details;
1174 rc->data.batch.initial_metadata = initial_metadata;
1175 return queue_call_request(server, rc);
Craig Tiller24be0f72015-02-10 14:04:22 -08001176}
1177
1178grpc_call_error grpc_server_request_registered_call(
Craig Tillerec3257c2015-02-12 15:59:43 -08001179 grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
1180 grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001181 grpc_completion_queue *cq_bound_to_call,
1182 grpc_completion_queue *cq_for_notification, void *tag) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001183 requested_call *rc = gpr_malloc(sizeof(*rc));
Craig Tiller20bc56d2015-02-12 09:02:56 -08001184 registered_method *registered_method = rm;
Craig Tillerb56975c2015-06-15 10:11:16 -07001185 if (!grpc_cq_is_server_cq(cq_for_notification)) {
Craig Tiller97fc6a32015-07-08 15:31:35 -07001186 gpr_free(rc);
Craig Tillerb56975c2015-06-15 10:11:16 -07001187 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1188 }
Craig Tiller97fc6a32015-07-08 15:31:35 -07001189 grpc_cq_begin_op(cq_for_notification);
1190 rc->type = REGISTERED_CALL;
Craig Tiller6a006ce2015-07-13 16:25:40 -07001191 rc->server = server;
Craig Tiller97fc6a32015-07-08 15:31:35 -07001192 rc->tag = tag;
1193 rc->cq_bound_to_call = cq_bound_to_call;
1194 rc->cq_for_notification = cq_for_notification;
1195 rc->call = call;
1196 rc->data.registered.registered_method = registered_method;
1197 rc->data.registered.deadline = deadline;
1198 rc->data.registered.initial_metadata = initial_metadata;
1199 rc->data.registered.optional_payload = optional_payload;
1200 return queue_call_request(server, rc);
Craig Tiller24be0f72015-02-10 14:04:22 -08001201}
1202
Craig Tiller64be9f72015-05-04 14:53:51 -07001203static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller24be0f72015-02-10 14:04:22 -08001204 void *tag);
Craig Tiller64be9f72015-05-04 14:53:51 -07001205static void publish_was_not_set(grpc_call *call, int success, void *tag) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001206 abort();
1207}
Craig Tiller24be0f72015-02-10 14:04:22 -08001208
Craig Tiller166e2502015-02-03 20:14:41 -08001209static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1210 gpr_slice slice = value->slice;
1211 size_t len = GPR_SLICE_LENGTH(slice);
1212
1213 if (len + 1 > *capacity) {
1214 *capacity = GPR_MAX(len + 1, *capacity * 2);
1215 *dest = gpr_realloc(*dest, *capacity);
1216 }
1217 memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1218}
1219
Craig Tiller24be0f72015-02-10 14:04:22 -08001220static void begin_call(grpc_server *server, call_data *calld,
1221 requested_call *rc) {
Yang Gaoeb8e7cd2015-02-11 11:43:40 -08001222 grpc_ioreq_completion_func publish = publish_was_not_set;
Craig Tiller24be0f72015-02-10 14:04:22 -08001223 grpc_ioreq req[2];
1224 grpc_ioreq *r = req;
1225
1226 /* called once initial metadata has been read by the call, but BEFORE
1227 the ioreq to fetch it out of the call has been executed.
1228 This means metadata related fields can be relied on in calld, but to
1229 fill in the metadata array passed by the client, we need to perform
1230 an ioreq op, that should complete immediately. */
1231
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001232 grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
1233 *rc->call = calld->call;
1234 calld->cq_new = rc->cq_for_notification;
Craig Tiller24be0f72015-02-10 14:04:22 -08001235 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001236 case BATCH_CALL:
Craig Tiller04c5d4b2015-06-26 17:21:41 -07001237 GPR_ASSERT(calld->host != NULL);
1238 GPR_ASSERT(calld->path != NULL);
Craig Tiller24be0f72015-02-10 14:04:22 -08001239 cpstr(&rc->data.batch.details->host,
1240 &rc->data.batch.details->host_capacity, calld->host);
1241 cpstr(&rc->data.batch.details->method,
1242 &rc->data.batch.details->method_capacity, calld->path);
Masood Malekghassemibf177c82015-04-27 12:14:38 -07001243 rc->data.batch.details->deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001244 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1245 r->data.recv_metadata = rc->data.batch.initial_metadata;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001246 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001247 r++;
1248 publish = publish_registered_or_batch;
1249 break;
1250 case REGISTERED_CALL:
1251 *rc->data.registered.deadline = calld->deadline;
Craig Tiller24be0f72015-02-10 14:04:22 -08001252 r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1253 r->data.recv_metadata = rc->data.registered.initial_metadata;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001254 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001255 r++;
1256 if (rc->data.registered.optional_payload) {
1257 r->op = GRPC_IOREQ_RECV_MESSAGE;
1258 r->data.recv_message = rc->data.registered.optional_payload;
David Garcia Quintasb8f54502015-06-15 16:19:10 -07001259 r->flags = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001260 r++;
1261 }
1262 publish = publish_registered_or_batch;
1263 break;
1264 }
1265
Craig Tiller4df412b2015-04-28 07:57:54 -07001266 GRPC_CALL_INTERNAL_REF(calld->call, "server");
Craig Tiller32ca48c2015-09-10 11:47:15 -07001267 grpc_call_start_ioreq_and_call_back(calld->call, req, (size_t)(r - req),
1268 publish, rc);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001269}
1270
1271static void done_request_event(void *req, grpc_cq_completion *c) {
Craig Tiller6a006ce2015-07-13 16:25:40 -07001272 requested_call *rc = req;
1273 grpc_server *server = rc->server;
1274
1275 if (rc >= server->requested_calls &&
1276 rc < server->requested_calls + server->max_requested_calls) {
Craig Tillerf96dfc32015-09-10 14:43:18 -07001277 GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
Craig Tiller6a006ce2015-07-13 16:25:40 -07001278 gpr_stack_lockfree_push(server->request_freelist,
Craig Tillerf96dfc32015-09-10 14:43:18 -07001279 (int)(rc - server->requested_calls));
Craig Tiller6a006ce2015-07-13 16:25:40 -07001280 } else {
1281 gpr_free(req);
1282 }
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001283
1284 server_unref(server);
Craig Tiller24be0f72015-02-10 14:04:22 -08001285}
1286
1287static void fail_call(grpc_server *server, requested_call *rc) {
Craig Tillerf9e6adf2015-05-06 11:45:59 -07001288 *rc->call = NULL;
Craig Tiller24be0f72015-02-10 14:04:22 -08001289 switch (rc->type) {
Craig Tiller24be0f72015-02-10 14:04:22 -08001290 case BATCH_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001291 rc->data.batch.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001292 break;
1293 case REGISTERED_CALL:
Craig Tiller24be0f72015-02-10 14:04:22 -08001294 rc->data.registered.initial_metadata->count = 0;
Craig Tiller24be0f72015-02-10 14:04:22 -08001295 break;
1296 }
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001297 server_ref(server);
Craig Tiller12cf5372015-07-09 13:48:11 -07001298 grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc,
1299 &rc->completion);
Craig Tiller24be0f72015-02-10 14:04:22 -08001300}
1301
Craig Tiller64be9f72015-05-04 14:53:51 -07001302static void publish_registered_or_batch(grpc_call *call, int success,
Craig Tiller97fc6a32015-07-08 15:31:35 -07001303 void *prc) {
Craig Tiller8e8fd892015-02-10 17:02:08 -08001304 grpc_call_element *elem =
1305 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
Craig Tiller97fc6a32015-07-08 15:31:35 -07001306 requested_call *rc = prc;
Craig Tiller20bc56d2015-02-12 09:02:56 -08001307 call_data *calld = elem->call_data;
Craig Tillere2b3bfa2015-07-30 10:40:22 -07001308 channel_data *chand = elem->channel_data;
1309 server_ref(chand->server);
Craig Tiller12cf5372015-07-09 13:48:11 -07001310 grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc,
1311 &rc->completion);
Craig Tillerc0122582015-07-09 13:21:36 -07001312 GRPC_CALL_INTERNAL_UNREF(call, "server", 0);
Craig Tiller24be0f72015-02-10 14:04:22 -08001313}
1314
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001315const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1316 return server->channel_args;
Craig Tiller190d3602015-02-18 09:23:38 -08001317}
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001318
1319int grpc_server_has_open_connections(grpc_server *server) {
1320 int r;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001321 gpr_mu_lock(&server->mu_global);
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001322 r = server->root_channel_data.next != &server->root_channel_data;
Vijay Pai8931cdd2015-06-17 12:42:17 -07001323 gpr_mu_unlock(&server->mu_global);
Craig Tillerba3c3cd2015-05-26 06:28:10 -07001324 return r;
1325}