blob: eed2773f8a0b4ef67a97c29ee2f4efc71b806c2e [file] [log] [blame]
murgatroid999030c812016-09-16 13:25:08 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/lib/iomgr/port.h"
35
36#ifdef GRPC_UV
37
38#include <string.h>
39
40#include <grpc/support/alloc.h>
41#include <grpc/support/log.h>
42
43#include "src/core/lib/iomgr/error.h"
44#include "src/core/lib/iomgr/exec_ctx.h"
murgatroid997871f732016-09-23 13:49:05 -070045#include "src/core/lib/iomgr/sockaddr.h"
murgatroid999030c812016-09-16 13:25:08 -070046#include "src/core/lib/iomgr/sockaddr_utils.h"
47#include "src/core/lib/iomgr/tcp_server.h"
48#include "src/core/lib/iomgr/tcp_uv.h"
49
50/* one listening port */
51typedef struct grpc_tcp_listener grpc_tcp_listener;
52struct grpc_tcp_listener {
53 uv_tcp_t *handle;
54 grpc_tcp_server *server;
55 unsigned port_index;
56 int port;
57 /* linked list */
58 struct grpc_tcp_listener *next;
59};
60
61struct grpc_tcp_server {
62 gpr_refcount refs;
63
64 /* Called whenever accept() succeeds on a server port. */
65 grpc_tcp_server_cb on_accept_cb;
66 void *on_accept_cb_arg;
67
68 int open_ports;
69
70 /* linked list of server ports */
71 grpc_tcp_listener *head;
72 grpc_tcp_listener *tail;
73
74 /* List of closures passed to shutdown_starting_add(). */
75 grpc_closure_list shutdown_starting;
76
77 /* shutdown callback */
78 grpc_closure *shutdown_complete;
murgatroid9969259d42016-10-31 14:34:10 -070079
80 grpc_resource_quota *resource_quota;
murgatroid999030c812016-09-16 13:25:08 -070081};
82
murgatroid9969259d42016-10-31 14:34:10 -070083grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
84 grpc_closure *shutdown_complete,
murgatroid999030c812016-09-16 13:25:08 -070085 const grpc_channel_args *args,
86 grpc_tcp_server **server) {
87 grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
murgatroid9969259d42016-10-31 14:34:10 -070088 s->resource_quota = grpc_resource_quota_create(NULL);
89 for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
90 if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
91 if (args->args[i].type == GRPC_ARG_POINTER) {
Craig Tiller58833762016-12-06 19:36:23 -080092 grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
murgatroid9969259d42016-10-31 14:34:10 -070093 s->resource_quota =
Craig Tiller58833762016-12-06 19:36:23 -080094 grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
murgatroid9969259d42016-10-31 14:34:10 -070095 } else {
Craig Tiller58833762016-12-06 19:36:23 -080096 grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
murgatroid9969259d42016-10-31 14:34:10 -070097 gpr_free(s);
98 return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
99 " must be a pointer to a buffer pool");
100 }
101 }
102 }
murgatroid999030c812016-09-16 13:25:08 -0700103 gpr_ref_init(&s->refs, 1);
104 s->on_accept_cb = NULL;
105 s->on_accept_cb_arg = NULL;
106 s->open_ports = 0;
107 s->head = NULL;
108 s->tail = NULL;
109 s->shutdown_starting.head = NULL;
110 s->shutdown_starting.tail = NULL;
111 s->shutdown_complete = shutdown_complete;
112 *server = s;
113 return GRPC_ERROR_NONE;
114}
115
116grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
117 gpr_ref(&s->refs);
118 return s;
119}
120
121void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
122 grpc_closure *shutdown_starting) {
123 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
124 GRPC_ERROR_NONE);
125}
126
127static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
128 if (s->shutdown_complete != NULL) {
Craig Tiller91031da2016-12-28 15:44:25 -0800129 grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
murgatroid999030c812016-09-16 13:25:08 -0700130 }
131
132 while (s->head) {
133 grpc_tcp_listener *sp = s->head;
134 s->head = sp->next;
135 sp->next = NULL;
murgatroid999030c812016-09-16 13:25:08 -0700136 gpr_free(sp->handle);
137 gpr_free(sp);
138 }
Craig Tiller58833762016-12-06 19:36:23 -0800139 grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
murgatroid999030c812016-09-16 13:25:08 -0700140 gpr_free(s);
141}
142
143static void handle_close_callback(uv_handle_t *handle) {
144 grpc_tcp_listener *sp = (grpc_tcp_listener *)handle->data;
145 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
146 sp->server->open_ports--;
147 if (sp->server->open_ports == 0) {
148 finish_shutdown(&exec_ctx, sp->server);
149 }
150 grpc_exec_ctx_finish(&exec_ctx);
151}
152
153static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
154 int immediately_done = 0;
155 grpc_tcp_listener *sp;
156
157 if (s->open_ports == 0) {
158 immediately_done = 1;
159 }
murgatroid99dedb9232016-09-26 13:54:04 -0700160 for (sp = s->head; sp; sp = sp->next) {
murgatroid999030c812016-09-16 13:25:08 -0700161 uv_close((uv_handle_t *)sp->handle, handle_close_callback);
162 }
163
164 if (immediately_done) {
165 finish_shutdown(exec_ctx, s);
166 }
167}
168
169void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
170 if (gpr_unref(&s->refs)) {
171 /* Complete shutdown_starting work before destroying. */
172 grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tilleraef521c2017-01-03 09:13:36 -0800173 grpc_closure_list_sched(&local_exec_ctx, &s->shutdown_starting);
murgatroid999030c812016-09-16 13:25:08 -0700174 if (exec_ctx == NULL) {
175 grpc_exec_ctx_flush(&local_exec_ctx);
176 tcp_server_destroy(&local_exec_ctx, s);
177 grpc_exec_ctx_finish(&local_exec_ctx);
178 } else {
179 grpc_exec_ctx_finish(&local_exec_ctx);
180 tcp_server_destroy(exec_ctx, s);
181 }
182 }
183}
184
murgatroid992c287ca2016-10-07 09:55:35 -0700185static void accepted_connection_close_cb(uv_handle_t *handle) {
186 gpr_free(handle);
187}
188
murgatroid999030c812016-09-16 13:25:08 -0700189static void on_connect(uv_stream_t *server, int status) {
190 grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
murgatroid999030c812016-09-16 13:25:08 -0700191 uv_tcp_t *client;
192 grpc_endpoint *ep = NULL;
193 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
murgatroid997871f732016-09-23 13:49:05 -0700194 grpc_resolved_address peer_name;
murgatroid999030c812016-09-16 13:25:08 -0700195 char *peer_name_string;
196 int err;
197
murgatroid999030c812016-09-16 13:25:08 -0700198 if (status < 0) {
199 gpr_log(GPR_INFO, "Skipping on_accept due to error: %s",
200 uv_strerror(status));
201 return;
202 }
Mark D. Rotheed38152016-12-08 13:59:13 -0800203
murgatroid999030c812016-09-16 13:25:08 -0700204 client = gpr_malloc(sizeof(uv_tcp_t));
murgatroid999030c812016-09-16 13:25:08 -0700205 uv_tcp_init(uv_default_loop(), client);
206 // UV documentation says this is guaranteed to succeed
207 uv_accept((uv_stream_t *)server, (uv_stream_t *)client);
murgatroid992c287ca2016-10-07 09:55:35 -0700208 // If the server has not been started, we discard incoming connections
209 if (sp->server->on_accept_cb == NULL) {
210 uv_close((uv_handle_t *)client, accepted_connection_close_cb);
murgatroid999030c812016-09-16 13:25:08 -0700211 } else {
murgatroid992c287ca2016-10-07 09:55:35 -0700212 peer_name_string = NULL;
213 memset(&peer_name, 0, sizeof(grpc_resolved_address));
214 peer_name.len = sizeof(struct sockaddr_storage);
215 err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr,
216 (int *)&peer_name.len);
217 if (err == 0) {
218 peer_name_string = grpc_sockaddr_to_uri(&peer_name);
219 } else {
220 gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
221 }
murgatroid9969259d42016-10-31 14:34:10 -0700222 ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
Mark D. Roth6c07bf82016-12-09 08:38:38 -0800223 // Create acceptor.
224 grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
225 acceptor->from_server = sp->server;
226 acceptor->port_index = sp->port_index;
227 acceptor->fd_index = 0;
murgatroid992c287ca2016-10-07 09:55:35 -0700228 sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
Mark D. Rotheed38152016-12-08 13:59:13 -0800229 acceptor);
murgatroid992c287ca2016-10-07 09:55:35 -0700230 grpc_exec_ctx_finish(&exec_ctx);
murgatroid999030c812016-09-16 13:25:08 -0700231 }
murgatroid999030c812016-09-16 13:25:08 -0700232}
233
murgatroid99dedb9232016-09-26 13:54:04 -0700234static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle,
murgatroid997871f732016-09-23 13:49:05 -0700235 const grpc_resolved_address *addr,
236 unsigned port_index,
murgatroid999030c812016-09-16 13:25:08 -0700237 grpc_tcp_listener **listener) {
238 grpc_tcp_listener *sp = NULL;
239 int port = -1;
240 int status;
241 grpc_error *error;
murgatroid997871f732016-09-23 13:49:05 -0700242 grpc_resolved_address sockname_temp;
murgatroid999030c812016-09-16 13:25:08 -0700243
244 // The last argument to uv_tcp_bind is flags
murgatroid997871f732016-09-23 13:49:05 -0700245 status = uv_tcp_bind(handle, (struct sockaddr *)addr->addr, 0);
murgatroid999030c812016-09-16 13:25:08 -0700246 if (status != 0) {
247 error = GRPC_ERROR_CREATE("Failed to bind to port");
murgatroid99dedb9232016-09-26 13:54:04 -0700248 error =
249 grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
murgatroid999030c812016-09-16 13:25:08 -0700250 return error;
251 }
252
murgatroid992c287ca2016-10-07 09:55:35 -0700253 status = uv_listen((uv_stream_t *)handle, SOMAXCONN, on_connect);
254 if (status != 0) {
255 error = GRPC_ERROR_CREATE("Failed to listen to port");
256 error =
257 grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
258 return error;
259 }
260
murgatroid997871f732016-09-23 13:49:05 -0700261 sockname_temp.len = (int)sizeof(struct sockaddr_storage);
262 status = uv_tcp_getsockname(handle, (struct sockaddr *)&sockname_temp.addr,
263 (int *)&sockname_temp.len);
murgatroid999030c812016-09-16 13:25:08 -0700264 if (status != 0) {
265 error = GRPC_ERROR_CREATE("getsockname failed");
murgatroid99dedb9232016-09-26 13:54:04 -0700266 error =
267 grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
murgatroid999030c812016-09-16 13:25:08 -0700268 return error;
269 }
270
murgatroid997871f732016-09-23 13:49:05 -0700271 port = grpc_sockaddr_get_port(&sockname_temp);
murgatroid999030c812016-09-16 13:25:08 -0700272
273 GPR_ASSERT(port >= 0);
274 GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
275 sp = gpr_malloc(sizeof(grpc_tcp_listener));
276 sp->next = NULL;
277 if (s->head == NULL) {
278 s->head = sp;
279 } else {
280 s->tail->next = sp;
281 }
282 s->tail = sp;
283 sp->server = s;
284 sp->handle = handle;
285 sp->port = port;
286 sp->port_index = port_index;
287 handle->data = sp;
288 s->open_ports++;
289 GPR_ASSERT(sp->handle);
290 *listener = sp;
291
292 return GRPC_ERROR_NONE;
293}
294
murgatroid997871f732016-09-23 13:49:05 -0700295grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
296 const grpc_resolved_address *addr,
297 int *port) {
murgatroid999030c812016-09-16 13:25:08 -0700298 // This function is mostly copied from tcp_server_windows.c
299 grpc_tcp_listener *sp = NULL;
300 uv_tcp_t *handle;
murgatroid997871f732016-09-23 13:49:05 -0700301 grpc_resolved_address addr6_v4mapped;
302 grpc_resolved_address wildcard;
303 grpc_resolved_address *allocated_addr = NULL;
304 grpc_resolved_address sockname_temp;
murgatroid999030c812016-09-16 13:25:08 -0700305 unsigned port_index = 0;
306 int status;
307 grpc_error *error = GRPC_ERROR_NONE;
308
309 if (s->tail != NULL) {
310 port_index = s->tail->port_index + 1;
311 }
312
313 /* Check if this is a wildcard port, and if so, try to keep the port the same
314 as some previously created listener. */
315 if (grpc_sockaddr_get_port(addr) == 0) {
316 for (sp = s->head; sp; sp = sp->next) {
murgatroid997871f732016-09-23 13:49:05 -0700317 sockname_temp.len = sizeof(struct sockaddr_storage);
murgatroid99dedb9232016-09-26 13:54:04 -0700318 if (0 == uv_tcp_getsockname(sp->handle,
319 (struct sockaddr *)&sockname_temp.addr,
murgatroid997871f732016-09-23 13:49:05 -0700320 (int *)&sockname_temp.len)) {
321 *port = grpc_sockaddr_get_port(&sockname_temp);
murgatroid999030c812016-09-16 13:25:08 -0700322 if (*port > 0) {
murgatroid997871f732016-09-23 13:49:05 -0700323 allocated_addr = gpr_malloc(sizeof(grpc_resolved_address));
324 memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
murgatroid999030c812016-09-16 13:25:08 -0700325 grpc_sockaddr_set_port(allocated_addr, *port);
326 addr = allocated_addr;
327 break;
328 }
329 }
330 }
331 }
332
333 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
murgatroid997871f732016-09-23 13:49:05 -0700334 addr = &addr6_v4mapped;
murgatroid999030c812016-09-16 13:25:08 -0700335 }
336
337 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
338 if (grpc_sockaddr_is_wildcard(addr, port)) {
339 grpc_sockaddr_make_wildcard6(*port, &wildcard);
340
murgatroid997871f732016-09-23 13:49:05 -0700341 addr = &wildcard;
murgatroid999030c812016-09-16 13:25:08 -0700342 }
343
344 handle = gpr_malloc(sizeof(uv_tcp_t));
murgatroid999030c812016-09-16 13:25:08 -0700345 status = uv_tcp_init(uv_default_loop(), handle);
346 if (status == 0) {
murgatroid997871f732016-09-23 13:49:05 -0700347 error = add_socket_to_server(s, handle, addr, port_index, &sp);
murgatroid999030c812016-09-16 13:25:08 -0700348 } else {
349 error = GRPC_ERROR_CREATE("Failed to initialize UV tcp handle");
murgatroid99dedb9232016-09-26 13:54:04 -0700350 error =
351 grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
murgatroid999030c812016-09-16 13:25:08 -0700352 }
353
354 gpr_free(allocated_addr);
355
356 if (error != GRPC_ERROR_NONE) {
357 grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING(
358 "Failed to add port to server", &error, 1);
359 GRPC_ERROR_UNREF(error);
360 error = error_out;
361 *port = -1;
362 } else {
363 GPR_ASSERT(sp != NULL);
364 *port = sp->port;
365 }
366 return error;
367}
368
369void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
370 grpc_pollset **pollsets, size_t pollset_count,
371 grpc_tcp_server_cb on_accept_cb, void *cb_arg) {
372 grpc_tcp_listener *sp;
373 (void)pollsets;
374 (void)pollset_count;
375 GPR_ASSERT(on_accept_cb);
376 GPR_ASSERT(!server->on_accept_cb);
377 server->on_accept_cb = on_accept_cb;
378 server->on_accept_cb_arg = cb_arg;
murgatroid99dedb9232016-09-26 13:54:04 -0700379 for (sp = server->head; sp; sp = sp->next) {
380 GPR_ASSERT(uv_listen((uv_stream_t *)sp->handle, SOMAXCONN, on_connect) ==
381 0);
murgatroid999030c812016-09-16 13:25:08 -0700382 }
383}
384
385void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
386 grpc_tcp_server *s) {}
387
murgatroid999030c812016-09-16 13:25:08 -0700388#endif /* GRPC_UV */