blob: 3a3c136a5aa836a4c223929999de50e5cfa6fbf5 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc/support/port_platform.h>
35
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070036#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070037
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070038#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
40#include <assert.h>
41#include <errno.h>
42#include <poll.h>
43#include <signal.h>
44#include <string.h>
45#include <sys/epoll.h>
46#include <sys/socket.h>
47#include <unistd.h>
48
49#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/string_util.h>
52#include <grpc/support/tls.h>
53#include <grpc/support/useful.h>
54
55#include "src/core/lib/iomgr/ev_posix.h"
56#include "src/core/lib/iomgr/iomgr_internal.h"
57#include "src/core/lib/iomgr/wakeup_fd_posix.h"
58#include "src/core/lib/profiling/timers.h"
59#include "src/core/lib/support/block_annotate.h"
60
61struct polling_island;
62
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070063static int grpc_poller_kick_signum;
64
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070065/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070066 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070067 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070068struct grpc_fd {
69 int fd;
70 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070071 bit 0 : 1=Active / 0=Orphaned
72 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070073 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074 gpr_atm refst;
75
76 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070077
78 /* Indicates that the fd is shutdown and that any pending read/write closures
79 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070080 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070081
82 /* The fd is either closed or we relinquished control of it. In either cases,
83 this indicates that the 'fd' on this structure is no longer valid */
84 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085
86 grpc_closure *read_closure;
87 grpc_closure *write_closure;
88
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070089 /* The polling island to which this fd belongs to and the mutex protecting the
90 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070091 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 struct polling_island *polling_island;
93
94 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095 grpc_closure *on_done_closure;
96
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070097 /* The pollset that last noticed that the fd is readable */
98 grpc_pollset *read_notifier_pollset;
99
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700100 grpc_iomgr_object iomgr_object;
101};
102
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700103/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700104#ifdef GRPC_FD_REF_COUNT_DEBUG
105static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
106static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
107 int line);
108#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
109#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
110#else
111static void fd_ref(grpc_fd *fd);
112static void fd_unref(grpc_fd *fd);
113#define GRPC_FD_REF(fd, reason) fd_ref(fd)
114#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
115#endif
116
117static void fd_global_init(void);
118static void fd_global_shutdown(void);
119
120#define CLOSURE_NOT_READY ((grpc_closure *)0)
121#define CLOSURE_READY ((grpc_closure *)1)
122
123/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700124 * Polling-island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700125 */
126typedef struct polling_island {
127 gpr_mu mu;
128 int ref_cnt;
129
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700130 /* Points to the polling_island this merged into.
131 * If merged_to is not NULL, all the remaining fields (except mu and ref_cnt)
132 * are invalid and must be ignored */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700133 struct polling_island *merged_to;
134
135 /* The fd of the underlying epoll set */
136 int epoll_fd;
137
138 /* The file descriptors in the epoll set */
139 size_t fd_cnt;
140 size_t fd_capacity;
141 grpc_fd **fds;
142
143 /* Polling islands that are no longer needed are kept in a freelist so that
144 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700145 free list */
146 struct polling_island *next_free;
147} polling_island;
148
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700149/*******************************************************************************
150 * Pollset Declarations
151 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700152struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700153 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700154 struct grpc_pollset_worker *next;
155 struct grpc_pollset_worker *prev;
156};
157
158struct grpc_pollset {
159 gpr_mu mu;
160 grpc_pollset_worker root_worker;
161 bool kicked_without_pollers;
162
163 bool shutting_down; /* Is the pollset shutting down ? */
164 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
165 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
166
167 /* The polling island to which this pollset belongs to and the mutex
168 protecting the field */
169 gpr_mu pi_mu;
170 struct polling_island *polling_island;
171};
172
173/*******************************************************************************
174 * Pollset-set Declarations
175 */
176struct grpc_pollset_set {
177 gpr_mu mu;
178
179 size_t pollset_count;
180 size_t pollset_capacity;
181 grpc_pollset **pollsets;
182
183 size_t pollset_set_count;
184 size_t pollset_set_capacity;
185 struct grpc_pollset_set **pollset_sets;
186
187 size_t fd_count;
188 size_t fd_capacity;
189 grpc_fd **fds;
190};
191
192/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700193 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700194 */
195
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700196/* The wakeup fd that is used to wake up all threads in a Polling island. This
197 is useful in the polling island merge operation where we need to wakeup all
198 the threads currently polling the smaller polling island (so that they can
199 start polling the new/merged polling island)
200
201 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
202 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
203static grpc_wakeup_fd polling_island_wakeup_fd;
204
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700205/* Polling island freelist */
206static gpr_mu g_pi_freelist_mu;
207static polling_island *g_pi_freelist = NULL;
208
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700209/* The caller is expected to hold pi->mu lock before calling this function */
210static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700211 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700212 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700213 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700214 struct epoll_event ev;
215
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700216 for (i = 0; i < fd_count; i++) {
217 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
218 ev.data.ptr = fds[i];
219 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700220
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700221 if (err < 0) {
222 if (errno != EEXIST) {
223 /* TODO: sreek - We need a better way to bubble up this error instead of
224 just logging a message */
225 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
226 fds[i]->fd, strerror(errno));
227 }
228
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700229 continue;
230 }
231
232 if (pi->fd_cnt == pi->fd_capacity) {
233 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
234 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
235 }
236
237 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700238 if (add_fd_refs) {
239 GRPC_FD_REF(fds[i], "polling_island");
240 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700241 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700242}
243
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700244/* The caller is expected to hold pi->mu before calling this */
245static void polling_island_add_wakeup_fd_locked(polling_island *pi,
246 grpc_wakeup_fd *wakeup_fd) {
247 struct epoll_event ev;
248 int err;
249
250 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
251 ev.data.ptr = wakeup_fd;
252 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
253 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
254 if (err < 0) {
255 gpr_log(GPR_ERROR,
256 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
257 ". Error: %s",
258 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
259 strerror(errno));
260 }
261}
262
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700263/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700264static void polling_island_remove_all_fds_locked(polling_island *pi,
265 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700266 int err;
267 size_t i;
268
269 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700270 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700271 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700272 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700273 * just logging a message */
274 gpr_log(GPR_ERROR, "epoll_ctl deleting fds[%d]: %d failed with error: %s",
275 i, pi->fds[i]->fd, strerror(errno));
276 }
277
278 if (remove_fd_refs) {
279 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700280 }
281 }
282
283 pi->fd_cnt = 0;
284}
285
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700286/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700287static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700288 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700289 int err;
290 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700291
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700292 /* If fd is already closed, then it would have been automatically been removed
293 from the epoll set */
294 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700295 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
296 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700297 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700298 fd->fd, strerror(errno));
299 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700300 }
301
302 for (i = 0; i < pi->fd_cnt; i++) {
303 if (pi->fds[i] == fd) {
304 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700305 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700306 break;
307 }
308 }
309}
310
311static polling_island *polling_island_create(grpc_fd *initial_fd,
312 int initial_ref_cnt) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700313 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700314
315 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700316 gpr_mu_lock(&g_pi_freelist_mu);
317 if (g_pi_freelist != NULL) {
318 pi = g_pi_freelist;
319 g_pi_freelist = g_pi_freelist->next_free;
320 pi->next_free = NULL;
321 }
322 gpr_mu_unlock(&g_pi_freelist_mu);
323
324 /* Create new polling island if we could not get one from the free list */
325 if (pi == NULL) {
326 pi = gpr_malloc(sizeof(*pi));
327 gpr_mu_init(&pi->mu);
328 pi->fd_cnt = 0;
329 pi->fd_capacity = 0;
330 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700331 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700332
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700333 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
334 if (pi->epoll_fd < 0) {
335 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
336 strerror(errno));
337 }
338 GPR_ASSERT(pi->epoll_fd >= 0);
339
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700340 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700341
342 pi->ref_cnt = initial_ref_cnt;
343 pi->merged_to = NULL;
344 pi->next_free = NULL;
345
346 if (initial_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700347 /* It is not really needed to get the pi->mu lock here. If this is a newly
348 created polling island (or one that we got from the freelist), no one
349 else would be holding a lock to it anyway */
350 gpr_mu_lock(&pi->mu);
351 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
352 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700353 }
354
355 return pi;
356}
357
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700358static void polling_island_delete(polling_island *pi) {
359 GPR_ASSERT(pi->ref_cnt == 0);
360 GPR_ASSERT(pi->fd_cnt == 0);
361
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700362 close(pi->epoll_fd);
363 pi->epoll_fd = -1;
364
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700365 pi->merged_to = NULL;
366
367 gpr_mu_lock(&g_pi_freelist_mu);
368 pi->next_free = g_pi_freelist;
369 g_pi_freelist = pi;
370 gpr_mu_unlock(&g_pi_freelist_mu);
371}
372
373void polling_island_unref_and_unlock(polling_island *pi, int unref_by) {
374 pi->ref_cnt -= unref_by;
375 int ref_cnt = pi->ref_cnt;
376 GPR_ASSERT(ref_cnt >= 0);
377
378 gpr_mu_unlock(&pi->mu);
379
380 if (ref_cnt == 0) {
381 polling_island_delete(pi);
382 }
383}
384
385polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
386 int add_ref_by) {
387 polling_island *next = NULL;
388 gpr_mu_lock(&pi->mu);
389 while (pi->merged_to != NULL) {
390 next = pi->merged_to;
391 polling_island_unref_and_unlock(pi, unref_by);
392 pi = next;
393 gpr_mu_lock(&pi->mu);
394 }
395
396 pi->ref_cnt += add_ref_by;
397 return pi;
398}
399
400void polling_island_pair_update_and_lock(polling_island **p,
401 polling_island **q) {
402 polling_island *pi_1 = *p;
403 polling_island *pi_2 = *q;
404 polling_island *temp = NULL;
405 bool pi_1_locked = false;
406 bool pi_2_locked = false;
407 int num_swaps = 0;
408
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700409 /* Loop until either pi_1 == pi_2 or until we acquired locks on both pi_1
410 and pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700411 while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700412 /* The following assertions are true at this point:
413 - pi_1 != pi_2 (else, the while loop would have exited)
414 - pi_1 MAY be locked
415 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700416
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700417 /* To maintain lock order consistency, always lock polling_island node with
418 lower address first.
419 First, make sure pi_1 < pi_2 before proceeding any further. If it turns
420 out that pi_1 > pi_2, unlock pi_1 if locked (because pi_2 is not locked
421 at this point and having pi_1 locked would violate the lock order) and
422 swap pi_1 and pi_2 so that pi_1 becomes less than pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700423 if (pi_1 > pi_2) {
424 if (pi_1_locked) {
425 gpr_mu_unlock(&pi_1->mu);
426 pi_1_locked = false;
427 }
428
429 GPR_SWAP(polling_island *, pi_1, pi_2);
430 num_swaps++;
431 }
432
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700433 /* The following assertions are true at this point:
434 - pi_1 != pi_2
435 - pi_1 < pi_2 (address of pi_1 is less than that of pi_2)
436 - pi_1 MAYBE locked
437 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700438
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700439 /* Lock pi_1 (if pi_1 is pointing to the terminal node in the list) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700440 if (!pi_1_locked) {
441 gpr_mu_lock(&pi_1->mu);
442 pi_1_locked = true;
443
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700444 /* If pi_1 is not terminal node (i.e pi_1->merged_to != NULL), we are not
445 done locking this polling_island yet. Release the lock on this node and
446 advance pi_1 to the next node in the list; and go to the beginning of
447 the loop (we can't proceed to locking pi_2 unless we locked pi_1 first)
448 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700449 if (pi_1->merged_to != NULL) {
450 temp = pi_1->merged_to;
451 polling_island_unref_and_unlock(pi_1, 1);
452 pi_1 = temp;
453 pi_1_locked = false;
454
455 continue;
456 }
457 }
458
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700459 /* The following assertions are true at this point:
460 - pi_1 is locked
461 - pi_2 is unlocked
462 - pi_1 != pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700463
464 gpr_mu_lock(&pi_2->mu);
465 pi_2_locked = true;
466
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700467 /* If pi_2 is not terminal node, we are not done locking this polling_island
468 yet. Release the lock and update pi_2 to the next node in the list */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700469 if (pi_2->merged_to != NULL) {
470 temp = pi_2->merged_to;
471 polling_island_unref_and_unlock(pi_2, 1);
472 pi_2 = temp;
473 pi_2_locked = false;
474 }
475 }
476
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700477 /* At this point, either pi_1 == pi_2 AND/OR we got both locks */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700478 if (pi_1 == pi_2) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700479 /* We may or may not have gotten the lock. If we didn't, walk the rest of
480 the polling_island list and get the lock */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700481 GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
482 if (!pi_1_locked) {
483 pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
484 }
485 } else {
486 GPR_ASSERT(pi_1_locked && pi_2_locked);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700487 /* If we swapped pi_1 and pi_2 odd number of times, do one more swap so that
488 pi_1 and pi_2 point to the same polling_island lists they started off
489 with at the beginning of this function (i.e *p and *q respectively) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700490 if (num_swaps % 2 > 0) {
491 GPR_SWAP(polling_island *, pi_1, pi_2);
492 }
493 }
494
495 *p = pi_1;
496 *q = pi_2;
497}
498
499polling_island *polling_island_merge(polling_island *p, polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700500 /* Get locks on both the polling islands */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700501 polling_island_pair_update_and_lock(&p, &q);
502
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700503 /* TODO: sreek: Think about this scenario some more */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700504 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700505 /* Nothing needs to be done here */
506 gpr_mu_unlock(&p->mu);
507 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700508 }
509
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700510 /* Make sure that p points to the polling island with fewer fds than q */
511 if (p->fd_cnt > q->fd_cnt) {
512 GPR_SWAP(polling_island *, p, q);
513 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700514
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700515 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
516 )Note that the refcounts on the fds being moved will not change here. This
517 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700518 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
519 polling_island_remove_all_fds_locked(p, false);
520
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700521 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
522 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
523
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700524 /* The merged polling island inherits all the ref counts of the island merging
525 with it */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700526 q->ref_cnt += p->ref_cnt;
527
528 gpr_mu_unlock(&p->mu);
529 gpr_mu_unlock(&q->mu);
530
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700531 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700532}
533
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700534static void polling_island_global_init() {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700535 gpr_mu_init(&g_pi_freelist_mu);
536 g_pi_freelist = NULL;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700537 grpc_wakeup_fd_init(&polling_island_wakeup_fd);
538 grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700539}
540
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700541static void polling_island_global_shutdown() {
542 polling_island *next;
543 gpr_mu_lock(&g_pi_freelist_mu);
544 gpr_mu_unlock(&g_pi_freelist_mu);
545 while (g_pi_freelist != NULL) {
546 next = g_pi_freelist->next_free;
547 gpr_mu_destroy(&g_pi_freelist->mu);
548 gpr_free(g_pi_freelist->fds);
549 gpr_free(g_pi_freelist);
550 g_pi_freelist = next;
551 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700552 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700553
554 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700555}
556
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700557/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700558 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700559 */
560
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700561/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700562 * but instead so that implementations with multiple threads in (for example)
563 * epoll_wait deal with the race between pollset removal and incoming poll
564 * notifications.
565 *
566 * The problem is that the poller ultimately holds a reference to this
567 * object, so it is very difficult to know when is safe to free it, at least
568 * without some expensive synchronization.
569 *
570 * If we keep the object freelisted, in the worst case losing this race just
571 * becomes a spurious read notification on a reused fd.
572 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700573
574/* The alarm system needs to be able to wakeup 'some poller' sometimes
575 * (specifically when a new alarm needs to be triggered earlier than the next
576 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
577 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700578
579/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
580 * sure to wake up one polling thread (which can wake up other threads if
581 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700582grpc_wakeup_fd grpc_global_wakeup_fd;
583
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700584static grpc_fd *fd_freelist = NULL;
585static gpr_mu fd_freelist_mu;
586
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700587#ifdef GRPC_FD_REF_COUNT_DEBUG
588#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
589#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
590static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
591 int line) {
592 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
593 gpr_atm_no_barrier_load(&fd->refst),
594 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
595#else
596#define REF_BY(fd, n, reason) ref_by(fd, n)
597#define UNREF_BY(fd, n, reason) unref_by(fd, n)
598static void ref_by(grpc_fd *fd, int n) {
599#endif
600 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
601}
602
603#ifdef GRPC_FD_REF_COUNT_DEBUG
604static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
605 int line) {
606 gpr_atm old;
607 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
608 gpr_atm_no_barrier_load(&fd->refst),
609 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
610#else
611static void unref_by(grpc_fd *fd, int n) {
612 gpr_atm old;
613#endif
614 old = gpr_atm_full_fetch_add(&fd->refst, -n);
615 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700616 /* Add the fd to the freelist */
617 gpr_mu_lock(&fd_freelist_mu);
618 fd->freelist_next = fd_freelist;
619 fd_freelist = fd;
620 grpc_iomgr_unregister_object(&fd->iomgr_object);
621 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700622 } else {
623 GPR_ASSERT(old > n);
624 }
625}
626
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700627/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700628#ifdef GRPC_FD_REF_COUNT_DEBUG
629static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
630 int line) {
631 ref_by(fd, 2, reason, file, line);
632}
633
634static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
635 int line) {
636 unref_by(fd, 2, reason, file, line);
637}
638#else
639static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700640static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
641#endif
642
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700643static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
644
645static void fd_global_shutdown(void) {
646 gpr_mu_lock(&fd_freelist_mu);
647 gpr_mu_unlock(&fd_freelist_mu);
648 while (fd_freelist != NULL) {
649 grpc_fd *fd = fd_freelist;
650 fd_freelist = fd_freelist->freelist_next;
651 gpr_mu_destroy(&fd->mu);
652 gpr_free(fd);
653 }
654 gpr_mu_destroy(&fd_freelist_mu);
655}
656
657static grpc_fd *fd_create(int fd, const char *name) {
658 grpc_fd *new_fd = NULL;
659
660 gpr_mu_lock(&fd_freelist_mu);
661 if (fd_freelist != NULL) {
662 new_fd = fd_freelist;
663 fd_freelist = fd_freelist->freelist_next;
664 }
665 gpr_mu_unlock(&fd_freelist_mu);
666
667 if (new_fd == NULL) {
668 new_fd = gpr_malloc(sizeof(grpc_fd));
669 gpr_mu_init(&new_fd->mu);
670 gpr_mu_init(&new_fd->pi_mu);
671 }
672
673 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
674 newly created fd (or an fd we got from the freelist), no one else would be
675 holding a lock to it anyway. */
676 gpr_mu_lock(&new_fd->mu);
677
678 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700679 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700680 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700681 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700682 new_fd->read_closure = CLOSURE_NOT_READY;
683 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700684 new_fd->polling_island = NULL;
685 new_fd->freelist_next = NULL;
686 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700687 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700688
689 gpr_mu_unlock(&new_fd->mu);
690
691 char *fd_name;
692 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
693 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
694 gpr_free(fd_name);
695#ifdef GRPC_FD_REF_COUNT_DEBUG
696 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
697#endif
698 return new_fd;
699}
700
701static bool fd_is_orphaned(grpc_fd *fd) {
702 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
703}
704
705static int fd_wrapped_fd(grpc_fd *fd) {
706 int ret_fd = -1;
707 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700708 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700709 ret_fd = fd->fd;
710 }
711 gpr_mu_unlock(&fd->mu);
712
713 return ret_fd;
714}
715
716static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
717 grpc_closure *on_done, int *release_fd,
718 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700719 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700720 gpr_mu_lock(&fd->mu);
721 fd->on_done_closure = on_done;
722
723 /* If release_fd is not NULL, we should be relinquishing control of the file
724 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700725 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700726 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700727 } else {
728 close(fd->fd);
729 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700730 }
731
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700732 fd->orphaned = true;
733
734 /* Remove the active status but keep referenced. We want this grpc_fd struct
735 to be alive (and not added to freelist) until the end of this function */
736 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700737
738 /* Remove the fd from the polling island:
739 - Update the fd->polling_island to point to the latest polling island
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700740 - Remove the fd from the polling island.
741 - Remove a ref to the polling island and set fd->polling_island to NULL */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700742 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700743 if (fd->polling_island != NULL) {
744 fd->polling_island =
745 polling_island_update_and_lock(fd->polling_island, 1, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700746 polling_island_remove_fd_locked(fd->polling_island, fd, is_fd_closed);
747
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700748 polling_island_unref_and_unlock(fd->polling_island, 1);
749 fd->polling_island = NULL;
750 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700751 gpr_mu_unlock(&fd->pi_mu);
752
753 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
754
755 gpr_mu_unlock(&fd->mu);
756 UNREF_BY(fd, 2, reason); /* Drop the reference */
757}
758
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700759static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
760 grpc_closure **st, grpc_closure *closure) {
761 if (*st == CLOSURE_NOT_READY) {
762 /* not ready ==> switch to a waiting state by setting the closure */
763 *st = closure;
764 } else if (*st == CLOSURE_READY) {
765 /* already ready ==> queue the closure to run immediately */
766 *st = CLOSURE_NOT_READY;
767 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
768 } else {
769 /* upcallptr was set to a different closure. This is an error! */
770 gpr_log(GPR_ERROR,
771 "User called a notify_on function with a previous callback still "
772 "pending");
773 abort();
774 }
775}
776
777/* returns 1 if state becomes not ready */
778static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
779 grpc_closure **st) {
780 if (*st == CLOSURE_READY) {
781 /* duplicate ready ==> ignore */
782 return 0;
783 } else if (*st == CLOSURE_NOT_READY) {
784 /* not ready, and not waiting ==> flag ready */
785 *st = CLOSURE_READY;
786 return 0;
787 } else {
788 /* waiting ==> queue closure */
789 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
790 *st = CLOSURE_NOT_READY;
791 return 1;
792 }
793}
794
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700795static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
796 grpc_fd *fd) {
797 grpc_pollset *notifier = NULL;
798
799 gpr_mu_lock(&fd->mu);
800 notifier = fd->read_notifier_pollset;
801 gpr_mu_unlock(&fd->mu);
802
803 return notifier;
804}
805
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700806static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
807 gpr_mu_lock(&fd->mu);
808 GPR_ASSERT(!fd->shutdown);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700809 fd->shutdown = true;
810
811 /* Flush any pending read and write closures. Since fd->shutdown is 'true' at
812 this point, the closures would be called with 'success = false' */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700813 set_ready_locked(exec_ctx, fd, &fd->read_closure);
814 set_ready_locked(exec_ctx, fd, &fd->write_closure);
815 gpr_mu_unlock(&fd->mu);
816}
817
818static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
819 grpc_closure *closure) {
820 gpr_mu_lock(&fd->mu);
821 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
822 gpr_mu_unlock(&fd->mu);
823}
824
825static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
826 grpc_closure *closure) {
827 gpr_mu_lock(&fd->mu);
828 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
829 gpr_mu_unlock(&fd->mu);
830}
831
832/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700833 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700834 */
835
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700836static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700837#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700838 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700839#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700840}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700841
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700842static void poller_kick_init() {
843 grpc_poller_kick_signum = SIGRTMIN + 2;
844 signal(grpc_poller_kick_signum, sig_handler);
845}
846
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700847/* Global state management */
848static void pollset_global_init(void) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700849 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700850 poller_kick_init();
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700851}
852
853static void pollset_global_shutdown(void) {
854 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700855}
856
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700857static void pollset_worker_kick(grpc_pollset_worker *worker) {
858 pthread_kill(worker->pt_id, grpc_poller_kick_signum);
859}
860
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700861/* Return 1 if the pollset has active threads in pollset_work (pollset must
862 * be locked) */
863static int pollset_has_workers(grpc_pollset *p) {
864 return p->root_worker.next != &p->root_worker;
865}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700866
867static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
868 worker->prev->next = worker->next;
869 worker->next->prev = worker->prev;
870}
871
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700872static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
873 if (pollset_has_workers(p)) {
874 grpc_pollset_worker *w = p->root_worker.next;
875 remove_worker(p, w);
876 return w;
877 } else {
878 return NULL;
879 }
880}
881
882static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
883 worker->next = &p->root_worker;
884 worker->prev = worker->next->prev;
885 worker->prev->next = worker->next->prev = worker;
886}
887
888static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
889 worker->prev = &p->root_worker;
890 worker->next = worker->prev->next;
891 worker->prev->next = worker->next->prev = worker;
892}
893
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700894/* p->mu must be held before calling this function */
895static void pollset_kick(grpc_pollset *p,
896 grpc_pollset_worker *specific_worker) {
897 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700898
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700899 grpc_pollset_worker *worker = specific_worker;
900 if (worker != NULL) {
901 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700902 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700903 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700904 for (worker = p->root_worker.next; worker != &p->root_worker;
905 worker = worker->next) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700906 pollset_worker_kick(worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700907 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700908 } else {
909 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700910 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700911 GPR_TIMER_END("pollset_kick.broadcast", 0);
912 } else {
913 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700914 pollset_worker_kick(worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700915 }
916 } else {
917 GPR_TIMER_MARK("kick_anonymous", 0);
918 worker = pop_front_worker(p);
919 if (worker != NULL) {
920 GPR_TIMER_MARK("finally_kick", 0);
921 push_back_worker(p, worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700922 pollset_worker_kick(worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700923 } else {
924 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700925 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700926 }
927 }
928
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700929 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700930}
931
932static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
933
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700934static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
935 gpr_mu_init(&pollset->mu);
936 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700937
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700938 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700939 pollset->kicked_without_pollers = false;
940
941 pollset->shutting_down = false;
942 pollset->finish_shutdown_called = false;
943 pollset->shutdown_done = NULL;
944
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700945 gpr_mu_init(&pollset->pi_mu);
946 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700947}
948
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700949/* Convert a timespec to milliseconds:
950 - Very small or negative poll times are clamped to zero to do a non-blocking
951 poll (which becomes spin polling)
952 - Other small values are rounded up to one millisecond
953 - Longer than a millisecond polls are rounded up to the next nearest
954 millisecond to avoid spinning
955 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700956static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
957 gpr_timespec now) {
958 gpr_timespec timeout;
959 static const int64_t max_spin_polling_us = 10;
960 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
961 return -1;
962 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700963
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700964 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
965 max_spin_polling_us,
966 GPR_TIMESPAN))) <= 0) {
967 return 0;
968 }
969 timeout = gpr_time_sub(deadline, now);
970 return gpr_time_to_millis(gpr_time_add(
971 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
972}
973
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700974static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
975 grpc_pollset *notifier) {
976 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700977 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700978 set_ready_locked(exec_ctx, fd, &fd->read_closure);
979 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700980 gpr_mu_unlock(&fd->mu);
981}
982
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700983static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700984 /* Need the fd->mu since we might be racing with fd_notify_on_write */
985 gpr_mu_lock(&fd->mu);
986 set_ready_locked(exec_ctx, fd, &fd->write_closure);
987 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700988}
989
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700990#define GRPC_EPOLL_MAX_EVENTS 1000
991static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
992 grpc_pollset *pollset, int timeout_ms,
993 sigset_t *sig_mask) {
994 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700995 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700996 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700997 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700998 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
999
1000 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
1001 polling island pointed by pollset->polling_island.
1002 Acquire the following locks:
1003 - pollset->mu (which we already have)
1004 - pollset->pi_mu
1005 - pollset->polling_island->mu */
1006 gpr_mu_lock(&pollset->pi_mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001007
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001008 pi = pollset->polling_island;
1009 if (pi == NULL) {
1010 pi = polling_island_create(NULL, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001011 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001012
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001013 /* In addition to locking the polling island, add a ref so that the island
1014 does not get destroyed (which means the epoll_fd won't be closed) while
1015 we are are doing an epoll_wait() on the epoll_fd */
1016 pi = polling_island_update_and_lock(pi, 1, 1);
1017 epoll_fd = pi->epoll_fd;
1018
1019 /* Update the pollset->polling_island */
1020 pollset->polling_island = pi;
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001021
1022#ifdef GRPC_EPOLL_DEBUG
1023 if (pollset->polling_island->fd_cnt == 0) {
1024 gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_fd: %d, No other fds",
1025 epoll_fd);
1026 }
1027 for (size_t i = 0; i < pollset->polling_island->fd_cnt; i++) {
1028 gpr_log(GPR_DEBUG,
1029 "pollset_work_and_unlock: epoll_fd: %d, fd_count: %d, fd[%d]: %d",
1030 epoll_fd, pollset->polling_island->fd_cnt, i,
1031 pollset->polling_island->fds[i]->fd);
1032 }
1033#endif
1034 gpr_mu_unlock(&pollset->polling_island->mu);
1035
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001036 gpr_mu_unlock(&pollset->pi_mu);
1037 gpr_mu_unlock(&pollset->mu);
1038
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001039 do {
1040 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1041 sig_mask);
1042 if (ep_rv < 0) {
1043 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001044 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
1045 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001046 /* We were interrupted. Save an interation by doing a zero timeout
1047 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001048 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001049 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001050 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001051
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001052 int i;
1053 for (i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001054 void *data_ptr = ep_ev[i].data.ptr;
1055 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001056 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001057 } else if (data_ptr == &polling_island_wakeup_fd) {
1058 /* This means that our polling island is merged with a different
1059 island. We do not have to do anything here since the subsequent call
1060 to the function pollset_work_and_unlock() will pick up the correct
1061 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001062 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001063 grpc_fd *fd = data_ptr;
1064 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1065 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1066 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001067 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001068 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001069 }
1070 if (write_ev || cancel) {
1071 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001072 }
1073 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001074 }
1075 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001076
1077 GPR_ASSERT(pi != NULL);
1078
1079 /* Before leaving, release the extra ref we added to the polling island */
1080 /* It is important to note that at this point 'pi' may not be the same as
1081 * pollset->polling_island. This is because pollset->polling_island pointer
1082 * gets updated whenever the underlying polling island is merged with another
1083 * island and while we are doing epoll_wait() above, the polling island may
1084 * have been merged */
1085
1086 /* TODO (sreek) - Change the ref count on polling island to gpr_atm so that
1087 * we do not have to do this here */
1088 gpr_mu_lock(&pi->mu);
1089 polling_island_unref_and_unlock(pi, 1);
1090
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001091 GPR_TIMER_END("pollset_work_and_unlock", 0);
1092}
1093
1094/* Release the reference to pollset->polling_island and set it to NULL.
1095 pollset->mu must be held */
1096static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
1097 gpr_mu_lock(&pollset->pi_mu);
1098 if (pollset->polling_island) {
1099 pollset->polling_island =
1100 polling_island_update_and_lock(pollset->polling_island, 1, 0);
1101 polling_island_unref_and_unlock(pollset->polling_island, 1);
1102 pollset->polling_island = NULL;
1103 }
1104 gpr_mu_unlock(&pollset->pi_mu);
1105}
1106
1107static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1108 grpc_pollset *pollset) {
1109 /* The pollset cannot have any workers if we are at this stage */
1110 GPR_ASSERT(!pollset_has_workers(pollset));
1111
1112 pollset->finish_shutdown_called = true;
1113 pollset_release_polling_island_locked(pollset);
1114
1115 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
1116}
1117
1118/* pollset->mu lock must be held by the caller before calling this */
1119static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1120 grpc_closure *closure) {
1121 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1122 GPR_ASSERT(!pollset->shutting_down);
1123 pollset->shutting_down = true;
1124 pollset->shutdown_done = closure;
1125 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1126
1127 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1128 because it would release the underlying polling island. In such a case, we
1129 let the last worker call finish_shutdown_locked() from pollset_work() */
1130 if (!pollset_has_workers(pollset)) {
1131 GPR_ASSERT(!pollset->finish_shutdown_called);
1132 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1133 finish_shutdown_locked(exec_ctx, pollset);
1134 }
1135 GPR_TIMER_END("pollset_shutdown", 0);
1136}
1137
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001138/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1139 * than destroying the mutexes, there is nothing special that needs to be done
1140 * here */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001141static void pollset_destroy(grpc_pollset *pollset) {
1142 GPR_ASSERT(!pollset_has_workers(pollset));
1143 gpr_mu_destroy(&pollset->pi_mu);
1144 gpr_mu_destroy(&pollset->mu);
1145}
1146
1147static void pollset_reset(grpc_pollset *pollset) {
1148 GPR_ASSERT(pollset->shutting_down);
1149 GPR_ASSERT(!pollset_has_workers(pollset));
1150 pollset->shutting_down = false;
1151 pollset->finish_shutdown_called = false;
1152 pollset->kicked_without_pollers = false;
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001153 pollset->shutdown_done = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001154 pollset_release_polling_island_locked(pollset);
1155}
1156
1157/* pollset->mu lock must be held by the caller before calling this.
1158 The function pollset_work() may temporarily release the lock (pollset->mu)
1159 during the course of its execution but it will always re-acquire the lock and
1160 ensure that it is held by the time the function returns */
1161static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1162 grpc_pollset_worker **worker_hdl, gpr_timespec now,
1163 gpr_timespec deadline) {
1164 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001165 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1166
1167 sigset_t new_mask;
1168 sigset_t orig_mask;
1169
1170 grpc_pollset_worker worker;
1171 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001172 worker.pt_id = pthread_self();
1173
1174 *worker_hdl = &worker;
1175
1176 if (pollset->kicked_without_pollers) {
1177 /* If the pollset was kicked without pollers, pretend that the current
1178 worker got the kick and skip polling. A kick indicates that there is some
1179 work that needs attention like an event on the completion queue or an
1180 alarm */
1181 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1182 pollset->kicked_without_pollers = 0;
1183 } else if (!pollset->shutting_down) {
1184 sigemptyset(&new_mask);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001185 sigaddset(&new_mask, grpc_poller_kick_signum);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001186 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001187 sigdelset(&orig_mask, grpc_poller_kick_signum);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001188
1189 push_front_worker(pollset, &worker);
1190
1191 pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
1192 grpc_exec_ctx_flush(exec_ctx);
1193
1194 gpr_mu_lock(&pollset->mu);
1195 remove_worker(pollset, &worker);
1196 }
1197
1198 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1199 false at this point) and the pollset is shutting down, we may have to
1200 finish the shutdown process by calling finish_shutdown_locked().
1201 See pollset_shutdown() for more details.
1202
1203 Note: Continuing to access pollset here is safe; it is the caller's
1204 responsibility to not destroy a pollset when it has outstanding calls to
1205 pollset_work() */
1206 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1207 !pollset->finish_shutdown_called) {
1208 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1209 finish_shutdown_locked(exec_ctx, pollset);
1210
1211 gpr_mu_unlock(&pollset->mu);
1212 grpc_exec_ctx_flush(exec_ctx);
1213 gpr_mu_lock(&pollset->mu);
1214 }
1215
1216 *worker_hdl = NULL;
1217 GPR_TIMER_END("pollset_work", 0);
1218}
1219
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001220static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1221 grpc_fd *fd) {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001222 /* TODO sreek - Double check if we need to get a pollset->mu lock here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001223 gpr_mu_lock(&pollset->pi_mu);
1224 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001225
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001226 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001227
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001228 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1229 * equal, do nothing.
1230 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1231 * a new polling island (with a refcount of 2) and make the polling_island
1232 * fields in both fd and pollset to point to the new island
1233 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1234 * the NULL polling_island field to point to the non-NULL polling_island
1235 * field (ensure that the refcount on the polling island is incremented by
1236 * 1 to account for the newly added reference)
1237 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1238 * and different, merge both the polling islands and update the
1239 * polling_island fields in both fd and pollset to point to the merged
1240 * polling island.
1241 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001242 if (fd->polling_island == pollset->polling_island) {
1243 pi_new = fd->polling_island;
1244 if (pi_new == NULL) {
1245 pi_new = polling_island_create(fd, 2);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001246 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001247 } else if (fd->polling_island == NULL) {
1248 pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001249 polling_island_add_fds_locked(pollset->polling_island, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001250 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001251 } else if (pollset->polling_island == NULL) {
1252 pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001253 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001254 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001255 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001256 }
1257
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001258 fd->polling_island = pollset->polling_island = pi_new;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001259
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001260 gpr_mu_unlock(&fd->pi_mu);
1261 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001262}
1263
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001264/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001265 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001266 */
1267
1268static grpc_pollset_set *pollset_set_create(void) {
1269 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1270 memset(pollset_set, 0, sizeof(*pollset_set));
1271 gpr_mu_init(&pollset_set->mu);
1272 return pollset_set;
1273}
1274
1275static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1276 size_t i;
1277 gpr_mu_destroy(&pollset_set->mu);
1278 for (i = 0; i < pollset_set->fd_count; i++) {
1279 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1280 }
1281 gpr_free(pollset_set->pollsets);
1282 gpr_free(pollset_set->pollset_sets);
1283 gpr_free(pollset_set->fds);
1284 gpr_free(pollset_set);
1285}
1286
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001287static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1288 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1289 size_t i;
1290 gpr_mu_lock(&pollset_set->mu);
1291 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1292 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1293 pollset_set->fds = gpr_realloc(
1294 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1295 }
1296 GRPC_FD_REF(fd, "pollset_set");
1297 pollset_set->fds[pollset_set->fd_count++] = fd;
1298 for (i = 0; i < pollset_set->pollset_count; i++) {
1299 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1300 }
1301 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1302 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1303 }
1304 gpr_mu_unlock(&pollset_set->mu);
1305}
1306
1307static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1308 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1309 size_t i;
1310 gpr_mu_lock(&pollset_set->mu);
1311 for (i = 0; i < pollset_set->fd_count; i++) {
1312 if (pollset_set->fds[i] == fd) {
1313 pollset_set->fd_count--;
1314 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1315 pollset_set->fds[pollset_set->fd_count]);
1316 GRPC_FD_UNREF(fd, "pollset_set");
1317 break;
1318 }
1319 }
1320 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1321 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1322 }
1323 gpr_mu_unlock(&pollset_set->mu);
1324}
1325
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001326static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1327 grpc_pollset_set *pollset_set,
1328 grpc_pollset *pollset) {
1329 size_t i, j;
1330 gpr_mu_lock(&pollset_set->mu);
1331 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1332 pollset_set->pollset_capacity =
1333 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1334 pollset_set->pollsets =
1335 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1336 sizeof(*pollset_set->pollsets));
1337 }
1338 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1339 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1340 if (fd_is_orphaned(pollset_set->fds[i])) {
1341 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1342 } else {
1343 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1344 pollset_set->fds[j++] = pollset_set->fds[i];
1345 }
1346 }
1347 pollset_set->fd_count = j;
1348 gpr_mu_unlock(&pollset_set->mu);
1349}
1350
1351static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1352 grpc_pollset_set *pollset_set,
1353 grpc_pollset *pollset) {
1354 size_t i;
1355 gpr_mu_lock(&pollset_set->mu);
1356 for (i = 0; i < pollset_set->pollset_count; i++) {
1357 if (pollset_set->pollsets[i] == pollset) {
1358 pollset_set->pollset_count--;
1359 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1360 pollset_set->pollsets[pollset_set->pollset_count]);
1361 break;
1362 }
1363 }
1364 gpr_mu_unlock(&pollset_set->mu);
1365}
1366
1367static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1368 grpc_pollset_set *bag,
1369 grpc_pollset_set *item) {
1370 size_t i, j;
1371 gpr_mu_lock(&bag->mu);
1372 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1373 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1374 bag->pollset_sets =
1375 gpr_realloc(bag->pollset_sets,
1376 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1377 }
1378 bag->pollset_sets[bag->pollset_set_count++] = item;
1379 for (i = 0, j = 0; i < bag->fd_count; i++) {
1380 if (fd_is_orphaned(bag->fds[i])) {
1381 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1382 } else {
1383 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1384 bag->fds[j++] = bag->fds[i];
1385 }
1386 }
1387 bag->fd_count = j;
1388 gpr_mu_unlock(&bag->mu);
1389}
1390
1391static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1392 grpc_pollset_set *bag,
1393 grpc_pollset_set *item) {
1394 size_t i;
1395 gpr_mu_lock(&bag->mu);
1396 for (i = 0; i < bag->pollset_set_count; i++) {
1397 if (bag->pollset_sets[i] == item) {
1398 bag->pollset_set_count--;
1399 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1400 bag->pollset_sets[bag->pollset_set_count]);
1401 break;
1402 }
1403 }
1404 gpr_mu_unlock(&bag->mu);
1405}
1406
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001407/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001408 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001409 */
1410
1411static void shutdown_engine(void) {
1412 fd_global_shutdown();
1413 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001414 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001415}
1416
1417static const grpc_event_engine_vtable vtable = {
1418 .pollset_size = sizeof(grpc_pollset),
1419
1420 .fd_create = fd_create,
1421 .fd_wrapped_fd = fd_wrapped_fd,
1422 .fd_orphan = fd_orphan,
1423 .fd_shutdown = fd_shutdown,
1424 .fd_notify_on_read = fd_notify_on_read,
1425 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001426 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001427
1428 .pollset_init = pollset_init,
1429 .pollset_shutdown = pollset_shutdown,
1430 .pollset_reset = pollset_reset,
1431 .pollset_destroy = pollset_destroy,
1432 .pollset_work = pollset_work,
1433 .pollset_kick = pollset_kick,
1434 .pollset_add_fd = pollset_add_fd,
1435
1436 .pollset_set_create = pollset_set_create,
1437 .pollset_set_destroy = pollset_set_destroy,
1438 .pollset_set_add_pollset = pollset_set_add_pollset,
1439 .pollset_set_del_pollset = pollset_set_del_pollset,
1440 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1441 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1442 .pollset_set_add_fd = pollset_set_add_fd,
1443 .pollset_set_del_fd = pollset_set_del_fd,
1444
1445 .kick_poller = kick_poller,
1446
1447 .shutdown_engine = shutdown_engine,
1448};
1449
1450const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
1451 fd_global_init();
1452 pollset_global_init();
1453 polling_island_global_init();
1454 return &vtable;
1455}
1456
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001457#else /* defined(GPR_LINUX_EPOLL) */
1458/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1459 * NULL */
1460const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
1461
1462#endif /* !defined(GPR_LINUX_EPOLL) */