blob: 2f386ce045bfd490340be940a8303cd60d0e2e49 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#define _GNU_SOURCE
35#include "src/core/endpoint/tcp_server.h"
36
37#include <limits.h>
38#include <fcntl.h>
39#include <netinet/in.h>
40#include <netinet/tcp.h>
41#include <stdio.h>
42#include <sys/types.h>
43#include <sys/socket.h>
44#include <unistd.h>
45#include <string.h>
46#include <errno.h>
47
48#include "src/core/endpoint/socket_utils.h"
49#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/sync.h>
52#include <grpc/support/time.h>
53
54#define INIT_PORT_CAP 2
55#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
56
57static gpr_once s_init_max_accept_queue_size;
58static int s_max_accept_queue_size;
59
60/* one listening port */
61typedef struct {
62 int fd;
63 grpc_em_fd *emfd;
64 grpc_tcp_server *server;
65} server_port;
66
67/* the overall server */
68struct grpc_tcp_server {
69 grpc_em *em;
70 grpc_tcp_server_cb cb;
71 void *cb_arg;
72
73 gpr_mu mu;
74 gpr_cv cv;
75
76 /* active port count: how many ports are actually still listening */
77 int active_ports;
78
79 /* all listening ports */
80 server_port *ports;
81 size_t nports;
82 size_t port_capacity;
83};
84
85grpc_tcp_server *grpc_tcp_server_create(grpc_em *em) {
86 grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
87 gpr_mu_init(&s->mu);
88 gpr_cv_init(&s->cv);
89 s->active_ports = 0;
90 s->em = em;
91 s->cb = NULL;
92 s->cb_arg = NULL;
93 s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
94 s->nports = 0;
95 s->port_capacity = INIT_PORT_CAP;
96 return s;
97}
98
99void grpc_tcp_server_destroy(grpc_tcp_server *s) {
100 size_t i;
101 gpr_mu_lock(&s->mu);
102 /* shutdown all fd's */
103 for (i = 0; i < s->nports; i++) {
104 grpc_em_fd_shutdown(s->ports[i].emfd);
105 }
106 /* wait while that happens */
107 while (s->active_ports) {
108 gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
109 }
110 gpr_mu_unlock(&s->mu);
111
112 /* delete ALL the things */
113 for (i = 0; i < s->nports; i++) {
114 server_port *sp = &s->ports[i];
115 grpc_em_fd_destroy(sp->emfd);
116 gpr_free(sp->emfd);
117 close(sp->fd);
118 }
119 gpr_free(s->ports);
120 gpr_free(s);
121}
122
123/* get max listen queue size on linux */
124static void init_max_accept_queue_size() {
125 int n = SOMAXCONN;
126 char buf[64];
127 FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r");
128 if (fp == NULL) {
129 /* 2.4 kernel. */
130 s_max_accept_queue_size = SOMAXCONN;
131 return;
132 }
133 if (fgets(buf, sizeof buf, fp)) {
134 char *end;
135 long i = strtol(buf, &end, 10);
136 if (i > 0 && i <= INT_MAX && end && *end == 0) {
137 n = i;
138 }
139 }
140 fclose(fp);
141 s_max_accept_queue_size = n;
142
143 if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
144 gpr_log(GPR_INFO,
145 "Suspiciously small accept queue (%d) will probably lead to "
146 "connection drops",
147 s_max_accept_queue_size);
148 }
149}
150
151static int get_max_accept_queue_size() {
152 gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size);
153 return s_max_accept_queue_size;
154}
155
156/* create a socket to listen with */
157static int create_listening_socket(struct sockaddr *port, int len) {
158 int fd = socket(port->sa_family, SOCK_STREAM, 0);
159 if (fd < 0) {
160 gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
161 goto error;
162 }
163
164 if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) ||
165 !grpc_set_socket_low_latency(fd, 1) ||
166 !grpc_set_socket_reuse_addr(fd, 1)) {
167 gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
168 strerror(errno));
169 goto error;
170 }
171
172 if (bind(fd, port, len) < 0) {
173 gpr_log(GPR_ERROR, "bind: %s", strerror(errno));
174 goto error;
175 }
176
177 if (listen(fd, get_max_accept_queue_size()) < 0) {
178 gpr_log(GPR_ERROR, "listen: %s", strerror(errno));
179 goto error;
180 }
181
182 return fd;
183
184error:
185 if (fd >= 0) {
186 close(fd);
187 }
188 return -1;
189}
190
191/* event manager callback when reads are ready */
192static void on_read(void *arg, grpc_em_cb_status status) {
193 server_port *sp = arg;
194
195 if (status != GRPC_CALLBACK_SUCCESS) {
196 goto error;
197 }
198
199 /* loop until accept4 returns EAGAIN, and then re-arm notification */
200 for (;;) {
201 struct sockaddr_storage addr;
202 socklen_t addrlen = sizeof(addr);
203 int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1);
204 if (fd < 0) {
205 switch (errno) {
206 case EINTR:
207 continue;
208 case EAGAIN:
209 if (GRPC_EM_OK != grpc_em_fd_notify_on_read(sp->emfd, on_read, sp,
210 gpr_inf_future)) {
211 gpr_log(GPR_ERROR, "Failed to register read request with em");
212 goto error;
213 }
214 return;
215 default:
216 gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
217 goto error;
218 }
219 }
220
221 sp->server->cb(sp->server->cb_arg, grpc_tcp_create(fd, sp->server->em));
222 }
223
224 abort();
225
226error:
227 gpr_mu_lock(&sp->server->mu);
228 if (0 == --sp->server->active_ports) {
229 gpr_cv_broadcast(&sp->server->cv);
230 }
231 gpr_mu_unlock(&sp->server->mu);
232}
233
234int grpc_tcp_server_add_port(grpc_tcp_server *s, struct sockaddr *port,
235 int len) {
236 server_port *sp;
237 /* create a socket */
238 int fd = create_listening_socket(port, len);
239 if (fd < 0) {
240 return -1;
241 }
242
243 gpr_mu_lock(&s->mu);
244 GPR_ASSERT(!s->cb && "must add ports before starting server");
245 /* append it to the list under a lock */
246 if (s->nports == s->port_capacity) {
247 s->port_capacity *= 2;
248 s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity);
249 }
250 sp = &s->ports[s->nports++];
251 sp->emfd = gpr_malloc(sizeof(grpc_em_fd));
252 sp->fd = fd;
253 sp->server = s;
254 /* initialize the em desc */
255 if (GRPC_EM_OK != grpc_em_fd_init(sp->emfd, s->em, fd)) {
256 grpc_em_fd_destroy(sp->emfd);
257 gpr_free(sp->emfd);
258 s->nports--;
259 gpr_mu_unlock(&s->mu);
260 return -1;
261 }
262 gpr_mu_unlock(&s->mu);
263
264 return fd;
265}
266
267void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb,
268 void *cb_arg) {
269 size_t i;
270 GPR_ASSERT(cb);
271 gpr_mu_lock(&s->mu);
272 GPR_ASSERT(!s->cb);
273 GPR_ASSERT(s->active_ports == 0);
274 s->cb = cb;
275 s->cb_arg = cb_arg;
276 for (i = 0; i < s->nports; i++) {
277 grpc_em_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i],
278 gpr_inf_future);
279 s->active_ports++;
280 }
281 gpr_mu_unlock(&s->mu);
282}