blob: efd3dede50a521d6c601866274253094431a096a [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#define _GNU_SOURCE
35#include "src/core/endpoint/tcp_server.h"
36
37#include <limits.h>
38#include <fcntl.h>
39#include <netinet/in.h>
40#include <netinet/tcp.h>
41#include <stdio.h>
42#include <sys/types.h>
43#include <sys/socket.h>
44#include <unistd.h>
45#include <string.h>
46#include <errno.h>
47
48#include "src/core/endpoint/socket_utils.h"
49#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/sync.h>
52#include <grpc/support/time.h>
53
54#define INIT_PORT_CAP 2
55#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
56
57static gpr_once s_init_max_accept_queue_size;
58static int s_max_accept_queue_size;
59
60/* one listening port */
61typedef struct {
62 int fd;
63 grpc_em_fd *emfd;
64 grpc_tcp_server *server;
65} server_port;
66
67/* the overall server */
68struct grpc_tcp_server {
69 grpc_em *em;
70 grpc_tcp_server_cb cb;
71 void *cb_arg;
72
73 gpr_mu mu;
74 gpr_cv cv;
75
76 /* active port count: how many ports are actually still listening */
77 int active_ports;
78
79 /* all listening ports */
80 server_port *ports;
81 size_t nports;
82 size_t port_capacity;
83};
84
85grpc_tcp_server *grpc_tcp_server_create(grpc_em *em) {
86 grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
87 gpr_mu_init(&s->mu);
88 gpr_cv_init(&s->cv);
89 s->active_ports = 0;
90 s->em = em;
91 s->cb = NULL;
92 s->cb_arg = NULL;
93 s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
94 s->nports = 0;
95 s->port_capacity = INIT_PORT_CAP;
96 return s;
97}
98
99void grpc_tcp_server_destroy(grpc_tcp_server *s) {
100 size_t i;
101 gpr_mu_lock(&s->mu);
102 /* shutdown all fd's */
103 for (i = 0; i < s->nports; i++) {
104 grpc_em_fd_shutdown(s->ports[i].emfd);
105 }
106 /* wait while that happens */
107 while (s->active_ports) {
108 gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
109 }
110 gpr_mu_unlock(&s->mu);
111
112 /* delete ALL the things */
113 for (i = 0; i < s->nports; i++) {
114 server_port *sp = &s->ports[i];
115 grpc_em_fd_destroy(sp->emfd);
116 gpr_free(sp->emfd);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800117 }
118 gpr_free(s->ports);
119 gpr_free(s);
120}
121
122/* get max listen queue size on linux */
123static void init_max_accept_queue_size() {
124 int n = SOMAXCONN;
125 char buf[64];
126 FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r");
127 if (fp == NULL) {
128 /* 2.4 kernel. */
129 s_max_accept_queue_size = SOMAXCONN;
130 return;
131 }
132 if (fgets(buf, sizeof buf, fp)) {
133 char *end;
134 long i = strtol(buf, &end, 10);
135 if (i > 0 && i <= INT_MAX && end && *end == 0) {
136 n = i;
137 }
138 }
139 fclose(fp);
140 s_max_accept_queue_size = n;
141
142 if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
143 gpr_log(GPR_INFO,
144 "Suspiciously small accept queue (%d) will probably lead to "
145 "connection drops",
146 s_max_accept_queue_size);
147 }
148}
149
150static int get_max_accept_queue_size() {
151 gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size);
152 return s_max_accept_queue_size;
153}
154
nnoble0c475f02014-12-05 15:37:39 -0800155/* Prepare a recently-created socket for listening. */
156static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800157 if (fd < 0) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800158 goto error;
159 }
160
161 if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) ||
162 !grpc_set_socket_low_latency(fd, 1) ||
163 !grpc_set_socket_reuse_addr(fd, 1)) {
164 gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
165 strerror(errno));
166 goto error;
167 }
168
nnoble0c475f02014-12-05 15:37:39 -0800169 if (bind(fd, addr, addr_len) < 0) {
170 char *addr_str;
171 grpc_sockaddr_to_string(&addr_str, addr, 0);
172 gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
173 gpr_free(addr_str);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800174 goto error;
175 }
176
177 if (listen(fd, get_max_accept_queue_size()) < 0) {
178 gpr_log(GPR_ERROR, "listen: %s", strerror(errno));
179 goto error;
180 }
181
nnoble0c475f02014-12-05 15:37:39 -0800182 return 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800183
184error:
185 if (fd >= 0) {
186 close(fd);
187 }
nnoble0c475f02014-12-05 15:37:39 -0800188 return 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800189}
190
191/* event manager callback when reads are ready */
192static void on_read(void *arg, grpc_em_cb_status status) {
193 server_port *sp = arg;
194
195 if (status != GRPC_CALLBACK_SUCCESS) {
196 goto error;
197 }
198
199 /* loop until accept4 returns EAGAIN, and then re-arm notification */
200 for (;;) {
201 struct sockaddr_storage addr;
202 socklen_t addrlen = sizeof(addr);
nnoble0c475f02014-12-05 15:37:39 -0800203 /* Note: If we ever decide to return this address to the user, remember to
204 strip off the ::ffff:0.0.0.0/96 prefix first. */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800205 int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1);
206 if (fd < 0) {
207 switch (errno) {
208 case EINTR:
209 continue;
210 case EAGAIN:
211 if (GRPC_EM_OK != grpc_em_fd_notify_on_read(sp->emfd, on_read, sp,
212 gpr_inf_future)) {
213 gpr_log(GPR_ERROR, "Failed to register read request with em");
214 goto error;
215 }
216 return;
217 default:
218 gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
219 goto error;
220 }
221 }
222
223 sp->server->cb(sp->server->cb_arg, grpc_tcp_create(fd, sp->server->em));
224 }
225
226 abort();
227
228error:
229 gpr_mu_lock(&sp->server->mu);
230 if (0 == --sp->server->active_ports) {
231 gpr_cv_broadcast(&sp->server->cv);
232 }
233 gpr_mu_unlock(&sp->server->mu);
234}
235
nnoble0c475f02014-12-05 15:37:39 -0800236static int add_socket_to_server(grpc_tcp_server *s, int fd,
237 const struct sockaddr *addr, int addr_len) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800238 server_port *sp;
nnoble0c475f02014-12-05 15:37:39 -0800239
240 if (!prepare_socket(fd, addr, addr_len)) {
241 return 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800242 }
243
244 gpr_mu_lock(&s->mu);
245 GPR_ASSERT(!s->cb && "must add ports before starting server");
246 /* append it to the list under a lock */
247 if (s->nports == s->port_capacity) {
248 s->port_capacity *= 2;
249 s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity);
250 }
251 sp = &s->ports[s->nports++];
252 sp->emfd = gpr_malloc(sizeof(grpc_em_fd));
253 sp->fd = fd;
254 sp->server = s;
255 /* initialize the em desc */
256 if (GRPC_EM_OK != grpc_em_fd_init(sp->emfd, s->em, fd)) {
257 grpc_em_fd_destroy(sp->emfd);
258 gpr_free(sp->emfd);
259 s->nports--;
260 gpr_mu_unlock(&s->mu);
nnoble0c475f02014-12-05 15:37:39 -0800261 return 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800262 }
263 gpr_mu_unlock(&s->mu);
264
nnoble0c475f02014-12-05 15:37:39 -0800265 return 1;
266}
267
268int grpc_tcp_server_add_port(grpc_tcp_server *s, const struct sockaddr *addr,
269 int addr_len) {
270 int ok = 0;
271 int fd;
272 grpc_dualstack_mode dsmode;
273 struct sockaddr_in6 addr6_v4mapped;
274 struct sockaddr_in wild4;
275 struct sockaddr_in6 wild6;
276 struct sockaddr_in addr4_copy;
277 int port;
278
279 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
280 addr = (const struct sockaddr *)&addr6_v4mapped;
281 addr_len = sizeof(addr6_v4mapped);
282 }
283
284 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
285 if (grpc_sockaddr_is_wildcard(addr, &port)) {
286 grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
287
288 /* Try listening on IPv6 first. */
289 addr = (struct sockaddr *)&wild6;
290 addr_len = sizeof(wild6);
291 fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
292 ok |= add_socket_to_server(s, fd, addr, addr_len);
293 if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
294 return ok;
295 }
296
297 /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
298 addr = (struct sockaddr *)&wild4;
299 addr_len = sizeof(wild4);
300 }
301
302 fd = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode);
303 if (fd < 0) {
304 gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
305 }
306 if (dsmode == GRPC_DSMODE_IPV4 &&
307 grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
308 addr = (struct sockaddr *)&addr4_copy;
309 addr_len = sizeof(addr4_copy);
310 }
311 ok |= add_socket_to_server(s, fd, addr, addr_len);
312 return ok;
313}
314
315int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index) {
316 return (0 <= index && index < s->nports) ? s->ports[index].fd : -1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800317}
318
319void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb,
320 void *cb_arg) {
321 size_t i;
322 GPR_ASSERT(cb);
323 gpr_mu_lock(&s->mu);
324 GPR_ASSERT(!s->cb);
325 GPR_ASSERT(s->active_ports == 0);
326 s->cb = cb;
327 s->cb_arg = cb_arg;
328 for (i = 0; i < s->nports; i++) {
329 grpc_em_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i],
330 gpr_inf_future);
331 s->active_ports++;
332 }
333 gpr_mu_unlock(&s->mu);
334}