blob: 7e01ac144f2a47970f24d2d2c0d5bc835bf6bb13 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070037#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <signal.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
51#include <grpc/support/log.h>
52#include <grpc/support/string_util.h>
53#include <grpc/support/tls.h>
54#include <grpc/support/useful.h>
55
56#include "src/core/lib/iomgr/ev_posix.h"
57#include "src/core/lib/iomgr/iomgr_internal.h"
58#include "src/core/lib/iomgr/wakeup_fd_posix.h"
59#include "src/core/lib/profiling/timers.h"
60#include "src/core/lib/support/block_annotate.h"
61
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070062static int grpc_wakeup_signal = -1;
63static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070064
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070065/* Implements the function defined in grpc_posix.h. This function might be
66 * called before even calling grpc_init() to set either a different signal to
67 * use. If signum == -1, then the use of signals is disabled */
68void grpc_use_signal(int signum) {
69 grpc_wakeup_signal = signum;
70 is_grpc_wakeup_signal_initialized = true;
71
72 if (grpc_wakeup_signal < 0) {
73 gpr_log(GPR_INFO,
74 "Use of signals is disabled. Epoll engine will not be used");
75 } else {
76 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
77 grpc_wakeup_signal);
78 }
79}
80
81struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070082
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070083/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070084 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070086struct grpc_fd {
87 int fd;
88 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070089 bit 0 : 1=Active / 0=Orphaned
90 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070091 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 gpr_atm refst;
93
94 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070095
96 /* Indicates that the fd is shutdown and that any pending read/write closures
97 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070098 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070099
100 /* The fd is either closed or we relinquished control of it. In either cases,
101 this indicates that the 'fd' on this structure is no longer valid */
102 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700103
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700104 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700105 grpc_closure *read_closure;
106 grpc_closure *write_closure;
107
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 /* The polling island to which this fd belongs to and the mutex protecting the
109 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700110 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 struct polling_island *polling_island;
112
113 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *on_done_closure;
115
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700116 /* The pollset that last noticed that the fd is readable */
117 grpc_pollset *read_notifier_pollset;
118
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 grpc_iomgr_object iomgr_object;
120};
121
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123#ifdef GRPC_FD_REF_COUNT_DEBUG
124static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
125static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
126 int line);
127#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
128#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
129#else
130static void fd_ref(grpc_fd *fd);
131static void fd_unref(grpc_fd *fd);
132#define GRPC_FD_REF(fd, reason) fd_ref(fd)
133#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
134#endif
135
136static void fd_global_init(void);
137static void fd_global_shutdown(void);
138
139#define CLOSURE_NOT_READY ((grpc_closure *)0)
140#define CLOSURE_READY ((grpc_closure *)1)
141
142/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700143 * Polling-island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700144 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700145/* TODO: sree: Consider making ref_cnt and merged_to to gpr_atm - This would
146 * significantly reduce the number of mutex acquisition calls. */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700147typedef struct polling_island {
148 gpr_mu mu;
149 int ref_cnt;
150
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700151 /* Points to the polling_island this merged into.
152 * If merged_to is not NULL, all the remaining fields (except mu and ref_cnt)
153 * are invalid and must be ignored */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700154 struct polling_island *merged_to;
155
156 /* The fd of the underlying epoll set */
157 int epoll_fd;
158
159 /* The file descriptors in the epoll set */
160 size_t fd_cnt;
161 size_t fd_capacity;
162 grpc_fd **fds;
163
164 /* Polling islands that are no longer needed are kept in a freelist so that
165 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700166 free list */
167 struct polling_island *next_free;
168} polling_island;
169
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700170/*******************************************************************************
171 * Pollset Declarations
172 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700173struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700174 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700175 struct grpc_pollset_worker *next;
176 struct grpc_pollset_worker *prev;
177};
178
179struct grpc_pollset {
180 gpr_mu mu;
181 grpc_pollset_worker root_worker;
182 bool kicked_without_pollers;
183
184 bool shutting_down; /* Is the pollset shutting down ? */
185 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
186 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
187
188 /* The polling island to which this pollset belongs to and the mutex
189 protecting the field */
Sree Kuchibhotlae682e462016-06-08 15:40:21 -0700190 /* TODO: sreek: This lock might actually be adding more overhead to the
191 critical path (i.e pollset_work() function). Consider removing this lock
192 and just using the overall pollset lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700193 gpr_mu pi_mu;
194 struct polling_island *polling_island;
195};
196
197/*******************************************************************************
198 * Pollset-set Declarations
199 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700200/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
201 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
202 * the current pollset_set would result in polling island merges. This would
203 * remove the need to maintain fd_count here. This will also significantly
204 * simplify the grpc_fd structure since we would no longer need to explicitly
205 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700206struct grpc_pollset_set {
207 gpr_mu mu;
208
209 size_t pollset_count;
210 size_t pollset_capacity;
211 grpc_pollset **pollsets;
212
213 size_t pollset_set_count;
214 size_t pollset_set_capacity;
215 struct grpc_pollset_set **pollset_sets;
216
217 size_t fd_count;
218 size_t fd_capacity;
219 grpc_fd **fds;
220};
221
222/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700223 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700224 */
225
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700226/* The wakeup fd that is used to wake up all threads in a Polling island. This
227 is useful in the polling island merge operation where we need to wakeup all
228 the threads currently polling the smaller polling island (so that they can
229 start polling the new/merged polling island)
230
231 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
232 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
233static grpc_wakeup_fd polling_island_wakeup_fd;
234
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700235/* Polling island freelist */
236static gpr_mu g_pi_freelist_mu;
237static polling_island *g_pi_freelist = NULL;
238
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700239/* The caller is expected to hold pi->mu lock before calling this function */
240static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700241 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700242 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700243 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700244 struct epoll_event ev;
245
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700246 for (i = 0; i < fd_count; i++) {
247 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
248 ev.data.ptr = fds[i];
249 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700250
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700251 if (err < 0) {
252 if (errno != EEXIST) {
253 /* TODO: sreek - We need a better way to bubble up this error instead of
254 just logging a message */
255 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
256 fds[i]->fd, strerror(errno));
257 }
258
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700259 continue;
260 }
261
262 if (pi->fd_cnt == pi->fd_capacity) {
263 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
264 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
265 }
266
267 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700268 if (add_fd_refs) {
269 GRPC_FD_REF(fds[i], "polling_island");
270 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700271 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700272}
273
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700274/* The caller is expected to hold pi->mu before calling this */
275static void polling_island_add_wakeup_fd_locked(polling_island *pi,
276 grpc_wakeup_fd *wakeup_fd) {
277 struct epoll_event ev;
278 int err;
279
280 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
281 ev.data.ptr = wakeup_fd;
282 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
283 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
284 if (err < 0) {
285 gpr_log(GPR_ERROR,
286 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
287 ". Error: %s",
288 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
289 strerror(errno));
290 }
291}
292
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700293/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700294static void polling_island_remove_all_fds_locked(polling_island *pi,
295 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700296 int err;
297 size_t i;
298
299 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700300 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700301 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700302 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700303 * just logging a message */
304 gpr_log(GPR_ERROR, "epoll_ctl deleting fds[%d]: %d failed with error: %s",
305 i, pi->fds[i]->fd, strerror(errno));
306 }
307
308 if (remove_fd_refs) {
309 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700310 }
311 }
312
313 pi->fd_cnt = 0;
314}
315
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700316/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700317static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700318 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700319 int err;
320 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700321
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700322 /* If fd is already closed, then it would have been automatically been removed
323 from the epoll set */
324 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700325 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
326 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700327 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700328 fd->fd, strerror(errno));
329 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700330 }
331
332 for (i = 0; i < pi->fd_cnt; i++) {
333 if (pi->fds[i] == fd) {
334 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700335 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700336 break;
337 }
338 }
339}
340
341static polling_island *polling_island_create(grpc_fd *initial_fd,
342 int initial_ref_cnt) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700343 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700344
345 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700346 gpr_mu_lock(&g_pi_freelist_mu);
347 if (g_pi_freelist != NULL) {
348 pi = g_pi_freelist;
349 g_pi_freelist = g_pi_freelist->next_free;
350 pi->next_free = NULL;
351 }
352 gpr_mu_unlock(&g_pi_freelist_mu);
353
354 /* Create new polling island if we could not get one from the free list */
355 if (pi == NULL) {
356 pi = gpr_malloc(sizeof(*pi));
357 gpr_mu_init(&pi->mu);
358 pi->fd_cnt = 0;
359 pi->fd_capacity = 0;
360 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700361 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700362
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700363 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
364 if (pi->epoll_fd < 0) {
365 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
366 strerror(errno));
367 }
368 GPR_ASSERT(pi->epoll_fd >= 0);
369
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700370 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700371
372 pi->ref_cnt = initial_ref_cnt;
373 pi->merged_to = NULL;
374 pi->next_free = NULL;
375
376 if (initial_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700377 /* It is not really needed to get the pi->mu lock here. If this is a newly
378 created polling island (or one that we got from the freelist), no one
379 else would be holding a lock to it anyway */
380 gpr_mu_lock(&pi->mu);
381 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
382 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700383 }
384
385 return pi;
386}
387
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700388static void polling_island_delete(polling_island *pi) {
389 GPR_ASSERT(pi->ref_cnt == 0);
390 GPR_ASSERT(pi->fd_cnt == 0);
391
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700392 close(pi->epoll_fd);
393 pi->epoll_fd = -1;
394
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700395 pi->merged_to = NULL;
396
397 gpr_mu_lock(&g_pi_freelist_mu);
398 pi->next_free = g_pi_freelist;
399 g_pi_freelist = pi;
400 gpr_mu_unlock(&g_pi_freelist_mu);
401}
402
403void polling_island_unref_and_unlock(polling_island *pi, int unref_by) {
404 pi->ref_cnt -= unref_by;
405 int ref_cnt = pi->ref_cnt;
406 GPR_ASSERT(ref_cnt >= 0);
407
408 gpr_mu_unlock(&pi->mu);
409
410 if (ref_cnt == 0) {
411 polling_island_delete(pi);
412 }
413}
414
415polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
416 int add_ref_by) {
417 polling_island *next = NULL;
418 gpr_mu_lock(&pi->mu);
419 while (pi->merged_to != NULL) {
420 next = pi->merged_to;
421 polling_island_unref_and_unlock(pi, unref_by);
422 pi = next;
423 gpr_mu_lock(&pi->mu);
424 }
425
426 pi->ref_cnt += add_ref_by;
427 return pi;
428}
429
430void polling_island_pair_update_and_lock(polling_island **p,
431 polling_island **q) {
432 polling_island *pi_1 = *p;
433 polling_island *pi_2 = *q;
434 polling_island *temp = NULL;
435 bool pi_1_locked = false;
436 bool pi_2_locked = false;
437 int num_swaps = 0;
438
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700439 /* Loop until either pi_1 == pi_2 or until we acquired locks on both pi_1
440 and pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700441 while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700442 /* The following assertions are true at this point:
443 - pi_1 != pi_2 (else, the while loop would have exited)
444 - pi_1 MAY be locked
445 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700446
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700447 /* To maintain lock order consistency, always lock polling_island node with
448 lower address first.
449 First, make sure pi_1 < pi_2 before proceeding any further. If it turns
450 out that pi_1 > pi_2, unlock pi_1 if locked (because pi_2 is not locked
451 at this point and having pi_1 locked would violate the lock order) and
452 swap pi_1 and pi_2 so that pi_1 becomes less than pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700453 if (pi_1 > pi_2) {
454 if (pi_1_locked) {
455 gpr_mu_unlock(&pi_1->mu);
456 pi_1_locked = false;
457 }
458
459 GPR_SWAP(polling_island *, pi_1, pi_2);
460 num_swaps++;
461 }
462
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700463 /* The following assertions are true at this point:
464 - pi_1 != pi_2
465 - pi_1 < pi_2 (address of pi_1 is less than that of pi_2)
466 - pi_1 MAYBE locked
467 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700468
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700469 /* Lock pi_1 (if pi_1 is pointing to the terminal node in the list) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700470 if (!pi_1_locked) {
471 gpr_mu_lock(&pi_1->mu);
472 pi_1_locked = true;
473
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700474 /* If pi_1 is not terminal node (i.e pi_1->merged_to != NULL), we are not
475 done locking this polling_island yet. Release the lock on this node and
476 advance pi_1 to the next node in the list; and go to the beginning of
477 the loop (we can't proceed to locking pi_2 unless we locked pi_1 first)
478 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700479 if (pi_1->merged_to != NULL) {
480 temp = pi_1->merged_to;
481 polling_island_unref_and_unlock(pi_1, 1);
482 pi_1 = temp;
483 pi_1_locked = false;
484
485 continue;
486 }
487 }
488
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700489 /* The following assertions are true at this point:
490 - pi_1 is locked
491 - pi_2 is unlocked
492 - pi_1 != pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700493
494 gpr_mu_lock(&pi_2->mu);
495 pi_2_locked = true;
496
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700497 /* If pi_2 is not terminal node, we are not done locking this polling_island
498 yet. Release the lock and update pi_2 to the next node in the list */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700499 if (pi_2->merged_to != NULL) {
500 temp = pi_2->merged_to;
501 polling_island_unref_and_unlock(pi_2, 1);
502 pi_2 = temp;
503 pi_2_locked = false;
504 }
505 }
506
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700507 /* At this point, either pi_1 == pi_2 AND/OR we got both locks */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700508 if (pi_1 == pi_2) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700509 /* We may or may not have gotten the lock. If we didn't, walk the rest of
510 the polling_island list and get the lock */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700511 GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
512 if (!pi_1_locked) {
513 pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
514 }
515 } else {
516 GPR_ASSERT(pi_1_locked && pi_2_locked);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700517 /* If we swapped pi_1 and pi_2 odd number of times, do one more swap so that
518 pi_1 and pi_2 point to the same polling_island lists they started off
519 with at the beginning of this function (i.e *p and *q respectively) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700520 if (num_swaps % 2 > 0) {
521 GPR_SWAP(polling_island *, pi_1, pi_2);
522 }
523 }
524
525 *p = pi_1;
526 *q = pi_2;
527}
528
529polling_island *polling_island_merge(polling_island *p, polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700530 /* Get locks on both the polling islands */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700531 polling_island_pair_update_and_lock(&p, &q);
532
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700533 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700534 /* Nothing needs to be done here */
535 gpr_mu_unlock(&p->mu);
536 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700537 }
538
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700539 /* Make sure that p points to the polling island with fewer fds than q */
540 if (p->fd_cnt > q->fd_cnt) {
541 GPR_SWAP(polling_island *, p, q);
542 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700543
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700544 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700545 Note that the refcounts on the fds being moved will not change here. This
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700546 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700547 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
548 polling_island_remove_all_fds_locked(p, false);
549
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700550 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
551 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
552
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700553 /* - The merged polling island (i.e q) inherits all the ref counts of the
554 island merging with it (i.e p)
555 - The island p will lose a ref count */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700556 q->ref_cnt += p->ref_cnt;
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700557 p->ref_cnt--;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700558
559 gpr_mu_unlock(&p->mu);
560 gpr_mu_unlock(&q->mu);
561
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700562 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700563}
564
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700565static void polling_island_global_init() {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700566 gpr_mu_init(&g_pi_freelist_mu);
567 g_pi_freelist = NULL;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700568 grpc_wakeup_fd_init(&polling_island_wakeup_fd);
569 grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700570}
571
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700572static void polling_island_global_shutdown() {
573 polling_island *next;
574 gpr_mu_lock(&g_pi_freelist_mu);
575 gpr_mu_unlock(&g_pi_freelist_mu);
576 while (g_pi_freelist != NULL) {
577 next = g_pi_freelist->next_free;
578 gpr_mu_destroy(&g_pi_freelist->mu);
579 gpr_free(g_pi_freelist->fds);
580 gpr_free(g_pi_freelist);
581 g_pi_freelist = next;
582 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700583 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700584
585 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700586}
587
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700588/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700589 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700590 */
591
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700592/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700593 * but instead so that implementations with multiple threads in (for example)
594 * epoll_wait deal with the race between pollset removal and incoming poll
595 * notifications.
596 *
597 * The problem is that the poller ultimately holds a reference to this
598 * object, so it is very difficult to know when is safe to free it, at least
599 * without some expensive synchronization.
600 *
601 * If we keep the object freelisted, in the worst case losing this race just
602 * becomes a spurious read notification on a reused fd.
603 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700604
605/* The alarm system needs to be able to wakeup 'some poller' sometimes
606 * (specifically when a new alarm needs to be triggered earlier than the next
607 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
608 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700609
610/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
611 * sure to wake up one polling thread (which can wake up other threads if
612 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700613grpc_wakeup_fd grpc_global_wakeup_fd;
614
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700615static grpc_fd *fd_freelist = NULL;
616static gpr_mu fd_freelist_mu;
617
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700618#ifdef GRPC_FD_REF_COUNT_DEBUG
619#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
620#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
621static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
622 int line) {
623 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
624 gpr_atm_no_barrier_load(&fd->refst),
625 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
626#else
627#define REF_BY(fd, n, reason) ref_by(fd, n)
628#define UNREF_BY(fd, n, reason) unref_by(fd, n)
629static void ref_by(grpc_fd *fd, int n) {
630#endif
631 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
632}
633
634#ifdef GRPC_FD_REF_COUNT_DEBUG
635static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
636 int line) {
637 gpr_atm old;
638 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
639 gpr_atm_no_barrier_load(&fd->refst),
640 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
641#else
642static void unref_by(grpc_fd *fd, int n) {
643 gpr_atm old;
644#endif
645 old = gpr_atm_full_fetch_add(&fd->refst, -n);
646 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700647 /* Add the fd to the freelist */
648 gpr_mu_lock(&fd_freelist_mu);
649 fd->freelist_next = fd_freelist;
650 fd_freelist = fd;
651 grpc_iomgr_unregister_object(&fd->iomgr_object);
652 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700653 } else {
654 GPR_ASSERT(old > n);
655 }
656}
657
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700658/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700659#ifdef GRPC_FD_REF_COUNT_DEBUG
660static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
661 int line) {
662 ref_by(fd, 2, reason, file, line);
663}
664
665static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
666 int line) {
667 unref_by(fd, 2, reason, file, line);
668}
669#else
670static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700671static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
672#endif
673
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700674static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
675
676static void fd_global_shutdown(void) {
677 gpr_mu_lock(&fd_freelist_mu);
678 gpr_mu_unlock(&fd_freelist_mu);
679 while (fd_freelist != NULL) {
680 grpc_fd *fd = fd_freelist;
681 fd_freelist = fd_freelist->freelist_next;
682 gpr_mu_destroy(&fd->mu);
683 gpr_free(fd);
684 }
685 gpr_mu_destroy(&fd_freelist_mu);
686}
687
688static grpc_fd *fd_create(int fd, const char *name) {
689 grpc_fd *new_fd = NULL;
690
691 gpr_mu_lock(&fd_freelist_mu);
692 if (fd_freelist != NULL) {
693 new_fd = fd_freelist;
694 fd_freelist = fd_freelist->freelist_next;
695 }
696 gpr_mu_unlock(&fd_freelist_mu);
697
698 if (new_fd == NULL) {
699 new_fd = gpr_malloc(sizeof(grpc_fd));
700 gpr_mu_init(&new_fd->mu);
701 gpr_mu_init(&new_fd->pi_mu);
702 }
703
704 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
705 newly created fd (or an fd we got from the freelist), no one else would be
706 holding a lock to it anyway. */
707 gpr_mu_lock(&new_fd->mu);
708
709 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700710 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700711 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700712 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700713 new_fd->read_closure = CLOSURE_NOT_READY;
714 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700715 new_fd->polling_island = NULL;
716 new_fd->freelist_next = NULL;
717 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700718 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700719
720 gpr_mu_unlock(&new_fd->mu);
721
722 char *fd_name;
723 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
724 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
725 gpr_free(fd_name);
726#ifdef GRPC_FD_REF_COUNT_DEBUG
727 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
728#endif
729 return new_fd;
730}
731
732static bool fd_is_orphaned(grpc_fd *fd) {
733 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
734}
735
736static int fd_wrapped_fd(grpc_fd *fd) {
737 int ret_fd = -1;
738 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700739 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700740 ret_fd = fd->fd;
741 }
742 gpr_mu_unlock(&fd->mu);
743
744 return ret_fd;
745}
746
747static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
748 grpc_closure *on_done, int *release_fd,
749 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700750 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700751 gpr_mu_lock(&fd->mu);
752 fd->on_done_closure = on_done;
753
754 /* If release_fd is not NULL, we should be relinquishing control of the file
755 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700756 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700757 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700758 } else {
759 close(fd->fd);
760 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700761 }
762
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700763 fd->orphaned = true;
764
765 /* Remove the active status but keep referenced. We want this grpc_fd struct
766 to be alive (and not added to freelist) until the end of this function */
767 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700768
769 /* Remove the fd from the polling island:
770 - Update the fd->polling_island to point to the latest polling island
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700771 - Remove the fd from the polling island.
772 - Remove a ref to the polling island and set fd->polling_island to NULL */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700773 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700774 if (fd->polling_island != NULL) {
775 fd->polling_island =
776 polling_island_update_and_lock(fd->polling_island, 1, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700777 polling_island_remove_fd_locked(fd->polling_island, fd, is_fd_closed);
778
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700779 polling_island_unref_and_unlock(fd->polling_island, 1);
780 fd->polling_island = NULL;
781 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700782 gpr_mu_unlock(&fd->pi_mu);
783
784 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
785
786 gpr_mu_unlock(&fd->mu);
787 UNREF_BY(fd, 2, reason); /* Drop the reference */
788}
789
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700790static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
791 grpc_closure **st, grpc_closure *closure) {
792 if (*st == CLOSURE_NOT_READY) {
793 /* not ready ==> switch to a waiting state by setting the closure */
794 *st = closure;
795 } else if (*st == CLOSURE_READY) {
796 /* already ready ==> queue the closure to run immediately */
797 *st = CLOSURE_NOT_READY;
798 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
799 } else {
800 /* upcallptr was set to a different closure. This is an error! */
801 gpr_log(GPR_ERROR,
802 "User called a notify_on function with a previous callback still "
803 "pending");
804 abort();
805 }
806}
807
808/* returns 1 if state becomes not ready */
809static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
810 grpc_closure **st) {
811 if (*st == CLOSURE_READY) {
812 /* duplicate ready ==> ignore */
813 return 0;
814 } else if (*st == CLOSURE_NOT_READY) {
815 /* not ready, and not waiting ==> flag ready */
816 *st = CLOSURE_READY;
817 return 0;
818 } else {
819 /* waiting ==> queue closure */
820 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
821 *st = CLOSURE_NOT_READY;
822 return 1;
823 }
824}
825
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700826static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
827 grpc_fd *fd) {
828 grpc_pollset *notifier = NULL;
829
830 gpr_mu_lock(&fd->mu);
831 notifier = fd->read_notifier_pollset;
832 gpr_mu_unlock(&fd->mu);
833
834 return notifier;
835}
836
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700837static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
838 gpr_mu_lock(&fd->mu);
839 GPR_ASSERT(!fd->shutdown);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700840 fd->shutdown = true;
841
842 /* Flush any pending read and write closures. Since fd->shutdown is 'true' at
843 this point, the closures would be called with 'success = false' */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700844 set_ready_locked(exec_ctx, fd, &fd->read_closure);
845 set_ready_locked(exec_ctx, fd, &fd->write_closure);
846 gpr_mu_unlock(&fd->mu);
847}
848
849static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
850 grpc_closure *closure) {
851 gpr_mu_lock(&fd->mu);
852 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
853 gpr_mu_unlock(&fd->mu);
854}
855
856static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
857 grpc_closure *closure) {
858 gpr_mu_lock(&fd->mu);
859 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
860 gpr_mu_unlock(&fd->mu);
861}
862
863/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700864 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700865 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700866GPR_TLS_DECL(g_current_thread_pollset);
867GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700868
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700869static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700870#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700871 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700872#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700873}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700874
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -0700875static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700876
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700877/* Global state management */
878static void pollset_global_init(void) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700879 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700880 gpr_tls_init(&g_current_thread_pollset);
881 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700882 poller_kick_init();
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700883}
884
885static void pollset_global_shutdown(void) {
886 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700887 gpr_tls_destroy(&g_current_thread_pollset);
888 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700889}
890
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700891static void pollset_worker_kick(grpc_pollset_worker *worker) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -0700892 pthread_kill(worker->pt_id, grpc_wakeup_signal);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700893}
894
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700895/* Return 1 if the pollset has active threads in pollset_work (pollset must
896 * be locked) */
897static int pollset_has_workers(grpc_pollset *p) {
898 return p->root_worker.next != &p->root_worker;
899}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700900
901static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
902 worker->prev->next = worker->next;
903 worker->next->prev = worker->prev;
904}
905
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700906static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
907 if (pollset_has_workers(p)) {
908 grpc_pollset_worker *w = p->root_worker.next;
909 remove_worker(p, w);
910 return w;
911 } else {
912 return NULL;
913 }
914}
915
916static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
917 worker->next = &p->root_worker;
918 worker->prev = worker->next->prev;
919 worker->prev->next = worker->next->prev = worker;
920}
921
922static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
923 worker->prev = &p->root_worker;
924 worker->next = worker->prev->next;
925 worker->prev->next = worker->next->prev = worker;
926}
927
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700928/* p->mu must be held before calling this function */
929static void pollset_kick(grpc_pollset *p,
930 grpc_pollset_worker *specific_worker) {
931 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700932
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700933 grpc_pollset_worker *worker = specific_worker;
934 if (worker != NULL) {
935 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700936 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700937 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700938 for (worker = p->root_worker.next; worker != &p->root_worker;
939 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700940 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
941 pollset_worker_kick(worker);
942 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700943 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700944 } else {
945 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700946 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700947 GPR_TIMER_END("pollset_kick.broadcast", 0);
948 } else {
949 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700950 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
951 pollset_worker_kick(worker);
952 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700953 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700954 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
955 /* Since worker == NULL, it means that we can kick "any" worker on this
956 pollset 'p'. If 'p' happens to be the same pollset this thread is
957 currently polling (i.e in pollset_work() function), then there is no need
958 to kick any other worker since the current thread can just absorb the
959 kick. This is the reason why we enter this case only when
960 g_current_thread_pollset is != p */
961
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700962 GPR_TIMER_MARK("kick_anonymous", 0);
963 worker = pop_front_worker(p);
964 if (worker != NULL) {
965 GPR_TIMER_MARK("finally_kick", 0);
966 push_back_worker(p, worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700967 pollset_worker_kick(worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700968 } else {
969 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700970 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700971 }
972 }
973
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700974 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700975}
976
977static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
978
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700979static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
980 gpr_mu_init(&pollset->mu);
981 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700982
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700983 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700984 pollset->kicked_without_pollers = false;
985
986 pollset->shutting_down = false;
987 pollset->finish_shutdown_called = false;
988 pollset->shutdown_done = NULL;
989
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700990 gpr_mu_init(&pollset->pi_mu);
991 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700992}
993
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700994/* Convert a timespec to milliseconds:
995 - Very small or negative poll times are clamped to zero to do a non-blocking
996 poll (which becomes spin polling)
997 - Other small values are rounded up to one millisecond
998 - Longer than a millisecond polls are rounded up to the next nearest
999 millisecond to avoid spinning
1000 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001001static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1002 gpr_timespec now) {
1003 gpr_timespec timeout;
1004 static const int64_t max_spin_polling_us = 10;
1005 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1006 return -1;
1007 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001008
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001009 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1010 max_spin_polling_us,
1011 GPR_TIMESPAN))) <= 0) {
1012 return 0;
1013 }
1014 timeout = gpr_time_sub(deadline, now);
1015 return gpr_time_to_millis(gpr_time_add(
1016 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1017}
1018
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001019static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1020 grpc_pollset *notifier) {
1021 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001022 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001023 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1024 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001025 gpr_mu_unlock(&fd->mu);
1026}
1027
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001028static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001029 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1030 gpr_mu_lock(&fd->mu);
1031 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1032 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001033}
1034
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001035/* Release the reference to pollset->polling_island and set it to NULL.
1036 pollset->mu must be held */
1037static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
1038 gpr_mu_lock(&pollset->pi_mu);
1039 if (pollset->polling_island) {
1040 pollset->polling_island =
1041 polling_island_update_and_lock(pollset->polling_island, 1, 0);
1042 polling_island_unref_and_unlock(pollset->polling_island, 1);
1043 pollset->polling_island = NULL;
1044 }
1045 gpr_mu_unlock(&pollset->pi_mu);
1046}
1047
1048static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1049 grpc_pollset *pollset) {
1050 /* The pollset cannot have any workers if we are at this stage */
1051 GPR_ASSERT(!pollset_has_workers(pollset));
1052
1053 pollset->finish_shutdown_called = true;
1054 pollset_release_polling_island_locked(pollset);
1055
1056 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
1057}
1058
1059/* pollset->mu lock must be held by the caller before calling this */
1060static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1061 grpc_closure *closure) {
1062 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1063 GPR_ASSERT(!pollset->shutting_down);
1064 pollset->shutting_down = true;
1065 pollset->shutdown_done = closure;
1066 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1067
1068 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1069 because it would release the underlying polling island. In such a case, we
1070 let the last worker call finish_shutdown_locked() from pollset_work() */
1071 if (!pollset_has_workers(pollset)) {
1072 GPR_ASSERT(!pollset->finish_shutdown_called);
1073 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1074 finish_shutdown_locked(exec_ctx, pollset);
1075 }
1076 GPR_TIMER_END("pollset_shutdown", 0);
1077}
1078
1079/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1080 * than destroying the mutexes, there is nothing special that needs to be done
1081 * here */
1082static void pollset_destroy(grpc_pollset *pollset) {
1083 GPR_ASSERT(!pollset_has_workers(pollset));
1084 gpr_mu_destroy(&pollset->pi_mu);
1085 gpr_mu_destroy(&pollset->mu);
1086}
1087
1088static void pollset_reset(grpc_pollset *pollset) {
1089 GPR_ASSERT(pollset->shutting_down);
1090 GPR_ASSERT(!pollset_has_workers(pollset));
1091 pollset->shutting_down = false;
1092 pollset->finish_shutdown_called = false;
1093 pollset->kicked_without_pollers = false;
1094 pollset->shutdown_done = NULL;
1095 pollset_release_polling_island_locked(pollset);
1096}
1097
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001098#define GRPC_EPOLL_MAX_EVENTS 1000
1099static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1100 grpc_pollset *pollset, int timeout_ms,
1101 sigset_t *sig_mask) {
1102 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001103 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001104 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001105 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001106 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1107
1108 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
1109 polling island pointed by pollset->polling_island.
1110 Acquire the following locks:
1111 - pollset->mu (which we already have)
1112 - pollset->pi_mu
1113 - pollset->polling_island->mu */
1114 gpr_mu_lock(&pollset->pi_mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001115
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001116 pi = pollset->polling_island;
1117 if (pi == NULL) {
1118 pi = polling_island_create(NULL, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001119 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001120
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001121 /* In addition to locking the polling island, add a ref so that the island
1122 does not get destroyed (which means the epoll_fd won't be closed) while
1123 we are are doing an epoll_wait() on the epoll_fd */
1124 pi = polling_island_update_and_lock(pi, 1, 1);
1125 epoll_fd = pi->epoll_fd;
1126
1127 /* Update the pollset->polling_island */
1128 pollset->polling_island = pi;
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001129
1130#ifdef GRPC_EPOLL_DEBUG
1131 if (pollset->polling_island->fd_cnt == 0) {
1132 gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_fd: %d, No other fds",
1133 epoll_fd);
1134 }
1135 for (size_t i = 0; i < pollset->polling_island->fd_cnt; i++) {
1136 gpr_log(GPR_DEBUG,
1137 "pollset_work_and_unlock: epoll_fd: %d, fd_count: %d, fd[%d]: %d",
1138 epoll_fd, pollset->polling_island->fd_cnt, i,
1139 pollset->polling_island->fds[i]->fd);
1140 }
1141#endif
1142 gpr_mu_unlock(&pollset->polling_island->mu);
1143
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001144 gpr_mu_unlock(&pollset->pi_mu);
1145 gpr_mu_unlock(&pollset->mu);
1146
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001147 do {
1148 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1149 sig_mask);
1150 if (ep_rv < 0) {
1151 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001152 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
1153 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001154 /* We were interrupted. Save an interation by doing a zero timeout
1155 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001156 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001157 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001158 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001159
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001160 int i;
1161 for (i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001162 void *data_ptr = ep_ev[i].data.ptr;
1163 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001164 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001165 } else if (data_ptr == &polling_island_wakeup_fd) {
1166 /* This means that our polling island is merged with a different
1167 island. We do not have to do anything here since the subsequent call
1168 to the function pollset_work_and_unlock() will pick up the correct
1169 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001170 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001171 grpc_fd *fd = data_ptr;
1172 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1173 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1174 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001175 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001176 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001177 }
1178 if (write_ev || cancel) {
1179 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001180 }
1181 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001182 }
1183 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001184
1185 GPR_ASSERT(pi != NULL);
1186
1187 /* Before leaving, release the extra ref we added to the polling island */
1188 /* It is important to note that at this point 'pi' may not be the same as
1189 * pollset->polling_island. This is because pollset->polling_island pointer
1190 * gets updated whenever the underlying polling island is merged with another
1191 * island and while we are doing epoll_wait() above, the polling island may
1192 * have been merged */
1193
1194 /* TODO (sreek) - Change the ref count on polling island to gpr_atm so that
1195 * we do not have to do this here */
1196 gpr_mu_lock(&pi->mu);
1197 polling_island_unref_and_unlock(pi, 1);
1198
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001199 GPR_TIMER_END("pollset_work_and_unlock", 0);
1200}
1201
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001202/* pollset->mu lock must be held by the caller before calling this.
1203 The function pollset_work() may temporarily release the lock (pollset->mu)
1204 during the course of its execution but it will always re-acquire the lock and
1205 ensure that it is held by the time the function returns */
1206static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1207 grpc_pollset_worker **worker_hdl, gpr_timespec now,
1208 gpr_timespec deadline) {
1209 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001210 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1211
1212 sigset_t new_mask;
1213 sigset_t orig_mask;
1214
1215 grpc_pollset_worker worker;
1216 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001217 worker.pt_id = pthread_self();
1218
1219 *worker_hdl = &worker;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001220 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1221 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001222
1223 if (pollset->kicked_without_pollers) {
1224 /* If the pollset was kicked without pollers, pretend that the current
1225 worker got the kick and skip polling. A kick indicates that there is some
1226 work that needs attention like an event on the completion queue or an
1227 alarm */
1228 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1229 pollset->kicked_without_pollers = 0;
1230 } else if (!pollset->shutting_down) {
1231 sigemptyset(&new_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001232 sigaddset(&new_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001233 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001234 sigdelset(&orig_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001235
1236 push_front_worker(pollset, &worker);
1237
1238 pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
1239 grpc_exec_ctx_flush(exec_ctx);
1240
1241 gpr_mu_lock(&pollset->mu);
1242 remove_worker(pollset, &worker);
1243 }
1244
1245 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1246 false at this point) and the pollset is shutting down, we may have to
1247 finish the shutdown process by calling finish_shutdown_locked().
1248 See pollset_shutdown() for more details.
1249
1250 Note: Continuing to access pollset here is safe; it is the caller's
1251 responsibility to not destroy a pollset when it has outstanding calls to
1252 pollset_work() */
1253 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1254 !pollset->finish_shutdown_called) {
1255 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1256 finish_shutdown_locked(exec_ctx, pollset);
1257
1258 gpr_mu_unlock(&pollset->mu);
1259 grpc_exec_ctx_flush(exec_ctx);
1260 gpr_mu_lock(&pollset->mu);
1261 }
1262
1263 *worker_hdl = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001264 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1265 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001266 GPR_TIMER_END("pollset_work", 0);
1267}
1268
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001269static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1270 grpc_fd *fd) {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001271 /* TODO sreek - Double check if we need to get a pollset->mu lock here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001272 gpr_mu_lock(&pollset->pi_mu);
1273 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001274
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001275 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001276
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001277 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1278 * equal, do nothing.
1279 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1280 * a new polling island (with a refcount of 2) and make the polling_island
1281 * fields in both fd and pollset to point to the new island
1282 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1283 * the NULL polling_island field to point to the non-NULL polling_island
1284 * field (ensure that the refcount on the polling island is incremented by
1285 * 1 to account for the newly added reference)
1286 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1287 * and different, merge both the polling islands and update the
1288 * polling_island fields in both fd and pollset to point to the merged
1289 * polling island.
1290 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001291 if (fd->polling_island == pollset->polling_island) {
1292 pi_new = fd->polling_island;
1293 if (pi_new == NULL) {
1294 pi_new = polling_island_create(fd, 2);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001295 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001296 } else if (fd->polling_island == NULL) {
1297 pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001298 polling_island_add_fds_locked(pollset->polling_island, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001299 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001300 } else if (pollset->polling_island == NULL) {
1301 pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001302 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001303 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001304 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001305 }
1306
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001307 fd->polling_island = pollset->polling_island = pi_new;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001308
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001309 gpr_mu_unlock(&fd->pi_mu);
1310 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001311}
1312
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001313/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001314 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001315 */
1316
1317static grpc_pollset_set *pollset_set_create(void) {
1318 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1319 memset(pollset_set, 0, sizeof(*pollset_set));
1320 gpr_mu_init(&pollset_set->mu);
1321 return pollset_set;
1322}
1323
1324static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1325 size_t i;
1326 gpr_mu_destroy(&pollset_set->mu);
1327 for (i = 0; i < pollset_set->fd_count; i++) {
1328 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1329 }
1330 gpr_free(pollset_set->pollsets);
1331 gpr_free(pollset_set->pollset_sets);
1332 gpr_free(pollset_set->fds);
1333 gpr_free(pollset_set);
1334}
1335
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001336static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1337 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1338 size_t i;
1339 gpr_mu_lock(&pollset_set->mu);
1340 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1341 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1342 pollset_set->fds = gpr_realloc(
1343 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1344 }
1345 GRPC_FD_REF(fd, "pollset_set");
1346 pollset_set->fds[pollset_set->fd_count++] = fd;
1347 for (i = 0; i < pollset_set->pollset_count; i++) {
1348 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1349 }
1350 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1351 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1352 }
1353 gpr_mu_unlock(&pollset_set->mu);
1354}
1355
1356static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1357 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1358 size_t i;
1359 gpr_mu_lock(&pollset_set->mu);
1360 for (i = 0; i < pollset_set->fd_count; i++) {
1361 if (pollset_set->fds[i] == fd) {
1362 pollset_set->fd_count--;
1363 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1364 pollset_set->fds[pollset_set->fd_count]);
1365 GRPC_FD_UNREF(fd, "pollset_set");
1366 break;
1367 }
1368 }
1369 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1370 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1371 }
1372 gpr_mu_unlock(&pollset_set->mu);
1373}
1374
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001375static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1376 grpc_pollset_set *pollset_set,
1377 grpc_pollset *pollset) {
1378 size_t i, j;
1379 gpr_mu_lock(&pollset_set->mu);
1380 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1381 pollset_set->pollset_capacity =
1382 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1383 pollset_set->pollsets =
1384 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1385 sizeof(*pollset_set->pollsets));
1386 }
1387 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1388 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1389 if (fd_is_orphaned(pollset_set->fds[i])) {
1390 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1391 } else {
1392 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1393 pollset_set->fds[j++] = pollset_set->fds[i];
1394 }
1395 }
1396 pollset_set->fd_count = j;
1397 gpr_mu_unlock(&pollset_set->mu);
1398}
1399
1400static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1401 grpc_pollset_set *pollset_set,
1402 grpc_pollset *pollset) {
1403 size_t i;
1404 gpr_mu_lock(&pollset_set->mu);
1405 for (i = 0; i < pollset_set->pollset_count; i++) {
1406 if (pollset_set->pollsets[i] == pollset) {
1407 pollset_set->pollset_count--;
1408 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1409 pollset_set->pollsets[pollset_set->pollset_count]);
1410 break;
1411 }
1412 }
1413 gpr_mu_unlock(&pollset_set->mu);
1414}
1415
1416static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1417 grpc_pollset_set *bag,
1418 grpc_pollset_set *item) {
1419 size_t i, j;
1420 gpr_mu_lock(&bag->mu);
1421 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1422 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1423 bag->pollset_sets =
1424 gpr_realloc(bag->pollset_sets,
1425 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1426 }
1427 bag->pollset_sets[bag->pollset_set_count++] = item;
1428 for (i = 0, j = 0; i < bag->fd_count; i++) {
1429 if (fd_is_orphaned(bag->fds[i])) {
1430 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1431 } else {
1432 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1433 bag->fds[j++] = bag->fds[i];
1434 }
1435 }
1436 bag->fd_count = j;
1437 gpr_mu_unlock(&bag->mu);
1438}
1439
1440static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1441 grpc_pollset_set *bag,
1442 grpc_pollset_set *item) {
1443 size_t i;
1444 gpr_mu_lock(&bag->mu);
1445 for (i = 0; i < bag->pollset_set_count; i++) {
1446 if (bag->pollset_sets[i] == item) {
1447 bag->pollset_set_count--;
1448 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1449 bag->pollset_sets[bag->pollset_set_count]);
1450 break;
1451 }
1452 }
1453 gpr_mu_unlock(&bag->mu);
1454}
1455
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001456/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001457 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001458 */
1459
1460static void shutdown_engine(void) {
1461 fd_global_shutdown();
1462 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001463 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001464}
1465
1466static const grpc_event_engine_vtable vtable = {
1467 .pollset_size = sizeof(grpc_pollset),
1468
1469 .fd_create = fd_create,
1470 .fd_wrapped_fd = fd_wrapped_fd,
1471 .fd_orphan = fd_orphan,
1472 .fd_shutdown = fd_shutdown,
1473 .fd_notify_on_read = fd_notify_on_read,
1474 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001475 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001476
1477 .pollset_init = pollset_init,
1478 .pollset_shutdown = pollset_shutdown,
1479 .pollset_reset = pollset_reset,
1480 .pollset_destroy = pollset_destroy,
1481 .pollset_work = pollset_work,
1482 .pollset_kick = pollset_kick,
1483 .pollset_add_fd = pollset_add_fd,
1484
1485 .pollset_set_create = pollset_set_create,
1486 .pollset_set_destroy = pollset_set_destroy,
1487 .pollset_set_add_pollset = pollset_set_add_pollset,
1488 .pollset_set_del_pollset = pollset_set_del_pollset,
1489 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1490 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1491 .pollset_set_add_fd = pollset_set_add_fd,
1492 .pollset_set_del_fd = pollset_set_del_fd,
1493
1494 .kick_poller = kick_poller,
1495
1496 .shutdown_engine = shutdown_engine,
1497};
1498
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001499/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1500 * Create a dummy epoll_fd to make sure epoll support is available */
1501static bool is_epoll_available() {
1502 int fd = epoll_create1(EPOLL_CLOEXEC);
1503 if (fd < 0) {
1504 gpr_log(
1505 GPR_ERROR,
1506 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1507 fd);
1508 return false;
1509 }
1510 close(fd);
1511 return true;
1512}
1513
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001514const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001515 /* If use of signals is disabled, we cannot use epoll engine*/
1516 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1517 return NULL;
1518 }
1519
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001520 if (!is_epoll_available()) {
1521 return NULL;
1522 }
1523
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001524 if (!is_grpc_wakeup_signal_initialized) {
1525 grpc_use_signal(SIGRTMIN + 2);
1526 }
1527
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001528 fd_global_init();
1529 pollset_global_init();
1530 polling_island_global_init();
1531 return &vtable;
1532}
1533
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001534#else /* defined(GPR_LINUX_EPOLL) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001535/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1536 * NULL */
1537const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
1538
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001539void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001540#endif /* !defined(GPR_LINUX_EPOLL) */