blob: 69ab665e15589303643d440864c763de3681e55d [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc/support/port_platform.h>
35
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070036#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070037
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070038#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
40#include <assert.h>
41#include <errno.h>
42#include <poll.h>
43#include <signal.h>
44#include <string.h>
45#include <sys/epoll.h>
46#include <sys/socket.h>
47#include <unistd.h>
48
49#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/string_util.h>
52#include <grpc/support/tls.h>
53#include <grpc/support/useful.h>
54
55#include "src/core/lib/iomgr/ev_posix.h"
56#include "src/core/lib/iomgr/iomgr_internal.h"
57#include "src/core/lib/iomgr/wakeup_fd_posix.h"
58#include "src/core/lib/profiling/timers.h"
59#include "src/core/lib/support/block_annotate.h"
60
61struct polling_island;
62
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070063static int grpc_poller_kick_signum;
64
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070065/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070066 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070067 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070068struct grpc_fd {
69 int fd;
70 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070071 bit 0 : 1=Active / 0=Orphaned
72 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070073 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074 gpr_atm refst;
75
76 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070077
78 /* Indicates that the fd is shutdown and that any pending read/write closures
79 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070080 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070081
82 /* The fd is either closed or we relinquished control of it. In either cases,
83 this indicates that the 'fd' on this structure is no longer valid */
84 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085
86 grpc_closure *read_closure;
87 grpc_closure *write_closure;
88
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070089 /* The polling island to which this fd belongs to and the mutex protecting the
90 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070091 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 struct polling_island *polling_island;
93
94 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095 grpc_closure *on_done_closure;
96
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070097 /* The pollset that last noticed that the fd is readable */
98 grpc_pollset *read_notifier_pollset;
99
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700100 grpc_iomgr_object iomgr_object;
101};
102
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700103/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700104#ifdef GRPC_FD_REF_COUNT_DEBUG
105static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
106static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
107 int line);
108#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
109#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
110#else
111static void fd_ref(grpc_fd *fd);
112static void fd_unref(grpc_fd *fd);
113#define GRPC_FD_REF(fd, reason) fd_ref(fd)
114#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
115#endif
116
117static void fd_global_init(void);
118static void fd_global_shutdown(void);
119
120#define CLOSURE_NOT_READY ((grpc_closure *)0)
121#define CLOSURE_READY ((grpc_closure *)1)
122
123/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700124 * Polling-island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700125 */
126typedef struct polling_island {
127 gpr_mu mu;
128 int ref_cnt;
129
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700130 /* Points to the polling_island this merged into.
131 * If merged_to is not NULL, all the remaining fields (except mu and ref_cnt)
132 * are invalid and must be ignored */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700133 struct polling_island *merged_to;
134
135 /* The fd of the underlying epoll set */
136 int epoll_fd;
137
138 /* The file descriptors in the epoll set */
139 size_t fd_cnt;
140 size_t fd_capacity;
141 grpc_fd **fds;
142
143 /* Polling islands that are no longer needed are kept in a freelist so that
144 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700145 free list */
146 struct polling_island *next_free;
147} polling_island;
148
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700149/*******************************************************************************
150 * Pollset Declarations
151 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700152struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700153 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700154 struct grpc_pollset_worker *next;
155 struct grpc_pollset_worker *prev;
156};
157
158struct grpc_pollset {
159 gpr_mu mu;
160 grpc_pollset_worker root_worker;
161 bool kicked_without_pollers;
162
163 bool shutting_down; /* Is the pollset shutting down ? */
164 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
165 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
166
167 /* The polling island to which this pollset belongs to and the mutex
168 protecting the field */
169 gpr_mu pi_mu;
170 struct polling_island *polling_island;
171};
172
173/*******************************************************************************
174 * Pollset-set Declarations
175 */
176struct grpc_pollset_set {
177 gpr_mu mu;
178
179 size_t pollset_count;
180 size_t pollset_capacity;
181 grpc_pollset **pollsets;
182
183 size_t pollset_set_count;
184 size_t pollset_set_capacity;
185 struct grpc_pollset_set **pollset_sets;
186
187 size_t fd_count;
188 size_t fd_capacity;
189 grpc_fd **fds;
190};
191
192/*******************************************************************************
193 * Polling-island Definitions
194 */
195
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700196/* Polling island freelist */
197static gpr_mu g_pi_freelist_mu;
198static polling_island *g_pi_freelist = NULL;
199
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700200/* The caller is expected to hold pi->mu lock before calling this function */
201static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700202 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700203 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700204 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700205 struct epoll_event ev;
206
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700207 for (i = 0; i < fd_count; i++) {
208 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
209 ev.data.ptr = fds[i];
210 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700211
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700212 if (err < 0) {
213 if (errno != EEXIST) {
214 /* TODO: sreek - We need a better way to bubble up this error instead of
215 just logging a message */
216 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
217 fds[i]->fd, strerror(errno));
218 }
219
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700220 continue;
221 }
222
223 if (pi->fd_cnt == pi->fd_capacity) {
224 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
225 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
226 }
227
228 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700229 if (add_fd_refs) {
230 GRPC_FD_REF(fds[i], "polling_island");
231 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700232 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700233}
234
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700235/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700236static void polling_island_remove_all_fds_locked(polling_island *pi,
237 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700238 int err;
239 size_t i;
240
241 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700242 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700243 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700244 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700245 * just logging a message */
246 gpr_log(GPR_ERROR, "epoll_ctl deleting fds[%d]: %d failed with error: %s",
247 i, pi->fds[i]->fd, strerror(errno));
248 }
249
250 if (remove_fd_refs) {
251 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700252 }
253 }
254
255 pi->fd_cnt = 0;
256}
257
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700258/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700259static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700260 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700261 int err;
262 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700263
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700264 /* If fd is already closed, then it would have been automatically been removed
265 from the epoll set */
266 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700267 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
268 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700269 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700270 fd->fd, strerror(errno));
271 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700272 }
273
274 for (i = 0; i < pi->fd_cnt; i++) {
275 if (pi->fds[i] == fd) {
276 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700277 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700278 break;
279 }
280 }
281}
282
283static polling_island *polling_island_create(grpc_fd *initial_fd,
284 int initial_ref_cnt) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700285 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700286 struct epoll_event ev;
287 int err;
288
289 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700290 gpr_mu_lock(&g_pi_freelist_mu);
291 if (g_pi_freelist != NULL) {
292 pi = g_pi_freelist;
293 g_pi_freelist = g_pi_freelist->next_free;
294 pi->next_free = NULL;
295 }
296 gpr_mu_unlock(&g_pi_freelist_mu);
297
298 /* Create new polling island if we could not get one from the free list */
299 if (pi == NULL) {
300 pi = gpr_malloc(sizeof(*pi));
301 gpr_mu_init(&pi->mu);
302 pi->fd_cnt = 0;
303 pi->fd_capacity = 0;
304 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700305 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700306
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700307 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
308 if (pi->epoll_fd < 0) {
309 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
310 strerror(errno));
311 }
312 GPR_ASSERT(pi->epoll_fd >= 0);
313
314 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
315 ev.data.ptr = NULL;
316 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
317 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
318 if (err < 0) {
319 gpr_log(GPR_ERROR,
320 "Failed to add grpc_global_wake_up_fd (%d) to the epoll set "
321 "(epoll_fd: %d) with error: %s",
322 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
323 strerror(errno));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700324 }
325
326 pi->ref_cnt = initial_ref_cnt;
327 pi->merged_to = NULL;
328 pi->next_free = NULL;
329
330 if (initial_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700331 /* It is not really needed to get the pi->mu lock here. If this is a newly
332 created polling island (or one that we got from the freelist), no one
333 else would be holding a lock to it anyway */
334 gpr_mu_lock(&pi->mu);
335 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
336 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700337 }
338
339 return pi;
340}
341
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700342static void polling_island_delete(polling_island *pi) {
343 GPR_ASSERT(pi->ref_cnt == 0);
344 GPR_ASSERT(pi->fd_cnt == 0);
345
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700346 close(pi->epoll_fd);
347 pi->epoll_fd = -1;
348
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700349 pi->merged_to = NULL;
350
351 gpr_mu_lock(&g_pi_freelist_mu);
352 pi->next_free = g_pi_freelist;
353 g_pi_freelist = pi;
354 gpr_mu_unlock(&g_pi_freelist_mu);
355}
356
357void polling_island_unref_and_unlock(polling_island *pi, int unref_by) {
358 pi->ref_cnt -= unref_by;
359 int ref_cnt = pi->ref_cnt;
360 GPR_ASSERT(ref_cnt >= 0);
361
362 gpr_mu_unlock(&pi->mu);
363
364 if (ref_cnt == 0) {
365 polling_island_delete(pi);
366 }
367}
368
369polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
370 int add_ref_by) {
371 polling_island *next = NULL;
372 gpr_mu_lock(&pi->mu);
373 while (pi->merged_to != NULL) {
374 next = pi->merged_to;
375 polling_island_unref_and_unlock(pi, unref_by);
376 pi = next;
377 gpr_mu_lock(&pi->mu);
378 }
379
380 pi->ref_cnt += add_ref_by;
381 return pi;
382}
383
384void polling_island_pair_update_and_lock(polling_island **p,
385 polling_island **q) {
386 polling_island *pi_1 = *p;
387 polling_island *pi_2 = *q;
388 polling_island *temp = NULL;
389 bool pi_1_locked = false;
390 bool pi_2_locked = false;
391 int num_swaps = 0;
392
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700393 /* Loop until either pi_1 == pi_2 or until we acquired locks on both pi_1
394 and pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700395 while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700396 /* The following assertions are true at this point:
397 - pi_1 != pi_2 (else, the while loop would have exited)
398 - pi_1 MAY be locked
399 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700400
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700401 /* To maintain lock order consistency, always lock polling_island node with
402 lower address first.
403 First, make sure pi_1 < pi_2 before proceeding any further. If it turns
404 out that pi_1 > pi_2, unlock pi_1 if locked (because pi_2 is not locked
405 at this point and having pi_1 locked would violate the lock order) and
406 swap pi_1 and pi_2 so that pi_1 becomes less than pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700407 if (pi_1 > pi_2) {
408 if (pi_1_locked) {
409 gpr_mu_unlock(&pi_1->mu);
410 pi_1_locked = false;
411 }
412
413 GPR_SWAP(polling_island *, pi_1, pi_2);
414 num_swaps++;
415 }
416
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700417 /* The following assertions are true at this point:
418 - pi_1 != pi_2
419 - pi_1 < pi_2 (address of pi_1 is less than that of pi_2)
420 - pi_1 MAYBE locked
421 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700422
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700423 /* Lock pi_1 (if pi_1 is pointing to the terminal node in the list) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700424 if (!pi_1_locked) {
425 gpr_mu_lock(&pi_1->mu);
426 pi_1_locked = true;
427
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700428 /* If pi_1 is not terminal node (i.e pi_1->merged_to != NULL), we are not
429 done locking this polling_island yet. Release the lock on this node and
430 advance pi_1 to the next node in the list; and go to the beginning of
431 the loop (we can't proceed to locking pi_2 unless we locked pi_1 first)
432 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700433 if (pi_1->merged_to != NULL) {
434 temp = pi_1->merged_to;
435 polling_island_unref_and_unlock(pi_1, 1);
436 pi_1 = temp;
437 pi_1_locked = false;
438
439 continue;
440 }
441 }
442
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700443 /* The following assertions are true at this point:
444 - pi_1 is locked
445 - pi_2 is unlocked
446 - pi_1 != pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700447
448 gpr_mu_lock(&pi_2->mu);
449 pi_2_locked = true;
450
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700451 /* If pi_2 is not terminal node, we are not done locking this polling_island
452 yet. Release the lock and update pi_2 to the next node in the list */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700453 if (pi_2->merged_to != NULL) {
454 temp = pi_2->merged_to;
455 polling_island_unref_and_unlock(pi_2, 1);
456 pi_2 = temp;
457 pi_2_locked = false;
458 }
459 }
460
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700461 /* At this point, either pi_1 == pi_2 AND/OR we got both locks */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700462 if (pi_1 == pi_2) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700463 /* We may or may not have gotten the lock. If we didn't, walk the rest of
464 the polling_island list and get the lock */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700465 GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
466 if (!pi_1_locked) {
467 pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
468 }
469 } else {
470 GPR_ASSERT(pi_1_locked && pi_2_locked);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700471 /* If we swapped pi_1 and pi_2 odd number of times, do one more swap so that
472 pi_1 and pi_2 point to the same polling_island lists they started off
473 with at the beginning of this function (i.e *p and *q respectively) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700474 if (num_swaps % 2 > 0) {
475 GPR_SWAP(polling_island *, pi_1, pi_2);
476 }
477 }
478
479 *p = pi_1;
480 *q = pi_2;
481}
482
483polling_island *polling_island_merge(polling_island *p, polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700484 /* Get locks on both the polling islands */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700485 polling_island_pair_update_and_lock(&p, &q);
486
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700487 /* TODO: sreek: Think about this scenario some more */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700488 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700489 /* Nothing needs to be done here */
490 gpr_mu_unlock(&p->mu);
491 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700492 }
493
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700494 /* Make sure that p points to the polling island with fewer fds than q */
495 if (p->fd_cnt > q->fd_cnt) {
496 GPR_SWAP(polling_island *, p, q);
497 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700498
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700499 /* "Merge" p with q i.e move all the fds from p (the polling_island with fewer
500 fds) to q.
501 Note: Not altering the ref counts on the affected fds here because they
502 would effectively remain unchanged */
503 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
504 polling_island_remove_all_fds_locked(p, false);
505
506 /* The merged polling island inherits all the ref counts of the island merging
507 with it */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700508 q->ref_cnt += p->ref_cnt;
509
510 gpr_mu_unlock(&p->mu);
511 gpr_mu_unlock(&q->mu);
512
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700513 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700514}
515
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700516static void polling_island_global_init() {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700517 gpr_mu_init(&g_pi_freelist_mu);
518 g_pi_freelist = NULL;
519}
520
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700521static void polling_island_global_shutdown() {
522 polling_island *next;
523 gpr_mu_lock(&g_pi_freelist_mu);
524 gpr_mu_unlock(&g_pi_freelist_mu);
525 while (g_pi_freelist != NULL) {
526 next = g_pi_freelist->next_free;
527 gpr_mu_destroy(&g_pi_freelist->mu);
528 gpr_free(g_pi_freelist->fds);
529 gpr_free(g_pi_freelist);
530 g_pi_freelist = next;
531 }
532
533 gpr_mu_destroy(&g_pi_freelist_mu);
534}
535
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700536/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700537 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700538 */
539
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700540/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700541 * but instead so that implementations with multiple threads in (for example)
542 * epoll_wait deal with the race between pollset removal and incoming poll
543 * notifications.
544 *
545 * The problem is that the poller ultimately holds a reference to this
546 * object, so it is very difficult to know when is safe to free it, at least
547 * without some expensive synchronization.
548 *
549 * If we keep the object freelisted, in the worst case losing this race just
550 * becomes a spurious read notification on a reused fd.
551 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700552
553/* The alarm system needs to be able to wakeup 'some poller' sometimes
554 * (specifically when a new alarm needs to be triggered earlier than the next
555 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
556 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700557
558/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
559 * sure to wake up one polling thread (which can wake up other threads if
560 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700561grpc_wakeup_fd grpc_global_wakeup_fd;
562
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700563static grpc_fd *fd_freelist = NULL;
564static gpr_mu fd_freelist_mu;
565
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700566#ifdef GRPC_FD_REF_COUNT_DEBUG
567#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
568#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
569static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
570 int line) {
571 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
572 gpr_atm_no_barrier_load(&fd->refst),
573 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
574#else
575#define REF_BY(fd, n, reason) ref_by(fd, n)
576#define UNREF_BY(fd, n, reason) unref_by(fd, n)
577static void ref_by(grpc_fd *fd, int n) {
578#endif
579 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
580}
581
582#ifdef GRPC_FD_REF_COUNT_DEBUG
583static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
584 int line) {
585 gpr_atm old;
586 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
587 gpr_atm_no_barrier_load(&fd->refst),
588 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
589#else
590static void unref_by(grpc_fd *fd, int n) {
591 gpr_atm old;
592#endif
593 old = gpr_atm_full_fetch_add(&fd->refst, -n);
594 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700595 /* Add the fd to the freelist */
596 gpr_mu_lock(&fd_freelist_mu);
597 fd->freelist_next = fd_freelist;
598 fd_freelist = fd;
599 grpc_iomgr_unregister_object(&fd->iomgr_object);
600 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700601 } else {
602 GPR_ASSERT(old > n);
603 }
604}
605
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700606/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700607#ifdef GRPC_FD_REF_COUNT_DEBUG
608static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
609 int line) {
610 ref_by(fd, 2, reason, file, line);
611}
612
613static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
614 int line) {
615 unref_by(fd, 2, reason, file, line);
616}
617#else
618static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700619static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
620#endif
621
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700622static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
623
624static void fd_global_shutdown(void) {
625 gpr_mu_lock(&fd_freelist_mu);
626 gpr_mu_unlock(&fd_freelist_mu);
627 while (fd_freelist != NULL) {
628 grpc_fd *fd = fd_freelist;
629 fd_freelist = fd_freelist->freelist_next;
630 gpr_mu_destroy(&fd->mu);
631 gpr_free(fd);
632 }
633 gpr_mu_destroy(&fd_freelist_mu);
634}
635
636static grpc_fd *fd_create(int fd, const char *name) {
637 grpc_fd *new_fd = NULL;
638
639 gpr_mu_lock(&fd_freelist_mu);
640 if (fd_freelist != NULL) {
641 new_fd = fd_freelist;
642 fd_freelist = fd_freelist->freelist_next;
643 }
644 gpr_mu_unlock(&fd_freelist_mu);
645
646 if (new_fd == NULL) {
647 new_fd = gpr_malloc(sizeof(grpc_fd));
648 gpr_mu_init(&new_fd->mu);
649 gpr_mu_init(&new_fd->pi_mu);
650 }
651
652 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
653 newly created fd (or an fd we got from the freelist), no one else would be
654 holding a lock to it anyway. */
655 gpr_mu_lock(&new_fd->mu);
656
657 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700658 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700659 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700660 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700661 new_fd->read_closure = CLOSURE_NOT_READY;
662 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700663 new_fd->polling_island = NULL;
664 new_fd->freelist_next = NULL;
665 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700666 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700667
668 gpr_mu_unlock(&new_fd->mu);
669
670 char *fd_name;
671 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
672 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
673 gpr_free(fd_name);
674#ifdef GRPC_FD_REF_COUNT_DEBUG
675 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
676#endif
677 return new_fd;
678}
679
680static bool fd_is_orphaned(grpc_fd *fd) {
681 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
682}
683
684static int fd_wrapped_fd(grpc_fd *fd) {
685 int ret_fd = -1;
686 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700687 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700688 ret_fd = fd->fd;
689 }
690 gpr_mu_unlock(&fd->mu);
691
692 return ret_fd;
693}
694
695static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
696 grpc_closure *on_done, int *release_fd,
697 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700698 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700699 gpr_mu_lock(&fd->mu);
700 fd->on_done_closure = on_done;
701
702 /* If release_fd is not NULL, we should be relinquishing control of the file
703 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700704 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700705 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700706 } else {
707 close(fd->fd);
708 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700709 }
710
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700711 fd->orphaned = true;
712
713 /* Remove the active status but keep referenced. We want this grpc_fd struct
714 to be alive (and not added to freelist) until the end of this function */
715 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700716
717 /* Remove the fd from the polling island:
718 - Update the fd->polling_island to point to the latest polling island
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700719 - Remove the fd from the polling island.
720 - Remove a ref to the polling island and set fd->polling_island to NULL */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700721 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700722 if (fd->polling_island != NULL) {
723 fd->polling_island =
724 polling_island_update_and_lock(fd->polling_island, 1, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700725 polling_island_remove_fd_locked(fd->polling_island, fd, is_fd_closed);
726
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700727 polling_island_unref_and_unlock(fd->polling_island, 1);
728 fd->polling_island = NULL;
729 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700730 gpr_mu_unlock(&fd->pi_mu);
731
732 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
733
734 gpr_mu_unlock(&fd->mu);
735 UNREF_BY(fd, 2, reason); /* Drop the reference */
736}
737
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700738static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
739 grpc_closure **st, grpc_closure *closure) {
740 if (*st == CLOSURE_NOT_READY) {
741 /* not ready ==> switch to a waiting state by setting the closure */
742 *st = closure;
743 } else if (*st == CLOSURE_READY) {
744 /* already ready ==> queue the closure to run immediately */
745 *st = CLOSURE_NOT_READY;
746 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
747 } else {
748 /* upcallptr was set to a different closure. This is an error! */
749 gpr_log(GPR_ERROR,
750 "User called a notify_on function with a previous callback still "
751 "pending");
752 abort();
753 }
754}
755
756/* returns 1 if state becomes not ready */
757static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
758 grpc_closure **st) {
759 if (*st == CLOSURE_READY) {
760 /* duplicate ready ==> ignore */
761 return 0;
762 } else if (*st == CLOSURE_NOT_READY) {
763 /* not ready, and not waiting ==> flag ready */
764 *st = CLOSURE_READY;
765 return 0;
766 } else {
767 /* waiting ==> queue closure */
768 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
769 *st = CLOSURE_NOT_READY;
770 return 1;
771 }
772}
773
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700774static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
775 grpc_fd *fd) {
776 grpc_pollset *notifier = NULL;
777
778 gpr_mu_lock(&fd->mu);
779 notifier = fd->read_notifier_pollset;
780 gpr_mu_unlock(&fd->mu);
781
782 return notifier;
783}
784
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700785static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
786 gpr_mu_lock(&fd->mu);
787 GPR_ASSERT(!fd->shutdown);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700788 fd->shutdown = true;
789
790 /* Flush any pending read and write closures. Since fd->shutdown is 'true' at
791 this point, the closures would be called with 'success = false' */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700792 set_ready_locked(exec_ctx, fd, &fd->read_closure);
793 set_ready_locked(exec_ctx, fd, &fd->write_closure);
794 gpr_mu_unlock(&fd->mu);
795}
796
797static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
798 grpc_closure *closure) {
799 gpr_mu_lock(&fd->mu);
800 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
801 gpr_mu_unlock(&fd->mu);
802}
803
804static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
805 grpc_closure *closure) {
806 gpr_mu_lock(&fd->mu);
807 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
808 gpr_mu_unlock(&fd->mu);
809}
810
811/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700812 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700813 */
814
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700815static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700816#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700817 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700818#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700819}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700820
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700821static void poller_kick_init() {
822 grpc_poller_kick_signum = SIGRTMIN + 2;
823 signal(grpc_poller_kick_signum, sig_handler);
824}
825
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700826/* Global state management */
827static void pollset_global_init(void) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700828 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700829 poller_kick_init();
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700830}
831
832static void pollset_global_shutdown(void) {
833 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700834}
835
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700836static void pollset_worker_kick(grpc_pollset_worker *worker) {
837 pthread_kill(worker->pt_id, grpc_poller_kick_signum);
838}
839
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700840/* Return 1 if the pollset has active threads in pollset_work (pollset must
841 * be locked) */
842static int pollset_has_workers(grpc_pollset *p) {
843 return p->root_worker.next != &p->root_worker;
844}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700845
846static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
847 worker->prev->next = worker->next;
848 worker->next->prev = worker->prev;
849}
850
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700851static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
852 if (pollset_has_workers(p)) {
853 grpc_pollset_worker *w = p->root_worker.next;
854 remove_worker(p, w);
855 return w;
856 } else {
857 return NULL;
858 }
859}
860
861static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
862 worker->next = &p->root_worker;
863 worker->prev = worker->next->prev;
864 worker->prev->next = worker->next->prev = worker;
865}
866
867static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
868 worker->prev = &p->root_worker;
869 worker->next = worker->prev->next;
870 worker->prev->next = worker->next->prev = worker;
871}
872
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700873/* p->mu must be held before calling this function */
874static void pollset_kick(grpc_pollset *p,
875 grpc_pollset_worker *specific_worker) {
876 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700877
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700878 grpc_pollset_worker *worker = specific_worker;
879 if (worker != NULL) {
880 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700881 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700882 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700883 for (worker = p->root_worker.next; worker != &p->root_worker;
884 worker = worker->next) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700885 pollset_worker_kick(worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700886 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700887 } else {
888 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700889 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700890 GPR_TIMER_END("pollset_kick.broadcast", 0);
891 } else {
892 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700893 pollset_worker_kick(worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700894 }
895 } else {
896 GPR_TIMER_MARK("kick_anonymous", 0);
897 worker = pop_front_worker(p);
898 if (worker != NULL) {
899 GPR_TIMER_MARK("finally_kick", 0);
900 push_back_worker(p, worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700901 pollset_worker_kick(worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700902 } else {
903 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700904 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700905 }
906 }
907
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700908 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700909}
910
911static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
912
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700913static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
914 gpr_mu_init(&pollset->mu);
915 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700916
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700917 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700918 pollset->kicked_without_pollers = false;
919
920 pollset->shutting_down = false;
921 pollset->finish_shutdown_called = false;
922 pollset->shutdown_done = NULL;
923
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700924 gpr_mu_init(&pollset->pi_mu);
925 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700926}
927
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700928/* Convert a timespec to milliseconds:
929 - Very small or negative poll times are clamped to zero to do a non-blocking
930 poll (which becomes spin polling)
931 - Other small values are rounded up to one millisecond
932 - Longer than a millisecond polls are rounded up to the next nearest
933 millisecond to avoid spinning
934 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700935static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
936 gpr_timespec now) {
937 gpr_timespec timeout;
938 static const int64_t max_spin_polling_us = 10;
939 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
940 return -1;
941 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700942
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700943 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
944 max_spin_polling_us,
945 GPR_TIMESPAN))) <= 0) {
946 return 0;
947 }
948 timeout = gpr_time_sub(deadline, now);
949 return gpr_time_to_millis(gpr_time_add(
950 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
951}
952
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700953static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
954 grpc_pollset *notifier) {
955 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700956 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700957 set_ready_locked(exec_ctx, fd, &fd->read_closure);
958 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700959 gpr_mu_unlock(&fd->mu);
960}
961
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700962static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700963 /* Need the fd->mu since we might be racing with fd_notify_on_write */
964 gpr_mu_lock(&fd->mu);
965 set_ready_locked(exec_ctx, fd, &fd->write_closure);
966 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700967}
968
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700969#define GRPC_EPOLL_MAX_EVENTS 1000
970static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
971 grpc_pollset *pollset, int timeout_ms,
972 sigset_t *sig_mask) {
973 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700974 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700975 int ep_rv;
976 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
977
978 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
979 polling island pointed by pollset->polling_island.
980 Acquire the following locks:
981 - pollset->mu (which we already have)
982 - pollset->pi_mu
983 - pollset->polling_island->mu */
984 gpr_mu_lock(&pollset->pi_mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700985
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700986 if (pollset->polling_island == NULL) {
987 pollset->polling_island = polling_island_create(NULL, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700988 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700989
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700990 pollset->polling_island =
991 polling_island_update_and_lock(pollset->polling_island, 1, 0);
992 epoll_fd = pollset->polling_island->epoll_fd;
993
994#ifdef GRPC_EPOLL_DEBUG
995 if (pollset->polling_island->fd_cnt == 0) {
996 gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_fd: %d, No other fds",
997 epoll_fd);
998 }
999 for (size_t i = 0; i < pollset->polling_island->fd_cnt; i++) {
1000 gpr_log(GPR_DEBUG,
1001 "pollset_work_and_unlock: epoll_fd: %d, fd_count: %d, fd[%d]: %d",
1002 epoll_fd, pollset->polling_island->fd_cnt, i,
1003 pollset->polling_island->fds[i]->fd);
1004 }
1005#endif
1006 gpr_mu_unlock(&pollset->polling_island->mu);
1007
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001008 gpr_mu_unlock(&pollset->pi_mu);
1009 gpr_mu_unlock(&pollset->mu);
1010
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001011 do {
1012 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1013 sig_mask);
1014 if (ep_rv < 0) {
1015 if (errno != EINTR) {
1016 /* TODO (sreek) - Do not log an error in case of bad file descriptor
1017 * (A bad file descriptor here would just mean that the epoll set was
1018 * merged with another epoll set and that the current epoll_fd is
1019 * closed) */
1020 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
1021 } else {
1022 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001023 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001024 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001025
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001026 int i;
1027 for (i = 0; i < ep_rv; ++i) {
1028 grpc_fd *fd = ep_ev[i].data.ptr;
1029 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1030 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1031 int write_ev = ep_ev[i].events & EPOLLOUT;
1032 if (fd == NULL) {
1033 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
1034 } else {
1035 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001036 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001037 }
1038 if (write_ev || cancel) {
1039 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001040 }
1041 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001042 }
1043 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001044 GPR_TIMER_END("pollset_work_and_unlock", 0);
1045}
1046
1047/* Release the reference to pollset->polling_island and set it to NULL.
1048 pollset->mu must be held */
1049static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
1050 gpr_mu_lock(&pollset->pi_mu);
1051 if (pollset->polling_island) {
1052 pollset->polling_island =
1053 polling_island_update_and_lock(pollset->polling_island, 1, 0);
1054 polling_island_unref_and_unlock(pollset->polling_island, 1);
1055 pollset->polling_island = NULL;
1056 }
1057 gpr_mu_unlock(&pollset->pi_mu);
1058}
1059
1060static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1061 grpc_pollset *pollset) {
1062 /* The pollset cannot have any workers if we are at this stage */
1063 GPR_ASSERT(!pollset_has_workers(pollset));
1064
1065 pollset->finish_shutdown_called = true;
1066 pollset_release_polling_island_locked(pollset);
1067
1068 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
1069}
1070
1071/* pollset->mu lock must be held by the caller before calling this */
1072static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1073 grpc_closure *closure) {
1074 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1075 GPR_ASSERT(!pollset->shutting_down);
1076 pollset->shutting_down = true;
1077 pollset->shutdown_done = closure;
1078 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1079
1080 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1081 because it would release the underlying polling island. In such a case, we
1082 let the last worker call finish_shutdown_locked() from pollset_work() */
1083 if (!pollset_has_workers(pollset)) {
1084 GPR_ASSERT(!pollset->finish_shutdown_called);
1085 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1086 finish_shutdown_locked(exec_ctx, pollset);
1087 }
1088 GPR_TIMER_END("pollset_shutdown", 0);
1089}
1090
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001091/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1092 * than destroying the mutexes, there is nothing special that needs to be done
1093 * here */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001094static void pollset_destroy(grpc_pollset *pollset) {
1095 GPR_ASSERT(!pollset_has_workers(pollset));
1096 gpr_mu_destroy(&pollset->pi_mu);
1097 gpr_mu_destroy(&pollset->mu);
1098}
1099
1100static void pollset_reset(grpc_pollset *pollset) {
1101 GPR_ASSERT(pollset->shutting_down);
1102 GPR_ASSERT(!pollset_has_workers(pollset));
1103 pollset->shutting_down = false;
1104 pollset->finish_shutdown_called = false;
1105 pollset->kicked_without_pollers = false;
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001106 pollset->shutdown_done = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001107 pollset_release_polling_island_locked(pollset);
1108}
1109
1110/* pollset->mu lock must be held by the caller before calling this.
1111 The function pollset_work() may temporarily release the lock (pollset->mu)
1112 during the course of its execution but it will always re-acquire the lock and
1113 ensure that it is held by the time the function returns */
1114static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1115 grpc_pollset_worker **worker_hdl, gpr_timespec now,
1116 gpr_timespec deadline) {
1117 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001118 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1119
1120 sigset_t new_mask;
1121 sigset_t orig_mask;
1122
1123 grpc_pollset_worker worker;
1124 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001125 worker.pt_id = pthread_self();
1126
1127 *worker_hdl = &worker;
1128
1129 if (pollset->kicked_without_pollers) {
1130 /* If the pollset was kicked without pollers, pretend that the current
1131 worker got the kick and skip polling. A kick indicates that there is some
1132 work that needs attention like an event on the completion queue or an
1133 alarm */
1134 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1135 pollset->kicked_without_pollers = 0;
1136 } else if (!pollset->shutting_down) {
1137 sigemptyset(&new_mask);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001138 sigaddset(&new_mask, grpc_poller_kick_signum);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001139 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001140 sigdelset(&orig_mask, grpc_poller_kick_signum);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001141
1142 push_front_worker(pollset, &worker);
1143
1144 pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
1145 grpc_exec_ctx_flush(exec_ctx);
1146
1147 gpr_mu_lock(&pollset->mu);
1148 remove_worker(pollset, &worker);
1149 }
1150
1151 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1152 false at this point) and the pollset is shutting down, we may have to
1153 finish the shutdown process by calling finish_shutdown_locked().
1154 See pollset_shutdown() for more details.
1155
1156 Note: Continuing to access pollset here is safe; it is the caller's
1157 responsibility to not destroy a pollset when it has outstanding calls to
1158 pollset_work() */
1159 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1160 !pollset->finish_shutdown_called) {
1161 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1162 finish_shutdown_locked(exec_ctx, pollset);
1163
1164 gpr_mu_unlock(&pollset->mu);
1165 grpc_exec_ctx_flush(exec_ctx);
1166 gpr_mu_lock(&pollset->mu);
1167 }
1168
1169 *worker_hdl = NULL;
1170 GPR_TIMER_END("pollset_work", 0);
1171}
1172
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001173static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1174 grpc_fd *fd) {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001175 /* TODO sreek - Double check if we need to get a pollset->mu lock here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001176 gpr_mu_lock(&pollset->pi_mu);
1177 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001178
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001179 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001180
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001181 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1182 * equal, do nothing.
1183 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1184 * a new polling island (with a refcount of 2) and make the polling_island
1185 * fields in both fd and pollset to point to the new island
1186 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1187 * the NULL polling_island field to point to the non-NULL polling_island
1188 * field (ensure that the refcount on the polling island is incremented by
1189 * 1 to account for the newly added reference)
1190 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1191 * and different, merge both the polling islands and update the
1192 * polling_island fields in both fd and pollset to point to the merged
1193 * polling island.
1194 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001195 if (fd->polling_island == pollset->polling_island) {
1196 pi_new = fd->polling_island;
1197 if (pi_new == NULL) {
1198 pi_new = polling_island_create(fd, 2);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001199 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001200 } else if (fd->polling_island == NULL) {
1201 pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001202 polling_island_add_fds_locked(pollset->polling_island, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001203 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001204 } else if (pollset->polling_island == NULL) {
1205 pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001206 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001207 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001208 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001209 }
1210
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001211 fd->polling_island = pollset->polling_island = pi_new;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001212
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001213 gpr_mu_unlock(&fd->pi_mu);
1214 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001215}
1216
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001217/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001218 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001219 */
1220
1221static grpc_pollset_set *pollset_set_create(void) {
1222 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1223 memset(pollset_set, 0, sizeof(*pollset_set));
1224 gpr_mu_init(&pollset_set->mu);
1225 return pollset_set;
1226}
1227
1228static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1229 size_t i;
1230 gpr_mu_destroy(&pollset_set->mu);
1231 for (i = 0; i < pollset_set->fd_count; i++) {
1232 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1233 }
1234 gpr_free(pollset_set->pollsets);
1235 gpr_free(pollset_set->pollset_sets);
1236 gpr_free(pollset_set->fds);
1237 gpr_free(pollset_set);
1238}
1239
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001240static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1241 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1242 size_t i;
1243 gpr_mu_lock(&pollset_set->mu);
1244 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1245 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1246 pollset_set->fds = gpr_realloc(
1247 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1248 }
1249 GRPC_FD_REF(fd, "pollset_set");
1250 pollset_set->fds[pollset_set->fd_count++] = fd;
1251 for (i = 0; i < pollset_set->pollset_count; i++) {
1252 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1253 }
1254 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1255 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1256 }
1257 gpr_mu_unlock(&pollset_set->mu);
1258}
1259
1260static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1261 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1262 size_t i;
1263 gpr_mu_lock(&pollset_set->mu);
1264 for (i = 0; i < pollset_set->fd_count; i++) {
1265 if (pollset_set->fds[i] == fd) {
1266 pollset_set->fd_count--;
1267 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1268 pollset_set->fds[pollset_set->fd_count]);
1269 GRPC_FD_UNREF(fd, "pollset_set");
1270 break;
1271 }
1272 }
1273 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1274 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1275 }
1276 gpr_mu_unlock(&pollset_set->mu);
1277}
1278
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001279static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1280 grpc_pollset_set *pollset_set,
1281 grpc_pollset *pollset) {
1282 size_t i, j;
1283 gpr_mu_lock(&pollset_set->mu);
1284 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1285 pollset_set->pollset_capacity =
1286 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1287 pollset_set->pollsets =
1288 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1289 sizeof(*pollset_set->pollsets));
1290 }
1291 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1292 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1293 if (fd_is_orphaned(pollset_set->fds[i])) {
1294 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1295 } else {
1296 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1297 pollset_set->fds[j++] = pollset_set->fds[i];
1298 }
1299 }
1300 pollset_set->fd_count = j;
1301 gpr_mu_unlock(&pollset_set->mu);
1302}
1303
1304static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1305 grpc_pollset_set *pollset_set,
1306 grpc_pollset *pollset) {
1307 size_t i;
1308 gpr_mu_lock(&pollset_set->mu);
1309 for (i = 0; i < pollset_set->pollset_count; i++) {
1310 if (pollset_set->pollsets[i] == pollset) {
1311 pollset_set->pollset_count--;
1312 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1313 pollset_set->pollsets[pollset_set->pollset_count]);
1314 break;
1315 }
1316 }
1317 gpr_mu_unlock(&pollset_set->mu);
1318}
1319
1320static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1321 grpc_pollset_set *bag,
1322 grpc_pollset_set *item) {
1323 size_t i, j;
1324 gpr_mu_lock(&bag->mu);
1325 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1326 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1327 bag->pollset_sets =
1328 gpr_realloc(bag->pollset_sets,
1329 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1330 }
1331 bag->pollset_sets[bag->pollset_set_count++] = item;
1332 for (i = 0, j = 0; i < bag->fd_count; i++) {
1333 if (fd_is_orphaned(bag->fds[i])) {
1334 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1335 } else {
1336 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1337 bag->fds[j++] = bag->fds[i];
1338 }
1339 }
1340 bag->fd_count = j;
1341 gpr_mu_unlock(&bag->mu);
1342}
1343
1344static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1345 grpc_pollset_set *bag,
1346 grpc_pollset_set *item) {
1347 size_t i;
1348 gpr_mu_lock(&bag->mu);
1349 for (i = 0; i < bag->pollset_set_count; i++) {
1350 if (bag->pollset_sets[i] == item) {
1351 bag->pollset_set_count--;
1352 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1353 bag->pollset_sets[bag->pollset_set_count]);
1354 break;
1355 }
1356 }
1357 gpr_mu_unlock(&bag->mu);
1358}
1359
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001360/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001361 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001362 */
1363
1364static void shutdown_engine(void) {
1365 fd_global_shutdown();
1366 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001367 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001368}
1369
1370static const grpc_event_engine_vtable vtable = {
1371 .pollset_size = sizeof(grpc_pollset),
1372
1373 .fd_create = fd_create,
1374 .fd_wrapped_fd = fd_wrapped_fd,
1375 .fd_orphan = fd_orphan,
1376 .fd_shutdown = fd_shutdown,
1377 .fd_notify_on_read = fd_notify_on_read,
1378 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001379 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001380
1381 .pollset_init = pollset_init,
1382 .pollset_shutdown = pollset_shutdown,
1383 .pollset_reset = pollset_reset,
1384 .pollset_destroy = pollset_destroy,
1385 .pollset_work = pollset_work,
1386 .pollset_kick = pollset_kick,
1387 .pollset_add_fd = pollset_add_fd,
1388
1389 .pollset_set_create = pollset_set_create,
1390 .pollset_set_destroy = pollset_set_destroy,
1391 .pollset_set_add_pollset = pollset_set_add_pollset,
1392 .pollset_set_del_pollset = pollset_set_del_pollset,
1393 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1394 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1395 .pollset_set_add_fd = pollset_set_add_fd,
1396 .pollset_set_del_fd = pollset_set_del_fd,
1397
1398 .kick_poller = kick_poller,
1399
1400 .shutdown_engine = shutdown_engine,
1401};
1402
1403const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
1404 fd_global_init();
1405 pollset_global_init();
1406 polling_island_global_init();
1407 return &vtable;
1408}
1409
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001410#else /* defined(GPR_LINUX_EPOLL) */
1411/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1412 * NULL */
1413const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
1414
1415#endif /* !defined(GPR_LINUX_EPOLL) */