blob: f257ac8a1dd792a33c6cee8a86d64c860f2be43e [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
38#include "src/core/lib/iomgr/ev_epoll_posix.h"
39
40#include <assert.h>
41#include <errno.h>
42#include <poll.h>
43#include <signal.h>
44#include <string.h>
45#include <sys/epoll.h>
46#include <sys/socket.h>
47#include <unistd.h>
48
49#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/string_util.h>
52#include <grpc/support/tls.h>
53#include <grpc/support/useful.h>
54
55#include "src/core/lib/iomgr/ev_posix.h"
56#include "src/core/lib/iomgr/iomgr_internal.h"
57#include "src/core/lib/iomgr/wakeup_fd_posix.h"
58#include "src/core/lib/profiling/timers.h"
59#include "src/core/lib/support/block_annotate.h"
60
61struct polling_island;
62
63/*******************************************************************************
64 * FD declarations
65 */
66
67struct grpc_fd {
68 int fd;
69 /* refst format:
70 bit0: 1=active/0=orphaned
71 bit1-n: refcount
72 meaning that mostly we ref by two to avoid altering the orphaned bit,
73 and just unref by 1 when we're ready to flag the object as orphaned */
74 gpr_atm refst;
75
76 gpr_mu mu;
77 int shutdown;
78 int closed;
79 int released;
80
81 grpc_closure *read_closure;
82 grpc_closure *write_closure;
83
84 /* Mutex protecting the 'polling_island' field */
85 gpr_mu pi_mu;
86
87 /* The polling island to which this fd belongs to. An fd belongs to exactly
88 one polling island */
89 struct polling_island *polling_island;
90
91 struct grpc_fd *freelist_next;
92
93 grpc_closure *on_done_closure;
94
95 grpc_iomgr_object iomgr_object;
96};
97
98/* Return 1 if this fd is orphaned, 0 otherwise */
99static bool fd_is_orphaned(grpc_fd *fd);
100
101/* Reference counting for fds */
102/*#define GRPC_FD_REF_COUNT_DEBUG*/
103#ifdef GRPC_FD_REF_COUNT_DEBUG
104static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
105static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
106 int line);
107#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
108#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
109#else
110static void fd_ref(grpc_fd *fd);
111static void fd_unref(grpc_fd *fd);
112#define GRPC_FD_REF(fd, reason) fd_ref(fd)
113#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
114#endif
115
116static void fd_global_init(void);
117static void fd_global_shutdown(void);
118
119#define CLOSURE_NOT_READY ((grpc_closure *)0)
120#define CLOSURE_READY ((grpc_closure *)1)
121
122/*******************************************************************************
123 * Polling Island
124 */
125typedef struct polling_island {
126 gpr_mu mu;
127 int ref_cnt;
128
129 /* Pointer to the polling_island this merged into. If this is not NULL, all
130 the remaining fields in this pollset (i.e all fields except mu and ref_cnt)
131 are considered invalid and must be ignored */
132 struct polling_island *merged_to;
133
134 /* The fd of the underlying epoll set */
135 int epoll_fd;
136
137 /* The file descriptors in the epoll set */
138 size_t fd_cnt;
139 size_t fd_capacity;
140 grpc_fd **fds;
141
142 /* Polling islands that are no longer needed are kept in a freelist so that
143 they can be reused. This field points to the next polling island in the
144 free list. Note that this is only used if the polling island is in the
145 free list */
146 struct polling_island *next_free;
147} polling_island;
148
149/* Polling island freelist */
150static gpr_mu g_pi_freelist_mu;
151static polling_island *g_pi_freelist = NULL;
152
153/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ? */
154static void add_fd_to_polling_island_locked(polling_island *pi, grpc_fd *fd) {
155 int err;
156 struct epoll_event ev;
157
158 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
159 ev.data.ptr = fd;
160 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
161
162 if (err < 0 && errno != EEXIST) {
163 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s", fd->fd,
164 strerror(errno));
165 return;
166 }
167
168 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
169 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
170 pi->fds[pi->fd_cnt++] = fd;
171}
172
173static polling_island *polling_island_create(int initial_ref_cnt,
174 grpc_fd *initial_fd) {
175 polling_island *pi = NULL;
176 gpr_mu_lock(&g_pi_freelist_mu);
177 if (g_pi_freelist != NULL) {
178 pi = g_pi_freelist;
179 g_pi_freelist = g_pi_freelist->next_free;
180 pi->next_free = NULL;
181 }
182 gpr_mu_unlock(&g_pi_freelist_mu);
183
184 /* Create new polling island if we could not get one from the free list */
185 if (pi == NULL) {
186 pi = gpr_malloc(sizeof(*pi));
187 gpr_mu_init(&pi->mu);
188 pi->fd_cnt = 0;
189 pi->fd_capacity = 0;
190 pi->fds = NULL;
191
192 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
193 if (pi->epoll_fd < 0) {
194 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
195 strerror(errno));
196 }
197 GPR_ASSERT(pi->epoll_fd >= 0);
198 }
199
200 pi->ref_cnt = initial_ref_cnt;
201 pi->merged_to = NULL;
202 pi->next_free = NULL;
203
204 if (initial_fd != NULL) {
205 /* add_fd_to_polling_island_locked() expects the caller to hold a pi->mu
206 * lock. However, since this is a new polling island (and no one has a
207 * reference to it yet), it is okay to not acquire pi->mu here */
208 add_fd_to_polling_island_locked(pi, initial_fd);
209 }
210
211 return pi;
212}
213
214static void polling_island_global_init() {
215 polling_island_create(0, NULL); /* TODO(sreek): Delete this line */
216 gpr_mu_init(&g_pi_freelist_mu);
217 g_pi_freelist = NULL;
218}
219
220/*******************************************************************************
221 * pollset declarations
222 */
223
224typedef struct grpc_cached_wakeup_fd {
225 grpc_wakeup_fd fd;
226 struct grpc_cached_wakeup_fd *next;
227} grpc_cached_wakeup_fd;
228
229struct grpc_pollset_worker {
230 grpc_cached_wakeup_fd *wakeup_fd;
231 int reevaluate_polling_on_wakeup;
232 int kicked_specifically;
233 pthread_t pt_id;
234 struct grpc_pollset_worker *next;
235 struct grpc_pollset_worker *prev;
236};
237
238struct grpc_pollset {
239 gpr_mu mu;
240 grpc_pollset_worker root_worker;
241 int shutting_down;
242 int called_shutdown;
243 int kicked_without_pollers;
244 grpc_closure *shutdown_done;
245
246 int epoll_fd;
247
248 /* Mutex protecting the 'polling_island' field */
249 gpr_mu pi_mu;
250
251 /* The polling island to which this fd belongs to. An fd belongs to exactly
252 one polling island */
253 struct polling_island *polling_island;
254
255 /* Local cache of eventfds for workers */
256 grpc_cached_wakeup_fd *local_wakeup_cache;
257};
258
259/* Add an fd to a pollset */
260static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
261 struct grpc_fd *fd);
262
263static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
264 grpc_pollset_set *pollset_set, grpc_fd *fd);
265
266/* Convert a timespec to milliseconds:
267 - very small or negative poll times are clamped to zero to do a
268 non-blocking poll (which becomes spin polling)
269 - other small values are rounded up to one millisecond
270 - longer than a millisecond polls are rounded up to the next nearest
271 millisecond to avoid spinning
272 - infinite timeouts are converted to -1 */
273static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
274 gpr_timespec now);
275
276/* Allow kick to wakeup the currently polling worker */
277#define GRPC_POLLSET_CAN_KICK_SELF 1
278/* Force the wakee to repoll when awoken */
279#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
280/* As per pollset_kick, with an extended set of flags (defined above)
281 -- mostly for fd_posix's use. */
282static void pollset_kick_ext(grpc_pollset *p,
283 grpc_pollset_worker *specific_worker,
284 uint32_t flags);
285
286/* turn a pollset into a multipoller: platform specific */
287typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
288 grpc_pollset *pollset,
289 struct grpc_fd **fds,
290 size_t fd_count);
291
292/* Return 1 if the pollset has active threads in pollset_work (pollset must
293 * be locked) */
294static int pollset_has_workers(grpc_pollset *pollset);
295
296static void remove_fd_from_all_epoll_sets(int fd);
297
298/*******************************************************************************
299 * pollset_set definitions
300 */
301
302struct grpc_pollset_set {
303 gpr_mu mu;
304
305 size_t pollset_count;
306 size_t pollset_capacity;
307 grpc_pollset **pollsets;
308
309 size_t pollset_set_count;
310 size_t pollset_set_capacity;
311 struct grpc_pollset_set **pollset_sets;
312
313 size_t fd_count;
314 size_t fd_capacity;
315 grpc_fd **fds;
316};
317
318/*******************************************************************************
319 * fd_posix.c
320 */
321
322/* We need to keep a freelist not because of any concerns of malloc performance
323 * but instead so that implementations with multiple threads in (for example)
324 * epoll_wait deal with the race between pollset removal and incoming poll
325 * notifications.
326 *
327 * The problem is that the poller ultimately holds a reference to this
328 * object, so it is very difficult to know when is safe to free it, at least
329 * without some expensive synchronization.
330 *
331 * If we keep the object freelisted, in the worst case losing this race just
332 * becomes a spurious read notification on a reused fd.
333 */
334/* TODO(klempner): We could use some form of polling generation count to know
335 * when these are safe to free. */
336/* TODO(klempner): Consider disabling freelisting if we don't have multiple
337 * threads in poll on the same fd */
338/* TODO(klempner): Batch these allocations to reduce fragmentation */
339static grpc_fd *fd_freelist = NULL;
340static gpr_mu fd_freelist_mu;
341
342static void freelist_fd(grpc_fd *fd) {
343 gpr_mu_lock(&fd_freelist_mu);
344 fd->freelist_next = fd_freelist;
345 fd_freelist = fd;
346 grpc_iomgr_unregister_object(&fd->iomgr_object);
347 gpr_mu_unlock(&fd_freelist_mu);
348}
349
350static grpc_fd *alloc_fd(int fd) {
351 grpc_fd *r = NULL;
352
353 gpr_mu_lock(&fd_freelist_mu);
354 if (fd_freelist != NULL) {
355 r = fd_freelist;
356 fd_freelist = fd_freelist->freelist_next;
357 }
358 gpr_mu_unlock(&fd_freelist_mu);
359
360 if (r == NULL) {
361 r = gpr_malloc(sizeof(grpc_fd));
362 gpr_mu_init(&r->mu);
363 gpr_mu_init(&r->pi_mu);
364 }
365
366 /* TODO: sreek - check with ctiller on why we need to acquire a lock here */
367 gpr_mu_lock(&r->mu);
368 gpr_atm_rel_store(&r->refst, 1);
369 r->shutdown = 0;
370 r->read_closure = CLOSURE_NOT_READY;
371 r->write_closure = CLOSURE_NOT_READY;
372 r->fd = fd;
373 r->polling_island = NULL;
374 r->freelist_next = NULL;
375 r->on_done_closure = NULL;
376 r->closed = 0;
377 r->released = 0;
378 gpr_mu_unlock(&r->mu);
379 return r;
380}
381
382static void destroy(grpc_fd *fd) {
383 gpr_mu_destroy(&fd->mu);
384 gpr_free(fd);
385}
386
387#ifdef GRPC_FD_REF_COUNT_DEBUG
388#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
389#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
390static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
391 int line) {
392 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
393 gpr_atm_no_barrier_load(&fd->refst),
394 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
395#else
396#define REF_BY(fd, n, reason) ref_by(fd, n)
397#define UNREF_BY(fd, n, reason) unref_by(fd, n)
398static void ref_by(grpc_fd *fd, int n) {
399#endif
400 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
401}
402
403#ifdef GRPC_FD_REF_COUNT_DEBUG
404static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
405 int line) {
406 gpr_atm old;
407 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
408 gpr_atm_no_barrier_load(&fd->refst),
409 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
410#else
411static void unref_by(grpc_fd *fd, int n) {
412 gpr_atm old;
413#endif
414 old = gpr_atm_full_fetch_add(&fd->refst, -n);
415 if (old == n) {
416 freelist_fd(fd);
417 } else {
418 GPR_ASSERT(old > n);
419 }
420}
421
422static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
423
424static void fd_global_shutdown(void) {
425 gpr_mu_lock(&fd_freelist_mu);
426 gpr_mu_unlock(&fd_freelist_mu);
427 while (fd_freelist != NULL) {
428 grpc_fd *fd = fd_freelist;
429 fd_freelist = fd_freelist->freelist_next;
430 destroy(fd);
431 }
432 gpr_mu_destroy(&fd_freelist_mu);
433}
434
435static grpc_fd *fd_create(int fd, const char *name) {
436 grpc_fd *r = alloc_fd(fd);
437 char *name2;
438 gpr_asprintf(&name2, "%s fd=%d", name, fd);
439 grpc_iomgr_register_object(&r->iomgr_object, name2);
440 gpr_free(name2);
441#ifdef GRPC_FD_REF_COUNT_DEBUG
442 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
443#endif
444 return r;
445}
446
447static bool fd_is_orphaned(grpc_fd *fd) {
448 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
449}
450
451static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
452 fd->closed = 1;
453 if (!fd->released) {
454 close(fd->fd);
455 } else {
456 remove_fd_from_all_epoll_sets(fd->fd);
457 }
458 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
459}
460
461static int fd_wrapped_fd(grpc_fd *fd) {
462 if (fd->released || fd->closed) {
463 return -1;
464 } else {
465 return fd->fd;
466 }
467}
468
469/* TODO: sreek - do something here with the pollset island link */
470static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
471 grpc_closure *on_done, int *release_fd,
472 const char *reason) {
473 fd->on_done_closure = on_done;
474 fd->released = release_fd != NULL;
475 if (!fd->released) {
476 shutdown(fd->fd, SHUT_RDWR);
477 } else {
478 *release_fd = fd->fd;
479 }
480 gpr_mu_lock(&fd->mu);
481 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
482 close_fd_locked(exec_ctx, fd);
483 gpr_mu_unlock(&fd->mu);
484 UNREF_BY(fd, 2, reason); /* drop the reference */
485}
486
487/* increment refcount by two to avoid changing the orphan bit */
488#ifdef GRPC_FD_REF_COUNT_DEBUG
489static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
490 int line) {
491 ref_by(fd, 2, reason, file, line);
492}
493
494static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
495 int line) {
496 unref_by(fd, 2, reason, file, line);
497}
498#else
499static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
500
501static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
502#endif
503
504static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
505 grpc_closure **st, grpc_closure *closure) {
506 if (*st == CLOSURE_NOT_READY) {
507 /* not ready ==> switch to a waiting state by setting the closure */
508 *st = closure;
509 } else if (*st == CLOSURE_READY) {
510 /* already ready ==> queue the closure to run immediately */
511 *st = CLOSURE_NOT_READY;
512 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
513 } else {
514 /* upcallptr was set to a different closure. This is an error! */
515 gpr_log(GPR_ERROR,
516 "User called a notify_on function with a previous callback still "
517 "pending");
518 abort();
519 }
520}
521
522/* returns 1 if state becomes not ready */
523static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
524 grpc_closure **st) {
525 if (*st == CLOSURE_READY) {
526 /* duplicate ready ==> ignore */
527 return 0;
528 } else if (*st == CLOSURE_NOT_READY) {
529 /* not ready, and not waiting ==> flag ready */
530 *st = CLOSURE_READY;
531 return 0;
532 } else {
533 /* waiting ==> queue closure */
534 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
535 *st = CLOSURE_NOT_READY;
536 return 1;
537 }
538}
539
540/* Do something here with the pollset island link (?) */
541static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
542 gpr_mu_lock(&fd->mu);
543 GPR_ASSERT(!fd->shutdown);
544 fd->shutdown = 1;
545 set_ready_locked(exec_ctx, fd, &fd->read_closure);
546 set_ready_locked(exec_ctx, fd, &fd->write_closure);
547 gpr_mu_unlock(&fd->mu);
548}
549
550static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
551 grpc_closure *closure) {
552 gpr_mu_lock(&fd->mu);
553 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
554 gpr_mu_unlock(&fd->mu);
555}
556
557static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
558 grpc_closure *closure) {
559 gpr_mu_lock(&fd->mu);
560 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
561 gpr_mu_unlock(&fd->mu);
562}
563
564/*******************************************************************************
565 * pollset_posix.c
566 */
567
568GPR_TLS_DECL(g_current_thread_poller);
569GPR_TLS_DECL(g_current_thread_worker);
570
571/** The alarm system needs to be able to wakeup 'some poller' sometimes
572 * (specifically when a new alarm needs to be triggered earlier than the next
573 * alarm 'epoch').
574 * This wakeup_fd gives us something to alert on when such a case occurs. */
575grpc_wakeup_fd grpc_global_wakeup_fd;
576
577static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
578 worker->prev->next = worker->next;
579 worker->next->prev = worker->prev;
580}
581
582static int pollset_has_workers(grpc_pollset *p) {
583 return p->root_worker.next != &p->root_worker;
584}
585
586static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
587 if (pollset_has_workers(p)) {
588 grpc_pollset_worker *w = p->root_worker.next;
589 remove_worker(p, w);
590 return w;
591 } else {
592 return NULL;
593 }
594}
595
596static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
597 worker->next = &p->root_worker;
598 worker->prev = worker->next->prev;
599 worker->prev->next = worker->next->prev = worker;
600}
601
602static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
603 worker->prev = &p->root_worker;
604 worker->next = worker->prev->next;
605 worker->prev->next = worker->next->prev = worker;
606}
607
608static void pollset_kick_ext(grpc_pollset *p,
609 grpc_pollset_worker *specific_worker,
610 uint32_t flags) {
611 GPR_TIMER_BEGIN("pollset_kick_ext", 0);
612
613 /* pollset->mu already held */
614 if (specific_worker != NULL) {
615 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
616 GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
617 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
618 for (specific_worker = p->root_worker.next;
619 specific_worker != &p->root_worker;
620 specific_worker = specific_worker->next) {
621 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
622 }
623 p->kicked_without_pollers = 1;
624 GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
625 } else if (gpr_tls_get(&g_current_thread_worker) !=
626 (intptr_t)specific_worker) {
627 GPR_TIMER_MARK("different_thread_worker", 0);
628 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
629 specific_worker->reevaluate_polling_on_wakeup = 1;
630 }
631 specific_worker->kicked_specifically = 1;
632 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
633 /* TODO (sreek): Refactor this into a separate file*/
634 pthread_kill(specific_worker->pt_id, SIGUSR1);
635 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
636 GPR_TIMER_MARK("kick_yoself", 0);
637 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
638 specific_worker->reevaluate_polling_on_wakeup = 1;
639 }
640 specific_worker->kicked_specifically = 1;
641 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
642 }
643 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
644 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
645 GPR_TIMER_MARK("kick_anonymous", 0);
646 specific_worker = pop_front_worker(p);
647 if (specific_worker != NULL) {
648 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
649 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
650 push_back_worker(p, specific_worker);
651 specific_worker = pop_front_worker(p);
652 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
653 gpr_tls_get(&g_current_thread_worker) ==
654 (intptr_t)specific_worker) {
655 push_back_worker(p, specific_worker);
656 specific_worker = NULL;
657 }
658 }
659 if (specific_worker != NULL) {
660 GPR_TIMER_MARK("finally_kick", 0);
661 push_back_worker(p, specific_worker);
662 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
663 }
664 } else {
665 GPR_TIMER_MARK("kicked_no_pollers", 0);
666 p->kicked_without_pollers = 1;
667 }
668 }
669
670 GPR_TIMER_END("pollset_kick_ext", 0);
671}
672
673static void pollset_kick(grpc_pollset *p,
674 grpc_pollset_worker *specific_worker) {
675 pollset_kick_ext(p, specific_worker, 0);
676}
677
678/* global state management */
679
680static void sig_handler(int sig_num) {
681 gpr_log(GPR_INFO, "Received signal %d", sig_num);
682}
683
684static void pollset_global_init(void) {
685 gpr_tls_init(&g_current_thread_poller);
686 gpr_tls_init(&g_current_thread_worker);
687 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
688 signal(SIGUSR1, sig_handler);
689}
690
691static void pollset_global_shutdown(void) {
692 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
693 gpr_tls_destroy(&g_current_thread_poller);
694 gpr_tls_destroy(&g_current_thread_worker);
695}
696
697static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
698
699/* TODO: sreek. Try to Remove this forward declaration*/
700static void multipoll_with_epoll_pollset_create_efd(grpc_pollset *pollset);
701
702/* main interface */
703
704static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
705 gpr_mu_init(&pollset->mu);
706 *mu = &pollset->mu;
707 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
708 gpr_mu_init(&pollset->pi_mu);
709 pollset->polling_island = NULL;
710 pollset->shutting_down = 0;
711 pollset->called_shutdown = 0;
712 pollset->kicked_without_pollers = 0;
713 pollset->local_wakeup_cache = NULL;
714 pollset->kicked_without_pollers = 0;
715
716 multipoll_with_epoll_pollset_create_efd(pollset);
717}
718
719/* TODO(sreek): Maybe merge multipoll_*_destroy() with pollset_destroy()
720 * function */
721static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset);
722
723static void pollset_destroy(grpc_pollset *pollset) {
724 GPR_ASSERT(!pollset_has_workers(pollset));
725
726 multipoll_with_epoll_pollset_destroy(pollset);
727
728 while (pollset->local_wakeup_cache) {
729 grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
730 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
731 gpr_free(pollset->local_wakeup_cache);
732 pollset->local_wakeup_cache = next;
733 }
734 gpr_mu_destroy(&pollset->pi_mu);
735 gpr_mu_destroy(&pollset->mu);
736}
737
738/* TODO(sreek) - Do something with the pollset island link (??) */
739static void pollset_reset(grpc_pollset *pollset) {
740 GPR_ASSERT(pollset->shutting_down);
741 GPR_ASSERT(!pollset_has_workers(pollset));
742 pollset->shutting_down = 0;
743 pollset->called_shutdown = 0;
744 pollset->kicked_without_pollers = 0;
745}
746
747/* TODO (sreek): Remove multipoll_with_epoll_finish_shutdown() declaration */
748static void multipoll_with_epoll_pollset_finish_shutdown(grpc_pollset *pollset);
749
750static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
751 multipoll_with_epoll_pollset_finish_shutdown(pollset);
752 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
753}
754
755/* TODO(sreek): Remove multipoll_with_epoll_*_maybe_work_and_unlock declaration
756 */
757static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
758 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
759 gpr_timespec deadline, gpr_timespec now);
760
761static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
762 grpc_pollset_worker **worker_hdl, gpr_timespec now,
763 gpr_timespec deadline) {
764 grpc_pollset_worker worker;
765 *worker_hdl = &worker;
766
767 /* pollset->mu already held */
768 int added_worker = 0;
769 int locked = 1;
770 int queued_work = 0;
771 int keep_polling = 0;
772 GPR_TIMER_BEGIN("pollset_work", 0);
773 /* this must happen before we (potentially) drop pollset->mu */
774 worker.next = worker.prev = NULL;
775 worker.reevaluate_polling_on_wakeup = 0;
776 if (pollset->local_wakeup_cache != NULL) {
777 worker.wakeup_fd = pollset->local_wakeup_cache;
778 pollset->local_wakeup_cache = worker.wakeup_fd->next;
779 } else {
780 worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
781 grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
782 }
783 worker.kicked_specifically = 0;
784
785 /* TODO(sreek): Abstract this thread id stuff out into a separate file */
786 worker.pt_id = pthread_self();
787 /* If we're shutting down then we don't execute any extended work */
788 if (pollset->shutting_down) {
789 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
790 goto done;
791 }
792 /* Start polling, and keep doing so while we're being asked to
793 re-evaluate our pollers (this allows poll() based pollers to
794 ensure they don't miss wakeups) */
795 keep_polling = 1;
796 while (keep_polling) {
797 keep_polling = 0;
798 if (!pollset->kicked_without_pollers) {
799 if (!added_worker) {
800 push_front_worker(pollset, &worker);
801 added_worker = 1;
802 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
803 }
804 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
805 GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
806
807 multipoll_with_epoll_pollset_maybe_work_and_unlock(
808 exec_ctx, pollset, &worker, deadline, now);
809
810 GPR_TIMER_END("maybe_work_and_unlock", 0);
811 locked = 0;
812 gpr_tls_set(&g_current_thread_poller, 0);
813 } else {
814 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
815 pollset->kicked_without_pollers = 0;
816 }
817 /* Finished execution - start cleaning up.
818 Note that we may arrive here from outside the enclosing while() loop.
819 In that case we won't loop though as we haven't added worker to the
820 worker list, which means nobody could ask us to re-evaluate polling). */
821 done:
822 if (!locked) {
823 queued_work |= grpc_exec_ctx_flush(exec_ctx);
824 gpr_mu_lock(&pollset->mu);
825 locked = 1;
826 }
827 /* If we're forced to re-evaluate polling (via pollset_kick with
828 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
829 a loop */
830 if (worker.reevaluate_polling_on_wakeup) {
831 worker.reevaluate_polling_on_wakeup = 0;
832 pollset->kicked_without_pollers = 0;
833 if (queued_work || worker.kicked_specifically) {
834 /* If there's queued work on the list, then set the deadline to be
835 immediate so we get back out of the polling loop quickly */
836 deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
837 }
838 keep_polling = 1;
839 }
840 }
841 if (added_worker) {
842 remove_worker(pollset, &worker);
843 gpr_tls_set(&g_current_thread_worker, 0);
844 }
845 /* release wakeup fd to the local pool */
846 worker.wakeup_fd->next = pollset->local_wakeup_cache;
847 pollset->local_wakeup_cache = worker.wakeup_fd;
848 /* check shutdown conditions */
849 if (pollset->shutting_down) {
850 if (pollset_has_workers(pollset)) {
851 pollset_kick(pollset, NULL);
852 } else if (!pollset->called_shutdown) {
853 pollset->called_shutdown = 1;
854 gpr_mu_unlock(&pollset->mu);
855 finish_shutdown(exec_ctx, pollset);
856 grpc_exec_ctx_flush(exec_ctx);
857 /* Continuing to access pollset here is safe -- it is the caller's
858 * responsibility to not destroy when it has outstanding calls to
859 * pollset_work.
860 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
861 gpr_mu_lock(&pollset->mu);
862 }
863 }
864 *worker_hdl = NULL;
865 GPR_TIMER_END("pollset_work", 0);
866}
867
868/* TODO: (sreek) Do something with the pollset island link */
869static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
870 grpc_closure *closure) {
871 GPR_ASSERT(!pollset->shutting_down);
872 pollset->shutting_down = 1;
873 pollset->shutdown_done = closure;
874 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
875
876 if (!pollset->called_shutdown && !pollset_has_workers(pollset)) {
877 pollset->called_shutdown = 1;
878 finish_shutdown(exec_ctx, pollset);
879 }
880}
881
882static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
883 gpr_timespec now) {
884 gpr_timespec timeout;
885 static const int64_t max_spin_polling_us = 10;
886 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
887 return -1;
888 }
889 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
890 max_spin_polling_us,
891 GPR_TIMESPAN))) <= 0) {
892 return 0;
893 }
894 timeout = gpr_time_sub(deadline, now);
895 return gpr_time_to_millis(gpr_time_add(
896 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
897}
898
899/*******************************************************************************
900 * pollset_multipoller_with_epoll_posix.c
901 */
902
903static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
904 /* only one set_ready can be active at once (but there may be a racing
905 notify_on) */
906 gpr_mu_lock(&fd->mu);
907 set_ready_locked(exec_ctx, fd, st);
908 gpr_mu_unlock(&fd->mu);
909}
910
911static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
912 set_ready(exec_ctx, fd, &fd->read_closure);
913}
914
915static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
916 set_ready(exec_ctx, fd, &fd->write_closure);
917}
918
919/* TODO (sreek): Maybe this global list is not required. Double check*/
920struct epoll_fd_list {
921 int *epoll_fds;
922 size_t count;
923 size_t capacity;
924};
925
926static struct epoll_fd_list epoll_fd_global_list;
927static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT;
928static gpr_mu epoll_fd_list_mu;
929
930static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); }
931
932static void add_epoll_fd_to_global_list(int epoll_fd) {
933 gpr_once_init(&init_epoll_fd_list_mu, init_mu);
934
935 gpr_mu_lock(&epoll_fd_list_mu);
936 if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) {
937 epoll_fd_global_list.capacity =
938 GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2);
939 epoll_fd_global_list.epoll_fds =
940 gpr_realloc(epoll_fd_global_list.epoll_fds,
941 epoll_fd_global_list.capacity * sizeof(int));
942 }
943 epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd;
944 gpr_mu_unlock(&epoll_fd_list_mu);
945}
946
947static void remove_epoll_fd_from_global_list(int epoll_fd) {
948 gpr_mu_lock(&epoll_fd_list_mu);
949 GPR_ASSERT(epoll_fd_global_list.count > 0);
950 for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
951 if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) {
952 epoll_fd_global_list.epoll_fds[i] =
953 epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)];
954 break;
955 }
956 }
957 gpr_mu_unlock(&epoll_fd_list_mu);
958}
959
960static void remove_fd_from_all_epoll_sets(int fd) {
961 int err;
962 gpr_once_init(&init_epoll_fd_list_mu, init_mu);
963 gpr_mu_lock(&epoll_fd_list_mu);
964 if (epoll_fd_global_list.count == 0) {
965 gpr_mu_unlock(&epoll_fd_list_mu);
966 return;
967 }
968 for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
969 err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL);
970 if (err < 0 && errno != ENOENT) {
971 gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd,
972 strerror(errno));
973 }
974 }
975 gpr_mu_unlock(&epoll_fd_list_mu);
976}
977
978/* TODO: sreek - This function multipoll_with_epoll_pollset_add_fd() and
979 * finally_add_fd() in ev_poll_and_epoll_posix.c */
980static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
981 grpc_fd *fd) {
982
983 /* TODO sreek - Check if we need to get a pollset->mu lock here */
984
985 struct epoll_event ev;
986 int err;
987
988 /* Hold a ref to the fd to keep it from being closed during the add. This may
989 result in a spurious wakeup being assigned to this pollset whilst adding,
990 but that should be benign. */
991 /* TODO: (sreek): Understand how a spurious wake up migh be assinged to this
992 * pollset..and how holding a reference will prevent the fd from being closed
993 * (and perhaps more importantly, see how can an fd be closed while being
994 * added to the epollset */
995 GRPC_FD_REF(fd, "add fd");
996
997 gpr_mu_lock(&fd->mu);
998 if (fd->shutdown) {
999 gpr_mu_unlock(&fd->mu);
1000 GRPC_FD_UNREF(fd, "add fd");
1001 return;
1002 }
1003 gpr_mu_unlock(&fd->mu);
1004
1005 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
1006 ev.data.ptr = fd;
1007 err = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
1008 if (err < 0) {
1009 /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
1010 if (errno != EEXIST) {
1011 gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
1012 strerror(errno));
1013 }
1014 }
1015
1016 /* The fd might have been orphaned while we were adding it to the epoll set.
1017 Close the fd in such a case (which will also take care of removing it from
1018 the epoll set */
1019 gpr_mu_lock(&fd->mu);
1020 if (fd_is_orphaned(fd) && !fd->closed) {
1021 close_fd_locked(exec_ctx, fd);
1022 }
1023 gpr_mu_unlock(&fd->mu);
1024
1025 GRPC_FD_UNREF(fd, "add fd");
1026}
1027
1028/* Creates an epoll fd and initializes the pollset */
1029/* TODO: This has to be called ONLY from pollset_init function. and hence it
1030 * does not acquire any lock */
1031static void multipoll_with_epoll_pollset_create_efd(grpc_pollset *pollset) {
1032 struct epoll_event ev;
1033 int err;
1034
1035 pollset->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1036 if (pollset->epoll_fd < 0) {
1037 gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
1038 abort();
1039 }
1040 add_epoll_fd_to_global_list(pollset->epoll_fd);
1041
1042 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
1043 ev.data.ptr = NULL;
1044
1045 err = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD,
1046 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
1047 if (err < 0) {
1048 gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s",
1049 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
1050 strerror(errno));
1051 }
1052}
1053
1054/* TODO(klempner): We probably want to turn this down a bit */
1055#define GRPC_EPOLL_MAX_EVENTS 1000
1056
1057static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
1058 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
1059 gpr_timespec deadline, gpr_timespec now) {
1060 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
1061 int epoll_fd = pollset->epoll_fd;
1062 int ep_rv;
1063 int poll_rv;
1064 int timeout_ms;
1065 struct pollfd pfds[2];
1066
1067 /* If you want to ignore epoll's ability to sanely handle parallel pollers,
1068 * for a more apples-to-apples performance comparison with poll, add a
1069 * if (pollset->counter != 0) { return 0; }
1070 * here.
1071 */
1072
1073 gpr_mu_unlock(&pollset->mu);
1074
1075 timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1076
1077 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
1078 pfds[0].events = POLLIN;
1079 pfds[0].revents = 0;
1080 pfds[1].fd = epoll_fd;
1081 pfds[1].events = POLLIN;
1082 pfds[1].revents = 0;
1083
1084 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1085 even going into the blocking annotation if possible */
1086 GPR_TIMER_BEGIN("poll", 0);
1087 GRPC_SCHEDULING_START_BLOCKING_REGION;
1088 poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
1089 GRPC_SCHEDULING_END_BLOCKING_REGION;
1090 GPR_TIMER_END("poll", 0);
1091
1092 if (poll_rv < 0) {
1093 if (errno != EINTR) {
1094 gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
1095 }
1096 } else if (poll_rv == 0) {
1097 /* do nothing */
1098 } else {
1099 if (pfds[0].revents) {
1100 grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
1101 }
1102 if (pfds[1].revents) {
1103 do {
1104 /* The following epoll_wait never blocks; it has a timeout of 0 */
1105 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
1106 if (ep_rv < 0) {
1107 if (errno != EINTR) {
1108 gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
1109 }
1110 } else {
1111 int i;
1112 for (i = 0; i < ep_rv; ++i) {
1113 grpc_fd *fd = ep_ev[i].data.ptr;
1114 /* TODO(klempner): We might want to consider making err and pri
1115 * separate events */
1116 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1117 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1118 int write_ev = ep_ev[i].events & EPOLLOUT;
1119 if (fd == NULL) {
1120 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
1121 } else {
1122 if (read_ev || cancel) {
1123 fd_become_readable(exec_ctx, fd);
1124 }
1125 if (write_ev || cancel) {
1126 fd_become_writable(exec_ctx, fd);
1127 }
1128 }
1129 }
1130 }
1131 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
1132 }
1133 }
1134}
1135
1136static void multipoll_with_epoll_pollset_finish_shutdown(
1137 grpc_pollset *pollset) {}
1138
1139static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
1140 close(pollset->epoll_fd);
1141 remove_epoll_fd_from_global_list(pollset->epoll_fd);
1142}
1143
1144/*******************************************************************************
1145 * pollset_set_posix.c
1146 */
1147
1148static grpc_pollset_set *pollset_set_create(void) {
1149 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1150 memset(pollset_set, 0, sizeof(*pollset_set));
1151 gpr_mu_init(&pollset_set->mu);
1152 return pollset_set;
1153}
1154
1155static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1156 size_t i;
1157 gpr_mu_destroy(&pollset_set->mu);
1158 for (i = 0; i < pollset_set->fd_count; i++) {
1159 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1160 }
1161 gpr_free(pollset_set->pollsets);
1162 gpr_free(pollset_set->pollset_sets);
1163 gpr_free(pollset_set->fds);
1164 gpr_free(pollset_set);
1165}
1166
1167static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1168 grpc_pollset_set *pollset_set,
1169 grpc_pollset *pollset) {
1170 size_t i, j;
1171 gpr_mu_lock(&pollset_set->mu);
1172 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1173 pollset_set->pollset_capacity =
1174 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1175 pollset_set->pollsets =
1176 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1177 sizeof(*pollset_set->pollsets));
1178 }
1179 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1180 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1181 if (fd_is_orphaned(pollset_set->fds[i])) {
1182 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1183 } else {
1184 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1185 pollset_set->fds[j++] = pollset_set->fds[i];
1186 }
1187 }
1188 pollset_set->fd_count = j;
1189 gpr_mu_unlock(&pollset_set->mu);
1190}
1191
1192static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1193 grpc_pollset_set *pollset_set,
1194 grpc_pollset *pollset) {
1195 size_t i;
1196 gpr_mu_lock(&pollset_set->mu);
1197 for (i = 0; i < pollset_set->pollset_count; i++) {
1198 if (pollset_set->pollsets[i] == pollset) {
1199 pollset_set->pollset_count--;
1200 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1201 pollset_set->pollsets[pollset_set->pollset_count]);
1202 break;
1203 }
1204 }
1205 gpr_mu_unlock(&pollset_set->mu);
1206}
1207
1208static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1209 grpc_pollset_set *bag,
1210 grpc_pollset_set *item) {
1211 size_t i, j;
1212 gpr_mu_lock(&bag->mu);
1213 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1214 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1215 bag->pollset_sets =
1216 gpr_realloc(bag->pollset_sets,
1217 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1218 }
1219 bag->pollset_sets[bag->pollset_set_count++] = item;
1220 for (i = 0, j = 0; i < bag->fd_count; i++) {
1221 if (fd_is_orphaned(bag->fds[i])) {
1222 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1223 } else {
1224 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1225 bag->fds[j++] = bag->fds[i];
1226 }
1227 }
1228 bag->fd_count = j;
1229 gpr_mu_unlock(&bag->mu);
1230}
1231
1232static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1233 grpc_pollset_set *bag,
1234 grpc_pollset_set *item) {
1235 size_t i;
1236 gpr_mu_lock(&bag->mu);
1237 for (i = 0; i < bag->pollset_set_count; i++) {
1238 if (bag->pollset_sets[i] == item) {
1239 bag->pollset_set_count--;
1240 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1241 bag->pollset_sets[bag->pollset_set_count]);
1242 break;
1243 }
1244 }
1245 gpr_mu_unlock(&bag->mu);
1246}
1247
1248static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1249 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1250 size_t i;
1251 gpr_mu_lock(&pollset_set->mu);
1252 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1253 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1254 pollset_set->fds = gpr_realloc(
1255 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1256 }
1257 GRPC_FD_REF(fd, "pollset_set");
1258 pollset_set->fds[pollset_set->fd_count++] = fd;
1259 for (i = 0; i < pollset_set->pollset_count; i++) {
1260 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1261 }
1262 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1263 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1264 }
1265 gpr_mu_unlock(&pollset_set->mu);
1266}
1267
1268static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1269 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1270 size_t i;
1271 gpr_mu_lock(&pollset_set->mu);
1272 for (i = 0; i < pollset_set->fd_count; i++) {
1273 if (pollset_set->fds[i] == fd) {
1274 pollset_set->fd_count--;
1275 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1276 pollset_set->fds[pollset_set->fd_count]);
1277 GRPC_FD_UNREF(fd, "pollset_set");
1278 break;
1279 }
1280 }
1281 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1282 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1283 }
1284 gpr_mu_unlock(&pollset_set->mu);
1285}
1286
1287/*******************************************************************************
1288 * event engine binding
1289 */
1290
1291static void shutdown_engine(void) {
1292 fd_global_shutdown();
1293 pollset_global_shutdown();
1294}
1295
1296static const grpc_event_engine_vtable vtable = {
1297 .pollset_size = sizeof(grpc_pollset),
1298
1299 .fd_create = fd_create,
1300 .fd_wrapped_fd = fd_wrapped_fd,
1301 .fd_orphan = fd_orphan,
1302 .fd_shutdown = fd_shutdown,
1303 .fd_notify_on_read = fd_notify_on_read,
1304 .fd_notify_on_write = fd_notify_on_write,
1305
1306 .pollset_init = pollset_init,
1307 .pollset_shutdown = pollset_shutdown,
1308 .pollset_reset = pollset_reset,
1309 .pollset_destroy = pollset_destroy,
1310 .pollset_work = pollset_work,
1311 .pollset_kick = pollset_kick,
1312 .pollset_add_fd = pollset_add_fd,
1313
1314 .pollset_set_create = pollset_set_create,
1315 .pollset_set_destroy = pollset_set_destroy,
1316 .pollset_set_add_pollset = pollset_set_add_pollset,
1317 .pollset_set_del_pollset = pollset_set_del_pollset,
1318 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1319 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1320 .pollset_set_add_fd = pollset_set_add_fd,
1321 .pollset_set_del_fd = pollset_set_del_fd,
1322
1323 .kick_poller = kick_poller,
1324
1325 .shutdown_engine = shutdown_engine,
1326};
1327
1328const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
1329 fd_global_init();
1330 pollset_global_init();
1331 polling_island_global_init();
1332 return &vtable;
1333}
1334
1335#endif