blob: d45f87c2f8e84d244e03def23d5b225adfb3b78f [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc/support/port_platform.h>
35
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070036#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070037
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070038#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
40#include <assert.h>
41#include <errno.h>
42#include <poll.h>
43#include <signal.h>
44#include <string.h>
45#include <sys/epoll.h>
46#include <sys/socket.h>
47#include <unistd.h>
48
49#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/string_util.h>
52#include <grpc/support/tls.h>
53#include <grpc/support/useful.h>
54
55#include "src/core/lib/iomgr/ev_posix.h"
56#include "src/core/lib/iomgr/iomgr_internal.h"
57#include "src/core/lib/iomgr/wakeup_fd_posix.h"
58#include "src/core/lib/profiling/timers.h"
59#include "src/core/lib/support/block_annotate.h"
60
61struct polling_island;
62
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070063static int grpc_poller_kick_signum;
64
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070065/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070066 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070067 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070068struct grpc_fd {
69 int fd;
70 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070071 bit 0 : 1=Active / 0=Orphaned
72 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070073 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074 gpr_atm refst;
75
76 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070077
78 /* Indicates that the fd is shutdown and that any pending read/write closures
79 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070080 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070081
82 /* The fd is either closed or we relinquished control of it. In either cases,
83 this indicates that the 'fd' on this structure is no longer valid */
84 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -070086 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070087 grpc_closure *read_closure;
88 grpc_closure *write_closure;
89
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070090 /* The polling island to which this fd belongs to and the mutex protecting the
91 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093 struct polling_island *polling_island;
94
95 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070096 grpc_closure *on_done_closure;
97
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070098 /* The pollset that last noticed that the fd is readable */
99 grpc_pollset *read_notifier_pollset;
100
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700101 grpc_iomgr_object iomgr_object;
102};
103
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700104/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700105#ifdef GRPC_FD_REF_COUNT_DEBUG
106static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
107static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
108 int line);
109#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
110#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
111#else
112static void fd_ref(grpc_fd *fd);
113static void fd_unref(grpc_fd *fd);
114#define GRPC_FD_REF(fd, reason) fd_ref(fd)
115#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
116#endif
117
118static void fd_global_init(void);
119static void fd_global_shutdown(void);
120
121#define CLOSURE_NOT_READY ((grpc_closure *)0)
122#define CLOSURE_READY ((grpc_closure *)1)
123
124/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700125 * Polling-island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700126 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700127/* TODO: sree: Consider making ref_cnt and merged_to to gpr_atm - This would
128 * significantly reduce the number of mutex acquisition calls. */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700129typedef struct polling_island {
130 gpr_mu mu;
131 int ref_cnt;
132
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700133 /* Points to the polling_island this merged into.
134 * If merged_to is not NULL, all the remaining fields (except mu and ref_cnt)
135 * are invalid and must be ignored */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700136 struct polling_island *merged_to;
137
138 /* The fd of the underlying epoll set */
139 int epoll_fd;
140
141 /* The file descriptors in the epoll set */
142 size_t fd_cnt;
143 size_t fd_capacity;
144 grpc_fd **fds;
145
146 /* Polling islands that are no longer needed are kept in a freelist so that
147 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700148 free list */
149 struct polling_island *next_free;
150} polling_island;
151
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700152/*******************************************************************************
153 * Pollset Declarations
154 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700155struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700156 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700157 struct grpc_pollset_worker *next;
158 struct grpc_pollset_worker *prev;
159};
160
161struct grpc_pollset {
162 gpr_mu mu;
163 grpc_pollset_worker root_worker;
164 bool kicked_without_pollers;
165
166 bool shutting_down; /* Is the pollset shutting down ? */
167 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
168 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
169
170 /* The polling island to which this pollset belongs to and the mutex
171 protecting the field */
Sree Kuchibhotlae682e462016-06-08 15:40:21 -0700172 /* TODO: sreek: This lock might actually be adding more overhead to the
173 critical path (i.e pollset_work() function). Consider removing this lock
174 and just using the overall pollset lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700175 gpr_mu pi_mu;
176 struct polling_island *polling_island;
177};
178
179/*******************************************************************************
180 * Pollset-set Declarations
181 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700182/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
183 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
184 * the current pollset_set would result in polling island merges. This would
185 * remove the need to maintain fd_count here. This will also significantly
186 * simplify the grpc_fd structure since we would no longer need to explicitly
187 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700188struct grpc_pollset_set {
189 gpr_mu mu;
190
191 size_t pollset_count;
192 size_t pollset_capacity;
193 grpc_pollset **pollsets;
194
195 size_t pollset_set_count;
196 size_t pollset_set_capacity;
197 struct grpc_pollset_set **pollset_sets;
198
199 size_t fd_count;
200 size_t fd_capacity;
201 grpc_fd **fds;
202};
203
204/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700205 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700206 */
207
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700208/* The wakeup fd that is used to wake up all threads in a Polling island. This
209 is useful in the polling island merge operation where we need to wakeup all
210 the threads currently polling the smaller polling island (so that they can
211 start polling the new/merged polling island)
212
213 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
214 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
215static grpc_wakeup_fd polling_island_wakeup_fd;
216
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700217/* Polling island freelist */
218static gpr_mu g_pi_freelist_mu;
219static polling_island *g_pi_freelist = NULL;
220
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700221/* The caller is expected to hold pi->mu lock before calling this function */
222static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700223 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700224 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700225 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700226 struct epoll_event ev;
227
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700228 for (i = 0; i < fd_count; i++) {
229 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
230 ev.data.ptr = fds[i];
231 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700232
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700233 if (err < 0) {
234 if (errno != EEXIST) {
235 /* TODO: sreek - We need a better way to bubble up this error instead of
236 just logging a message */
237 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
238 fds[i]->fd, strerror(errno));
239 }
240
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700241 continue;
242 }
243
244 if (pi->fd_cnt == pi->fd_capacity) {
245 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
246 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
247 }
248
249 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700250 if (add_fd_refs) {
251 GRPC_FD_REF(fds[i], "polling_island");
252 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700253 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700254}
255
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700256/* The caller is expected to hold pi->mu before calling this */
257static void polling_island_add_wakeup_fd_locked(polling_island *pi,
258 grpc_wakeup_fd *wakeup_fd) {
259 struct epoll_event ev;
260 int err;
261
262 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
263 ev.data.ptr = wakeup_fd;
264 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
265 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
266 if (err < 0) {
267 gpr_log(GPR_ERROR,
268 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
269 ". Error: %s",
270 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
271 strerror(errno));
272 }
273}
274
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700275/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700276static void polling_island_remove_all_fds_locked(polling_island *pi,
277 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700278 int err;
279 size_t i;
280
281 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700282 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700283 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700284 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700285 * just logging a message */
286 gpr_log(GPR_ERROR, "epoll_ctl deleting fds[%d]: %d failed with error: %s",
287 i, pi->fds[i]->fd, strerror(errno));
288 }
289
290 if (remove_fd_refs) {
291 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700292 }
293 }
294
295 pi->fd_cnt = 0;
296}
297
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700298/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700299static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700300 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700301 int err;
302 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700303
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700304 /* If fd is already closed, then it would have been automatically been removed
305 from the epoll set */
306 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700307 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
308 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700309 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700310 fd->fd, strerror(errno));
311 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700312 }
313
314 for (i = 0; i < pi->fd_cnt; i++) {
315 if (pi->fds[i] == fd) {
316 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700317 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700318 break;
319 }
320 }
321}
322
323static polling_island *polling_island_create(grpc_fd *initial_fd,
324 int initial_ref_cnt) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700325 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700326
327 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700328 gpr_mu_lock(&g_pi_freelist_mu);
329 if (g_pi_freelist != NULL) {
330 pi = g_pi_freelist;
331 g_pi_freelist = g_pi_freelist->next_free;
332 pi->next_free = NULL;
333 }
334 gpr_mu_unlock(&g_pi_freelist_mu);
335
336 /* Create new polling island if we could not get one from the free list */
337 if (pi == NULL) {
338 pi = gpr_malloc(sizeof(*pi));
339 gpr_mu_init(&pi->mu);
340 pi->fd_cnt = 0;
341 pi->fd_capacity = 0;
342 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700343 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700344
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700345 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
346 if (pi->epoll_fd < 0) {
347 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
348 strerror(errno));
349 }
350 GPR_ASSERT(pi->epoll_fd >= 0);
351
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700352 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700353
354 pi->ref_cnt = initial_ref_cnt;
355 pi->merged_to = NULL;
356 pi->next_free = NULL;
357
358 if (initial_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700359 /* It is not really needed to get the pi->mu lock here. If this is a newly
360 created polling island (or one that we got from the freelist), no one
361 else would be holding a lock to it anyway */
362 gpr_mu_lock(&pi->mu);
363 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
364 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700365 }
366
367 return pi;
368}
369
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700370static void polling_island_delete(polling_island *pi) {
371 GPR_ASSERT(pi->ref_cnt == 0);
372 GPR_ASSERT(pi->fd_cnt == 0);
373
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700374 close(pi->epoll_fd);
375 pi->epoll_fd = -1;
376
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700377 pi->merged_to = NULL;
378
379 gpr_mu_lock(&g_pi_freelist_mu);
380 pi->next_free = g_pi_freelist;
381 g_pi_freelist = pi;
382 gpr_mu_unlock(&g_pi_freelist_mu);
383}
384
385void polling_island_unref_and_unlock(polling_island *pi, int unref_by) {
386 pi->ref_cnt -= unref_by;
387 int ref_cnt = pi->ref_cnt;
388 GPR_ASSERT(ref_cnt >= 0);
389
390 gpr_mu_unlock(&pi->mu);
391
392 if (ref_cnt == 0) {
393 polling_island_delete(pi);
394 }
395}
396
397polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
398 int add_ref_by) {
399 polling_island *next = NULL;
400 gpr_mu_lock(&pi->mu);
401 while (pi->merged_to != NULL) {
402 next = pi->merged_to;
403 polling_island_unref_and_unlock(pi, unref_by);
404 pi = next;
405 gpr_mu_lock(&pi->mu);
406 }
407
408 pi->ref_cnt += add_ref_by;
409 return pi;
410}
411
412void polling_island_pair_update_and_lock(polling_island **p,
413 polling_island **q) {
414 polling_island *pi_1 = *p;
415 polling_island *pi_2 = *q;
416 polling_island *temp = NULL;
417 bool pi_1_locked = false;
418 bool pi_2_locked = false;
419 int num_swaps = 0;
420
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700421 /* Loop until either pi_1 == pi_2 or until we acquired locks on both pi_1
422 and pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700423 while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700424 /* The following assertions are true at this point:
425 - pi_1 != pi_2 (else, the while loop would have exited)
426 - pi_1 MAY be locked
427 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700428
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700429 /* To maintain lock order consistency, always lock polling_island node with
430 lower address first.
431 First, make sure pi_1 < pi_2 before proceeding any further. If it turns
432 out that pi_1 > pi_2, unlock pi_1 if locked (because pi_2 is not locked
433 at this point and having pi_1 locked would violate the lock order) and
434 swap pi_1 and pi_2 so that pi_1 becomes less than pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700435 if (pi_1 > pi_2) {
436 if (pi_1_locked) {
437 gpr_mu_unlock(&pi_1->mu);
438 pi_1_locked = false;
439 }
440
441 GPR_SWAP(polling_island *, pi_1, pi_2);
442 num_swaps++;
443 }
444
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700445 /* The following assertions are true at this point:
446 - pi_1 != pi_2
447 - pi_1 < pi_2 (address of pi_1 is less than that of pi_2)
448 - pi_1 MAYBE locked
449 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700450
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700451 /* Lock pi_1 (if pi_1 is pointing to the terminal node in the list) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700452 if (!pi_1_locked) {
453 gpr_mu_lock(&pi_1->mu);
454 pi_1_locked = true;
455
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700456 /* If pi_1 is not terminal node (i.e pi_1->merged_to != NULL), we are not
457 done locking this polling_island yet. Release the lock on this node and
458 advance pi_1 to the next node in the list; and go to the beginning of
459 the loop (we can't proceed to locking pi_2 unless we locked pi_1 first)
460 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700461 if (pi_1->merged_to != NULL) {
462 temp = pi_1->merged_to;
463 polling_island_unref_and_unlock(pi_1, 1);
464 pi_1 = temp;
465 pi_1_locked = false;
466
467 continue;
468 }
469 }
470
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700471 /* The following assertions are true at this point:
472 - pi_1 is locked
473 - pi_2 is unlocked
474 - pi_1 != pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700475
476 gpr_mu_lock(&pi_2->mu);
477 pi_2_locked = true;
478
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700479 /* If pi_2 is not terminal node, we are not done locking this polling_island
480 yet. Release the lock and update pi_2 to the next node in the list */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700481 if (pi_2->merged_to != NULL) {
482 temp = pi_2->merged_to;
483 polling_island_unref_and_unlock(pi_2, 1);
484 pi_2 = temp;
485 pi_2_locked = false;
486 }
487 }
488
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700489 /* At this point, either pi_1 == pi_2 AND/OR we got both locks */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700490 if (pi_1 == pi_2) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700491 /* We may or may not have gotten the lock. If we didn't, walk the rest of
492 the polling_island list and get the lock */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700493 GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
494 if (!pi_1_locked) {
495 pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
496 }
497 } else {
498 GPR_ASSERT(pi_1_locked && pi_2_locked);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700499 /* If we swapped pi_1 and pi_2 odd number of times, do one more swap so that
500 pi_1 and pi_2 point to the same polling_island lists they started off
501 with at the beginning of this function (i.e *p and *q respectively) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700502 if (num_swaps % 2 > 0) {
503 GPR_SWAP(polling_island *, pi_1, pi_2);
504 }
505 }
506
507 *p = pi_1;
508 *q = pi_2;
509}
510
511polling_island *polling_island_merge(polling_island *p, polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700512 /* Get locks on both the polling islands */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700513 polling_island_pair_update_and_lock(&p, &q);
514
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700515 /* TODO: sreek: Think about this scenario some more */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700516 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700517 /* Nothing needs to be done here */
518 gpr_mu_unlock(&p->mu);
519 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700520 }
521
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700522 /* Make sure that p points to the polling island with fewer fds than q */
523 if (p->fd_cnt > q->fd_cnt) {
524 GPR_SWAP(polling_island *, p, q);
525 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700526
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700527 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
528 )Note that the refcounts on the fds being moved will not change here. This
529 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700530 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
531 polling_island_remove_all_fds_locked(p, false);
532
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700533 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
534 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
535
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700536 /* The merged polling island inherits all the ref counts of the island merging
537 with it */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700538 q->ref_cnt += p->ref_cnt;
539
540 gpr_mu_unlock(&p->mu);
541 gpr_mu_unlock(&q->mu);
542
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700543 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700544}
545
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700546static void polling_island_global_init() {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700547 gpr_mu_init(&g_pi_freelist_mu);
548 g_pi_freelist = NULL;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700549 grpc_wakeup_fd_init(&polling_island_wakeup_fd);
550 grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700551}
552
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700553static void polling_island_global_shutdown() {
554 polling_island *next;
555 gpr_mu_lock(&g_pi_freelist_mu);
556 gpr_mu_unlock(&g_pi_freelist_mu);
557 while (g_pi_freelist != NULL) {
558 next = g_pi_freelist->next_free;
559 gpr_mu_destroy(&g_pi_freelist->mu);
560 gpr_free(g_pi_freelist->fds);
561 gpr_free(g_pi_freelist);
562 g_pi_freelist = next;
563 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700564 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700565
566 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700567}
568
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700569/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700570 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700571 */
572
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700573/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700574 * but instead so that implementations with multiple threads in (for example)
575 * epoll_wait deal with the race between pollset removal and incoming poll
576 * notifications.
577 *
578 * The problem is that the poller ultimately holds a reference to this
579 * object, so it is very difficult to know when is safe to free it, at least
580 * without some expensive synchronization.
581 *
582 * If we keep the object freelisted, in the worst case losing this race just
583 * becomes a spurious read notification on a reused fd.
584 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700585
586/* The alarm system needs to be able to wakeup 'some poller' sometimes
587 * (specifically when a new alarm needs to be triggered earlier than the next
588 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
589 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700590
591/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
592 * sure to wake up one polling thread (which can wake up other threads if
593 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700594grpc_wakeup_fd grpc_global_wakeup_fd;
595
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700596static grpc_fd *fd_freelist = NULL;
597static gpr_mu fd_freelist_mu;
598
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700599#ifdef GRPC_FD_REF_COUNT_DEBUG
600#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
601#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
602static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
603 int line) {
604 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
605 gpr_atm_no_barrier_load(&fd->refst),
606 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
607#else
608#define REF_BY(fd, n, reason) ref_by(fd, n)
609#define UNREF_BY(fd, n, reason) unref_by(fd, n)
610static void ref_by(grpc_fd *fd, int n) {
611#endif
612 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
613}
614
615#ifdef GRPC_FD_REF_COUNT_DEBUG
616static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
617 int line) {
618 gpr_atm old;
619 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
620 gpr_atm_no_barrier_load(&fd->refst),
621 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
622#else
623static void unref_by(grpc_fd *fd, int n) {
624 gpr_atm old;
625#endif
626 old = gpr_atm_full_fetch_add(&fd->refst, -n);
627 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700628 /* Add the fd to the freelist */
629 gpr_mu_lock(&fd_freelist_mu);
630 fd->freelist_next = fd_freelist;
631 fd_freelist = fd;
632 grpc_iomgr_unregister_object(&fd->iomgr_object);
633 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700634 } else {
635 GPR_ASSERT(old > n);
636 }
637}
638
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700639/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700640#ifdef GRPC_FD_REF_COUNT_DEBUG
641static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
642 int line) {
643 ref_by(fd, 2, reason, file, line);
644}
645
646static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
647 int line) {
648 unref_by(fd, 2, reason, file, line);
649}
650#else
651static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700652static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
653#endif
654
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700655static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
656
657static void fd_global_shutdown(void) {
658 gpr_mu_lock(&fd_freelist_mu);
659 gpr_mu_unlock(&fd_freelist_mu);
660 while (fd_freelist != NULL) {
661 grpc_fd *fd = fd_freelist;
662 fd_freelist = fd_freelist->freelist_next;
663 gpr_mu_destroy(&fd->mu);
664 gpr_free(fd);
665 }
666 gpr_mu_destroy(&fd_freelist_mu);
667}
668
669static grpc_fd *fd_create(int fd, const char *name) {
670 grpc_fd *new_fd = NULL;
671
672 gpr_mu_lock(&fd_freelist_mu);
673 if (fd_freelist != NULL) {
674 new_fd = fd_freelist;
675 fd_freelist = fd_freelist->freelist_next;
676 }
677 gpr_mu_unlock(&fd_freelist_mu);
678
679 if (new_fd == NULL) {
680 new_fd = gpr_malloc(sizeof(grpc_fd));
681 gpr_mu_init(&new_fd->mu);
682 gpr_mu_init(&new_fd->pi_mu);
683 }
684
685 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
686 newly created fd (or an fd we got from the freelist), no one else would be
687 holding a lock to it anyway. */
688 gpr_mu_lock(&new_fd->mu);
689
690 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700691 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700692 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700693 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700694 new_fd->read_closure = CLOSURE_NOT_READY;
695 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700696 new_fd->polling_island = NULL;
697 new_fd->freelist_next = NULL;
698 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700699 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700700
701 gpr_mu_unlock(&new_fd->mu);
702
703 char *fd_name;
704 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
705 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
706 gpr_free(fd_name);
707#ifdef GRPC_FD_REF_COUNT_DEBUG
708 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
709#endif
710 return new_fd;
711}
712
713static bool fd_is_orphaned(grpc_fd *fd) {
714 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
715}
716
717static int fd_wrapped_fd(grpc_fd *fd) {
718 int ret_fd = -1;
719 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700720 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700721 ret_fd = fd->fd;
722 }
723 gpr_mu_unlock(&fd->mu);
724
725 return ret_fd;
726}
727
728static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
729 grpc_closure *on_done, int *release_fd,
730 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700731 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700732 gpr_mu_lock(&fd->mu);
733 fd->on_done_closure = on_done;
734
735 /* If release_fd is not NULL, we should be relinquishing control of the file
736 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700737 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700738 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700739 } else {
740 close(fd->fd);
741 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700742 }
743
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700744 fd->orphaned = true;
745
746 /* Remove the active status but keep referenced. We want this grpc_fd struct
747 to be alive (and not added to freelist) until the end of this function */
748 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700749
750 /* Remove the fd from the polling island:
751 - Update the fd->polling_island to point to the latest polling island
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700752 - Remove the fd from the polling island.
753 - Remove a ref to the polling island and set fd->polling_island to NULL */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700754 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700755 if (fd->polling_island != NULL) {
756 fd->polling_island =
757 polling_island_update_and_lock(fd->polling_island, 1, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700758 polling_island_remove_fd_locked(fd->polling_island, fd, is_fd_closed);
759
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700760 polling_island_unref_and_unlock(fd->polling_island, 1);
761 fd->polling_island = NULL;
762 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700763 gpr_mu_unlock(&fd->pi_mu);
764
765 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
766
767 gpr_mu_unlock(&fd->mu);
768 UNREF_BY(fd, 2, reason); /* Drop the reference */
769}
770
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700771static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
772 grpc_closure **st, grpc_closure *closure) {
773 if (*st == CLOSURE_NOT_READY) {
774 /* not ready ==> switch to a waiting state by setting the closure */
775 *st = closure;
776 } else if (*st == CLOSURE_READY) {
777 /* already ready ==> queue the closure to run immediately */
778 *st = CLOSURE_NOT_READY;
779 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
780 } else {
781 /* upcallptr was set to a different closure. This is an error! */
782 gpr_log(GPR_ERROR,
783 "User called a notify_on function with a previous callback still "
784 "pending");
785 abort();
786 }
787}
788
789/* returns 1 if state becomes not ready */
790static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
791 grpc_closure **st) {
792 if (*st == CLOSURE_READY) {
793 /* duplicate ready ==> ignore */
794 return 0;
795 } else if (*st == CLOSURE_NOT_READY) {
796 /* not ready, and not waiting ==> flag ready */
797 *st = CLOSURE_READY;
798 return 0;
799 } else {
800 /* waiting ==> queue closure */
801 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
802 *st = CLOSURE_NOT_READY;
803 return 1;
804 }
805}
806
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700807static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
808 grpc_fd *fd) {
809 grpc_pollset *notifier = NULL;
810
811 gpr_mu_lock(&fd->mu);
812 notifier = fd->read_notifier_pollset;
813 gpr_mu_unlock(&fd->mu);
814
815 return notifier;
816}
817
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700818static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
819 gpr_mu_lock(&fd->mu);
820 GPR_ASSERT(!fd->shutdown);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700821 fd->shutdown = true;
822
823 /* Flush any pending read and write closures. Since fd->shutdown is 'true' at
824 this point, the closures would be called with 'success = false' */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700825 set_ready_locked(exec_ctx, fd, &fd->read_closure);
826 set_ready_locked(exec_ctx, fd, &fd->write_closure);
827 gpr_mu_unlock(&fd->mu);
828}
829
830static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
831 grpc_closure *closure) {
832 gpr_mu_lock(&fd->mu);
833 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
834 gpr_mu_unlock(&fd->mu);
835}
836
837static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
838 grpc_closure *closure) {
839 gpr_mu_lock(&fd->mu);
840 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
841 gpr_mu_unlock(&fd->mu);
842}
843
844/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700845 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700846 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700847GPR_TLS_DECL(g_current_thread_pollset);
848GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700849
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700850static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700851#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700852 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700853#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700854}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700855
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700856static void poller_kick_init() {
857 grpc_poller_kick_signum = SIGRTMIN + 2;
858 signal(grpc_poller_kick_signum, sig_handler);
859}
860
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700861/* Global state management */
862static void pollset_global_init(void) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700863 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700864 gpr_tls_init(&g_current_thread_pollset);
865 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700866 poller_kick_init();
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700867}
868
869static void pollset_global_shutdown(void) {
870 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700871 gpr_tls_destroy(&g_current_thread_pollset);
872 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700873}
874
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700875static void pollset_worker_kick(grpc_pollset_worker *worker) {
876 pthread_kill(worker->pt_id, grpc_poller_kick_signum);
877}
878
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700879/* Return 1 if the pollset has active threads in pollset_work (pollset must
880 * be locked) */
881static int pollset_has_workers(grpc_pollset *p) {
882 return p->root_worker.next != &p->root_worker;
883}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700884
885static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
886 worker->prev->next = worker->next;
887 worker->next->prev = worker->prev;
888}
889
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700890static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
891 if (pollset_has_workers(p)) {
892 grpc_pollset_worker *w = p->root_worker.next;
893 remove_worker(p, w);
894 return w;
895 } else {
896 return NULL;
897 }
898}
899
900static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
901 worker->next = &p->root_worker;
902 worker->prev = worker->next->prev;
903 worker->prev->next = worker->next->prev = worker;
904}
905
906static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
907 worker->prev = &p->root_worker;
908 worker->next = worker->prev->next;
909 worker->prev->next = worker->next->prev = worker;
910}
911
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700912/* p->mu must be held before calling this function */
913static void pollset_kick(grpc_pollset *p,
914 grpc_pollset_worker *specific_worker) {
915 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700916
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700917 grpc_pollset_worker *worker = specific_worker;
918 if (worker != NULL) {
919 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700920 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700921 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700922 for (worker = p->root_worker.next; worker != &p->root_worker;
923 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700924 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
925 pollset_worker_kick(worker);
926 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700927 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700928 } else {
929 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700930 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700931 GPR_TIMER_END("pollset_kick.broadcast", 0);
932 } else {
933 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700934 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
935 pollset_worker_kick(worker);
936 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700937 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700938 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
939 /* Since worker == NULL, it means that we can kick "any" worker on this
940 pollset 'p'. If 'p' happens to be the same pollset this thread is
941 currently polling (i.e in pollset_work() function), then there is no need
942 to kick any other worker since the current thread can just absorb the
943 kick. This is the reason why we enter this case only when
944 g_current_thread_pollset is != p */
945
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700946 GPR_TIMER_MARK("kick_anonymous", 0);
947 worker = pop_front_worker(p);
948 if (worker != NULL) {
949 GPR_TIMER_MARK("finally_kick", 0);
950 push_back_worker(p, worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700951 pollset_worker_kick(worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700952 } else {
953 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700954 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700955 }
956 }
957
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700958 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700959}
960
961static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
962
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700963static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
964 gpr_mu_init(&pollset->mu);
965 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700966
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700967 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700968 pollset->kicked_without_pollers = false;
969
970 pollset->shutting_down = false;
971 pollset->finish_shutdown_called = false;
972 pollset->shutdown_done = NULL;
973
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700974 gpr_mu_init(&pollset->pi_mu);
975 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700976}
977
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700978/* Convert a timespec to milliseconds:
979 - Very small or negative poll times are clamped to zero to do a non-blocking
980 poll (which becomes spin polling)
981 - Other small values are rounded up to one millisecond
982 - Longer than a millisecond polls are rounded up to the next nearest
983 millisecond to avoid spinning
984 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700985static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
986 gpr_timespec now) {
987 gpr_timespec timeout;
988 static const int64_t max_spin_polling_us = 10;
989 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
990 return -1;
991 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700992
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700993 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
994 max_spin_polling_us,
995 GPR_TIMESPAN))) <= 0) {
996 return 0;
997 }
998 timeout = gpr_time_sub(deadline, now);
999 return gpr_time_to_millis(gpr_time_add(
1000 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1001}
1002
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001003static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1004 grpc_pollset *notifier) {
1005 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001006 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001007 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1008 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001009 gpr_mu_unlock(&fd->mu);
1010}
1011
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001012static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001013 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1014 gpr_mu_lock(&fd->mu);
1015 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1016 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001017}
1018
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001019/* Release the reference to pollset->polling_island and set it to NULL.
1020 pollset->mu must be held */
1021static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
1022 gpr_mu_lock(&pollset->pi_mu);
1023 if (pollset->polling_island) {
1024 pollset->polling_island =
1025 polling_island_update_and_lock(pollset->polling_island, 1, 0);
1026 polling_island_unref_and_unlock(pollset->polling_island, 1);
1027 pollset->polling_island = NULL;
1028 }
1029 gpr_mu_unlock(&pollset->pi_mu);
1030}
1031
1032static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1033 grpc_pollset *pollset) {
1034 /* The pollset cannot have any workers if we are at this stage */
1035 GPR_ASSERT(!pollset_has_workers(pollset));
1036
1037 pollset->finish_shutdown_called = true;
1038 pollset_release_polling_island_locked(pollset);
1039
1040 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
1041}
1042
1043/* pollset->mu lock must be held by the caller before calling this */
1044static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1045 grpc_closure *closure) {
1046 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1047 GPR_ASSERT(!pollset->shutting_down);
1048 pollset->shutting_down = true;
1049 pollset->shutdown_done = closure;
1050 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1051
1052 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1053 because it would release the underlying polling island. In such a case, we
1054 let the last worker call finish_shutdown_locked() from pollset_work() */
1055 if (!pollset_has_workers(pollset)) {
1056 GPR_ASSERT(!pollset->finish_shutdown_called);
1057 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1058 finish_shutdown_locked(exec_ctx, pollset);
1059 }
1060 GPR_TIMER_END("pollset_shutdown", 0);
1061}
1062
1063/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1064 * than destroying the mutexes, there is nothing special that needs to be done
1065 * here */
1066static void pollset_destroy(grpc_pollset *pollset) {
1067 GPR_ASSERT(!pollset_has_workers(pollset));
1068 gpr_mu_destroy(&pollset->pi_mu);
1069 gpr_mu_destroy(&pollset->mu);
1070}
1071
1072static void pollset_reset(grpc_pollset *pollset) {
1073 GPR_ASSERT(pollset->shutting_down);
1074 GPR_ASSERT(!pollset_has_workers(pollset));
1075 pollset->shutting_down = false;
1076 pollset->finish_shutdown_called = false;
1077 pollset->kicked_without_pollers = false;
1078 pollset->shutdown_done = NULL;
1079 pollset_release_polling_island_locked(pollset);
1080}
1081
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001082#define GRPC_EPOLL_MAX_EVENTS 1000
1083static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1084 grpc_pollset *pollset, int timeout_ms,
1085 sigset_t *sig_mask) {
1086 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001087 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001088 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001089 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001090 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1091
1092 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
1093 polling island pointed by pollset->polling_island.
1094 Acquire the following locks:
1095 - pollset->mu (which we already have)
1096 - pollset->pi_mu
1097 - pollset->polling_island->mu */
1098 gpr_mu_lock(&pollset->pi_mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001099
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001100 pi = pollset->polling_island;
1101 if (pi == NULL) {
1102 pi = polling_island_create(NULL, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001103 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001104
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001105 /* In addition to locking the polling island, add a ref so that the island
1106 does not get destroyed (which means the epoll_fd won't be closed) while
1107 we are are doing an epoll_wait() on the epoll_fd */
1108 pi = polling_island_update_and_lock(pi, 1, 1);
1109 epoll_fd = pi->epoll_fd;
1110
1111 /* Update the pollset->polling_island */
1112 pollset->polling_island = pi;
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001113
1114#ifdef GRPC_EPOLL_DEBUG
1115 if (pollset->polling_island->fd_cnt == 0) {
1116 gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_fd: %d, No other fds",
1117 epoll_fd);
1118 }
1119 for (size_t i = 0; i < pollset->polling_island->fd_cnt; i++) {
1120 gpr_log(GPR_DEBUG,
1121 "pollset_work_and_unlock: epoll_fd: %d, fd_count: %d, fd[%d]: %d",
1122 epoll_fd, pollset->polling_island->fd_cnt, i,
1123 pollset->polling_island->fds[i]->fd);
1124 }
1125#endif
1126 gpr_mu_unlock(&pollset->polling_island->mu);
1127
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001128 gpr_mu_unlock(&pollset->pi_mu);
1129 gpr_mu_unlock(&pollset->mu);
1130
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001131 do {
1132 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1133 sig_mask);
1134 if (ep_rv < 0) {
1135 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001136 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
1137 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001138 /* We were interrupted. Save an interation by doing a zero timeout
1139 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001140 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001141 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001142 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001143
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001144 int i;
1145 for (i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001146 void *data_ptr = ep_ev[i].data.ptr;
1147 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001148 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001149 } else if (data_ptr == &polling_island_wakeup_fd) {
1150 /* This means that our polling island is merged with a different
1151 island. We do not have to do anything here since the subsequent call
1152 to the function pollset_work_and_unlock() will pick up the correct
1153 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001154 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001155 grpc_fd *fd = data_ptr;
1156 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1157 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1158 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001159 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001160 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001161 }
1162 if (write_ev || cancel) {
1163 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001164 }
1165 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001166 }
1167 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001168
1169 GPR_ASSERT(pi != NULL);
1170
1171 /* Before leaving, release the extra ref we added to the polling island */
1172 /* It is important to note that at this point 'pi' may not be the same as
1173 * pollset->polling_island. This is because pollset->polling_island pointer
1174 * gets updated whenever the underlying polling island is merged with another
1175 * island and while we are doing epoll_wait() above, the polling island may
1176 * have been merged */
1177
1178 /* TODO (sreek) - Change the ref count on polling island to gpr_atm so that
1179 * we do not have to do this here */
1180 gpr_mu_lock(&pi->mu);
1181 polling_island_unref_and_unlock(pi, 1);
1182
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001183 GPR_TIMER_END("pollset_work_and_unlock", 0);
1184}
1185
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001186/* pollset->mu lock must be held by the caller before calling this.
1187 The function pollset_work() may temporarily release the lock (pollset->mu)
1188 during the course of its execution but it will always re-acquire the lock and
1189 ensure that it is held by the time the function returns */
1190static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1191 grpc_pollset_worker **worker_hdl, gpr_timespec now,
1192 gpr_timespec deadline) {
1193 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001194 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1195
1196 sigset_t new_mask;
1197 sigset_t orig_mask;
1198
1199 grpc_pollset_worker worker;
1200 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001201 worker.pt_id = pthread_self();
1202
1203 *worker_hdl = &worker;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001204 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1205 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001206
1207 if (pollset->kicked_without_pollers) {
1208 /* If the pollset was kicked without pollers, pretend that the current
1209 worker got the kick and skip polling. A kick indicates that there is some
1210 work that needs attention like an event on the completion queue or an
1211 alarm */
1212 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1213 pollset->kicked_without_pollers = 0;
1214 } else if (!pollset->shutting_down) {
1215 sigemptyset(&new_mask);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001216 sigaddset(&new_mask, grpc_poller_kick_signum);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001217 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001218 sigdelset(&orig_mask, grpc_poller_kick_signum);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001219
1220 push_front_worker(pollset, &worker);
1221
1222 pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
1223 grpc_exec_ctx_flush(exec_ctx);
1224
1225 gpr_mu_lock(&pollset->mu);
1226 remove_worker(pollset, &worker);
1227 }
1228
1229 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1230 false at this point) and the pollset is shutting down, we may have to
1231 finish the shutdown process by calling finish_shutdown_locked().
1232 See pollset_shutdown() for more details.
1233
1234 Note: Continuing to access pollset here is safe; it is the caller's
1235 responsibility to not destroy a pollset when it has outstanding calls to
1236 pollset_work() */
1237 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1238 !pollset->finish_shutdown_called) {
1239 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1240 finish_shutdown_locked(exec_ctx, pollset);
1241
1242 gpr_mu_unlock(&pollset->mu);
1243 grpc_exec_ctx_flush(exec_ctx);
1244 gpr_mu_lock(&pollset->mu);
1245 }
1246
1247 *worker_hdl = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001248 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1249 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001250 GPR_TIMER_END("pollset_work", 0);
1251}
1252
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001253static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1254 grpc_fd *fd) {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001255 /* TODO sreek - Double check if we need to get a pollset->mu lock here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001256 gpr_mu_lock(&pollset->pi_mu);
1257 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001258
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001259 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001260
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001261 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1262 * equal, do nothing.
1263 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1264 * a new polling island (with a refcount of 2) and make the polling_island
1265 * fields in both fd and pollset to point to the new island
1266 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1267 * the NULL polling_island field to point to the non-NULL polling_island
1268 * field (ensure that the refcount on the polling island is incremented by
1269 * 1 to account for the newly added reference)
1270 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1271 * and different, merge both the polling islands and update the
1272 * polling_island fields in both fd and pollset to point to the merged
1273 * polling island.
1274 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001275 if (fd->polling_island == pollset->polling_island) {
1276 pi_new = fd->polling_island;
1277 if (pi_new == NULL) {
1278 pi_new = polling_island_create(fd, 2);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001279 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001280 } else if (fd->polling_island == NULL) {
1281 pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001282 polling_island_add_fds_locked(pollset->polling_island, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001283 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001284 } else if (pollset->polling_island == NULL) {
1285 pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001286 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001287 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001288 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001289 }
1290
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001291 fd->polling_island = pollset->polling_island = pi_new;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001292
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001293 gpr_mu_unlock(&fd->pi_mu);
1294 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001295}
1296
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001297/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001298 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001299 */
1300
1301static grpc_pollset_set *pollset_set_create(void) {
1302 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1303 memset(pollset_set, 0, sizeof(*pollset_set));
1304 gpr_mu_init(&pollset_set->mu);
1305 return pollset_set;
1306}
1307
1308static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1309 size_t i;
1310 gpr_mu_destroy(&pollset_set->mu);
1311 for (i = 0; i < pollset_set->fd_count; i++) {
1312 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1313 }
1314 gpr_free(pollset_set->pollsets);
1315 gpr_free(pollset_set->pollset_sets);
1316 gpr_free(pollset_set->fds);
1317 gpr_free(pollset_set);
1318}
1319
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001320static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1321 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1322 size_t i;
1323 gpr_mu_lock(&pollset_set->mu);
1324 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1325 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1326 pollset_set->fds = gpr_realloc(
1327 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1328 }
1329 GRPC_FD_REF(fd, "pollset_set");
1330 pollset_set->fds[pollset_set->fd_count++] = fd;
1331 for (i = 0; i < pollset_set->pollset_count; i++) {
1332 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1333 }
1334 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1335 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1336 }
1337 gpr_mu_unlock(&pollset_set->mu);
1338}
1339
1340static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1341 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1342 size_t i;
1343 gpr_mu_lock(&pollset_set->mu);
1344 for (i = 0; i < pollset_set->fd_count; i++) {
1345 if (pollset_set->fds[i] == fd) {
1346 pollset_set->fd_count--;
1347 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1348 pollset_set->fds[pollset_set->fd_count]);
1349 GRPC_FD_UNREF(fd, "pollset_set");
1350 break;
1351 }
1352 }
1353 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1354 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1355 }
1356 gpr_mu_unlock(&pollset_set->mu);
1357}
1358
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001359static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1360 grpc_pollset_set *pollset_set,
1361 grpc_pollset *pollset) {
1362 size_t i, j;
1363 gpr_mu_lock(&pollset_set->mu);
1364 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1365 pollset_set->pollset_capacity =
1366 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1367 pollset_set->pollsets =
1368 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1369 sizeof(*pollset_set->pollsets));
1370 }
1371 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1372 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1373 if (fd_is_orphaned(pollset_set->fds[i])) {
1374 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1375 } else {
1376 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1377 pollset_set->fds[j++] = pollset_set->fds[i];
1378 }
1379 }
1380 pollset_set->fd_count = j;
1381 gpr_mu_unlock(&pollset_set->mu);
1382}
1383
1384static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1385 grpc_pollset_set *pollset_set,
1386 grpc_pollset *pollset) {
1387 size_t i;
1388 gpr_mu_lock(&pollset_set->mu);
1389 for (i = 0; i < pollset_set->pollset_count; i++) {
1390 if (pollset_set->pollsets[i] == pollset) {
1391 pollset_set->pollset_count--;
1392 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1393 pollset_set->pollsets[pollset_set->pollset_count]);
1394 break;
1395 }
1396 }
1397 gpr_mu_unlock(&pollset_set->mu);
1398}
1399
1400static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1401 grpc_pollset_set *bag,
1402 grpc_pollset_set *item) {
1403 size_t i, j;
1404 gpr_mu_lock(&bag->mu);
1405 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1406 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1407 bag->pollset_sets =
1408 gpr_realloc(bag->pollset_sets,
1409 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1410 }
1411 bag->pollset_sets[bag->pollset_set_count++] = item;
1412 for (i = 0, j = 0; i < bag->fd_count; i++) {
1413 if (fd_is_orphaned(bag->fds[i])) {
1414 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1415 } else {
1416 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1417 bag->fds[j++] = bag->fds[i];
1418 }
1419 }
1420 bag->fd_count = j;
1421 gpr_mu_unlock(&bag->mu);
1422}
1423
1424static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1425 grpc_pollset_set *bag,
1426 grpc_pollset_set *item) {
1427 size_t i;
1428 gpr_mu_lock(&bag->mu);
1429 for (i = 0; i < bag->pollset_set_count; i++) {
1430 if (bag->pollset_sets[i] == item) {
1431 bag->pollset_set_count--;
1432 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1433 bag->pollset_sets[bag->pollset_set_count]);
1434 break;
1435 }
1436 }
1437 gpr_mu_unlock(&bag->mu);
1438}
1439
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001440/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001441 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001442 */
1443
1444static void shutdown_engine(void) {
1445 fd_global_shutdown();
1446 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001447 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001448}
1449
1450static const grpc_event_engine_vtable vtable = {
1451 .pollset_size = sizeof(grpc_pollset),
1452
1453 .fd_create = fd_create,
1454 .fd_wrapped_fd = fd_wrapped_fd,
1455 .fd_orphan = fd_orphan,
1456 .fd_shutdown = fd_shutdown,
1457 .fd_notify_on_read = fd_notify_on_read,
1458 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001459 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001460
1461 .pollset_init = pollset_init,
1462 .pollset_shutdown = pollset_shutdown,
1463 .pollset_reset = pollset_reset,
1464 .pollset_destroy = pollset_destroy,
1465 .pollset_work = pollset_work,
1466 .pollset_kick = pollset_kick,
1467 .pollset_add_fd = pollset_add_fd,
1468
1469 .pollset_set_create = pollset_set_create,
1470 .pollset_set_destroy = pollset_set_destroy,
1471 .pollset_set_add_pollset = pollset_set_add_pollset,
1472 .pollset_set_del_pollset = pollset_set_del_pollset,
1473 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1474 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1475 .pollset_set_add_fd = pollset_set_add_fd,
1476 .pollset_set_del_fd = pollset_set_del_fd,
1477
1478 .kick_poller = kick_poller,
1479
1480 .shutdown_engine = shutdown_engine,
1481};
1482
1483const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
1484 fd_global_init();
1485 pollset_global_init();
1486 polling_island_global_init();
1487 return &vtable;
1488}
1489
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001490#else /* defined(GPR_LINUX_EPOLL) */
1491/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1492 * NULL */
1493const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
1494
1495#endif /* !defined(GPR_LINUX_EPOLL) */