blob: d2d5d2852b938a83e7ef5c8eef398175b4d7d977 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc/support/port_platform.h>
35
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070036#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070037
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070038#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
40#include <assert.h>
41#include <errno.h>
42#include <poll.h>
43#include <signal.h>
44#include <string.h>
45#include <sys/epoll.h>
46#include <sys/socket.h>
47#include <unistd.h>
48
49#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/string_util.h>
52#include <grpc/support/tls.h>
53#include <grpc/support/useful.h>
54
55#include "src/core/lib/iomgr/ev_posix.h"
56#include "src/core/lib/iomgr/iomgr_internal.h"
57#include "src/core/lib/iomgr/wakeup_fd_posix.h"
58#include "src/core/lib/profiling/timers.h"
59#include "src/core/lib/support/block_annotate.h"
60
61struct polling_island;
62
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070063static int grpc_poller_kick_signum;
64
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070065/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070066 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070067 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070068struct grpc_fd {
69 int fd;
70 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070071 bit 0 : 1=Active / 0=Orphaned
72 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070073 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074 gpr_atm refst;
75
76 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070077
78 /* Indicates that the fd is shutdown and that any pending read/write closures
79 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070080 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070081
82 /* The fd is either closed or we relinquished control of it. In either cases,
83 this indicates that the 'fd' on this structure is no longer valid */
84 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -070086 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070087 grpc_closure *read_closure;
88 grpc_closure *write_closure;
89
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070090 /* The polling island to which this fd belongs to and the mutex protecting the
91 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093 struct polling_island *polling_island;
94
95 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070096 grpc_closure *on_done_closure;
97
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070098 /* The pollset that last noticed that the fd is readable */
99 grpc_pollset *read_notifier_pollset;
100
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700101 grpc_iomgr_object iomgr_object;
102};
103
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700104/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700105#ifdef GRPC_FD_REF_COUNT_DEBUG
106static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
107static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
108 int line);
109#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
110#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
111#else
112static void fd_ref(grpc_fd *fd);
113static void fd_unref(grpc_fd *fd);
114#define GRPC_FD_REF(fd, reason) fd_ref(fd)
115#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
116#endif
117
118static void fd_global_init(void);
119static void fd_global_shutdown(void);
120
121#define CLOSURE_NOT_READY ((grpc_closure *)0)
122#define CLOSURE_READY ((grpc_closure *)1)
123
124/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700125 * Polling-island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700126 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700127/* TODO: sree: Consider making ref_cnt and merged_to to gpr_atm - This would
128 * significantly reduce the number of mutex acquisition calls. */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700129typedef struct polling_island {
130 gpr_mu mu;
131 int ref_cnt;
132
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700133 /* Points to the polling_island this merged into.
134 * If merged_to is not NULL, all the remaining fields (except mu and ref_cnt)
135 * are invalid and must be ignored */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700136 struct polling_island *merged_to;
137
138 /* The fd of the underlying epoll set */
139 int epoll_fd;
140
141 /* The file descriptors in the epoll set */
142 size_t fd_cnt;
143 size_t fd_capacity;
144 grpc_fd **fds;
145
146 /* Polling islands that are no longer needed are kept in a freelist so that
147 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700148 free list */
149 struct polling_island *next_free;
150} polling_island;
151
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700152/*******************************************************************************
153 * Pollset Declarations
154 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700155struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700156 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700157 struct grpc_pollset_worker *next;
158 struct grpc_pollset_worker *prev;
159};
160
161struct grpc_pollset {
162 gpr_mu mu;
163 grpc_pollset_worker root_worker;
164 bool kicked_without_pollers;
165
166 bool shutting_down; /* Is the pollset shutting down ? */
167 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
168 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
169
170 /* The polling island to which this pollset belongs to and the mutex
171 protecting the field */
Sree Kuchibhotlae682e462016-06-08 15:40:21 -0700172 /* TODO: sreek: This lock might actually be adding more overhead to the
173 critical path (i.e pollset_work() function). Consider removing this lock
174 and just using the overall pollset lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700175 gpr_mu pi_mu;
176 struct polling_island *polling_island;
177};
178
179/*******************************************************************************
180 * Pollset-set Declarations
181 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700182/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
183 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
184 * the current pollset_set would result in polling island merges. This would
185 * remove the need to maintain fd_count here. This will also significantly
186 * simplify the grpc_fd structure since we would no longer need to explicitly
187 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700188struct grpc_pollset_set {
189 gpr_mu mu;
190
191 size_t pollset_count;
192 size_t pollset_capacity;
193 grpc_pollset **pollsets;
194
195 size_t pollset_set_count;
196 size_t pollset_set_capacity;
197 struct grpc_pollset_set **pollset_sets;
198
199 size_t fd_count;
200 size_t fd_capacity;
201 grpc_fd **fds;
202};
203
204/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700205 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700206 */
207
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700208/* The wakeup fd that is used to wake up all threads in a Polling island. This
209 is useful in the polling island merge operation where we need to wakeup all
210 the threads currently polling the smaller polling island (so that they can
211 start polling the new/merged polling island)
212
213 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
214 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
215static grpc_wakeup_fd polling_island_wakeup_fd;
216
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700217/* Polling island freelist */
218static gpr_mu g_pi_freelist_mu;
219static polling_island *g_pi_freelist = NULL;
220
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700221/* The caller is expected to hold pi->mu lock before calling this function */
222static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700223 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700224 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700225 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700226 struct epoll_event ev;
227
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700228 for (i = 0; i < fd_count; i++) {
229 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
230 ev.data.ptr = fds[i];
231 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700232
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700233 if (err < 0) {
234 if (errno != EEXIST) {
235 /* TODO: sreek - We need a better way to bubble up this error instead of
236 just logging a message */
237 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
238 fds[i]->fd, strerror(errno));
239 }
240
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700241 continue;
242 }
243
244 if (pi->fd_cnt == pi->fd_capacity) {
245 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
246 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
247 }
248
249 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700250 if (add_fd_refs) {
251 GRPC_FD_REF(fds[i], "polling_island");
252 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700253 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700254}
255
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700256/* The caller is expected to hold pi->mu before calling this */
257static void polling_island_add_wakeup_fd_locked(polling_island *pi,
258 grpc_wakeup_fd *wakeup_fd) {
259 struct epoll_event ev;
260 int err;
261
262 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
263 ev.data.ptr = wakeup_fd;
264 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
265 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
266 if (err < 0) {
267 gpr_log(GPR_ERROR,
268 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
269 ". Error: %s",
270 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
271 strerror(errno));
272 }
273}
274
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700275/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700276static void polling_island_remove_all_fds_locked(polling_island *pi,
277 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700278 int err;
279 size_t i;
280
281 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700282 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700283 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700284 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700285 * just logging a message */
286 gpr_log(GPR_ERROR, "epoll_ctl deleting fds[%d]: %d failed with error: %s",
287 i, pi->fds[i]->fd, strerror(errno));
288 }
289
290 if (remove_fd_refs) {
291 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700292 }
293 }
294
295 pi->fd_cnt = 0;
296}
297
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700298/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700299static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700300 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700301 int err;
302 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700303
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700304 /* If fd is already closed, then it would have been automatically been removed
305 from the epoll set */
306 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700307 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
308 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700309 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700310 fd->fd, strerror(errno));
311 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700312 }
313
314 for (i = 0; i < pi->fd_cnt; i++) {
315 if (pi->fds[i] == fd) {
316 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700317 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700318 break;
319 }
320 }
321}
322
323static polling_island *polling_island_create(grpc_fd *initial_fd,
324 int initial_ref_cnt) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700325 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700326
327 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700328 gpr_mu_lock(&g_pi_freelist_mu);
329 if (g_pi_freelist != NULL) {
330 pi = g_pi_freelist;
331 g_pi_freelist = g_pi_freelist->next_free;
332 pi->next_free = NULL;
333 }
334 gpr_mu_unlock(&g_pi_freelist_mu);
335
336 /* Create new polling island if we could not get one from the free list */
337 if (pi == NULL) {
338 pi = gpr_malloc(sizeof(*pi));
339 gpr_mu_init(&pi->mu);
340 pi->fd_cnt = 0;
341 pi->fd_capacity = 0;
342 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700343 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700344
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700345 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
346 if (pi->epoll_fd < 0) {
347 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
348 strerror(errno));
349 }
350 GPR_ASSERT(pi->epoll_fd >= 0);
351
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700352 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700353
354 pi->ref_cnt = initial_ref_cnt;
355 pi->merged_to = NULL;
356 pi->next_free = NULL;
357
358 if (initial_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700359 /* It is not really needed to get the pi->mu lock here. If this is a newly
360 created polling island (or one that we got from the freelist), no one
361 else would be holding a lock to it anyway */
362 gpr_mu_lock(&pi->mu);
363 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
364 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700365 }
366
367 return pi;
368}
369
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700370static void polling_island_delete(polling_island *pi) {
371 GPR_ASSERT(pi->ref_cnt == 0);
372 GPR_ASSERT(pi->fd_cnt == 0);
373
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700374 close(pi->epoll_fd);
375 pi->epoll_fd = -1;
376
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700377 pi->merged_to = NULL;
378
379 gpr_mu_lock(&g_pi_freelist_mu);
380 pi->next_free = g_pi_freelist;
381 g_pi_freelist = pi;
382 gpr_mu_unlock(&g_pi_freelist_mu);
383}
384
385void polling_island_unref_and_unlock(polling_island *pi, int unref_by) {
386 pi->ref_cnt -= unref_by;
387 int ref_cnt = pi->ref_cnt;
388 GPR_ASSERT(ref_cnt >= 0);
389
390 gpr_mu_unlock(&pi->mu);
391
392 if (ref_cnt == 0) {
393 polling_island_delete(pi);
394 }
395}
396
397polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
398 int add_ref_by) {
399 polling_island *next = NULL;
400 gpr_mu_lock(&pi->mu);
401 while (pi->merged_to != NULL) {
402 next = pi->merged_to;
403 polling_island_unref_and_unlock(pi, unref_by);
404 pi = next;
405 gpr_mu_lock(&pi->mu);
406 }
407
408 pi->ref_cnt += add_ref_by;
409 return pi;
410}
411
412void polling_island_pair_update_and_lock(polling_island **p,
413 polling_island **q) {
414 polling_island *pi_1 = *p;
415 polling_island *pi_2 = *q;
416 polling_island *temp = NULL;
417 bool pi_1_locked = false;
418 bool pi_2_locked = false;
419 int num_swaps = 0;
420
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700421 /* Loop until either pi_1 == pi_2 or until we acquired locks on both pi_1
422 and pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700423 while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700424 /* The following assertions are true at this point:
425 - pi_1 != pi_2 (else, the while loop would have exited)
426 - pi_1 MAY be locked
427 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700428
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700429 /* To maintain lock order consistency, always lock polling_island node with
430 lower address first.
431 First, make sure pi_1 < pi_2 before proceeding any further. If it turns
432 out that pi_1 > pi_2, unlock pi_1 if locked (because pi_2 is not locked
433 at this point and having pi_1 locked would violate the lock order) and
434 swap pi_1 and pi_2 so that pi_1 becomes less than pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700435 if (pi_1 > pi_2) {
436 if (pi_1_locked) {
437 gpr_mu_unlock(&pi_1->mu);
438 pi_1_locked = false;
439 }
440
441 GPR_SWAP(polling_island *, pi_1, pi_2);
442 num_swaps++;
443 }
444
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700445 /* The following assertions are true at this point:
446 - pi_1 != pi_2
447 - pi_1 < pi_2 (address of pi_1 is less than that of pi_2)
448 - pi_1 MAYBE locked
449 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700450
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700451 /* Lock pi_1 (if pi_1 is pointing to the terminal node in the list) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700452 if (!pi_1_locked) {
453 gpr_mu_lock(&pi_1->mu);
454 pi_1_locked = true;
455
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700456 /* If pi_1 is not terminal node (i.e pi_1->merged_to != NULL), we are not
457 done locking this polling_island yet. Release the lock on this node and
458 advance pi_1 to the next node in the list; and go to the beginning of
459 the loop (we can't proceed to locking pi_2 unless we locked pi_1 first)
460 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700461 if (pi_1->merged_to != NULL) {
462 temp = pi_1->merged_to;
463 polling_island_unref_and_unlock(pi_1, 1);
464 pi_1 = temp;
465 pi_1_locked = false;
466
467 continue;
468 }
469 }
470
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700471 /* The following assertions are true at this point:
472 - pi_1 is locked
473 - pi_2 is unlocked
474 - pi_1 != pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700475
476 gpr_mu_lock(&pi_2->mu);
477 pi_2_locked = true;
478
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700479 /* If pi_2 is not terminal node, we are not done locking this polling_island
480 yet. Release the lock and update pi_2 to the next node in the list */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700481 if (pi_2->merged_to != NULL) {
482 temp = pi_2->merged_to;
483 polling_island_unref_and_unlock(pi_2, 1);
484 pi_2 = temp;
485 pi_2_locked = false;
486 }
487 }
488
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700489 /* At this point, either pi_1 == pi_2 AND/OR we got both locks */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700490 if (pi_1 == pi_2) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700491 /* We may or may not have gotten the lock. If we didn't, walk the rest of
492 the polling_island list and get the lock */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700493 GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
494 if (!pi_1_locked) {
495 pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
496 }
497 } else {
498 GPR_ASSERT(pi_1_locked && pi_2_locked);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700499 /* If we swapped pi_1 and pi_2 odd number of times, do one more swap so that
500 pi_1 and pi_2 point to the same polling_island lists they started off
501 with at the beginning of this function (i.e *p and *q respectively) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700502 if (num_swaps % 2 > 0) {
503 GPR_SWAP(polling_island *, pi_1, pi_2);
504 }
505 }
506
507 *p = pi_1;
508 *q = pi_2;
509}
510
511polling_island *polling_island_merge(polling_island *p, polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700512 /* Get locks on both the polling islands */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700513 polling_island_pair_update_and_lock(&p, &q);
514
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700515 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700516 /* Nothing needs to be done here */
517 gpr_mu_unlock(&p->mu);
518 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700519 }
520
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700521 /* Make sure that p points to the polling island with fewer fds than q */
522 if (p->fd_cnt > q->fd_cnt) {
523 GPR_SWAP(polling_island *, p, q);
524 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700525
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700526 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700527 Note that the refcounts on the fds being moved will not change here. This
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700528 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700529 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
530 polling_island_remove_all_fds_locked(p, false);
531
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700532 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
533 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
534
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700535 /* - The merged polling island (i.e q) inherits all the ref counts of the
536 island merging with it (i.e p)
537 - The island p will lose a ref count */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700538 q->ref_cnt += p->ref_cnt;
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700539 p->ref_cnt--;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700540
541 gpr_mu_unlock(&p->mu);
542 gpr_mu_unlock(&q->mu);
543
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700544 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700545}
546
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700547static void polling_island_global_init() {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700548 gpr_mu_init(&g_pi_freelist_mu);
549 g_pi_freelist = NULL;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700550 grpc_wakeup_fd_init(&polling_island_wakeup_fd);
551 grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700552}
553
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700554static void polling_island_global_shutdown() {
555 polling_island *next;
556 gpr_mu_lock(&g_pi_freelist_mu);
557 gpr_mu_unlock(&g_pi_freelist_mu);
558 while (g_pi_freelist != NULL) {
559 next = g_pi_freelist->next_free;
560 gpr_mu_destroy(&g_pi_freelist->mu);
561 gpr_free(g_pi_freelist->fds);
562 gpr_free(g_pi_freelist);
563 g_pi_freelist = next;
564 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700565 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700566
567 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700568}
569
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700570/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700571 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700572 */
573
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700574/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700575 * but instead so that implementations with multiple threads in (for example)
576 * epoll_wait deal with the race between pollset removal and incoming poll
577 * notifications.
578 *
579 * The problem is that the poller ultimately holds a reference to this
580 * object, so it is very difficult to know when is safe to free it, at least
581 * without some expensive synchronization.
582 *
583 * If we keep the object freelisted, in the worst case losing this race just
584 * becomes a spurious read notification on a reused fd.
585 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700586
587/* The alarm system needs to be able to wakeup 'some poller' sometimes
588 * (specifically when a new alarm needs to be triggered earlier than the next
589 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
590 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700591
592/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
593 * sure to wake up one polling thread (which can wake up other threads if
594 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700595grpc_wakeup_fd grpc_global_wakeup_fd;
596
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700597static grpc_fd *fd_freelist = NULL;
598static gpr_mu fd_freelist_mu;
599
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700600#ifdef GRPC_FD_REF_COUNT_DEBUG
601#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
602#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
603static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
604 int line) {
605 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
606 gpr_atm_no_barrier_load(&fd->refst),
607 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
608#else
609#define REF_BY(fd, n, reason) ref_by(fd, n)
610#define UNREF_BY(fd, n, reason) unref_by(fd, n)
611static void ref_by(grpc_fd *fd, int n) {
612#endif
613 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
614}
615
616#ifdef GRPC_FD_REF_COUNT_DEBUG
617static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
618 int line) {
619 gpr_atm old;
620 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
621 gpr_atm_no_barrier_load(&fd->refst),
622 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
623#else
624static void unref_by(grpc_fd *fd, int n) {
625 gpr_atm old;
626#endif
627 old = gpr_atm_full_fetch_add(&fd->refst, -n);
628 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700629 /* Add the fd to the freelist */
630 gpr_mu_lock(&fd_freelist_mu);
631 fd->freelist_next = fd_freelist;
632 fd_freelist = fd;
633 grpc_iomgr_unregister_object(&fd->iomgr_object);
634 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700635 } else {
636 GPR_ASSERT(old > n);
637 }
638}
639
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700640/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700641#ifdef GRPC_FD_REF_COUNT_DEBUG
642static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
643 int line) {
644 ref_by(fd, 2, reason, file, line);
645}
646
647static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
648 int line) {
649 unref_by(fd, 2, reason, file, line);
650}
651#else
652static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700653static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
654#endif
655
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700656static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
657
658static void fd_global_shutdown(void) {
659 gpr_mu_lock(&fd_freelist_mu);
660 gpr_mu_unlock(&fd_freelist_mu);
661 while (fd_freelist != NULL) {
662 grpc_fd *fd = fd_freelist;
663 fd_freelist = fd_freelist->freelist_next;
664 gpr_mu_destroy(&fd->mu);
665 gpr_free(fd);
666 }
667 gpr_mu_destroy(&fd_freelist_mu);
668}
669
670static grpc_fd *fd_create(int fd, const char *name) {
671 grpc_fd *new_fd = NULL;
672
673 gpr_mu_lock(&fd_freelist_mu);
674 if (fd_freelist != NULL) {
675 new_fd = fd_freelist;
676 fd_freelist = fd_freelist->freelist_next;
677 }
678 gpr_mu_unlock(&fd_freelist_mu);
679
680 if (new_fd == NULL) {
681 new_fd = gpr_malloc(sizeof(grpc_fd));
682 gpr_mu_init(&new_fd->mu);
683 gpr_mu_init(&new_fd->pi_mu);
684 }
685
686 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
687 newly created fd (or an fd we got from the freelist), no one else would be
688 holding a lock to it anyway. */
689 gpr_mu_lock(&new_fd->mu);
690
691 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700692 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700693 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700694 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700695 new_fd->read_closure = CLOSURE_NOT_READY;
696 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700697 new_fd->polling_island = NULL;
698 new_fd->freelist_next = NULL;
699 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700700 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700701
702 gpr_mu_unlock(&new_fd->mu);
703
704 char *fd_name;
705 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
706 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
707 gpr_free(fd_name);
708#ifdef GRPC_FD_REF_COUNT_DEBUG
709 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
710#endif
711 return new_fd;
712}
713
714static bool fd_is_orphaned(grpc_fd *fd) {
715 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
716}
717
718static int fd_wrapped_fd(grpc_fd *fd) {
719 int ret_fd = -1;
720 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700721 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700722 ret_fd = fd->fd;
723 }
724 gpr_mu_unlock(&fd->mu);
725
726 return ret_fd;
727}
728
729static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
730 grpc_closure *on_done, int *release_fd,
731 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700732 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700733 gpr_mu_lock(&fd->mu);
734 fd->on_done_closure = on_done;
735
736 /* If release_fd is not NULL, we should be relinquishing control of the file
737 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700738 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700739 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700740 } else {
741 close(fd->fd);
742 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700743 }
744
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700745 fd->orphaned = true;
746
747 /* Remove the active status but keep referenced. We want this grpc_fd struct
748 to be alive (and not added to freelist) until the end of this function */
749 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700750
751 /* Remove the fd from the polling island:
752 - Update the fd->polling_island to point to the latest polling island
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700753 - Remove the fd from the polling island.
754 - Remove a ref to the polling island and set fd->polling_island to NULL */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700755 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700756 if (fd->polling_island != NULL) {
757 fd->polling_island =
758 polling_island_update_and_lock(fd->polling_island, 1, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700759 polling_island_remove_fd_locked(fd->polling_island, fd, is_fd_closed);
760
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700761 polling_island_unref_and_unlock(fd->polling_island, 1);
762 fd->polling_island = NULL;
763 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700764 gpr_mu_unlock(&fd->pi_mu);
765
766 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
767
768 gpr_mu_unlock(&fd->mu);
769 UNREF_BY(fd, 2, reason); /* Drop the reference */
770}
771
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700772static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
773 grpc_closure **st, grpc_closure *closure) {
774 if (*st == CLOSURE_NOT_READY) {
775 /* not ready ==> switch to a waiting state by setting the closure */
776 *st = closure;
777 } else if (*st == CLOSURE_READY) {
778 /* already ready ==> queue the closure to run immediately */
779 *st = CLOSURE_NOT_READY;
780 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
781 } else {
782 /* upcallptr was set to a different closure. This is an error! */
783 gpr_log(GPR_ERROR,
784 "User called a notify_on function with a previous callback still "
785 "pending");
786 abort();
787 }
788}
789
790/* returns 1 if state becomes not ready */
791static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
792 grpc_closure **st) {
793 if (*st == CLOSURE_READY) {
794 /* duplicate ready ==> ignore */
795 return 0;
796 } else if (*st == CLOSURE_NOT_READY) {
797 /* not ready, and not waiting ==> flag ready */
798 *st = CLOSURE_READY;
799 return 0;
800 } else {
801 /* waiting ==> queue closure */
802 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
803 *st = CLOSURE_NOT_READY;
804 return 1;
805 }
806}
807
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700808static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
809 grpc_fd *fd) {
810 grpc_pollset *notifier = NULL;
811
812 gpr_mu_lock(&fd->mu);
813 notifier = fd->read_notifier_pollset;
814 gpr_mu_unlock(&fd->mu);
815
816 return notifier;
817}
818
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700819static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
820 gpr_mu_lock(&fd->mu);
821 GPR_ASSERT(!fd->shutdown);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700822 fd->shutdown = true;
823
824 /* Flush any pending read and write closures. Since fd->shutdown is 'true' at
825 this point, the closures would be called with 'success = false' */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700826 set_ready_locked(exec_ctx, fd, &fd->read_closure);
827 set_ready_locked(exec_ctx, fd, &fd->write_closure);
828 gpr_mu_unlock(&fd->mu);
829}
830
831static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
832 grpc_closure *closure) {
833 gpr_mu_lock(&fd->mu);
834 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
835 gpr_mu_unlock(&fd->mu);
836}
837
838static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
839 grpc_closure *closure) {
840 gpr_mu_lock(&fd->mu);
841 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
842 gpr_mu_unlock(&fd->mu);
843}
844
845/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700846 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700847 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700848GPR_TLS_DECL(g_current_thread_pollset);
849GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700850
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700851static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700852#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700853 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700854#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700855}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700856
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700857static void poller_kick_init() {
858 grpc_poller_kick_signum = SIGRTMIN + 2;
859 signal(grpc_poller_kick_signum, sig_handler);
860}
861
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700862/* Global state management */
863static void pollset_global_init(void) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700864 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700865 gpr_tls_init(&g_current_thread_pollset);
866 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700867 poller_kick_init();
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700868}
869
870static void pollset_global_shutdown(void) {
871 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700872 gpr_tls_destroy(&g_current_thread_pollset);
873 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700874}
875
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700876static void pollset_worker_kick(grpc_pollset_worker *worker) {
877 pthread_kill(worker->pt_id, grpc_poller_kick_signum);
878}
879
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700880/* Return 1 if the pollset has active threads in pollset_work (pollset must
881 * be locked) */
882static int pollset_has_workers(grpc_pollset *p) {
883 return p->root_worker.next != &p->root_worker;
884}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700885
886static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
887 worker->prev->next = worker->next;
888 worker->next->prev = worker->prev;
889}
890
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700891static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
892 if (pollset_has_workers(p)) {
893 grpc_pollset_worker *w = p->root_worker.next;
894 remove_worker(p, w);
895 return w;
896 } else {
897 return NULL;
898 }
899}
900
901static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
902 worker->next = &p->root_worker;
903 worker->prev = worker->next->prev;
904 worker->prev->next = worker->next->prev = worker;
905}
906
907static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
908 worker->prev = &p->root_worker;
909 worker->next = worker->prev->next;
910 worker->prev->next = worker->next->prev = worker;
911}
912
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700913/* p->mu must be held before calling this function */
914static void pollset_kick(grpc_pollset *p,
915 grpc_pollset_worker *specific_worker) {
916 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700917
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700918 grpc_pollset_worker *worker = specific_worker;
919 if (worker != NULL) {
920 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700921 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700922 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700923 for (worker = p->root_worker.next; worker != &p->root_worker;
924 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700925 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
926 pollset_worker_kick(worker);
927 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700928 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700929 } else {
930 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700931 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700932 GPR_TIMER_END("pollset_kick.broadcast", 0);
933 } else {
934 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700935 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
936 pollset_worker_kick(worker);
937 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700938 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700939 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
940 /* Since worker == NULL, it means that we can kick "any" worker on this
941 pollset 'p'. If 'p' happens to be the same pollset this thread is
942 currently polling (i.e in pollset_work() function), then there is no need
943 to kick any other worker since the current thread can just absorb the
944 kick. This is the reason why we enter this case only when
945 g_current_thread_pollset is != p */
946
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700947 GPR_TIMER_MARK("kick_anonymous", 0);
948 worker = pop_front_worker(p);
949 if (worker != NULL) {
950 GPR_TIMER_MARK("finally_kick", 0);
951 push_back_worker(p, worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700952 pollset_worker_kick(worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700953 } else {
954 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700955 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700956 }
957 }
958
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700959 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700960}
961
962static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
963
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700964static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
965 gpr_mu_init(&pollset->mu);
966 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700967
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700968 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700969 pollset->kicked_without_pollers = false;
970
971 pollset->shutting_down = false;
972 pollset->finish_shutdown_called = false;
973 pollset->shutdown_done = NULL;
974
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700975 gpr_mu_init(&pollset->pi_mu);
976 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700977}
978
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700979/* Convert a timespec to milliseconds:
980 - Very small or negative poll times are clamped to zero to do a non-blocking
981 poll (which becomes spin polling)
982 - Other small values are rounded up to one millisecond
983 - Longer than a millisecond polls are rounded up to the next nearest
984 millisecond to avoid spinning
985 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700986static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
987 gpr_timespec now) {
988 gpr_timespec timeout;
989 static const int64_t max_spin_polling_us = 10;
990 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
991 return -1;
992 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700993
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700994 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
995 max_spin_polling_us,
996 GPR_TIMESPAN))) <= 0) {
997 return 0;
998 }
999 timeout = gpr_time_sub(deadline, now);
1000 return gpr_time_to_millis(gpr_time_add(
1001 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1002}
1003
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001004static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1005 grpc_pollset *notifier) {
1006 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001007 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001008 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1009 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001010 gpr_mu_unlock(&fd->mu);
1011}
1012
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001013static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001014 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1015 gpr_mu_lock(&fd->mu);
1016 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1017 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001018}
1019
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001020/* Release the reference to pollset->polling_island and set it to NULL.
1021 pollset->mu must be held */
1022static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
1023 gpr_mu_lock(&pollset->pi_mu);
1024 if (pollset->polling_island) {
1025 pollset->polling_island =
1026 polling_island_update_and_lock(pollset->polling_island, 1, 0);
1027 polling_island_unref_and_unlock(pollset->polling_island, 1);
1028 pollset->polling_island = NULL;
1029 }
1030 gpr_mu_unlock(&pollset->pi_mu);
1031}
1032
1033static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1034 grpc_pollset *pollset) {
1035 /* The pollset cannot have any workers if we are at this stage */
1036 GPR_ASSERT(!pollset_has_workers(pollset));
1037
1038 pollset->finish_shutdown_called = true;
1039 pollset_release_polling_island_locked(pollset);
1040
1041 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
1042}
1043
1044/* pollset->mu lock must be held by the caller before calling this */
1045static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1046 grpc_closure *closure) {
1047 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1048 GPR_ASSERT(!pollset->shutting_down);
1049 pollset->shutting_down = true;
1050 pollset->shutdown_done = closure;
1051 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1052
1053 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1054 because it would release the underlying polling island. In such a case, we
1055 let the last worker call finish_shutdown_locked() from pollset_work() */
1056 if (!pollset_has_workers(pollset)) {
1057 GPR_ASSERT(!pollset->finish_shutdown_called);
1058 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1059 finish_shutdown_locked(exec_ctx, pollset);
1060 }
1061 GPR_TIMER_END("pollset_shutdown", 0);
1062}
1063
1064/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1065 * than destroying the mutexes, there is nothing special that needs to be done
1066 * here */
1067static void pollset_destroy(grpc_pollset *pollset) {
1068 GPR_ASSERT(!pollset_has_workers(pollset));
1069 gpr_mu_destroy(&pollset->pi_mu);
1070 gpr_mu_destroy(&pollset->mu);
1071}
1072
1073static void pollset_reset(grpc_pollset *pollset) {
1074 GPR_ASSERT(pollset->shutting_down);
1075 GPR_ASSERT(!pollset_has_workers(pollset));
1076 pollset->shutting_down = false;
1077 pollset->finish_shutdown_called = false;
1078 pollset->kicked_without_pollers = false;
1079 pollset->shutdown_done = NULL;
1080 pollset_release_polling_island_locked(pollset);
1081}
1082
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001083#define GRPC_EPOLL_MAX_EVENTS 1000
1084static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1085 grpc_pollset *pollset, int timeout_ms,
1086 sigset_t *sig_mask) {
1087 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001088 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001089 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001090 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001091 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1092
1093 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
1094 polling island pointed by pollset->polling_island.
1095 Acquire the following locks:
1096 - pollset->mu (which we already have)
1097 - pollset->pi_mu
1098 - pollset->polling_island->mu */
1099 gpr_mu_lock(&pollset->pi_mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001100
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001101 pi = pollset->polling_island;
1102 if (pi == NULL) {
1103 pi = polling_island_create(NULL, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001104 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001105
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001106 /* In addition to locking the polling island, add a ref so that the island
1107 does not get destroyed (which means the epoll_fd won't be closed) while
1108 we are are doing an epoll_wait() on the epoll_fd */
1109 pi = polling_island_update_and_lock(pi, 1, 1);
1110 epoll_fd = pi->epoll_fd;
1111
1112 /* Update the pollset->polling_island */
1113 pollset->polling_island = pi;
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001114
1115#ifdef GRPC_EPOLL_DEBUG
1116 if (pollset->polling_island->fd_cnt == 0) {
1117 gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_fd: %d, No other fds",
1118 epoll_fd);
1119 }
1120 for (size_t i = 0; i < pollset->polling_island->fd_cnt; i++) {
1121 gpr_log(GPR_DEBUG,
1122 "pollset_work_and_unlock: epoll_fd: %d, fd_count: %d, fd[%d]: %d",
1123 epoll_fd, pollset->polling_island->fd_cnt, i,
1124 pollset->polling_island->fds[i]->fd);
1125 }
1126#endif
1127 gpr_mu_unlock(&pollset->polling_island->mu);
1128
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001129 gpr_mu_unlock(&pollset->pi_mu);
1130 gpr_mu_unlock(&pollset->mu);
1131
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001132 do {
1133 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1134 sig_mask);
1135 if (ep_rv < 0) {
1136 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001137 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
1138 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001139 /* We were interrupted. Save an interation by doing a zero timeout
1140 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001141 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001142 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001143 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001144
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001145 int i;
1146 for (i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001147 void *data_ptr = ep_ev[i].data.ptr;
1148 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001149 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001150 } else if (data_ptr == &polling_island_wakeup_fd) {
1151 /* This means that our polling island is merged with a different
1152 island. We do not have to do anything here since the subsequent call
1153 to the function pollset_work_and_unlock() will pick up the correct
1154 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001155 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001156 grpc_fd *fd = data_ptr;
1157 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1158 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1159 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001160 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001161 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001162 }
1163 if (write_ev || cancel) {
1164 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001165 }
1166 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001167 }
1168 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001169
1170 GPR_ASSERT(pi != NULL);
1171
1172 /* Before leaving, release the extra ref we added to the polling island */
1173 /* It is important to note that at this point 'pi' may not be the same as
1174 * pollset->polling_island. This is because pollset->polling_island pointer
1175 * gets updated whenever the underlying polling island is merged with another
1176 * island and while we are doing epoll_wait() above, the polling island may
1177 * have been merged */
1178
1179 /* TODO (sreek) - Change the ref count on polling island to gpr_atm so that
1180 * we do not have to do this here */
1181 gpr_mu_lock(&pi->mu);
1182 polling_island_unref_and_unlock(pi, 1);
1183
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001184 GPR_TIMER_END("pollset_work_and_unlock", 0);
1185}
1186
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001187/* pollset->mu lock must be held by the caller before calling this.
1188 The function pollset_work() may temporarily release the lock (pollset->mu)
1189 during the course of its execution but it will always re-acquire the lock and
1190 ensure that it is held by the time the function returns */
1191static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1192 grpc_pollset_worker **worker_hdl, gpr_timespec now,
1193 gpr_timespec deadline) {
1194 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001195 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1196
1197 sigset_t new_mask;
1198 sigset_t orig_mask;
1199
1200 grpc_pollset_worker worker;
1201 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001202 worker.pt_id = pthread_self();
1203
1204 *worker_hdl = &worker;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001205 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1206 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001207
1208 if (pollset->kicked_without_pollers) {
1209 /* If the pollset was kicked without pollers, pretend that the current
1210 worker got the kick and skip polling. A kick indicates that there is some
1211 work that needs attention like an event on the completion queue or an
1212 alarm */
1213 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1214 pollset->kicked_without_pollers = 0;
1215 } else if (!pollset->shutting_down) {
1216 sigemptyset(&new_mask);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001217 sigaddset(&new_mask, grpc_poller_kick_signum);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001218 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001219 sigdelset(&orig_mask, grpc_poller_kick_signum);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001220
1221 push_front_worker(pollset, &worker);
1222
1223 pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
1224 grpc_exec_ctx_flush(exec_ctx);
1225
1226 gpr_mu_lock(&pollset->mu);
1227 remove_worker(pollset, &worker);
1228 }
1229
1230 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1231 false at this point) and the pollset is shutting down, we may have to
1232 finish the shutdown process by calling finish_shutdown_locked().
1233 See pollset_shutdown() for more details.
1234
1235 Note: Continuing to access pollset here is safe; it is the caller's
1236 responsibility to not destroy a pollset when it has outstanding calls to
1237 pollset_work() */
1238 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1239 !pollset->finish_shutdown_called) {
1240 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1241 finish_shutdown_locked(exec_ctx, pollset);
1242
1243 gpr_mu_unlock(&pollset->mu);
1244 grpc_exec_ctx_flush(exec_ctx);
1245 gpr_mu_lock(&pollset->mu);
1246 }
1247
1248 *worker_hdl = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001249 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1250 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001251 GPR_TIMER_END("pollset_work", 0);
1252}
1253
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001254static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1255 grpc_fd *fd) {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001256 /* TODO sreek - Double check if we need to get a pollset->mu lock here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001257 gpr_mu_lock(&pollset->pi_mu);
1258 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001259
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001260 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001261
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001262 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1263 * equal, do nothing.
1264 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1265 * a new polling island (with a refcount of 2) and make the polling_island
1266 * fields in both fd and pollset to point to the new island
1267 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1268 * the NULL polling_island field to point to the non-NULL polling_island
1269 * field (ensure that the refcount on the polling island is incremented by
1270 * 1 to account for the newly added reference)
1271 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1272 * and different, merge both the polling islands and update the
1273 * polling_island fields in both fd and pollset to point to the merged
1274 * polling island.
1275 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001276 if (fd->polling_island == pollset->polling_island) {
1277 pi_new = fd->polling_island;
1278 if (pi_new == NULL) {
1279 pi_new = polling_island_create(fd, 2);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001280 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001281 } else if (fd->polling_island == NULL) {
1282 pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001283 polling_island_add_fds_locked(pollset->polling_island, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001284 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001285 } else if (pollset->polling_island == NULL) {
1286 pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001287 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001288 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001289 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001290 }
1291
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001292 fd->polling_island = pollset->polling_island = pi_new;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001293
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001294 gpr_mu_unlock(&fd->pi_mu);
1295 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001296}
1297
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001298/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001299 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001300 */
1301
1302static grpc_pollset_set *pollset_set_create(void) {
1303 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1304 memset(pollset_set, 0, sizeof(*pollset_set));
1305 gpr_mu_init(&pollset_set->mu);
1306 return pollset_set;
1307}
1308
1309static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1310 size_t i;
1311 gpr_mu_destroy(&pollset_set->mu);
1312 for (i = 0; i < pollset_set->fd_count; i++) {
1313 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1314 }
1315 gpr_free(pollset_set->pollsets);
1316 gpr_free(pollset_set->pollset_sets);
1317 gpr_free(pollset_set->fds);
1318 gpr_free(pollset_set);
1319}
1320
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001321static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1322 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1323 size_t i;
1324 gpr_mu_lock(&pollset_set->mu);
1325 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1326 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1327 pollset_set->fds = gpr_realloc(
1328 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1329 }
1330 GRPC_FD_REF(fd, "pollset_set");
1331 pollset_set->fds[pollset_set->fd_count++] = fd;
1332 for (i = 0; i < pollset_set->pollset_count; i++) {
1333 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1334 }
1335 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1336 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1337 }
1338 gpr_mu_unlock(&pollset_set->mu);
1339}
1340
1341static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1342 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1343 size_t i;
1344 gpr_mu_lock(&pollset_set->mu);
1345 for (i = 0; i < pollset_set->fd_count; i++) {
1346 if (pollset_set->fds[i] == fd) {
1347 pollset_set->fd_count--;
1348 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1349 pollset_set->fds[pollset_set->fd_count]);
1350 GRPC_FD_UNREF(fd, "pollset_set");
1351 break;
1352 }
1353 }
1354 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1355 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1356 }
1357 gpr_mu_unlock(&pollset_set->mu);
1358}
1359
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001360static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1361 grpc_pollset_set *pollset_set,
1362 grpc_pollset *pollset) {
1363 size_t i, j;
1364 gpr_mu_lock(&pollset_set->mu);
1365 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1366 pollset_set->pollset_capacity =
1367 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1368 pollset_set->pollsets =
1369 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1370 sizeof(*pollset_set->pollsets));
1371 }
1372 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1373 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1374 if (fd_is_orphaned(pollset_set->fds[i])) {
1375 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1376 } else {
1377 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1378 pollset_set->fds[j++] = pollset_set->fds[i];
1379 }
1380 }
1381 pollset_set->fd_count = j;
1382 gpr_mu_unlock(&pollset_set->mu);
1383}
1384
1385static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1386 grpc_pollset_set *pollset_set,
1387 grpc_pollset *pollset) {
1388 size_t i;
1389 gpr_mu_lock(&pollset_set->mu);
1390 for (i = 0; i < pollset_set->pollset_count; i++) {
1391 if (pollset_set->pollsets[i] == pollset) {
1392 pollset_set->pollset_count--;
1393 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1394 pollset_set->pollsets[pollset_set->pollset_count]);
1395 break;
1396 }
1397 }
1398 gpr_mu_unlock(&pollset_set->mu);
1399}
1400
1401static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1402 grpc_pollset_set *bag,
1403 grpc_pollset_set *item) {
1404 size_t i, j;
1405 gpr_mu_lock(&bag->mu);
1406 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1407 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1408 bag->pollset_sets =
1409 gpr_realloc(bag->pollset_sets,
1410 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1411 }
1412 bag->pollset_sets[bag->pollset_set_count++] = item;
1413 for (i = 0, j = 0; i < bag->fd_count; i++) {
1414 if (fd_is_orphaned(bag->fds[i])) {
1415 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1416 } else {
1417 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1418 bag->fds[j++] = bag->fds[i];
1419 }
1420 }
1421 bag->fd_count = j;
1422 gpr_mu_unlock(&bag->mu);
1423}
1424
1425static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1426 grpc_pollset_set *bag,
1427 grpc_pollset_set *item) {
1428 size_t i;
1429 gpr_mu_lock(&bag->mu);
1430 for (i = 0; i < bag->pollset_set_count; i++) {
1431 if (bag->pollset_sets[i] == item) {
1432 bag->pollset_set_count--;
1433 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1434 bag->pollset_sets[bag->pollset_set_count]);
1435 break;
1436 }
1437 }
1438 gpr_mu_unlock(&bag->mu);
1439}
1440
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001441/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001442 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001443 */
1444
1445static void shutdown_engine(void) {
1446 fd_global_shutdown();
1447 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001448 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001449}
1450
1451static const grpc_event_engine_vtable vtable = {
1452 .pollset_size = sizeof(grpc_pollset),
1453
1454 .fd_create = fd_create,
1455 .fd_wrapped_fd = fd_wrapped_fd,
1456 .fd_orphan = fd_orphan,
1457 .fd_shutdown = fd_shutdown,
1458 .fd_notify_on_read = fd_notify_on_read,
1459 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001460 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001461
1462 .pollset_init = pollset_init,
1463 .pollset_shutdown = pollset_shutdown,
1464 .pollset_reset = pollset_reset,
1465 .pollset_destroy = pollset_destroy,
1466 .pollset_work = pollset_work,
1467 .pollset_kick = pollset_kick,
1468 .pollset_add_fd = pollset_add_fd,
1469
1470 .pollset_set_create = pollset_set_create,
1471 .pollset_set_destroy = pollset_set_destroy,
1472 .pollset_set_add_pollset = pollset_set_add_pollset,
1473 .pollset_set_del_pollset = pollset_set_del_pollset,
1474 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1475 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1476 .pollset_set_add_fd = pollset_set_add_fd,
1477 .pollset_set_del_fd = pollset_set_del_fd,
1478
1479 .kick_poller = kick_poller,
1480
1481 .shutdown_engine = shutdown_engine,
1482};
1483
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001484/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1485 * Create a dummy epoll_fd to make sure epoll support is available */
1486static bool is_epoll_available() {
1487 int fd = epoll_create1(EPOLL_CLOEXEC);
1488 if (fd < 0) {
1489 gpr_log(
1490 GPR_ERROR,
1491 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1492 fd);
1493 return false;
1494 }
1495 close(fd);
1496 return true;
1497}
1498
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001499const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001500 if (!is_epoll_available()) {
1501 return NULL;
1502 }
1503
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001504 fd_global_init();
1505 pollset_global_init();
1506 polling_island_global_init();
1507 return &vtable;
1508}
1509
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001510#else /* defined(GPR_LINUX_EPOLL) */
1511/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1512 * NULL */
1513const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
1514
1515#endif /* !defined(GPR_LINUX_EPOLL) */