blob: 1201c10a7e0b51a957e5fb18ea7bd2c8f22e6c34 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
38#include "src/core/lib/iomgr/ev_epoll_posix.h"
39
40#include <assert.h>
41#include <errno.h>
42#include <poll.h>
43#include <signal.h>
44#include <string.h>
45#include <sys/epoll.h>
46#include <sys/socket.h>
47#include <unistd.h>
48
49#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/string_util.h>
52#include <grpc/support/tls.h>
53#include <grpc/support/useful.h>
54
55#include "src/core/lib/iomgr/ev_posix.h"
56#include "src/core/lib/iomgr/iomgr_internal.h"
57#include "src/core/lib/iomgr/wakeup_fd_posix.h"
58#include "src/core/lib/profiling/timers.h"
59#include "src/core/lib/support/block_annotate.h"
60
61struct polling_island;
62
63/*******************************************************************************
64 * FD declarations
65 */
66
67struct grpc_fd {
68 int fd;
69 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070070 bit 0 : 1=Active / 0=Orphaned
71 bits 1-n : refcount
72 - ref/unref by two to avoid altering the orphaned bit
73 - To orphan, unref by 1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074 gpr_atm refst;
75
76 gpr_mu mu;
77 int shutdown;
78 int closed;
79 int released;
80
81 grpc_closure *read_closure;
82 grpc_closure *write_closure;
83
84 /* Mutex protecting the 'polling_island' field */
85 gpr_mu pi_mu;
86
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070087 /* The polling island to which this fd belongs to.
88 * An fd belongs to exactly one polling island */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070089 struct polling_island *polling_island;
90
91 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 grpc_closure *on_done_closure;
93
94 grpc_iomgr_object iomgr_object;
95};
96
97/* Return 1 if this fd is orphaned, 0 otherwise */
98static bool fd_is_orphaned(grpc_fd *fd);
99
100/* Reference counting for fds */
101/*#define GRPC_FD_REF_COUNT_DEBUG*/
102#ifdef GRPC_FD_REF_COUNT_DEBUG
103static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
104static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
105 int line);
106#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
107#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
108#else
109static void fd_ref(grpc_fd *fd);
110static void fd_unref(grpc_fd *fd);
111#define GRPC_FD_REF(fd, reason) fd_ref(fd)
112#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
113#endif
114
115static void fd_global_init(void);
116static void fd_global_shutdown(void);
117
118#define CLOSURE_NOT_READY ((grpc_closure *)0)
119#define CLOSURE_READY ((grpc_closure *)1)
120
121/*******************************************************************************
122 * Polling Island
123 */
124typedef struct polling_island {
125 gpr_mu mu;
126 int ref_cnt;
127
128 /* Pointer to the polling_island this merged into. If this is not NULL, all
129 the remaining fields in this pollset (i.e all fields except mu and ref_cnt)
130 are considered invalid and must be ignored */
131 struct polling_island *merged_to;
132
133 /* The fd of the underlying epoll set */
134 int epoll_fd;
135
136 /* The file descriptors in the epoll set */
137 size_t fd_cnt;
138 size_t fd_capacity;
139 grpc_fd **fds;
140
141 /* Polling islands that are no longer needed are kept in a freelist so that
142 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700143 free list */
144 struct polling_island *next_free;
145} polling_island;
146
147/* Polling island freelist */
148static gpr_mu g_pi_freelist_mu;
149static polling_island *g_pi_freelist = NULL;
150
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700151/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
152 * TODO: sreek - Should this add a ref to the grpc_fd ? */
153/* The caller is expected to hold pi->mu lock before calling this function */
154static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
155 size_t fd_count) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700156 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700157 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700158 struct epoll_event ev;
159
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700160 for (i = 0; i < fd_count; i++) {
161 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
162 ev.data.ptr = fds[i];
163 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700164
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700165 if (err < 0 && errno != EEXIST) {
166 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
167 fds[i]->fd, strerror(errno));
168 /* TODO: sreek - Not sure if it is a good idea to continue here. We need a
169 * better way to bubble up this error instead of doing an abort() */
170 continue;
171 }
172
173 if (pi->fd_cnt == pi->fd_capacity) {
174 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
175 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
176 }
177
178 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700179 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700180}
181
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700182/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
183 * TODO: sreek - Might have to unref the fds (assuming whether we add a ref to
184 * the fd when adding it to the epollset) */
185/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700186static void polling_island_remove_all_fds_locked(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700187 int err;
188 size_t i;
189
190 for (i = 0; i < pi->fd_cnt; i++) {
191 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
192
193 if (err < 0 && errno != ENOENT) {
194 gpr_log(GPR_ERROR,
195 "epoll_ctl delete for fds[i]: %d failed with error: %s", i,
196 pi->fds[i]->fd, strerror(errno));
197 /* TODO: sreek - Not sure if it is a good idea to continue here. We need a
198 * better way to bubble up this error instead of doing an abort() */
199 continue;
200 }
201 }
202
203 pi->fd_cnt = 0;
204}
205
206/* TODO: sreek - Should we hold a lock on fd or add a ref to the fd ?
207 * TODO: sreek - Might have to unref the fd (assuming whether we add a ref to
208 * the fd when adding it to the epollset) */
209/* The caller is expected to hold pi->mu lock before calling this function */
210static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd) {
211 int err;
212 size_t i;
213 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
214 if (err < 0 && errno != ENOENT) {
215 gpr_log(GPR_ERROR, "epoll_ctl delete for fd: %d failed with error; %s",
216 fd->fd, strerror(errno));
217 }
218
219 for (i = 0; i < pi->fd_cnt; i++) {
220 if (pi->fds[i] == fd) {
221 pi->fds[i] = pi->fds[--pi->fd_cnt];
222 break;
223 }
224 }
225}
226
227static polling_island *polling_island_create(grpc_fd *initial_fd,
228 int initial_ref_cnt) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700229 polling_island *pi = NULL;
230 gpr_mu_lock(&g_pi_freelist_mu);
231 if (g_pi_freelist != NULL) {
232 pi = g_pi_freelist;
233 g_pi_freelist = g_pi_freelist->next_free;
234 pi->next_free = NULL;
235 }
236 gpr_mu_unlock(&g_pi_freelist_mu);
237
238 /* Create new polling island if we could not get one from the free list */
239 if (pi == NULL) {
240 pi = gpr_malloc(sizeof(*pi));
241 gpr_mu_init(&pi->mu);
242 pi->fd_cnt = 0;
243 pi->fd_capacity = 0;
244 pi->fds = NULL;
245
246 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
247 if (pi->epoll_fd < 0) {
248 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
249 strerror(errno));
250 }
251 GPR_ASSERT(pi->epoll_fd >= 0);
252 }
253
254 pi->ref_cnt = initial_ref_cnt;
255 pi->merged_to = NULL;
256 pi->next_free = NULL;
257
258 if (initial_fd != NULL) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700259 /* polling_island_add_fds_locked() expects the caller to hold a pi->mu
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700260 * lock. However, since this is a new polling island (and no one has a
261 * reference to it yet), it is okay to not acquire pi->mu here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700262 polling_island_add_fds_locked(pi, &initial_fd, 1);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700263 }
264
265 return pi;
266}
267
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700268static void polling_island_delete(polling_island *pi) {
269 GPR_ASSERT(pi->ref_cnt == 0);
270 GPR_ASSERT(pi->fd_cnt == 0);
271
272 pi->merged_to = NULL;
273
274 gpr_mu_lock(&g_pi_freelist_mu);
275 pi->next_free = g_pi_freelist;
276 g_pi_freelist = pi;
277 gpr_mu_unlock(&g_pi_freelist_mu);
278}
279
280void polling_island_unref_and_unlock(polling_island *pi, int unref_by) {
281 pi->ref_cnt -= unref_by;
282 int ref_cnt = pi->ref_cnt;
283 GPR_ASSERT(ref_cnt >= 0);
284
285 gpr_mu_unlock(&pi->mu);
286
287 if (ref_cnt == 0) {
288 polling_island_delete(pi);
289 }
290}
291
292polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
293 int add_ref_by) {
294 polling_island *next = NULL;
295 gpr_mu_lock(&pi->mu);
296 while (pi->merged_to != NULL) {
297 next = pi->merged_to;
298 polling_island_unref_and_unlock(pi, unref_by);
299 pi = next;
300 gpr_mu_lock(&pi->mu);
301 }
302
303 pi->ref_cnt += add_ref_by;
304 return pi;
305}
306
307void polling_island_pair_update_and_lock(polling_island **p,
308 polling_island **q) {
309 polling_island *pi_1 = *p;
310 polling_island *pi_2 = *q;
311 polling_island *temp = NULL;
312 bool pi_1_locked = false;
313 bool pi_2_locked = false;
314 int num_swaps = 0;
315
316 while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) {
317 // pi_1 is NOT equal to pi_2
318 // pi_1 MAY be locked
319
320 if (pi_1 > pi_2) {
321 if (pi_1_locked) {
322 gpr_mu_unlock(&pi_1->mu);
323 pi_1_locked = false;
324 }
325
326 GPR_SWAP(polling_island *, pi_1, pi_2);
327 num_swaps++;
328 }
329
330 // p1 < p2
331 // p1 MAY BE locked
332 // p2 is NOT locked
333
334 if (!pi_1_locked) {
335 gpr_mu_lock(&pi_1->mu);
336 pi_1_locked = true;
337
338 if (pi_1->merged_to != NULL) {
339 temp = pi_1->merged_to;
340 polling_island_unref_and_unlock(pi_1, 1);
341 pi_1 = temp;
342 pi_1_locked = false;
343
344 continue;
345 }
346 }
347
348 // p1 is LOCKED
349 // p2 is UNLOCKED
350 // p1 != p2
351
352 gpr_mu_lock(&pi_2->mu);
353 pi_2_locked = true;
354
355 if (pi_2->merged_to != NULL) {
356 temp = pi_2->merged_to;
357 polling_island_unref_and_unlock(pi_2, 1);
358 pi_2 = temp;
359 pi_2_locked = false;
360 }
361 }
362
363 // Either pi_1 == pi_2 OR we got both locks!
364 if (pi_1 == pi_2) {
365 GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
366 if (!pi_1_locked) {
367 pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
368 }
369 } else {
370 GPR_ASSERT(pi_1_locked && pi_2_locked);
371 if (num_swaps % 2 > 0) {
372 GPR_SWAP(polling_island *, pi_1, pi_2);
373 }
374 }
375
376 *p = pi_1;
377 *q = pi_2;
378}
379
380polling_island *polling_island_merge(polling_island *p, polling_island *q) {
381 polling_island *merged = NULL;
382
383 polling_island_pair_update_and_lock(&p, &q);
384
385 /* TODO: sreek: Think about this scenario some more. Is it possible ?. what
386 * does it mean, when would this happen */
387 if (p == q) {
388 merged = p;
389 }
390
391 // Move all the fds from polling_island p to polling_island q
392 polling_island_add_fds_locked(q, p->fds, p->fd_cnt);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700393 polling_island_remove_all_fds_locked(p);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700394
395 q->ref_cnt += p->ref_cnt;
396
397 gpr_mu_unlock(&p->mu);
398 gpr_mu_unlock(&q->mu);
399
400 return merged;
401}
402
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700403static void polling_island_global_init() {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700404 gpr_mu_init(&g_pi_freelist_mu);
405 g_pi_freelist = NULL;
406}
407
408/*******************************************************************************
409 * pollset declarations
410 */
411
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700412struct grpc_pollset_worker {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700413 int kicked_specifically;
414 pthread_t pt_id;
415 struct grpc_pollset_worker *next;
416 struct grpc_pollset_worker *prev;
417};
418
419struct grpc_pollset {
420 gpr_mu mu;
421 grpc_pollset_worker root_worker;
422 int shutting_down;
423 int called_shutdown;
424 int kicked_without_pollers;
425 grpc_closure *shutdown_done;
426
427 int epoll_fd;
428
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700429 /* Mutex protecting the 'polling_island' field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700430 gpr_mu pi_mu;
431
432 /* The polling island to which this fd belongs to. An fd belongs to exactly
433 one polling island */
434 struct polling_island *polling_island;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700435};
436
437/* Add an fd to a pollset */
438static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
439 struct grpc_fd *fd);
440
441static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
442 grpc_pollset_set *pollset_set, grpc_fd *fd);
443
444/* Convert a timespec to milliseconds:
445 - very small or negative poll times are clamped to zero to do a
446 non-blocking poll (which becomes spin polling)
447 - other small values are rounded up to one millisecond
448 - longer than a millisecond polls are rounded up to the next nearest
449 millisecond to avoid spinning
450 - infinite timeouts are converted to -1 */
451static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
452 gpr_timespec now);
453
454/* Allow kick to wakeup the currently polling worker */
455#define GRPC_POLLSET_CAN_KICK_SELF 1
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700456/* As per pollset_kick, with an extended set of flags (defined above)
457 -- mostly for fd_posix's use. */
458static void pollset_kick_ext(grpc_pollset *p,
459 grpc_pollset_worker *specific_worker,
460 uint32_t flags);
461
462/* turn a pollset into a multipoller: platform specific */
463typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
464 grpc_pollset *pollset,
465 struct grpc_fd **fds,
466 size_t fd_count);
467
468/* Return 1 if the pollset has active threads in pollset_work (pollset must
469 * be locked) */
470static int pollset_has_workers(grpc_pollset *pollset);
471
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700472/*******************************************************************************
473 * pollset_set definitions
474 */
475
476struct grpc_pollset_set {
477 gpr_mu mu;
478
479 size_t pollset_count;
480 size_t pollset_capacity;
481 grpc_pollset **pollsets;
482
483 size_t pollset_set_count;
484 size_t pollset_set_capacity;
485 struct grpc_pollset_set **pollset_sets;
486
487 size_t fd_count;
488 size_t fd_capacity;
489 grpc_fd **fds;
490};
491
492/*******************************************************************************
493 * fd_posix.c
494 */
495
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700496/* We need to keep a freelist not because of any concerns of malloc
497 * performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700498 * but instead so that implementations with multiple threads in (for example)
499 * epoll_wait deal with the race between pollset removal and incoming poll
500 * notifications.
501 *
502 * The problem is that the poller ultimately holds a reference to this
503 * object, so it is very difficult to know when is safe to free it, at least
504 * without some expensive synchronization.
505 *
506 * If we keep the object freelisted, in the worst case losing this race just
507 * becomes a spurious read notification on a reused fd.
508 */
509/* TODO(klempner): We could use some form of polling generation count to know
510 * when these are safe to free. */
511/* TODO(klempner): Consider disabling freelisting if we don't have multiple
512 * threads in poll on the same fd */
513/* TODO(klempner): Batch these allocations to reduce fragmentation */
514static grpc_fd *fd_freelist = NULL;
515static gpr_mu fd_freelist_mu;
516
517static void freelist_fd(grpc_fd *fd) {
518 gpr_mu_lock(&fd_freelist_mu);
519 fd->freelist_next = fd_freelist;
520 fd_freelist = fd;
521 grpc_iomgr_unregister_object(&fd->iomgr_object);
522 gpr_mu_unlock(&fd_freelist_mu);
523}
524
525static grpc_fd *alloc_fd(int fd) {
526 grpc_fd *r = NULL;
527
528 gpr_mu_lock(&fd_freelist_mu);
529 if (fd_freelist != NULL) {
530 r = fd_freelist;
531 fd_freelist = fd_freelist->freelist_next;
532 }
533 gpr_mu_unlock(&fd_freelist_mu);
534
535 if (r == NULL) {
536 r = gpr_malloc(sizeof(grpc_fd));
537 gpr_mu_init(&r->mu);
538 gpr_mu_init(&r->pi_mu);
539 }
540
541 /* TODO: sreek - check with ctiller on why we need to acquire a lock here */
542 gpr_mu_lock(&r->mu);
543 gpr_atm_rel_store(&r->refst, 1);
544 r->shutdown = 0;
545 r->read_closure = CLOSURE_NOT_READY;
546 r->write_closure = CLOSURE_NOT_READY;
547 r->fd = fd;
548 r->polling_island = NULL;
549 r->freelist_next = NULL;
550 r->on_done_closure = NULL;
551 r->closed = 0;
552 r->released = 0;
553 gpr_mu_unlock(&r->mu);
554 return r;
555}
556
557static void destroy(grpc_fd *fd) {
558 gpr_mu_destroy(&fd->mu);
559 gpr_free(fd);
560}
561
562#ifdef GRPC_FD_REF_COUNT_DEBUG
563#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
564#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
565static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
566 int line) {
567 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
568 gpr_atm_no_barrier_load(&fd->refst),
569 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
570#else
571#define REF_BY(fd, n, reason) ref_by(fd, n)
572#define UNREF_BY(fd, n, reason) unref_by(fd, n)
573static void ref_by(grpc_fd *fd, int n) {
574#endif
575 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
576}
577
578#ifdef GRPC_FD_REF_COUNT_DEBUG
579static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
580 int line) {
581 gpr_atm old;
582 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
583 gpr_atm_no_barrier_load(&fd->refst),
584 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
585#else
586static void unref_by(grpc_fd *fd, int n) {
587 gpr_atm old;
588#endif
589 old = gpr_atm_full_fetch_add(&fd->refst, -n);
590 if (old == n) {
591 freelist_fd(fd);
592 } else {
593 GPR_ASSERT(old > n);
594 }
595}
596
597static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
598
599static void fd_global_shutdown(void) {
600 gpr_mu_lock(&fd_freelist_mu);
601 gpr_mu_unlock(&fd_freelist_mu);
602 while (fd_freelist != NULL) {
603 grpc_fd *fd = fd_freelist;
604 fd_freelist = fd_freelist->freelist_next;
605 destroy(fd);
606 }
607 gpr_mu_destroy(&fd_freelist_mu);
608}
609
610static grpc_fd *fd_create(int fd, const char *name) {
611 grpc_fd *r = alloc_fd(fd);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700612
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700613 char *name2;
614 gpr_asprintf(&name2, "%s fd=%d", name, fd);
615 grpc_iomgr_register_object(&r->iomgr_object, name2);
616 gpr_free(name2);
617#ifdef GRPC_FD_REF_COUNT_DEBUG
618 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
619#endif
620 return r;
621}
622
623static bool fd_is_orphaned(grpc_fd *fd) {
624 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
625}
626
627static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
628 fd->closed = 1;
629 if (!fd->released) {
630 close(fd->fd);
631 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700632 /* TODO: sreek - Check for deadlocks */
633
634 gpr_mu_lock(&fd->pi_mu);
635 fd->polling_island =
636 polling_island_update_and_lock(fd->polling_island, 1, 0);
637
638 polling_island_remove_fd_locked(fd->polling_island, fd);
639 polling_island_unref_and_unlock(fd->polling_island, 1);
640
641 fd->polling_island = NULL;
642 gpr_mu_unlock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700643 }
Sree Kuchibhotlad8061452016-05-20 18:12:30 -0700644
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700645 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
646}
647
648static int fd_wrapped_fd(grpc_fd *fd) {
649 if (fd->released || fd->closed) {
650 return -1;
651 } else {
652 return fd->fd;
653 }
654}
655
656/* TODO: sreek - do something here with the pollset island link */
657static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
658 grpc_closure *on_done, int *release_fd,
659 const char *reason) {
660 fd->on_done_closure = on_done;
661 fd->released = release_fd != NULL;
662 if (!fd->released) {
663 shutdown(fd->fd, SHUT_RDWR);
664 } else {
665 *release_fd = fd->fd;
666 }
667 gpr_mu_lock(&fd->mu);
668 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
669 close_fd_locked(exec_ctx, fd);
670 gpr_mu_unlock(&fd->mu);
671 UNREF_BY(fd, 2, reason); /* drop the reference */
672}
673
674/* increment refcount by two to avoid changing the orphan bit */
675#ifdef GRPC_FD_REF_COUNT_DEBUG
676static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
677 int line) {
678 ref_by(fd, 2, reason, file, line);
679}
680
681static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
682 int line) {
683 unref_by(fd, 2, reason, file, line);
684}
685#else
686static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
687
688static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
689#endif
690
691static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
692 grpc_closure **st, grpc_closure *closure) {
693 if (*st == CLOSURE_NOT_READY) {
694 /* not ready ==> switch to a waiting state by setting the closure */
695 *st = closure;
696 } else if (*st == CLOSURE_READY) {
697 /* already ready ==> queue the closure to run immediately */
698 *st = CLOSURE_NOT_READY;
699 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
700 } else {
701 /* upcallptr was set to a different closure. This is an error! */
702 gpr_log(GPR_ERROR,
703 "User called a notify_on function with a previous callback still "
704 "pending");
705 abort();
706 }
707}
708
709/* returns 1 if state becomes not ready */
710static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
711 grpc_closure **st) {
712 if (*st == CLOSURE_READY) {
713 /* duplicate ready ==> ignore */
714 return 0;
715 } else if (*st == CLOSURE_NOT_READY) {
716 /* not ready, and not waiting ==> flag ready */
717 *st = CLOSURE_READY;
718 return 0;
719 } else {
720 /* waiting ==> queue closure */
721 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
722 *st = CLOSURE_NOT_READY;
723 return 1;
724 }
725}
726
727/* Do something here with the pollset island link (?) */
728static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
729 gpr_mu_lock(&fd->mu);
730 GPR_ASSERT(!fd->shutdown);
731 fd->shutdown = 1;
732 set_ready_locked(exec_ctx, fd, &fd->read_closure);
733 set_ready_locked(exec_ctx, fd, &fd->write_closure);
734 gpr_mu_unlock(&fd->mu);
735}
736
737static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
738 grpc_closure *closure) {
739 gpr_mu_lock(&fd->mu);
740 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
741 gpr_mu_unlock(&fd->mu);
742}
743
744static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
745 grpc_closure *closure) {
746 gpr_mu_lock(&fd->mu);
747 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
748 gpr_mu_unlock(&fd->mu);
749}
750
751/*******************************************************************************
752 * pollset_posix.c
753 */
754
755GPR_TLS_DECL(g_current_thread_poller);
756GPR_TLS_DECL(g_current_thread_worker);
757
758/** The alarm system needs to be able to wakeup 'some poller' sometimes
759 * (specifically when a new alarm needs to be triggered earlier than the next
760 * alarm 'epoch').
761 * This wakeup_fd gives us something to alert on when such a case occurs. */
762grpc_wakeup_fd grpc_global_wakeup_fd;
763
764static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
765 worker->prev->next = worker->next;
766 worker->next->prev = worker->prev;
767}
768
769static int pollset_has_workers(grpc_pollset *p) {
770 return p->root_worker.next != &p->root_worker;
771}
772
773static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
774 if (pollset_has_workers(p)) {
775 grpc_pollset_worker *w = p->root_worker.next;
776 remove_worker(p, w);
777 return w;
778 } else {
779 return NULL;
780 }
781}
782
783static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
784 worker->next = &p->root_worker;
785 worker->prev = worker->next->prev;
786 worker->prev->next = worker->next->prev = worker;
787}
788
789static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
790 worker->prev = &p->root_worker;
791 worker->next = worker->prev->next;
792 worker->prev->next = worker->next->prev = worker;
793}
794
795static void pollset_kick_ext(grpc_pollset *p,
796 grpc_pollset_worker *specific_worker,
797 uint32_t flags) {
798 GPR_TIMER_BEGIN("pollset_kick_ext", 0);
799
800 /* pollset->mu already held */
801 if (specific_worker != NULL) {
802 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
803 GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700804 for (specific_worker = p->root_worker.next;
805 specific_worker != &p->root_worker;
806 specific_worker = specific_worker->next) {
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700807 pthread_kill(specific_worker->pt_id, SIGUSR1);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700808 }
809 p->kicked_without_pollers = 1;
810 GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
811 } else if (gpr_tls_get(&g_current_thread_worker) !=
812 (intptr_t)specific_worker) {
813 GPR_TIMER_MARK("different_thread_worker", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700814 specific_worker->kicked_specifically = 1;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700815 /* TODO (sreek): Refactor this into a separate file*/
816 pthread_kill(specific_worker->pt_id, SIGUSR1);
817 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
818 GPR_TIMER_MARK("kick_yoself", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700819 specific_worker->kicked_specifically = 1;
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700820 pthread_kill(specific_worker->pt_id, SIGUSR1);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700821 }
822 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700823 GPR_TIMER_MARK("kick_anonymous", 0);
824 specific_worker = pop_front_worker(p);
825 if (specific_worker != NULL) {
826 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
827 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
828 push_back_worker(p, specific_worker);
829 specific_worker = pop_front_worker(p);
830 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
831 gpr_tls_get(&g_current_thread_worker) ==
832 (intptr_t)specific_worker) {
833 push_back_worker(p, specific_worker);
834 specific_worker = NULL;
835 }
836 }
837 if (specific_worker != NULL) {
838 GPR_TIMER_MARK("finally_kick", 0);
839 push_back_worker(p, specific_worker);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700840 pthread_kill(specific_worker->pt_id, SIGUSR1);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700841 }
842 } else {
843 GPR_TIMER_MARK("kicked_no_pollers", 0);
844 p->kicked_without_pollers = 1;
845 }
846 }
847
848 GPR_TIMER_END("pollset_kick_ext", 0);
849}
850
851static void pollset_kick(grpc_pollset *p,
852 grpc_pollset_worker *specific_worker) {
853 pollset_kick_ext(p, specific_worker, 0);
854}
855
856/* global state management */
857
858static void sig_handler(int sig_num) {
859 gpr_log(GPR_INFO, "Received signal %d", sig_num);
860}
861
862static void pollset_global_init(void) {
863 gpr_tls_init(&g_current_thread_poller);
864 gpr_tls_init(&g_current_thread_worker);
865 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
866 signal(SIGUSR1, sig_handler);
867}
868
869static void pollset_global_shutdown(void) {
870 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
871 gpr_tls_destroy(&g_current_thread_poller);
872 gpr_tls_destroy(&g_current_thread_worker);
873}
874
875static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
876
877/* TODO: sreek. Try to Remove this forward declaration*/
878static void multipoll_with_epoll_pollset_create_efd(grpc_pollset *pollset);
879
880/* main interface */
881
882static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
883 gpr_mu_init(&pollset->mu);
884 *mu = &pollset->mu;
885 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
886 gpr_mu_init(&pollset->pi_mu);
887 pollset->polling_island = NULL;
888 pollset->shutting_down = 0;
889 pollset->called_shutdown = 0;
890 pollset->kicked_without_pollers = 0;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700891
892 multipoll_with_epoll_pollset_create_efd(pollset);
893}
894
895/* TODO(sreek): Maybe merge multipoll_*_destroy() with pollset_destroy()
896 * function */
897static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset);
898
899static void pollset_destroy(grpc_pollset *pollset) {
900 GPR_ASSERT(!pollset_has_workers(pollset));
901
902 multipoll_with_epoll_pollset_destroy(pollset);
903
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700904 gpr_mu_destroy(&pollset->pi_mu);
905 gpr_mu_destroy(&pollset->mu);
906}
907
908/* TODO(sreek) - Do something with the pollset island link (??) */
909static void pollset_reset(grpc_pollset *pollset) {
910 GPR_ASSERT(pollset->shutting_down);
911 GPR_ASSERT(!pollset_has_workers(pollset));
912 pollset->shutting_down = 0;
913 pollset->called_shutdown = 0;
914 pollset->kicked_without_pollers = 0;
915}
916
917/* TODO (sreek): Remove multipoll_with_epoll_finish_shutdown() declaration */
918static void multipoll_with_epoll_pollset_finish_shutdown(grpc_pollset *pollset);
919
920static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
921 multipoll_with_epoll_pollset_finish_shutdown(pollset);
922 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
923}
924
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700925/* TODO(sreek): Remove multipoll_with_epoll_*_maybe_work_and_unlock
926 * declaration
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700927 */
928static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
929 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
930 gpr_timespec deadline, gpr_timespec now);
931
932static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
933 grpc_pollset_worker **worker_hdl, gpr_timespec now,
934 gpr_timespec deadline) {
935 grpc_pollset_worker worker;
936 *worker_hdl = &worker;
937
938 /* pollset->mu already held */
939 int added_worker = 0;
940 int locked = 1;
941 int queued_work = 0;
942 int keep_polling = 0;
943 GPR_TIMER_BEGIN("pollset_work", 0);
944 /* this must happen before we (potentially) drop pollset->mu */
945 worker.next = worker.prev = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700946 worker.kicked_specifically = 0;
947
948 /* TODO(sreek): Abstract this thread id stuff out into a separate file */
949 worker.pt_id = pthread_self();
950 /* If we're shutting down then we don't execute any extended work */
951 if (pollset->shutting_down) {
952 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
953 goto done;
954 }
955 /* Start polling, and keep doing so while we're being asked to
956 re-evaluate our pollers (this allows poll() based pollers to
957 ensure they don't miss wakeups) */
958 keep_polling = 1;
959 while (keep_polling) {
960 keep_polling = 0;
961 if (!pollset->kicked_without_pollers) {
962 if (!added_worker) {
963 push_front_worker(pollset, &worker);
964 added_worker = 1;
965 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
966 }
967 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
968 GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
969
970 multipoll_with_epoll_pollset_maybe_work_and_unlock(
971 exec_ctx, pollset, &worker, deadline, now);
972
973 GPR_TIMER_END("maybe_work_and_unlock", 0);
974 locked = 0;
975 gpr_tls_set(&g_current_thread_poller, 0);
976 } else {
977 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
978 pollset->kicked_without_pollers = 0;
979 }
980 /* Finished execution - start cleaning up.
981 Note that we may arrive here from outside the enclosing while() loop.
982 In that case we won't loop though as we haven't added worker to the
983 worker list, which means nobody could ask us to re-evaluate polling). */
984 done:
985 if (!locked) {
986 queued_work |= grpc_exec_ctx_flush(exec_ctx);
987 gpr_mu_lock(&pollset->mu);
988 locked = 1;
989 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700990 }
991 if (added_worker) {
992 remove_worker(pollset, &worker);
993 gpr_tls_set(&g_current_thread_worker, 0);
994 }
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700995
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700996 /* check shutdown conditions */
997 if (pollset->shutting_down) {
998 if (pollset_has_workers(pollset)) {
999 pollset_kick(pollset, NULL);
1000 } else if (!pollset->called_shutdown) {
1001 pollset->called_shutdown = 1;
1002 gpr_mu_unlock(&pollset->mu);
1003 finish_shutdown(exec_ctx, pollset);
1004 grpc_exec_ctx_flush(exec_ctx);
1005 /* Continuing to access pollset here is safe -- it is the caller's
1006 * responsibility to not destroy when it has outstanding calls to
1007 * pollset_work.
1008 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1009 gpr_mu_lock(&pollset->mu);
1010 }
1011 }
1012 *worker_hdl = NULL;
1013 GPR_TIMER_END("pollset_work", 0);
1014}
1015
1016/* TODO: (sreek) Do something with the pollset island link */
1017static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1018 grpc_closure *closure) {
1019 GPR_ASSERT(!pollset->shutting_down);
1020 pollset->shutting_down = 1;
1021 pollset->shutdown_done = closure;
1022 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1023
1024 if (!pollset->called_shutdown && !pollset_has_workers(pollset)) {
1025 pollset->called_shutdown = 1;
1026 finish_shutdown(exec_ctx, pollset);
1027 }
1028}
1029
1030static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1031 gpr_timespec now) {
1032 gpr_timespec timeout;
1033 static const int64_t max_spin_polling_us = 10;
1034 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1035 return -1;
1036 }
1037 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1038 max_spin_polling_us,
1039 GPR_TIMESPAN))) <= 0) {
1040 return 0;
1041 }
1042 timeout = gpr_time_sub(deadline, now);
1043 return gpr_time_to_millis(gpr_time_add(
1044 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1045}
1046
1047/*******************************************************************************
1048 * pollset_multipoller_with_epoll_posix.c
1049 */
1050
1051static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
1052 /* only one set_ready can be active at once (but there may be a racing
1053 notify_on) */
1054 gpr_mu_lock(&fd->mu);
1055 set_ready_locked(exec_ctx, fd, st);
1056 gpr_mu_unlock(&fd->mu);
1057}
1058
1059static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1060 set_ready(exec_ctx, fd, &fd->read_closure);
1061}
1062
1063static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1064 set_ready(exec_ctx, fd, &fd->write_closure);
1065}
1066
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001067/* TODO: sreek - This function multipoll_with_epoll_pollset_add_fd() and
1068 * finally_add_fd() in ev_poll_and_epoll_posix.c */
1069static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1070 grpc_fd *fd) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001071 /* TODO sreek - Check if we need to get a pollset->mu lock here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001072 gpr_mu_lock(&pollset->pi_mu);
1073 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001074
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001075 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001076
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001077 if (fd->polling_island == pollset->polling_island) {
1078 pi_new = fd->polling_island;
1079 if (pi_new == NULL) {
1080 pi_new = polling_island_create(fd, 2);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001081 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001082 } else if (fd->polling_island == NULL) {
1083 pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001084 } else if (pollset->polling_island == NULL) {
1085 pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001086 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001087 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001088 }
1089
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001090 fd->polling_island = pollset->polling_island = pi_new;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001091
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001092 gpr_mu_unlock(&fd->pi_mu);
1093 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001094}
1095
1096/* Creates an epoll fd and initializes the pollset */
1097/* TODO: This has to be called ONLY from pollset_init function. and hence it
1098 * does not acquire any lock */
1099static void multipoll_with_epoll_pollset_create_efd(grpc_pollset *pollset) {
1100 struct epoll_event ev;
1101 int err;
1102
1103 pollset->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1104 if (pollset->epoll_fd < 0) {
1105 gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
1106 abort();
1107 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001108
1109 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
1110 ev.data.ptr = NULL;
1111
1112 err = epoll_ctl(pollset->epoll_fd, EPOLL_CTL_ADD,
1113 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
1114 if (err < 0) {
1115 gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s",
1116 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
1117 strerror(errno));
1118 }
1119}
1120
1121/* TODO(klempner): We probably want to turn this down a bit */
1122#define GRPC_EPOLL_MAX_EVENTS 1000
1123
1124static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
1125 grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
1126 gpr_timespec deadline, gpr_timespec now) {
1127 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
1128 int epoll_fd = pollset->epoll_fd;
1129 int ep_rv;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001130 int timeout_ms;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001131
1132 /* If you want to ignore epoll's ability to sanely handle parallel pollers,
1133 * for a more apples-to-apples performance comparison with poll, add a
1134 * if (pollset->counter != 0) { return 0; }
1135 * here.
1136 */
1137
1138 gpr_mu_unlock(&pollset->mu);
1139
1140 timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1141
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001142 do {
1143 /* The following epoll_wait never blocks; it has a timeout of 0 */
1144 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
1145 if (ep_rv < 0) {
1146 if (errno != EINTR) {
1147 gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
1148 }
1149 } else {
1150 int i;
1151 for (i = 0; i < ep_rv; ++i) {
1152 grpc_fd *fd = ep_ev[i].data.ptr;
1153 /* TODO(klempner): We might want to consider making err and pri
1154 * separate events */
1155 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1156 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1157 int write_ev = ep_ev[i].events & EPOLLOUT;
1158 if (fd == NULL) {
1159 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001160 } else {
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001161 if (read_ev || cancel) {
1162 fd_become_readable(exec_ctx, fd);
1163 }
1164 if (write_ev || cancel) {
1165 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001166 }
1167 }
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001168 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001169 }
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001170 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001171}
1172
1173static void multipoll_with_epoll_pollset_finish_shutdown(
1174 grpc_pollset *pollset) {}
1175
1176static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
1177 close(pollset->epoll_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001178}
1179
1180/*******************************************************************************
1181 * pollset_set_posix.c
1182 */
1183
1184static grpc_pollset_set *pollset_set_create(void) {
1185 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1186 memset(pollset_set, 0, sizeof(*pollset_set));
1187 gpr_mu_init(&pollset_set->mu);
1188 return pollset_set;
1189}
1190
1191static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1192 size_t i;
1193 gpr_mu_destroy(&pollset_set->mu);
1194 for (i = 0; i < pollset_set->fd_count; i++) {
1195 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1196 }
1197 gpr_free(pollset_set->pollsets);
1198 gpr_free(pollset_set->pollset_sets);
1199 gpr_free(pollset_set->fds);
1200 gpr_free(pollset_set);
1201}
1202
1203static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1204 grpc_pollset_set *pollset_set,
1205 grpc_pollset *pollset) {
1206 size_t i, j;
1207 gpr_mu_lock(&pollset_set->mu);
1208 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1209 pollset_set->pollset_capacity =
1210 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1211 pollset_set->pollsets =
1212 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1213 sizeof(*pollset_set->pollsets));
1214 }
1215 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1216 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1217 if (fd_is_orphaned(pollset_set->fds[i])) {
1218 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1219 } else {
1220 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1221 pollset_set->fds[j++] = pollset_set->fds[i];
1222 }
1223 }
1224 pollset_set->fd_count = j;
1225 gpr_mu_unlock(&pollset_set->mu);
1226}
1227
1228static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1229 grpc_pollset_set *pollset_set,
1230 grpc_pollset *pollset) {
1231 size_t i;
1232 gpr_mu_lock(&pollset_set->mu);
1233 for (i = 0; i < pollset_set->pollset_count; i++) {
1234 if (pollset_set->pollsets[i] == pollset) {
1235 pollset_set->pollset_count--;
1236 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1237 pollset_set->pollsets[pollset_set->pollset_count]);
1238 break;
1239 }
1240 }
1241 gpr_mu_unlock(&pollset_set->mu);
1242}
1243
1244static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1245 grpc_pollset_set *bag,
1246 grpc_pollset_set *item) {
1247 size_t i, j;
1248 gpr_mu_lock(&bag->mu);
1249 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1250 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1251 bag->pollset_sets =
1252 gpr_realloc(bag->pollset_sets,
1253 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1254 }
1255 bag->pollset_sets[bag->pollset_set_count++] = item;
1256 for (i = 0, j = 0; i < bag->fd_count; i++) {
1257 if (fd_is_orphaned(bag->fds[i])) {
1258 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1259 } else {
1260 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1261 bag->fds[j++] = bag->fds[i];
1262 }
1263 }
1264 bag->fd_count = j;
1265 gpr_mu_unlock(&bag->mu);
1266}
1267
1268static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1269 grpc_pollset_set *bag,
1270 grpc_pollset_set *item) {
1271 size_t i;
1272 gpr_mu_lock(&bag->mu);
1273 for (i = 0; i < bag->pollset_set_count; i++) {
1274 if (bag->pollset_sets[i] == item) {
1275 bag->pollset_set_count--;
1276 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1277 bag->pollset_sets[bag->pollset_set_count]);
1278 break;
1279 }
1280 }
1281 gpr_mu_unlock(&bag->mu);
1282}
1283
1284static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1285 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1286 size_t i;
1287 gpr_mu_lock(&pollset_set->mu);
1288 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1289 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1290 pollset_set->fds = gpr_realloc(
1291 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1292 }
1293 GRPC_FD_REF(fd, "pollset_set");
1294 pollset_set->fds[pollset_set->fd_count++] = fd;
1295 for (i = 0; i < pollset_set->pollset_count; i++) {
1296 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1297 }
1298 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1299 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1300 }
1301 gpr_mu_unlock(&pollset_set->mu);
1302}
1303
1304static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1305 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1306 size_t i;
1307 gpr_mu_lock(&pollset_set->mu);
1308 for (i = 0; i < pollset_set->fd_count; i++) {
1309 if (pollset_set->fds[i] == fd) {
1310 pollset_set->fd_count--;
1311 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1312 pollset_set->fds[pollset_set->fd_count]);
1313 GRPC_FD_UNREF(fd, "pollset_set");
1314 break;
1315 }
1316 }
1317 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1318 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1319 }
1320 gpr_mu_unlock(&pollset_set->mu);
1321}
1322
1323/*******************************************************************************
1324 * event engine binding
1325 */
1326
1327static void shutdown_engine(void) {
1328 fd_global_shutdown();
1329 pollset_global_shutdown();
1330}
1331
1332static const grpc_event_engine_vtable vtable = {
1333 .pollset_size = sizeof(grpc_pollset),
1334
1335 .fd_create = fd_create,
1336 .fd_wrapped_fd = fd_wrapped_fd,
1337 .fd_orphan = fd_orphan,
1338 .fd_shutdown = fd_shutdown,
1339 .fd_notify_on_read = fd_notify_on_read,
1340 .fd_notify_on_write = fd_notify_on_write,
1341
1342 .pollset_init = pollset_init,
1343 .pollset_shutdown = pollset_shutdown,
1344 .pollset_reset = pollset_reset,
1345 .pollset_destroy = pollset_destroy,
1346 .pollset_work = pollset_work,
1347 .pollset_kick = pollset_kick,
1348 .pollset_add_fd = pollset_add_fd,
1349
1350 .pollset_set_create = pollset_set_create,
1351 .pollset_set_destroy = pollset_set_destroy,
1352 .pollset_set_add_pollset = pollset_set_add_pollset,
1353 .pollset_set_del_pollset = pollset_set_del_pollset,
1354 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1355 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1356 .pollset_set_add_fd = pollset_set_add_fd,
1357 .pollset_set_del_fd = pollset_set_del_fd,
1358
1359 .kick_poller = kick_poller,
1360
1361 .shutdown_engine = shutdown_engine,
1362};
1363
1364const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
1365 fd_global_init();
1366 pollset_global_init();
1367 polling_island_global_init();
1368 return &vtable;
1369}
1370
1371#endif