blob: 0d30bb659b652ecdb9501a4d0ea27d027fe65804 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
38#include "src/core/lib/iomgr/ev_epoll_posix.h"
39
40#include <assert.h>
41#include <errno.h>
42#include <poll.h>
43#include <signal.h>
44#include <string.h>
45#include <sys/epoll.h>
46#include <sys/socket.h>
47#include <unistd.h>
48
49#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/string_util.h>
52#include <grpc/support/tls.h>
53#include <grpc/support/useful.h>
54
55#include "src/core/lib/iomgr/ev_posix.h"
56#include "src/core/lib/iomgr/iomgr_internal.h"
57#include "src/core/lib/iomgr/wakeup_fd_posix.h"
58#include "src/core/lib/profiling/timers.h"
59#include "src/core/lib/support/block_annotate.h"
60
61struct polling_island;
62
63/*******************************************************************************
64 * FD declarations
65 */
66
67struct grpc_fd {
68 int fd;
69 /* refst format:
70 bit0: 1=active/0=orphaned
71 bit1-n: refcount
72 meaning that mostly we ref by two to avoid altering the orphaned bit,
73 and just unref by 1 when we're ready to flag the object as orphaned */
74 gpr_atm refst;
75
76 gpr_mu mu;
77 int shutdown;
78 int closed;
79 int released;
80
81 grpc_closure *read_closure;
82 grpc_closure *write_closure;
83
84 /* Mutex protecting the 'polling_island' field */
85 gpr_mu pi_mu;
86
87 /* The polling island to which this fd belongs to. An fd belongs to exactly
88 one polling island */
89 struct polling_island *polling_island;
90
91 struct grpc_fd *freelist_next;
92
93 grpc_closure *on_done_closure;
94
95 grpc_iomgr_object iomgr_object;
96};
97
98/* Return 1 if this fd is orphaned, 0 otherwise */
99static bool fd_is_orphaned(grpc_fd *fd);
100
101/* Reference counting for fds */
102/*#define GRPC_FD_REF_COUNT_DEBUG*/
103#ifdef GRPC_FD_REF_COUNT_DEBUG
104static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
105static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
106 int line);
107#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
108#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
109#else
110static void fd_ref(grpc_fd *fd);
111static void fd_unref(grpc_fd *fd);
112#define GRPC_FD_REF(fd, reason) fd_ref(fd)
113#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
114#endif
115
116static void fd_global_init(void);
117static void fd_global_shutdown(void);
118
119#define CLOSURE_NOT_READY ((grpc_closure *)0)
120#define CLOSURE_READY ((grpc_closure *)1)
121
122/*******************************************************************************
123 * Polling Island
124 */
125typedef struct polling_island {
126 gpr_mu mu;
127 int ref_cnt;
128
129 /* Pointer to the polling_island this merged into. If this is not NULL, all
130 the remaining fields in this pollset (i.e all fields except mu and ref_cnt)
131 are considered invalid and must be ignored */
132 struct polling_island *merged_to;
133
134 /* The fd of the underlying epoll set */
135 int epoll_fd;
136
137 /* The file descriptors in the epoll set */
138 size_t fd_cnt;
139 size_t fd_capacity;
140 grpc_fd **fds;
141
142 /* Polling islands that are no longer needed are kept in a freelist so that
143 they can be reused. This field points to the next polling island in the
144 free list. Note that this is only used if the polling island is in the
145 free list */
146 struct polling_island *next_free;
147} polling_island;
148
149/* Polling island freelist */
150static gpr_mu g_pi_freelist_mu;
151static polling_island *g_pi_freelist = NULL;
152
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700153/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
154 * TODO: sreek - Should this add a ref to the grpc_fd ? */
155/* The caller is expected to hold pi->mu lock before calling this function */
156static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
157 size_t fd_count) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700158 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700159 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700160 struct epoll_event ev;
161
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700162 for (i = 0; i < fd_count; i++) {
163 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
164 ev.data.ptr = fds[i];
165 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700166
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700167 if (err < 0 && errno != EEXIST) {
168 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
169 fds[i]->fd, strerror(errno));
170 /* TODO: sreek - Not sure if it is a good idea to continue here. We need a
171 * better way to bubble up this error instead of doing an abort() */
172 continue;
173 }
174
175 if (pi->fd_cnt == pi->fd_capacity) {
176 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
177 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
178 }
179
180 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700181 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700182}
183
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700184/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
185 * TODO: sreek - Might have to unref the fds (assuming whether we add a ref to
186 * the fd when adding it to the epollset) */
187/* The caller is expected to hold pi->mu lock before calling this function */
188static void polling_island_clear_fds_locked(polling_island *pi) {
189 int err;
190 size_t i;
191
192 for (i = 0; i < pi->fd_cnt; i++) {
193 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
194
195 if (err < 0 && errno != ENOENT) {
196 gpr_log(GPR_ERROR,
197 "epoll_ctl delete for fds[i]: %d failed with error: %s", i,
198 pi->fds[i]->fd, strerror(errno));
199 /* TODO: sreek - Not sure if it is a good idea to continue here. We need a
200 * better way to bubble up this error instead of doing an abort() */
201 continue;
202 }
203 }
204
205 pi->fd_cnt = 0;
206}
207
208/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
209 * TODO: sreek - Might have to unref the fd (assuming whether we add a ref to
210 * the fd when adding it to the epollset) */
211/* The caller is expected to hold pi->mu lock before calling this function */
212static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd) {
213 int err;
214 size_t i;
215 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
216 if (err < 0 && errno != ENOENT) {
217 gpr_log(GPR_ERROR, "epoll_ctl delete for fd: %d failed with error; %s",
218 fd->fd, strerror(errno));
219 }
220
221 for (i = 0; i < pi->fd_cnt; i++) {
222 if (pi->fds[i] == fd) {
223 pi->fds[i] = pi->fds[--pi->fd_cnt];
224 break;
225 }
226 }
227}
228
229static polling_island *polling_island_create(grpc_fd *initial_fd,
230 int initial_ref_cnt) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700231 polling_island *pi = NULL;
232 gpr_mu_lock(&g_pi_freelist_mu);
233 if (g_pi_freelist != NULL) {
234 pi = g_pi_freelist;
235 g_pi_freelist = g_pi_freelist->next_free;
236 pi->next_free = NULL;
237 }
238 gpr_mu_unlock(&g_pi_freelist_mu);
239
240 /* Create new polling island if we could not get one from the free list */
241 if (pi == NULL) {
242 pi = gpr_malloc(sizeof(*pi));
243 gpr_mu_init(&pi->mu);
244 pi->fd_cnt = 0;
245 pi->fd_capacity = 0;
246 pi->fds = NULL;
247
248 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
249 if (pi->epoll_fd < 0) {
250 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
251 strerror(errno));
252 }
253 GPR_ASSERT(pi->epoll_fd >= 0);
254 }
255
256 pi->ref_cnt = initial_ref_cnt;
257 pi->merged_to = NULL;
258 pi->next_free = NULL;
259
260 if (initial_fd != NULL) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700261 /* polling_island_add_fds_locked() expects the caller to hold a pi->mu
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700262 * lock. However, since this is a new polling island (and no one has a
263 * reference to it yet), it is okay to not acquire pi->mu here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700264 polling_island_add_fds_locked(pi, &initial_fd, 1);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700265 }
266
267 return pi;
268}
269
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700270static void polling_island_delete(polling_island *pi) {
271 GPR_ASSERT(pi->ref_cnt == 0);
272 GPR_ASSERT(pi->fd_cnt == 0);
273
274 pi->merged_to = NULL;
275
276 gpr_mu_lock(&g_pi_freelist_mu);
277 pi->next_free = g_pi_freelist;
278 g_pi_freelist = pi;
279 gpr_mu_unlock(&g_pi_freelist_mu);
280}
281
282void polling_island_unref_and_unlock(polling_island *pi, int unref_by) {
283 pi->ref_cnt -= unref_by;
284 int ref_cnt = pi->ref_cnt;
285 GPR_ASSERT(ref_cnt >= 0);
286
287 gpr_mu_unlock(&pi->mu);
288
289 if (ref_cnt == 0) {
290 polling_island_delete(pi);
291 }
292}
293
294polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
295 int add_ref_by) {
296 polling_island *next = NULL;
297 gpr_mu_lock(&pi->mu);
298 while (pi->merged_to != NULL) {
299 next = pi->merged_to;
300 polling_island_unref_and_unlock(pi, unref_by);
301 pi = next;
302 gpr_mu_lock(&pi->mu);
303 }
304
305 pi->ref_cnt += add_ref_by;
306 return pi;
307}
308
309void polling_island_pair_update_and_lock(polling_island **p,
310 polling_island **q) {
311 polling_island *pi_1 = *p;
312 polling_island *pi_2 = *q;
313 polling_island *temp = NULL;
314 bool pi_1_locked = false;
315 bool pi_2_locked = false;
316 int num_swaps = 0;
317
318 while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) {
319 // pi_1 is NOT equal to pi_2
320 // pi_1 MAY be locked
321
322 if (pi_1 > pi_2) {
323 if (pi_1_locked) {
324 gpr_mu_unlock(&pi_1->mu);
325 pi_1_locked = false;
326 }
327
328 GPR_SWAP(polling_island *, pi_1, pi_2);
329 num_swaps++;
330 }
331
332 // p1 < p2
333 // p1 MAY BE locked
334 // p2 is NOT locked
335
336 if (!pi_1_locked) {
337 gpr_mu_lock(&pi_1->mu);
338 pi_1_locked = true;
339
340 if (pi_1->merged_to != NULL) {
341 temp = pi_1->merged_to;
342 polling_island_unref_and_unlock(pi_1, 1);
343 pi_1 = temp;
344 pi_1_locked = false;
345
346 continue;
347 }
348 }
349
350 // p1 is LOCKED
351 // p2 is UNLOCKED
352 // p1 != p2
353
354 gpr_mu_lock(&pi_2->mu);
355 pi_2_locked = true;
356
357 if (pi_2->merged_to != NULL) {
358 temp = pi_2->merged_to;
359 polling_island_unref_and_unlock(pi_2, 1);
360 pi_2 = temp;
361 pi_2_locked = false;
362 }
363 }
364
365 // Either pi_1 == pi_2 OR we got both locks!
366 if (pi_1 == pi_2) {
367 GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
368 if (!pi_1_locked) {
369 pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
370 }
371 } else {
372 GPR_ASSERT(pi_1_locked && pi_2_locked);
373 if (num_swaps % 2 > 0) {
374 GPR_SWAP(polling_island *, pi_1, pi_2);
375 }
376 }
377
378 *p = pi_1;
379 *q = pi_2;
380}
381
382polling_island *polling_island_merge(polling_island *p, polling_island *q) {
383 polling_island *merged = NULL;
384
385 polling_island_pair_update_and_lock(&p, &q);
386
387 /* TODO: sreek: Think about this scenario some more. Is it possible ?. what
388 * does it mean, when would this happen */
389 if (p == q) {
390 merged = p;
391 }
392
393 // Move all the fds from polling_island p to polling_island q
394 polling_island_add_fds_locked(q, p->fds, p->fd_cnt);
395 polling_island_clear_fds_locked(p);
396
397 q->ref_cnt += p->ref_cnt;
398
399 gpr_mu_unlock(&p->mu);
400 gpr_mu_unlock(&q->mu);
401
402 return merged;
403}
404
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700405static void polling_island_global_init() {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700406 gpr_mu_init(&g_pi_freelist_mu);
407 g_pi_freelist = NULL;
408}
409
410/*******************************************************************************
411 * pollset declarations
412 */
413
414typedef struct grpc_cached_wakeup_fd {
415 grpc_wakeup_fd fd;
416 struct grpc_cached_wakeup_fd *next;
417} grpc_cached_wakeup_fd;
418
419struct grpc_pollset_worker {
420 grpc_cached_wakeup_fd *wakeup_fd;
421 int reevaluate_polling_on_wakeup;
422 int kicked_specifically;
423 pthread_t pt_id;
424 struct grpc_pollset_worker *next;
425 struct grpc_pollset_worker *prev;
426};
427
428struct grpc_pollset {
429 gpr_mu mu;
430 grpc_pollset_worker root_worker;
431 int shutting_down;
432 int called_shutdown;
433 int kicked_without_pollers;
434 grpc_closure *shutdown_done;
435
436 int epoll_fd;
437
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700438 /* Mutex protecting the 'polling_island' field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700439 gpr_mu pi_mu;
440
441 /* The polling island to which this fd belongs to. An fd belongs to exactly
442 one polling island */
443 struct polling_island *polling_island;
444
445 /* Local cache of eventfds for workers */
446 grpc_cached_wakeup_fd *local_wakeup_cache;
447};
448
449/* Add an fd to a pollset */
450static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
451 struct grpc_fd *fd);
452
453static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
454 grpc_pollset_set *pollset_set, grpc_fd *fd);
455
456/* Convert a timespec to milliseconds:
457 - very small or negative poll times are clamped to zero to do a
458 non-blocking poll (which becomes spin polling)
459 - other small values are rounded up to one millisecond
460 - longer than a millisecond polls are rounded up to the next nearest
461 millisecond to avoid spinning
462 - infinite timeouts are converted to -1 */
463static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
464 gpr_timespec now);
465
466/* Allow kick to wakeup the currently polling worker */
467#define GRPC_POLLSET_CAN_KICK_SELF 1
468/* Force the wakee to repoll when awoken */
469#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
470/* As per pollset_kick, with an extended set of flags (defined above)
471 -- mostly for fd_posix's use. */
472static void pollset_kick_ext(grpc_pollset *p,
473 grpc_pollset_worker *specific_worker,
474 uint32_t flags);
475
476/* turn a pollset into a multipoller: platform specific */
477typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
478 grpc_pollset *pollset,
479 struct grpc_fd **fds,
480 size_t fd_count);
481
482/* Return 1 if the pollset has active threads in pollset_work (pollset must
483 * be locked) */
484static int pollset_has_workers(grpc_pollset *pollset);
485
486static void remove_fd_from_all_epoll_sets(int fd);
487
488/*******************************************************************************
489 * pollset_set definitions
490 */
491
492struct grpc_pollset_set {
493 gpr_mu mu;
494
495 size_t pollset_count;
496 size_t pollset_capacity;
497 grpc_pollset **pollsets;
498
499 size_t pollset_set_count;
500 size_t pollset_set_capacity;
501 struct grpc_pollset_set **pollset_sets;
502
503 size_t fd_count;
504 size_t fd_capacity;
505 grpc_fd **fds;
506};
507
508/*******************************************************************************
509 * fd_posix.c
510 */
511
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700512/* We need to keep a freelist not because of any concerns of malloc
513 * performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700514 * but instead so that implementations with multiple threads in (for example)
515 * epoll_wait deal with the race between pollset removal and incoming poll
516 * notifications.
517 *
518 * The problem is that the poller ultimately holds a reference to this
519 * object, so it is very difficult to know when is safe to free it, at least
520 * without some expensive synchronization.
521 *
522 * If we keep the object freelisted, in the worst case losing this race just
523 * becomes a spurious read notification on a reused fd.
524 */
525/* TODO(klempner): We could use some form of polling generation count to know
526 * when these are safe to free. */
527/* TODO(klempner): Consider disabling freelisting if we don't have multiple
528 * threads in poll on the same fd */
529/* TODO(klempner): Batch these allocations to reduce fragmentation */
530static grpc_fd *fd_freelist = NULL;
531static gpr_mu fd_freelist_mu;
532
533static void freelist_fd(grpc_fd *fd) {
534 gpr_mu_lock(&fd_freelist_mu);
535 fd->freelist_next = fd_freelist;
536 fd_freelist = fd;
537 grpc_iomgr_unregister_object(&fd->iomgr_object);
538 gpr_mu_unlock(&fd_freelist_mu);
539}
540
541static grpc_fd *alloc_fd(int fd) {
542 grpc_fd *r = NULL;
543
544 gpr_mu_lock(&fd_freelist_mu);
545 if (fd_freelist != NULL) {
546 r = fd_freelist;
547 fd_freelist = fd_freelist->freelist_next;
548 }
549 gpr_mu_unlock(&fd_freelist_mu);
550
551 if (r == NULL) {
552 r = gpr_malloc(sizeof(grpc_fd));
553 gpr_mu_init(&r->mu);
554 gpr_mu_init(&r->pi_mu);
555 }
556
557 /* TODO: sreek - check with ctiller on why we need to acquire a lock here */
558 gpr_mu_lock(&r->mu);
559 gpr_atm_rel_store(&r->refst, 1);
560 r->shutdown = 0;
561 r->read_closure = CLOSURE_NOT_READY;
562 r->write_closure = CLOSURE_NOT_READY;
563 r->fd = fd;
564 r->polling_island = NULL;
565 r->freelist_next = NULL;
566 r->on_done_closure = NULL;
567 r->closed = 0;
568 r->released = 0;
569 gpr_mu_unlock(&r->mu);
570 return r;
571}
572
573static void destroy(grpc_fd *fd) {
574 gpr_mu_destroy(&fd->mu);
575 gpr_free(fd);
576}
577
578#ifdef GRPC_FD_REF_COUNT_DEBUG
579#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
580#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
581static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
582 int line) {
583 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
584 gpr_atm_no_barrier_load(&fd->refst),
585 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
586#else
587#define REF_BY(fd, n, reason) ref_by(fd, n)
588#define UNREF_BY(fd, n, reason) unref_by(fd, n)
589static void ref_by(grpc_fd *fd, int n) {
590#endif
591 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
592}
593
594#ifdef GRPC_FD_REF_COUNT_DEBUG
595static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
596 int line) {
597 gpr_atm old;
598 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
599 gpr_atm_no_barrier_load(&fd->refst),
600 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
601#else
602static void unref_by(grpc_fd *fd, int n) {
603 gpr_atm old;
604#endif
605 old = gpr_atm_full_fetch_add(&fd->refst, -n);
606 if (old == n) {
607 freelist_fd(fd);
608 } else {
609 GPR_ASSERT(old > n);
610 }
611}
612
613static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
614
615static void fd_global_shutdown(void) {
616 gpr_mu_lock(&fd_freelist_mu);
617 gpr_mu_unlock(&fd_freelist_mu);
618 while (fd_freelist != NULL) {
619 grpc_fd *fd = fd_freelist;
620 fd_freelist = fd_freelist->freelist_next;
621 destroy(fd);
622 }
623 gpr_mu_destroy(&fd_freelist_mu);
624}
625
626static grpc_fd *fd_create(int fd, const char *name) {
627 grpc_fd *r = alloc_fd(fd);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700628
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700629 char *name2;
630 gpr_asprintf(&name2, "%s fd=%d", name, fd);
631 grpc_iomgr_register_object(&r->iomgr_object, name2);
632 gpr_free(name2);
633#ifdef GRPC_FD_REF_COUNT_DEBUG
634 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
635#endif
636 return r;
637}
638
639static bool fd_is_orphaned(grpc_fd *fd) {
640 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
641}
642
643static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
644 fd->closed = 1;
645 if (!fd->released) {
646 close(fd->fd);
647 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700648 /* TODO: sreek - Check for deadlocks */
649
650 gpr_mu_lock(&fd->pi_mu);
651 fd->polling_island =
652 polling_island_update_and_lock(fd->polling_island, 1, 0);
653
654 polling_island_remove_fd_locked(fd->polling_island, fd);
655 polling_island_unref_and_unlock(fd->polling_island, 1);
656
657 fd->polling_island = NULL;
658 gpr_mu_unlock(&fd->pi_mu);
659
660
661 /* TODO: sreek - This should be no longer needed */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700662 remove_fd_from_all_epoll_sets(fd->fd);
663 }
664 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
665}
666
667static int fd_wrapped_fd(grpc_fd *fd) {
668 if (fd->released || fd->closed) {
669 return -1;
670 } else {
671 return fd->fd;
672 }
673}
674
675/* TODO: sreek - do something here with the pollset island link */
676static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
677 grpc_closure *on_done, int *release_fd,
678 const char *reason) {
679 fd->on_done_closure = on_done;
680 fd->released = release_fd != NULL;
681 if (!fd->released) {
682 shutdown(fd->fd, SHUT_RDWR);
683 } else {
684 *release_fd = fd->fd;
685 }
686 gpr_mu_lock(&fd->mu);
687 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
688 close_fd_locked(exec_ctx, fd);
689 gpr_mu_unlock(&fd->mu);
690 UNREF_BY(fd, 2, reason); /* drop the reference */
691}
692
693/* increment refcount by two to avoid changing the orphan bit */
694#ifdef GRPC_FD_REF_COUNT_DEBUG
695static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
696 int line) {
697 ref_by(fd, 2, reason, file, line);
698}
699
700static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
701 int line) {
702 unref_by(fd, 2, reason, file, line);
703}
704#else
705static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
706
707static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
708#endif
709
710static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
711 grpc_closure **st, grpc_closure *closure) {
712 if (*st == CLOSURE_NOT_READY) {
713 /* not ready ==> switch to a waiting state by setting the closure */
714 *st = closure;
715 } else if (*st == CLOSURE_READY) {
716 /* already ready ==> queue the closure to run immediately */
717 *st = CLOSURE_NOT_READY;
718 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
719 } else {
720 /* upcallptr was set to a different closure. This is an error! */
721 gpr_log(GPR_ERROR,
722 "User called a notify_on function with a previous callback still "
723 "pending");
724 abort();
725 }
726}
727
728/* returns 1 if state becomes not ready */
729static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
730 grpc_closure **st) {
731 if (*st == CLOSURE_READY) {
732 /* duplicate ready ==> ignore */
733 return 0;
734 } else if (*st == CLOSURE_NOT_READY) {
735 /* not ready, and not waiting ==> flag ready */
736 *st = CLOSURE_READY;
737 return 0;
738 } else {
739 /* waiting ==> queue closure */
740 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
741 *st = CLOSURE_NOT_READY;
742 return 1;
743 }
744}
745
746/* Do something here with the pollset island link (?) */
747static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
748 gpr_mu_lock(&fd->mu);
749 GPR_ASSERT(!fd->shutdown);
750 fd->shutdown = 1;
751 set_ready_locked(exec_ctx, fd, &fd->read_closure);
752 set_ready_locked(exec_ctx, fd, &fd->write_closure);
753 gpr_mu_unlock(&fd->mu);
754}
755
756static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
757 grpc_closure *closure) {
758 gpr_mu_lock(&fd->mu);
759 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
760 gpr_mu_unlock(&fd->mu);
761}
762
763static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
764 grpc_closure *closure) {
765 gpr_mu_lock(&fd->mu);
766 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
767 gpr_mu_unlock(&fd->mu);
768}
769
770/*******************************************************************************
771 * pollset_posix.c
772 */
773
774GPR_TLS_DECL(g_current_thread_poller);
775GPR_TLS_DECL(g_current_thread_worker);
776
777/** The alarm system needs to be able to wakeup 'some poller' sometimes
778 * (specifically when a new alarm needs to be triggered earlier than the next
779 * alarm 'epoch').
780 * This wakeup_fd gives us something to alert on when such a case occurs. */
781grpc_wakeup_fd grpc_global_wakeup_fd;
782
783static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
784 worker->prev->next = worker->next;
785 worker->next->prev = worker->prev;
786}
787
788static int pollset_has_workers(grpc_pollset *p) {
789 return p->root_worker.next != &p->root_worker;
790}
791
792static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
793 if (pollset_has_workers(p)) {
794 grpc_pollset_worker *w = p->root_worker.next;
795 remove_worker(p, w);
796 return w;
797 } else {
798 return NULL;
799 }
800}
801
802static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
803 worker->next = &p->root_worker;
804 worker->prev = worker->next->prev;
805 worker->prev->next = worker->next->prev = worker;
806}
807
808static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
809 worker->prev = &p->root_worker;
810 worker->next = worker->prev->next;
811 worker->prev->next = worker->next->prev = worker;
812}
813
814static void pollset_kick_ext(grpc_pollset *p,
815 grpc_pollset_worker *specific_worker,
816 uint32_t flags) {
817 GPR_TIMER_BEGIN("pollset_kick_ext", 0);
818
819 /* pollset->mu already held */
820 if (specific_worker != NULL) {
821 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
822 GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
823 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
824 for (specific_worker = p->root_worker.next;
825 specific_worker != &p->root_worker;
826 specific_worker = specific_worker->next) {
827 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
828 }
829 p->kicked_without_pollers = 1;
830 GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
831 } else if (gpr_tls_get(&g_current_thread_worker) !=
832 (intptr_t)specific_worker) {
833 GPR_TIMER_MARK("different_thread_worker", 0);
834 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
835 specific_worker->reevaluate_polling_on_wakeup = 1;
836 }
837 specific_worker->kicked_specifically = 1;
838 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
839 /* TODO (sreek): Refactor this into a separate file*/
840 pthread_kill(specific_worker->pt_id, SIGUSR1);
841 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
842 GPR_TIMER_MARK("kick_yoself", 0);
843 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
844 specific_worker->reevaluate_polling_on_wakeup = 1;
845 }
846 specific_worker->kicked_specifically = 1;
847 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
848 }
849 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
850 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
851 GPR_TIMER_MARK("kick_anonymous", 0);
852 specific_worker = pop_front_worker(p);
853 if (specific_worker != NULL) {
854 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
855 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
856 push_back_worker(p, specific_worker);
857 specific_worker = pop_front_worker(p);
858 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
859 gpr_tls_get(&g_current_thread_worker) ==
860 (intptr_t)specific_worker) {
861 push_back_worker(p, specific_worker);
862 specific_worker = NULL;
863 }
864 }
865 if (specific_worker != NULL) {
866 GPR_TIMER_MARK("finally_kick", 0);
867 push_back_worker(p, specific_worker);
868 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
869 }
870 } else {
871 GPR_TIMER_MARK("kicked_no_pollers", 0);
872 p->kicked_without_pollers = 1;
873 }
874 }
875
876 GPR_TIMER_END("pollset_kick_ext", 0);
877}
878
879static void pollset_kick(grpc_pollset *p,
880 grpc_pollset_worker *specific_worker) {
881 pollset_kick_ext(p, specific_worker, 0);
882}
883
884/* global state management */
885
886static void sig_handler(int sig_num) {
887 gpr_log(GPR_INFO, "Received signal %d", sig_num);
888}
889
890static void pollset_global_init(void) {
891 gpr_tls_init(&g_current_thread_poller);
892 gpr_tls_init(&g_current_thread_worker);
893 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
894 signal(SIGUSR1, sig_handler);
895}
896
897static void pollset_global_shutdown(void) {
898 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
899 gpr_tls_destroy(&g_current_thread_poller);
900 gpr_tls_destroy(&g_current_thread_worker);
901}
902
903static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
904
905/* TODO: sreek. Try to Remove this forward declaration*/
906static void multipoll_with_epoll_pollset_create_efd(grpc_pollset *pollset);
907
908/* main interface */
909
910static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
911 gpr_mu_init(&pollset->mu);
912 *mu = &pollset->mu;
913 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
914 gpr_mu_init(&pollset->pi_mu);
915 pollset->polling_island = NULL;
916 pollset->shutting_down = 0;
917 pollset->called_shutdown = 0;
918 pollset->kicked_without_pollers = 0;
919 pollset->local_wakeup_cache = NULL;
920 pollset->kicked_without_pollers = 0;
921
922 multipoll_with_epoll_pollset_create_efd(pollset);
923}
924
925/* TODO(sreek): Maybe merge multipoll_*_destroy() with pollset_destroy()
926 * function */
927static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset);
928
929static void pollset_destroy(grpc_pollset *pollset) {
930 GPR_ASSERT(!pollset_has_workers(pollset));
931
932 multipoll_with_epoll_pollset_destroy(pollset);
933
934 while (pollset->local_wakeup_cache) {
935 grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
936 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
937 gpr_free(pollset->local_wakeup_cache);
938 pollset->local_wakeup_cache = next;
939 }
940 gpr_mu_destroy(&pollset->pi_mu);
941 gpr_mu_destroy(&pollset->mu);
942}
943
944/* TODO(sreek) - Do something with the pollset island link (??) */
945static void pollset_reset(grpc_pollset *pollset) {
946 GPR_ASSERT(pollset->shutting_down);
947 GPR_ASSERT(!pollset_has_workers(pollset));
948 pollset->shutting_down = 0;
949 pollset->called_shutdown = 0;
950 pollset->kicked_without_pollers = 0;
951}
952
953/* TODO (sreek): Remove multipoll_with_epoll_finish_shutdown() declaration */
954static void multipoll_with_epoll_pollset_finish_shutdown(grpc_pollset *pollset);
955
956static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
957 multipoll_with_epoll_pollset_finish_shutdown(pollset);
958 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
959}
960
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700961/* TODO(sreek): Remove multipoll_with_epoll_*_maybe_work_and_unlock
962 * declaration
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700963 */
964static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
965 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
966 gpr_timespec deadline, gpr_timespec now);
967
968static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
969 grpc_pollset_worker **worker_hdl, gpr_timespec now,
970 gpr_timespec deadline) {
971 grpc_pollset_worker worker;
972 *worker_hdl = &worker;
973
974 /* pollset->mu already held */
975 int added_worker = 0;
976 int locked = 1;
977 int queued_work = 0;
978 int keep_polling = 0;
979 GPR_TIMER_BEGIN("pollset_work", 0);
980 /* this must happen before we (potentially) drop pollset->mu */
981 worker.next = worker.prev = NULL;
982 worker.reevaluate_polling_on_wakeup = 0;
983 if (pollset->local_wakeup_cache != NULL) {
984 worker.wakeup_fd = pollset->local_wakeup_cache;
985 pollset->local_wakeup_cache = worker.wakeup_fd->next;
986 } else {
987 worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
988 grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
989 }
990 worker.kicked_specifically = 0;
991
992 /* TODO(sreek): Abstract this thread id stuff out into a separate file */
993 worker.pt_id = pthread_self();
994 /* If we're shutting down then we don't execute any extended work */
995 if (pollset->shutting_down) {
996 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
997 goto done;
998 }
999 /* Start polling, and keep doing so while we're being asked to
1000 re-evaluate our pollers (this allows poll() based pollers to
1001 ensure they don't miss wakeups) */
1002 keep_polling = 1;
1003 while (keep_polling) {
1004 keep_polling = 0;
1005 if (!pollset->kicked_without_pollers) {
1006 if (!added_worker) {
1007 push_front_worker(pollset, &worker);
1008 added_worker = 1;
1009 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1010 }
1011 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
1012 GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
1013
1014 multipoll_with_epoll_pollset_maybe_work_and_unlock(
1015 exec_ctx, pollset, &worker, deadline, now);
1016
1017 GPR_TIMER_END("maybe_work_and_unlock", 0);
1018 locked = 0;
1019 gpr_tls_set(&g_current_thread_poller, 0);
1020 } else {
1021 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1022 pollset->kicked_without_pollers = 0;
1023 }
1024 /* Finished execution - start cleaning up.
1025 Note that we may arrive here from outside the enclosing while() loop.
1026 In that case we won't loop though as we haven't added worker to the
1027 worker list, which means nobody could ask us to re-evaluate polling). */
1028 done:
1029 if (!locked) {
1030 queued_work |= grpc_exec_ctx_flush(exec_ctx);
1031 gpr_mu_lock(&pollset->mu);
1032 locked = 1;
1033 }
1034 /* If we're forced to re-evaluate polling (via pollset_kick with
1035 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1036 a loop */
1037 if (worker.reevaluate_polling_on_wakeup) {
1038 worker.reevaluate_polling_on_wakeup = 0;
1039 pollset->kicked_without_pollers = 0;
1040 if (queued_work || worker.kicked_specifically) {
1041 /* If there's queued work on the list, then set the deadline to be
1042 immediate so we get back out of the polling loop quickly */
1043 deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
1044 }
1045 keep_polling = 1;
1046 }
1047 }
1048 if (added_worker) {
1049 remove_worker(pollset, &worker);
1050 gpr_tls_set(&g_current_thread_worker, 0);
1051 }
1052 /* release wakeup fd to the local pool */
1053 worker.wakeup_fd->next = pollset->local_wakeup_cache;
1054 pollset->local_wakeup_cache = worker.wakeup_fd;
1055 /* check shutdown conditions */
1056 if (pollset->shutting_down) {
1057 if (pollset_has_workers(pollset)) {
1058 pollset_kick(pollset, NULL);
1059 } else if (!pollset->called_shutdown) {
1060 pollset->called_shutdown = 1;
1061 gpr_mu_unlock(&pollset->mu);
1062 finish_shutdown(exec_ctx, pollset);
1063 grpc_exec_ctx_flush(exec_ctx);
1064 /* Continuing to access pollset here is safe -- it is the caller's
1065 * responsibility to not destroy when it has outstanding calls to
1066 * pollset_work.
1067 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1068 gpr_mu_lock(&pollset->mu);
1069 }
1070 }
1071 *worker_hdl = NULL;
1072 GPR_TIMER_END("pollset_work", 0);
1073}
1074
1075/* TODO: (sreek) Do something with the pollset island link */
1076static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1077 grpc_closure *closure) {
1078 GPR_ASSERT(!pollset->shutting_down);
1079 pollset->shutting_down = 1;
1080 pollset->shutdown_done = closure;
1081 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1082
1083 if (!pollset->called_shutdown && !pollset_has_workers(pollset)) {
1084 pollset->called_shutdown = 1;
1085 finish_shutdown(exec_ctx, pollset);
1086 }
1087}
1088
1089static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1090 gpr_timespec now) {
1091 gpr_timespec timeout;
1092 static const int64_t max_spin_polling_us = 10;
1093 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1094 return -1;
1095 }
1096 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1097 max_spin_polling_us,
1098 GPR_TIMESPAN))) <= 0) {
1099 return 0;
1100 }
1101 timeout = gpr_time_sub(deadline, now);
1102 return gpr_time_to_millis(gpr_time_add(
1103 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1104}
1105
1106/*******************************************************************************
1107 * pollset_multipoller_with_epoll_posix.c
1108 */
1109
1110static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
1111 /* only one set_ready can be active at once (but there may be a racing
1112 notify_on) */
1113 gpr_mu_lock(&fd->mu);
1114 set_ready_locked(exec_ctx, fd, st);
1115 gpr_mu_unlock(&fd->mu);
1116}
1117
1118static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1119 set_ready(exec_ctx, fd, &fd->read_closure);
1120}
1121
1122static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1123 set_ready(exec_ctx, fd, &fd->write_closure);
1124}
1125
1126/* TODO (sreek): Maybe this global list is not required. Double check*/
1127struct epoll_fd_list {
1128 int *epoll_fds;
1129 size_t count;
1130 size_t capacity;
1131};
1132
1133static struct epoll_fd_list epoll_fd_global_list;
1134static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT;
1135static gpr_mu epoll_fd_list_mu;
1136
1137static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); }
1138
1139static void add_epoll_fd_to_global_list(int epoll_fd) {
1140 gpr_once_init(&init_epoll_fd_list_mu, init_mu);
1141
1142 gpr_mu_lock(&epoll_fd_list_mu);
1143 if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) {
1144 epoll_fd_global_list.capacity =
1145 GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2);
1146 epoll_fd_global_list.epoll_fds =
1147 gpr_realloc(epoll_fd_global_list.epoll_fds,
1148 epoll_fd_global_list.capacity * sizeof(int));
1149 }
1150 epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd;
1151 gpr_mu_unlock(&epoll_fd_list_mu);
1152}
1153
1154static void remove_epoll_fd_from_global_list(int epoll_fd) {
1155 gpr_mu_lock(&epoll_fd_list_mu);
1156 GPR_ASSERT(epoll_fd_global_list.count > 0);
1157 for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
1158 if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) {
1159 epoll_fd_global_list.epoll_fds[i] =
1160 epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)];
1161 break;
1162 }
1163 }
1164 gpr_mu_unlock(&epoll_fd_list_mu);
1165}
1166
1167static void remove_fd_from_all_epoll_sets(int fd) {
1168 int err;
1169 gpr_once_init(&init_epoll_fd_list_mu, init_mu);
1170 gpr_mu_lock(&epoll_fd_list_mu);
1171 if (epoll_fd_global_list.count == 0) {
1172 gpr_mu_unlock(&epoll_fd_list_mu);
1173 return;
1174 }
1175 for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
1176 err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL);
1177 if (err < 0 && errno != ENOENT) {
1178 gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd,
1179 strerror(errno));
1180 }
1181 }
1182 gpr_mu_unlock(&epoll_fd_list_mu);
1183}
1184
1185/* TODO: sreek - This function multipoll_with_epoll_pollset_add_fd() and
1186 * finally_add_fd() in ev_poll_and_epoll_posix.c */
1187static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1188 grpc_fd *fd) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001189 /* TODO sreek - Check if we need to get a pollset->mu lock here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001190 gpr_mu_lock(&pollset->pi_mu);
1191 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001192
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001193 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001194
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001195 if (fd->polling_island == pollset->polling_island) {
1196 pi_new = fd->polling_island;
1197 if (pi_new == NULL) {
1198 pi_new = polling_island_create(fd, 2);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001199 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001200 } else if (fd->polling_island == NULL) {
1201 pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
1202
1203 } else if (pollset->polling_island == NULL) {
1204 pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
1205 } else { // Non null and different
1206 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001207 }
1208
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001209 fd->polling_island = pollset->polling_island = pi_new;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001210
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001211 gpr_mu_unlock(&fd->pi_mu);
1212 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001213}
1214
1215/* Creates an epoll fd and initializes the pollset */
1216/* TODO: This has to be called ONLY from pollset_init function. and hence it
1217 * does not acquire any lock */
1218static void multipoll_with_epoll_pollset_create_efd(grpc_pollset *pollset) {
1219 struct epoll_event ev;
1220 int err;
1221
1222 pollset->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1223 if (pollset->epoll_fd < 0) {
1224 gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
1225 abort();
1226 }
1227 add_epoll_fd_to_global_list(pollset->epoll_fd);
1228
1229 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
1230 ev.data.ptr = NULL;
1231
1232 err = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD,
1233 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
1234 if (err < 0) {
1235 gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s",
1236 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
1237 strerror(errno));
1238 }
1239}
1240
1241/* TODO(klempner): We probably want to turn this down a bit */
1242#define GRPC_EPOLL_MAX_EVENTS 1000
1243
1244static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
1245 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
1246 gpr_timespec deadline, gpr_timespec now) {
1247 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
1248 int epoll_fd = pollset->epoll_fd;
1249 int ep_rv;
1250 int poll_rv;
1251 int timeout_ms;
1252 struct pollfd pfds[2];
1253
1254 /* If you want to ignore epoll's ability to sanely handle parallel pollers,
1255 * for a more apples-to-apples performance comparison with poll, add a
1256 * if (pollset->counter != 0) { return 0; }
1257 * here.
1258 */
1259
1260 gpr_mu_unlock(&pollset->mu);
1261
1262 timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1263
1264 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
1265 pfds[0].events = POLLIN;
1266 pfds[0].revents = 0;
1267 pfds[1].fd = epoll_fd;
1268 pfds[1].events = POLLIN;
1269 pfds[1].revents = 0;
1270
1271 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1272 even going into the blocking annotation if possible */
1273 GPR_TIMER_BEGIN("poll", 0);
1274 GRPC_SCHEDULING_START_BLOCKING_REGION;
1275 poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
1276 GRPC_SCHEDULING_END_BLOCKING_REGION;
1277 GPR_TIMER_END("poll", 0);
1278
1279 if (poll_rv < 0) {
1280 if (errno != EINTR) {
1281 gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
1282 }
1283 } else if (poll_rv == 0) {
1284 /* do nothing */
1285 } else {
1286 if (pfds[0].revents) {
1287 grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
1288 }
1289 if (pfds[1].revents) {
1290 do {
1291 /* The following epoll_wait never blocks; it has a timeout of 0 */
1292 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
1293 if (ep_rv < 0) {
1294 if (errno != EINTR) {
1295 gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
1296 }
1297 } else {
1298 int i;
1299 for (i = 0; i < ep_rv; ++i) {
1300 grpc_fd *fd = ep_ev[i].data.ptr;
1301 /* TODO(klempner): We might want to consider making err and pri
1302 * separate events */
1303 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1304 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1305 int write_ev = ep_ev[i].events & EPOLLOUT;
1306 if (fd == NULL) {
1307 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
1308 } else {
1309 if (read_ev || cancel) {
1310 fd_become_readable(exec_ctx, fd);
1311 }
1312 if (write_ev || cancel) {
1313 fd_become_writable(exec_ctx, fd);
1314 }
1315 }
1316 }
1317 }
1318 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
1319 }
1320 }
1321}
1322
1323static void multipoll_with_epoll_pollset_finish_shutdown(
1324 grpc_pollset *pollset) {}
1325
1326static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
1327 close(pollset->epoll_fd);
1328 remove_epoll_fd_from_global_list(pollset->epoll_fd);
1329}
1330
1331/*******************************************************************************
1332 * pollset_set_posix.c
1333 */
1334
1335static grpc_pollset_set *pollset_set_create(void) {
1336 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1337 memset(pollset_set, 0, sizeof(*pollset_set));
1338 gpr_mu_init(&pollset_set->mu);
1339 return pollset_set;
1340}
1341
1342static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1343 size_t i;
1344 gpr_mu_destroy(&pollset_set->mu);
1345 for (i = 0; i < pollset_set->fd_count; i++) {
1346 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1347 }
1348 gpr_free(pollset_set->pollsets);
1349 gpr_free(pollset_set->pollset_sets);
1350 gpr_free(pollset_set->fds);
1351 gpr_free(pollset_set);
1352}
1353
1354static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1355 grpc_pollset_set *pollset_set,
1356 grpc_pollset *pollset) {
1357 size_t i, j;
1358 gpr_mu_lock(&pollset_set->mu);
1359 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1360 pollset_set->pollset_capacity =
1361 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1362 pollset_set->pollsets =
1363 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1364 sizeof(*pollset_set->pollsets));
1365 }
1366 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1367 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1368 if (fd_is_orphaned(pollset_set->fds[i])) {
1369 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1370 } else {
1371 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1372 pollset_set->fds[j++] = pollset_set->fds[i];
1373 }
1374 }
1375 pollset_set->fd_count = j;
1376 gpr_mu_unlock(&pollset_set->mu);
1377}
1378
1379static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1380 grpc_pollset_set *pollset_set,
1381 grpc_pollset *pollset) {
1382 size_t i;
1383 gpr_mu_lock(&pollset_set->mu);
1384 for (i = 0; i < pollset_set->pollset_count; i++) {
1385 if (pollset_set->pollsets[i] == pollset) {
1386 pollset_set->pollset_count--;
1387 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1388 pollset_set->pollsets[pollset_set->pollset_count]);
1389 break;
1390 }
1391 }
1392 gpr_mu_unlock(&pollset_set->mu);
1393}
1394
1395static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1396 grpc_pollset_set *bag,
1397 grpc_pollset_set *item) {
1398 size_t i, j;
1399 gpr_mu_lock(&bag->mu);
1400 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1401 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1402 bag->pollset_sets =
1403 gpr_realloc(bag->pollset_sets,
1404 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1405 }
1406 bag->pollset_sets[bag->pollset_set_count++] = item;
1407 for (i = 0, j = 0; i < bag->fd_count; i++) {
1408 if (fd_is_orphaned(bag->fds[i])) {
1409 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1410 } else {
1411 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1412 bag->fds[j++] = bag->fds[i];
1413 }
1414 }
1415 bag->fd_count = j;
1416 gpr_mu_unlock(&bag->mu);
1417}
1418
1419static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1420 grpc_pollset_set *bag,
1421 grpc_pollset_set *item) {
1422 size_t i;
1423 gpr_mu_lock(&bag->mu);
1424 for (i = 0; i < bag->pollset_set_count; i++) {
1425 if (bag->pollset_sets[i] == item) {
1426 bag->pollset_set_count--;
1427 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1428 bag->pollset_sets[bag->pollset_set_count]);
1429 break;
1430 }
1431 }
1432 gpr_mu_unlock(&bag->mu);
1433}
1434
1435static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1436 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1437 size_t i;
1438 gpr_mu_lock(&pollset_set->mu);
1439 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1440 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1441 pollset_set->fds = gpr_realloc(
1442 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1443 }
1444 GRPC_FD_REF(fd, "pollset_set");
1445 pollset_set->fds[pollset_set->fd_count++] = fd;
1446 for (i = 0; i < pollset_set->pollset_count; i++) {
1447 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1448 }
1449 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1450 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1451 }
1452 gpr_mu_unlock(&pollset_set->mu);
1453}
1454
1455static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1456 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1457 size_t i;
1458 gpr_mu_lock(&pollset_set->mu);
1459 for (i = 0; i < pollset_set->fd_count; i++) {
1460 if (pollset_set->fds[i] == fd) {
1461 pollset_set->fd_count--;
1462 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1463 pollset_set->fds[pollset_set->fd_count]);
1464 GRPC_FD_UNREF(fd, "pollset_set");
1465 break;
1466 }
1467 }
1468 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1469 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1470 }
1471 gpr_mu_unlock(&pollset_set->mu);
1472}
1473
1474/*******************************************************************************
1475 * event engine binding
1476 */
1477
1478static void shutdown_engine(void) {
1479 fd_global_shutdown();
1480 pollset_global_shutdown();
1481}
1482
1483static const grpc_event_engine_vtable vtable = {
1484 .pollset_size = sizeof(grpc_pollset),
1485
1486 .fd_create = fd_create,
1487 .fd_wrapped_fd = fd_wrapped_fd,
1488 .fd_orphan = fd_orphan,
1489 .fd_shutdown = fd_shutdown,
1490 .fd_notify_on_read = fd_notify_on_read,
1491 .fd_notify_on_write = fd_notify_on_write,
1492
1493 .pollset_init = pollset_init,
1494 .pollset_shutdown = pollset_shutdown,
1495 .pollset_reset = pollset_reset,
1496 .pollset_destroy = pollset_destroy,
1497 .pollset_work = pollset_work,
1498 .pollset_kick = pollset_kick,
1499 .pollset_add_fd = pollset_add_fd,
1500
1501 .pollset_set_create = pollset_set_create,
1502 .pollset_set_destroy = pollset_set_destroy,
1503 .pollset_set_add_pollset = pollset_set_add_pollset,
1504 .pollset_set_del_pollset = pollset_set_del_pollset,
1505 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1506 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1507 .pollset_set_add_fd = pollset_set_add_fd,
1508 .pollset_set_del_fd = pollset_set_del_fd,
1509
1510 .kick_poller = kick_poller,
1511
1512 .shutdown_engine = shutdown_engine,
1513};
1514
1515const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
1516 fd_global_init();
1517 pollset_global_init();
1518 polling_island_global_init();
1519 return &vtable;
1520}
1521
1522#endif