blob: 006c2a8ee7fb03991e14a6d4dad0cbe5ad587956 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070037#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <signal.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
51#include <grpc/support/log.h>
52#include <grpc/support/string_util.h>
53#include <grpc/support/tls.h>
54#include <grpc/support/useful.h>
55
56#include "src/core/lib/iomgr/ev_posix.h"
57#include "src/core/lib/iomgr/iomgr_internal.h"
58#include "src/core/lib/iomgr/wakeup_fd_posix.h"
59#include "src/core/lib/profiling/timers.h"
60#include "src/core/lib/support/block_annotate.h"
61
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070062static int grpc_wakeup_signal = -1;
63static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070064
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070065/* Implements the function defined in grpc_posix.h. This function might be
66 * called before even calling grpc_init() to set either a different signal to
67 * use. If signum == -1, then the use of signals is disabled */
68void grpc_use_signal(int signum) {
69 grpc_wakeup_signal = signum;
70 is_grpc_wakeup_signal_initialized = true;
71
72 if (grpc_wakeup_signal < 0) {
73 gpr_log(GPR_INFO,
74 "Use of signals is disabled. Epoll engine will not be used");
75 } else {
76 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
77 grpc_wakeup_signal);
78 }
79}
80
81struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070082
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070083/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070084 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070086struct grpc_fd {
87 int fd;
88 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070089 bit 0 : 1=Active / 0=Orphaned
90 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070091 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 gpr_atm refst;
93
94 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070095
96 /* Indicates that the fd is shutdown and that any pending read/write closures
97 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070098 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070099
100 /* The fd is either closed or we relinquished control of it. In either cases,
101 this indicates that the 'fd' on this structure is no longer valid */
102 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700103
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700104 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700105 grpc_closure *read_closure;
106 grpc_closure *write_closure;
107
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 /* The polling island to which this fd belongs to and the mutex protecting the
109 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700110 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 struct polling_island *polling_island;
112
113 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *on_done_closure;
115
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700116 /* The pollset that last noticed that the fd is readable */
117 grpc_pollset *read_notifier_pollset;
118
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 grpc_iomgr_object iomgr_object;
120};
121
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123#ifdef GRPC_FD_REF_COUNT_DEBUG
124static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
125static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
126 int line);
127#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
128#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
129#else
130static void fd_ref(grpc_fd *fd);
131static void fd_unref(grpc_fd *fd);
132#define GRPC_FD_REF(fd, reason) fd_ref(fd)
133#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
134#endif
135
136static void fd_global_init(void);
137static void fd_global_shutdown(void);
138
139#define CLOSURE_NOT_READY ((grpc_closure *)0)
140#define CLOSURE_READY ((grpc_closure *)1)
141
142/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700143 * Polling-island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700144 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700145/* TODO: sree: Consider making ref_cnt and merged_to to gpr_atm - This would
146 * significantly reduce the number of mutex acquisition calls. */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700147typedef struct polling_island {
148 gpr_mu mu;
149 int ref_cnt;
150
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700151 /* Points to the polling_island this merged into.
152 * If merged_to is not NULL, all the remaining fields (except mu and ref_cnt)
153 * are invalid and must be ignored */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700154 struct polling_island *merged_to;
155
156 /* The fd of the underlying epoll set */
157 int epoll_fd;
158
159 /* The file descriptors in the epoll set */
160 size_t fd_cnt;
161 size_t fd_capacity;
162 grpc_fd **fds;
163
164 /* Polling islands that are no longer needed are kept in a freelist so that
165 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700166 free list */
167 struct polling_island *next_free;
168} polling_island;
169
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700170/*******************************************************************************
171 * Pollset Declarations
172 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700173struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700174 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700175 struct grpc_pollset_worker *next;
176 struct grpc_pollset_worker *prev;
177};
178
179struct grpc_pollset {
180 gpr_mu mu;
181 grpc_pollset_worker root_worker;
182 bool kicked_without_pollers;
183
184 bool shutting_down; /* Is the pollset shutting down ? */
185 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
186 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
187
188 /* The polling island to which this pollset belongs to and the mutex
189 protecting the field */
Sree Kuchibhotlae682e462016-06-08 15:40:21 -0700190 /* TODO: sreek: This lock might actually be adding more overhead to the
191 critical path (i.e pollset_work() function). Consider removing this lock
192 and just using the overall pollset lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700193 gpr_mu pi_mu;
194 struct polling_island *polling_island;
195};
196
197/*******************************************************************************
198 * Pollset-set Declarations
199 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700200/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
201 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
202 * the current pollset_set would result in polling island merges. This would
203 * remove the need to maintain fd_count here. This will also significantly
204 * simplify the grpc_fd structure since we would no longer need to explicitly
205 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700206struct grpc_pollset_set {
207 gpr_mu mu;
208
209 size_t pollset_count;
210 size_t pollset_capacity;
211 grpc_pollset **pollsets;
212
213 size_t pollset_set_count;
214 size_t pollset_set_capacity;
215 struct grpc_pollset_set **pollset_sets;
216
217 size_t fd_count;
218 size_t fd_capacity;
219 grpc_fd **fds;
220};
221
222/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700223 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700224 */
225
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700226/* The wakeup fd that is used to wake up all threads in a Polling island. This
227 is useful in the polling island merge operation where we need to wakeup all
228 the threads currently polling the smaller polling island (so that they can
229 start polling the new/merged polling island)
230
231 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
232 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
233static grpc_wakeup_fd polling_island_wakeup_fd;
234
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700235/* Polling island freelist */
236static gpr_mu g_pi_freelist_mu;
237static polling_island *g_pi_freelist = NULL;
238
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700239#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700240/* Currently TSAN may incorrectly flag data races between epoll_ctl and
241 epoll_wait for any grpc_fd structs that are added to the epoll set via
242 epoll_ctl and are returned (within a very short window) via epoll_wait().
243
244 To work-around this race, we establish a happens-before relation between
245 the code just-before epoll_ctl() and the code after epoll_wait() by using
246 this atomic */
247gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700248#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700249
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700250/* The caller is expected to hold pi->mu lock before calling this function */
251static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700252 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700253 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700254 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700255 struct epoll_event ev;
256
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700257#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700258 /* See the definition of g_epoll_sync for more context */
259 gpr_atm_rel_store(&g_epoll_sync, 0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700260#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700261
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700262 for (i = 0; i < fd_count; i++) {
263 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
264 ev.data.ptr = fds[i];
265 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700266
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700267 if (err < 0) {
268 if (errno != EEXIST) {
269 /* TODO: sreek - We need a better way to bubble up this error instead of
270 just logging a message */
271 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
272 fds[i]->fd, strerror(errno));
273 }
274
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700275 continue;
276 }
277
278 if (pi->fd_cnt == pi->fd_capacity) {
279 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
280 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
281 }
282
283 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700284 if (add_fd_refs) {
285 GRPC_FD_REF(fds[i], "polling_island");
286 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700287 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700288}
289
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700290/* The caller is expected to hold pi->mu before calling this */
291static void polling_island_add_wakeup_fd_locked(polling_island *pi,
292 grpc_wakeup_fd *wakeup_fd) {
293 struct epoll_event ev;
294 int err;
295
296 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
297 ev.data.ptr = wakeup_fd;
298 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
299 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
300 if (err < 0) {
301 gpr_log(GPR_ERROR,
302 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
303 ". Error: %s",
304 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
305 strerror(errno));
306 }
307}
308
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700309/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700310static void polling_island_remove_all_fds_locked(polling_island *pi,
311 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700312 int err;
313 size_t i;
314
315 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700316 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700317 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700318 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700319 * just logging a message */
320 gpr_log(GPR_ERROR, "epoll_ctl deleting fds[%d]: %d failed with error: %s",
321 i, pi->fds[i]->fd, strerror(errno));
322 }
323
324 if (remove_fd_refs) {
325 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700326 }
327 }
328
329 pi->fd_cnt = 0;
330}
331
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700332/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700333static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700334 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700335 int err;
336 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700337
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700338 /* If fd is already closed, then it would have been automatically been removed
339 from the epoll set */
340 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700341 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
342 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700343 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700344 fd->fd, strerror(errno));
345 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700346 }
347
348 for (i = 0; i < pi->fd_cnt; i++) {
349 if (pi->fds[i] == fd) {
350 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700351 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700352 break;
353 }
354 }
355}
356
357static polling_island *polling_island_create(grpc_fd *initial_fd,
358 int initial_ref_cnt) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700359 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700360
361 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700362 gpr_mu_lock(&g_pi_freelist_mu);
363 if (g_pi_freelist != NULL) {
364 pi = g_pi_freelist;
365 g_pi_freelist = g_pi_freelist->next_free;
366 pi->next_free = NULL;
367 }
368 gpr_mu_unlock(&g_pi_freelist_mu);
369
370 /* Create new polling island if we could not get one from the free list */
371 if (pi == NULL) {
372 pi = gpr_malloc(sizeof(*pi));
373 gpr_mu_init(&pi->mu);
374 pi->fd_cnt = 0;
375 pi->fd_capacity = 0;
376 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700377 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700378
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700379 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700380
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700381 if (pi->epoll_fd < 0) {
382 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
383 strerror(errno));
384 }
385 GPR_ASSERT(pi->epoll_fd >= 0);
386
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700387 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700388
389 pi->ref_cnt = initial_ref_cnt;
390 pi->merged_to = NULL;
391 pi->next_free = NULL;
392
393 if (initial_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700394 /* It is not really needed to get the pi->mu lock here. If this is a newly
395 created polling island (or one that we got from the freelist), no one
396 else would be holding a lock to it anyway */
397 gpr_mu_lock(&pi->mu);
398 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
399 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700400 }
401
402 return pi;
403}
404
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700405static void polling_island_delete(polling_island *pi) {
406 GPR_ASSERT(pi->ref_cnt == 0);
407 GPR_ASSERT(pi->fd_cnt == 0);
408
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700409 close(pi->epoll_fd);
410 pi->epoll_fd = -1;
411
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700412 pi->merged_to = NULL;
413
414 gpr_mu_lock(&g_pi_freelist_mu);
415 pi->next_free = g_pi_freelist;
416 g_pi_freelist = pi;
417 gpr_mu_unlock(&g_pi_freelist_mu);
418}
419
420void polling_island_unref_and_unlock(polling_island *pi, int unref_by) {
421 pi->ref_cnt -= unref_by;
422 int ref_cnt = pi->ref_cnt;
423 GPR_ASSERT(ref_cnt >= 0);
424
425 gpr_mu_unlock(&pi->mu);
426
427 if (ref_cnt == 0) {
428 polling_island_delete(pi);
429 }
430}
431
432polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
433 int add_ref_by) {
434 polling_island *next = NULL;
435 gpr_mu_lock(&pi->mu);
436 while (pi->merged_to != NULL) {
437 next = pi->merged_to;
438 polling_island_unref_and_unlock(pi, unref_by);
439 pi = next;
440 gpr_mu_lock(&pi->mu);
441 }
442
443 pi->ref_cnt += add_ref_by;
444 return pi;
445}
446
447void polling_island_pair_update_and_lock(polling_island **p,
448 polling_island **q) {
449 polling_island *pi_1 = *p;
450 polling_island *pi_2 = *q;
451 polling_island *temp = NULL;
452 bool pi_1_locked = false;
453 bool pi_2_locked = false;
454 int num_swaps = 0;
455
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700456 /* Loop until either pi_1 == pi_2 or until we acquired locks on both pi_1
457 and pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700458 while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700459 /* The following assertions are true at this point:
460 - pi_1 != pi_2 (else, the while loop would have exited)
461 - pi_1 MAY be locked
462 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700463
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700464 /* To maintain lock order consistency, always lock polling_island node with
465 lower address first.
466 First, make sure pi_1 < pi_2 before proceeding any further. If it turns
467 out that pi_1 > pi_2, unlock pi_1 if locked (because pi_2 is not locked
468 at this point and having pi_1 locked would violate the lock order) and
469 swap pi_1 and pi_2 so that pi_1 becomes less than pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700470 if (pi_1 > pi_2) {
471 if (pi_1_locked) {
472 gpr_mu_unlock(&pi_1->mu);
473 pi_1_locked = false;
474 }
475
476 GPR_SWAP(polling_island *, pi_1, pi_2);
477 num_swaps++;
478 }
479
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700480 /* The following assertions are true at this point:
481 - pi_1 != pi_2
482 - pi_1 < pi_2 (address of pi_1 is less than that of pi_2)
483 - pi_1 MAYBE locked
484 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700485
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700486 /* Lock pi_1 (if pi_1 is pointing to the terminal node in the list) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700487 if (!pi_1_locked) {
488 gpr_mu_lock(&pi_1->mu);
489 pi_1_locked = true;
490
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700491 /* If pi_1 is not terminal node (i.e pi_1->merged_to != NULL), we are not
492 done locking this polling_island yet. Release the lock on this node and
493 advance pi_1 to the next node in the list; and go to the beginning of
494 the loop (we can't proceed to locking pi_2 unless we locked pi_1 first)
495 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700496 if (pi_1->merged_to != NULL) {
497 temp = pi_1->merged_to;
498 polling_island_unref_and_unlock(pi_1, 1);
499 pi_1 = temp;
500 pi_1_locked = false;
501
502 continue;
503 }
504 }
505
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700506 /* The following assertions are true at this point:
507 - pi_1 is locked
508 - pi_2 is unlocked
509 - pi_1 != pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700510
511 gpr_mu_lock(&pi_2->mu);
512 pi_2_locked = true;
513
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700514 /* If pi_2 is not terminal node, we are not done locking this polling_island
515 yet. Release the lock and update pi_2 to the next node in the list */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700516 if (pi_2->merged_to != NULL) {
517 temp = pi_2->merged_to;
518 polling_island_unref_and_unlock(pi_2, 1);
519 pi_2 = temp;
520 pi_2_locked = false;
521 }
522 }
523
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700524 /* At this point, either pi_1 == pi_2 AND/OR we got both locks */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700525 if (pi_1 == pi_2) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700526 /* We may or may not have gotten the lock. If we didn't, walk the rest of
527 the polling_island list and get the lock */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700528 GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
529 if (!pi_1_locked) {
530 pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
531 }
532 } else {
533 GPR_ASSERT(pi_1_locked && pi_2_locked);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700534 /* If we swapped pi_1 and pi_2 odd number of times, do one more swap so that
535 pi_1 and pi_2 point to the same polling_island lists they started off
536 with at the beginning of this function (i.e *p and *q respectively) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700537 if (num_swaps % 2 > 0) {
538 GPR_SWAP(polling_island *, pi_1, pi_2);
539 }
540 }
541
542 *p = pi_1;
543 *q = pi_2;
544}
545
546polling_island *polling_island_merge(polling_island *p, polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700547 /* Get locks on both the polling islands */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700548 polling_island_pair_update_and_lock(&p, &q);
549
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700550 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700551 /* Nothing needs to be done here */
552 gpr_mu_unlock(&p->mu);
553 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700554 }
555
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700556 /* Make sure that p points to the polling island with fewer fds than q */
557 if (p->fd_cnt > q->fd_cnt) {
558 GPR_SWAP(polling_island *, p, q);
559 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700560
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700561 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700562 Note that the refcounts on the fds being moved will not change here. This
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700563 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700564 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
565 polling_island_remove_all_fds_locked(p, false);
566
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700567 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
568 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
569
Sree Kuchibhotla58e58962016-06-13 00:52:56 -0700570 p->merged_to = q;
571
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700572 /* - The merged polling island (i.e q) inherits all the ref counts of the
573 island merging with it (i.e p)
574 - The island p will lose a ref count */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700575 q->ref_cnt += p->ref_cnt;
Sree Kuchibhotla58e58962016-06-13 00:52:56 -0700576 polling_island_unref_and_unlock(p, 1); /* Decrement refcount */
577 polling_island_unref_and_unlock(q, 0); /* Just Unlock. Don't decrement ref */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700578
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700579 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700580}
581
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700582static void polling_island_global_init() {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700583 gpr_mu_init(&g_pi_freelist_mu);
584 g_pi_freelist = NULL;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700585 grpc_wakeup_fd_init(&polling_island_wakeup_fd);
586 grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700587}
588
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700589static void polling_island_global_shutdown() {
590 polling_island *next;
591 gpr_mu_lock(&g_pi_freelist_mu);
592 gpr_mu_unlock(&g_pi_freelist_mu);
593 while (g_pi_freelist != NULL) {
594 next = g_pi_freelist->next_free;
595 gpr_mu_destroy(&g_pi_freelist->mu);
596 gpr_free(g_pi_freelist->fds);
597 gpr_free(g_pi_freelist);
598 g_pi_freelist = next;
599 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700600 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700601
602 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700603}
604
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700605/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700606 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700607 */
608
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700609/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700610 * but instead so that implementations with multiple threads in (for example)
611 * epoll_wait deal with the race between pollset removal and incoming poll
612 * notifications.
613 *
614 * The problem is that the poller ultimately holds a reference to this
615 * object, so it is very difficult to know when is safe to free it, at least
616 * without some expensive synchronization.
617 *
618 * If we keep the object freelisted, in the worst case losing this race just
619 * becomes a spurious read notification on a reused fd.
620 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700621
622/* The alarm system needs to be able to wakeup 'some poller' sometimes
623 * (specifically when a new alarm needs to be triggered earlier than the next
624 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
625 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700626
627/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
628 * sure to wake up one polling thread (which can wake up other threads if
629 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700630grpc_wakeup_fd grpc_global_wakeup_fd;
631
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700632static grpc_fd *fd_freelist = NULL;
633static gpr_mu fd_freelist_mu;
634
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700635#ifdef GRPC_FD_REF_COUNT_DEBUG
636#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
637#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
638static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
639 int line) {
640 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
641 gpr_atm_no_barrier_load(&fd->refst),
642 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
643#else
644#define REF_BY(fd, n, reason) ref_by(fd, n)
645#define UNREF_BY(fd, n, reason) unref_by(fd, n)
646static void ref_by(grpc_fd *fd, int n) {
647#endif
648 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
649}
650
651#ifdef GRPC_FD_REF_COUNT_DEBUG
652static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
653 int line) {
654 gpr_atm old;
655 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
656 gpr_atm_no_barrier_load(&fd->refst),
657 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
658#else
659static void unref_by(grpc_fd *fd, int n) {
660 gpr_atm old;
661#endif
662 old = gpr_atm_full_fetch_add(&fd->refst, -n);
663 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700664 /* Add the fd to the freelist */
665 gpr_mu_lock(&fd_freelist_mu);
666 fd->freelist_next = fd_freelist;
667 fd_freelist = fd;
668 grpc_iomgr_unregister_object(&fd->iomgr_object);
669 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700670 } else {
671 GPR_ASSERT(old > n);
672 }
673}
674
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700675/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700676#ifdef GRPC_FD_REF_COUNT_DEBUG
677static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
678 int line) {
679 ref_by(fd, 2, reason, file, line);
680}
681
682static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
683 int line) {
684 unref_by(fd, 2, reason, file, line);
685}
686#else
687static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700688static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
689#endif
690
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700691static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
692
693static void fd_global_shutdown(void) {
694 gpr_mu_lock(&fd_freelist_mu);
695 gpr_mu_unlock(&fd_freelist_mu);
696 while (fd_freelist != NULL) {
697 grpc_fd *fd = fd_freelist;
698 fd_freelist = fd_freelist->freelist_next;
699 gpr_mu_destroy(&fd->mu);
700 gpr_free(fd);
701 }
702 gpr_mu_destroy(&fd_freelist_mu);
703}
704
705static grpc_fd *fd_create(int fd, const char *name) {
706 grpc_fd *new_fd = NULL;
707
708 gpr_mu_lock(&fd_freelist_mu);
709 if (fd_freelist != NULL) {
710 new_fd = fd_freelist;
711 fd_freelist = fd_freelist->freelist_next;
712 }
713 gpr_mu_unlock(&fd_freelist_mu);
714
715 if (new_fd == NULL) {
716 new_fd = gpr_malloc(sizeof(grpc_fd));
717 gpr_mu_init(&new_fd->mu);
718 gpr_mu_init(&new_fd->pi_mu);
719 }
720
721 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
722 newly created fd (or an fd we got from the freelist), no one else would be
723 holding a lock to it anyway. */
724 gpr_mu_lock(&new_fd->mu);
725
726 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700727 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700728 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700729 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700730 new_fd->read_closure = CLOSURE_NOT_READY;
731 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700732 new_fd->polling_island = NULL;
733 new_fd->freelist_next = NULL;
734 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700735 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700736
737 gpr_mu_unlock(&new_fd->mu);
738
739 char *fd_name;
740 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
741 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
742 gpr_free(fd_name);
743#ifdef GRPC_FD_REF_COUNT_DEBUG
744 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
745#endif
746 return new_fd;
747}
748
749static bool fd_is_orphaned(grpc_fd *fd) {
750 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
751}
752
753static int fd_wrapped_fd(grpc_fd *fd) {
754 int ret_fd = -1;
755 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700756 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700757 ret_fd = fd->fd;
758 }
759 gpr_mu_unlock(&fd->mu);
760
761 return ret_fd;
762}
763
764static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
765 grpc_closure *on_done, int *release_fd,
766 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700767 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700768 gpr_mu_lock(&fd->mu);
769 fd->on_done_closure = on_done;
770
771 /* If release_fd is not NULL, we should be relinquishing control of the file
772 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700773 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700774 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700775 } else {
776 close(fd->fd);
777 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700778 }
779
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700780 fd->orphaned = true;
781
782 /* Remove the active status but keep referenced. We want this grpc_fd struct
783 to be alive (and not added to freelist) until the end of this function */
784 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700785
786 /* Remove the fd from the polling island:
787 - Update the fd->polling_island to point to the latest polling island
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700788 - Remove the fd from the polling island.
789 - Remove a ref to the polling island and set fd->polling_island to NULL */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700790 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700791 if (fd->polling_island != NULL) {
792 fd->polling_island =
793 polling_island_update_and_lock(fd->polling_island, 1, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700794 polling_island_remove_fd_locked(fd->polling_island, fd, is_fd_closed);
795
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700796 polling_island_unref_and_unlock(fd->polling_island, 1);
797 fd->polling_island = NULL;
798 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700799 gpr_mu_unlock(&fd->pi_mu);
800
801 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
802
803 gpr_mu_unlock(&fd->mu);
804 UNREF_BY(fd, 2, reason); /* Drop the reference */
805}
806
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700807static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
808 grpc_closure **st, grpc_closure *closure) {
809 if (*st == CLOSURE_NOT_READY) {
810 /* not ready ==> switch to a waiting state by setting the closure */
811 *st = closure;
812 } else if (*st == CLOSURE_READY) {
813 /* already ready ==> queue the closure to run immediately */
814 *st = CLOSURE_NOT_READY;
815 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
816 } else {
817 /* upcallptr was set to a different closure. This is an error! */
818 gpr_log(GPR_ERROR,
819 "User called a notify_on function with a previous callback still "
820 "pending");
821 abort();
822 }
823}
824
825/* returns 1 if state becomes not ready */
826static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
827 grpc_closure **st) {
828 if (*st == CLOSURE_READY) {
829 /* duplicate ready ==> ignore */
830 return 0;
831 } else if (*st == CLOSURE_NOT_READY) {
832 /* not ready, and not waiting ==> flag ready */
833 *st = CLOSURE_READY;
834 return 0;
835 } else {
836 /* waiting ==> queue closure */
837 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
838 *st = CLOSURE_NOT_READY;
839 return 1;
840 }
841}
842
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700843static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
844 grpc_fd *fd) {
845 grpc_pollset *notifier = NULL;
846
847 gpr_mu_lock(&fd->mu);
848 notifier = fd->read_notifier_pollset;
849 gpr_mu_unlock(&fd->mu);
850
851 return notifier;
852}
853
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700854static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
855 gpr_mu_lock(&fd->mu);
856 GPR_ASSERT(!fd->shutdown);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700857 fd->shutdown = true;
858
859 /* Flush any pending read and write closures. Since fd->shutdown is 'true' at
860 this point, the closures would be called with 'success = false' */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700861 set_ready_locked(exec_ctx, fd, &fd->read_closure);
862 set_ready_locked(exec_ctx, fd, &fd->write_closure);
863 gpr_mu_unlock(&fd->mu);
864}
865
866static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
867 grpc_closure *closure) {
868 gpr_mu_lock(&fd->mu);
869 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
870 gpr_mu_unlock(&fd->mu);
871}
872
873static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
874 grpc_closure *closure) {
875 gpr_mu_lock(&fd->mu);
876 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
877 gpr_mu_unlock(&fd->mu);
878}
879
880/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700881 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700882 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700883GPR_TLS_DECL(g_current_thread_pollset);
884GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700885
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700886static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700887#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700888 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700889#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700890}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700891
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -0700892static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700893
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700894/* Global state management */
895static void pollset_global_init(void) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700896 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700897 gpr_tls_init(&g_current_thread_pollset);
898 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700899 poller_kick_init();
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700900}
901
902static void pollset_global_shutdown(void) {
903 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700904 gpr_tls_destroy(&g_current_thread_pollset);
905 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700906}
907
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700908static void pollset_worker_kick(grpc_pollset_worker *worker) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -0700909 pthread_kill(worker->pt_id, grpc_wakeup_signal);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700910}
911
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700912/* Return 1 if the pollset has active threads in pollset_work (pollset must
913 * be locked) */
914static int pollset_has_workers(grpc_pollset *p) {
915 return p->root_worker.next != &p->root_worker;
916}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700917
918static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
919 worker->prev->next = worker->next;
920 worker->next->prev = worker->prev;
921}
922
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700923static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
924 if (pollset_has_workers(p)) {
925 grpc_pollset_worker *w = p->root_worker.next;
926 remove_worker(p, w);
927 return w;
928 } else {
929 return NULL;
930 }
931}
932
933static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
934 worker->next = &p->root_worker;
935 worker->prev = worker->next->prev;
936 worker->prev->next = worker->next->prev = worker;
937}
938
939static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
940 worker->prev = &p->root_worker;
941 worker->next = worker->prev->next;
942 worker->prev->next = worker->next->prev = worker;
943}
944
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700945/* p->mu must be held before calling this function */
946static void pollset_kick(grpc_pollset *p,
947 grpc_pollset_worker *specific_worker) {
948 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700949
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700950 grpc_pollset_worker *worker = specific_worker;
951 if (worker != NULL) {
952 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700953 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700954 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700955 for (worker = p->root_worker.next; worker != &p->root_worker;
956 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700957 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
958 pollset_worker_kick(worker);
959 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700960 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700961 } else {
962 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700963 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700964 GPR_TIMER_END("pollset_kick.broadcast", 0);
965 } else {
966 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700967 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
968 pollset_worker_kick(worker);
969 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700970 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700971 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
972 /* Since worker == NULL, it means that we can kick "any" worker on this
973 pollset 'p'. If 'p' happens to be the same pollset this thread is
974 currently polling (i.e in pollset_work() function), then there is no need
975 to kick any other worker since the current thread can just absorb the
976 kick. This is the reason why we enter this case only when
977 g_current_thread_pollset is != p */
978
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700979 GPR_TIMER_MARK("kick_anonymous", 0);
980 worker = pop_front_worker(p);
981 if (worker != NULL) {
982 GPR_TIMER_MARK("finally_kick", 0);
983 push_back_worker(p, worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700984 pollset_worker_kick(worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700985 } else {
986 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700987 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700988 }
989 }
990
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700991 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700992}
993
994static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
995
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700996static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
997 gpr_mu_init(&pollset->mu);
998 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700999
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001000 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001001 pollset->kicked_without_pollers = false;
1002
1003 pollset->shutting_down = false;
1004 pollset->finish_shutdown_called = false;
1005 pollset->shutdown_done = NULL;
1006
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001007 gpr_mu_init(&pollset->pi_mu);
1008 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001009}
1010
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001011/* Convert a timespec to milliseconds:
1012 - Very small or negative poll times are clamped to zero to do a non-blocking
1013 poll (which becomes spin polling)
1014 - Other small values are rounded up to one millisecond
1015 - Longer than a millisecond polls are rounded up to the next nearest
1016 millisecond to avoid spinning
1017 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001018static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1019 gpr_timespec now) {
1020 gpr_timespec timeout;
1021 static const int64_t max_spin_polling_us = 10;
1022 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1023 return -1;
1024 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001025
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001026 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1027 max_spin_polling_us,
1028 GPR_TIMESPAN))) <= 0) {
1029 return 0;
1030 }
1031 timeout = gpr_time_sub(deadline, now);
1032 return gpr_time_to_millis(gpr_time_add(
1033 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1034}
1035
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001036static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1037 grpc_pollset *notifier) {
1038 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001039 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001040 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1041 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001042 gpr_mu_unlock(&fd->mu);
1043}
1044
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001045static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001046 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1047 gpr_mu_lock(&fd->mu);
1048 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1049 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001050}
1051
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001052/* Release the reference to pollset->polling_island and set it to NULL.
1053 pollset->mu must be held */
1054static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
1055 gpr_mu_lock(&pollset->pi_mu);
1056 if (pollset->polling_island) {
1057 pollset->polling_island =
1058 polling_island_update_and_lock(pollset->polling_island, 1, 0);
1059 polling_island_unref_and_unlock(pollset->polling_island, 1);
1060 pollset->polling_island = NULL;
1061 }
1062 gpr_mu_unlock(&pollset->pi_mu);
1063}
1064
1065static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1066 grpc_pollset *pollset) {
1067 /* The pollset cannot have any workers if we are at this stage */
1068 GPR_ASSERT(!pollset_has_workers(pollset));
1069
1070 pollset->finish_shutdown_called = true;
1071 pollset_release_polling_island_locked(pollset);
1072
1073 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
1074}
1075
1076/* pollset->mu lock must be held by the caller before calling this */
1077static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1078 grpc_closure *closure) {
1079 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1080 GPR_ASSERT(!pollset->shutting_down);
1081 pollset->shutting_down = true;
1082 pollset->shutdown_done = closure;
1083 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1084
1085 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1086 because it would release the underlying polling island. In such a case, we
1087 let the last worker call finish_shutdown_locked() from pollset_work() */
1088 if (!pollset_has_workers(pollset)) {
1089 GPR_ASSERT(!pollset->finish_shutdown_called);
1090 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1091 finish_shutdown_locked(exec_ctx, pollset);
1092 }
1093 GPR_TIMER_END("pollset_shutdown", 0);
1094}
1095
1096/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1097 * than destroying the mutexes, there is nothing special that needs to be done
1098 * here */
1099static void pollset_destroy(grpc_pollset *pollset) {
1100 GPR_ASSERT(!pollset_has_workers(pollset));
1101 gpr_mu_destroy(&pollset->pi_mu);
1102 gpr_mu_destroy(&pollset->mu);
1103}
1104
1105static void pollset_reset(grpc_pollset *pollset) {
1106 GPR_ASSERT(pollset->shutting_down);
1107 GPR_ASSERT(!pollset_has_workers(pollset));
1108 pollset->shutting_down = false;
1109 pollset->finish_shutdown_called = false;
1110 pollset->kicked_without_pollers = false;
1111 pollset->shutdown_done = NULL;
1112 pollset_release_polling_island_locked(pollset);
1113}
1114
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001115#define GRPC_EPOLL_MAX_EVENTS 1000
1116static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1117 grpc_pollset *pollset, int timeout_ms,
1118 sigset_t *sig_mask) {
1119 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001120 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001121 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001122 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001123 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1124
1125 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
1126 polling island pointed by pollset->polling_island.
1127 Acquire the following locks:
1128 - pollset->mu (which we already have)
1129 - pollset->pi_mu
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001130 - pollset->polling_island->mu (call polling_island_update_and_lock())*/
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001131 gpr_mu_lock(&pollset->pi_mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001132
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001133 pi = pollset->polling_island;
1134 if (pi == NULL) {
1135 pi = polling_island_create(NULL, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001136 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001137
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001138 /* In addition to locking the polling island, add a ref so that the island
1139 does not get destroyed (which means the epoll_fd won't be closed) while
1140 we are are doing an epoll_wait() on the epoll_fd */
1141 pi = polling_island_update_and_lock(pi, 1, 1);
1142 epoll_fd = pi->epoll_fd;
1143
1144 /* Update the pollset->polling_island */
1145 pollset->polling_island = pi;
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001146
Sree Kuchibhotlaeb16b3d2016-06-10 23:06:25 -07001147 polling_island_unref_and_unlock(pollset->polling_island, 0); /* Keep the ref*/
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001148 gpr_mu_unlock(&pollset->pi_mu);
1149 gpr_mu_unlock(&pollset->mu);
1150
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001151 do {
1152 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1153 sig_mask);
1154 if (ep_rv < 0) {
1155 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001156 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
1157 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001158 /* We were interrupted. Save an interation by doing a zero timeout
1159 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001160 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001161 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001162 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001163
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001164#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001165 /* See the definition of g_poll_sync for more details */
1166 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001167#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001168
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001169 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001170 void *data_ptr = ep_ev[i].data.ptr;
1171 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001172 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001173 } else if (data_ptr == &polling_island_wakeup_fd) {
1174 /* This means that our polling island is merged with a different
1175 island. We do not have to do anything here since the subsequent call
1176 to the function pollset_work_and_unlock() will pick up the correct
1177 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001178 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001179 grpc_fd *fd = data_ptr;
1180 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1181 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1182 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001183 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001184 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001185 }
1186 if (write_ev || cancel) {
1187 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001188 }
1189 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001190 }
1191 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001192
1193 GPR_ASSERT(pi != NULL);
1194
1195 /* Before leaving, release the extra ref we added to the polling island */
1196 /* It is important to note that at this point 'pi' may not be the same as
1197 * pollset->polling_island. This is because pollset->polling_island pointer
1198 * gets updated whenever the underlying polling island is merged with another
1199 * island and while we are doing epoll_wait() above, the polling island may
1200 * have been merged */
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001201 pi = polling_island_update_and_lock(pi, 1, 0); /* No new ref added */
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001202 polling_island_unref_and_unlock(pi, 1);
1203
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001204 GPR_TIMER_END("pollset_work_and_unlock", 0);
1205}
1206
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001207/* pollset->mu lock must be held by the caller before calling this.
1208 The function pollset_work() may temporarily release the lock (pollset->mu)
1209 during the course of its execution but it will always re-acquire the lock and
1210 ensure that it is held by the time the function returns */
1211static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1212 grpc_pollset_worker **worker_hdl, gpr_timespec now,
1213 gpr_timespec deadline) {
1214 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001215 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1216
1217 sigset_t new_mask;
1218 sigset_t orig_mask;
1219
1220 grpc_pollset_worker worker;
1221 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001222 worker.pt_id = pthread_self();
1223
1224 *worker_hdl = &worker;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001225 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1226 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001227
1228 if (pollset->kicked_without_pollers) {
1229 /* If the pollset was kicked without pollers, pretend that the current
1230 worker got the kick and skip polling. A kick indicates that there is some
1231 work that needs attention like an event on the completion queue or an
1232 alarm */
1233 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1234 pollset->kicked_without_pollers = 0;
1235 } else if (!pollset->shutting_down) {
1236 sigemptyset(&new_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001237 sigaddset(&new_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001238 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001239 sigdelset(&orig_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001240
1241 push_front_worker(pollset, &worker);
1242
1243 pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
1244 grpc_exec_ctx_flush(exec_ctx);
1245
1246 gpr_mu_lock(&pollset->mu);
1247 remove_worker(pollset, &worker);
1248 }
1249
1250 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1251 false at this point) and the pollset is shutting down, we may have to
1252 finish the shutdown process by calling finish_shutdown_locked().
1253 See pollset_shutdown() for more details.
1254
1255 Note: Continuing to access pollset here is safe; it is the caller's
1256 responsibility to not destroy a pollset when it has outstanding calls to
1257 pollset_work() */
1258 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1259 !pollset->finish_shutdown_called) {
1260 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1261 finish_shutdown_locked(exec_ctx, pollset);
1262
1263 gpr_mu_unlock(&pollset->mu);
1264 grpc_exec_ctx_flush(exec_ctx);
1265 gpr_mu_lock(&pollset->mu);
1266 }
1267
1268 *worker_hdl = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001269 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1270 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001271 GPR_TIMER_END("pollset_work", 0);
1272}
1273
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001274static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1275 grpc_fd *fd) {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001276 /* TODO sreek - Double check if we need to get a pollset->mu lock here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001277 gpr_mu_lock(&pollset->pi_mu);
1278 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001279
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001280 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001281
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001282 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1283 * equal, do nothing.
1284 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1285 * a new polling island (with a refcount of 2) and make the polling_island
1286 * fields in both fd and pollset to point to the new island
1287 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1288 * the NULL polling_island field to point to the non-NULL polling_island
1289 * field (ensure that the refcount on the polling island is incremented by
1290 * 1 to account for the newly added reference)
1291 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1292 * and different, merge both the polling islands and update the
1293 * polling_island fields in both fd and pollset to point to the merged
1294 * polling island.
1295 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001296 if (fd->polling_island == pollset->polling_island) {
1297 pi_new = fd->polling_island;
1298 if (pi_new == NULL) {
1299 pi_new = polling_island_create(fd, 2);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001300 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001301 } else if (fd->polling_island == NULL) {
1302 pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001303 polling_island_add_fds_locked(pollset->polling_island, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001304 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001305 } else if (pollset->polling_island == NULL) {
1306 pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001307 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001308 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001309 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001310 }
1311
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001312 fd->polling_island = pollset->polling_island = pi_new;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001313
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001314 gpr_mu_unlock(&fd->pi_mu);
1315 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001316}
1317
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001318/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001319 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001320 */
1321
1322static grpc_pollset_set *pollset_set_create(void) {
1323 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1324 memset(pollset_set, 0, sizeof(*pollset_set));
1325 gpr_mu_init(&pollset_set->mu);
1326 return pollset_set;
1327}
1328
1329static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1330 size_t i;
1331 gpr_mu_destroy(&pollset_set->mu);
1332 for (i = 0; i < pollset_set->fd_count; i++) {
1333 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1334 }
1335 gpr_free(pollset_set->pollsets);
1336 gpr_free(pollset_set->pollset_sets);
1337 gpr_free(pollset_set->fds);
1338 gpr_free(pollset_set);
1339}
1340
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001341static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1342 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1343 size_t i;
1344 gpr_mu_lock(&pollset_set->mu);
1345 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1346 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1347 pollset_set->fds = gpr_realloc(
1348 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1349 }
1350 GRPC_FD_REF(fd, "pollset_set");
1351 pollset_set->fds[pollset_set->fd_count++] = fd;
1352 for (i = 0; i < pollset_set->pollset_count; i++) {
1353 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1354 }
1355 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1356 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1357 }
1358 gpr_mu_unlock(&pollset_set->mu);
1359}
1360
1361static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1362 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1363 size_t i;
1364 gpr_mu_lock(&pollset_set->mu);
1365 for (i = 0; i < pollset_set->fd_count; i++) {
1366 if (pollset_set->fds[i] == fd) {
1367 pollset_set->fd_count--;
1368 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1369 pollset_set->fds[pollset_set->fd_count]);
1370 GRPC_FD_UNREF(fd, "pollset_set");
1371 break;
1372 }
1373 }
1374 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1375 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1376 }
1377 gpr_mu_unlock(&pollset_set->mu);
1378}
1379
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001380static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1381 grpc_pollset_set *pollset_set,
1382 grpc_pollset *pollset) {
1383 size_t i, j;
1384 gpr_mu_lock(&pollset_set->mu);
1385 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1386 pollset_set->pollset_capacity =
1387 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1388 pollset_set->pollsets =
1389 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1390 sizeof(*pollset_set->pollsets));
1391 }
1392 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1393 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1394 if (fd_is_orphaned(pollset_set->fds[i])) {
1395 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1396 } else {
1397 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1398 pollset_set->fds[j++] = pollset_set->fds[i];
1399 }
1400 }
1401 pollset_set->fd_count = j;
1402 gpr_mu_unlock(&pollset_set->mu);
1403}
1404
1405static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1406 grpc_pollset_set *pollset_set,
1407 grpc_pollset *pollset) {
1408 size_t i;
1409 gpr_mu_lock(&pollset_set->mu);
1410 for (i = 0; i < pollset_set->pollset_count; i++) {
1411 if (pollset_set->pollsets[i] == pollset) {
1412 pollset_set->pollset_count--;
1413 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1414 pollset_set->pollsets[pollset_set->pollset_count]);
1415 break;
1416 }
1417 }
1418 gpr_mu_unlock(&pollset_set->mu);
1419}
1420
1421static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1422 grpc_pollset_set *bag,
1423 grpc_pollset_set *item) {
1424 size_t i, j;
1425 gpr_mu_lock(&bag->mu);
1426 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1427 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1428 bag->pollset_sets =
1429 gpr_realloc(bag->pollset_sets,
1430 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1431 }
1432 bag->pollset_sets[bag->pollset_set_count++] = item;
1433 for (i = 0, j = 0; i < bag->fd_count; i++) {
1434 if (fd_is_orphaned(bag->fds[i])) {
1435 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1436 } else {
1437 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1438 bag->fds[j++] = bag->fds[i];
1439 }
1440 }
1441 bag->fd_count = j;
1442 gpr_mu_unlock(&bag->mu);
1443}
1444
1445static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1446 grpc_pollset_set *bag,
1447 grpc_pollset_set *item) {
1448 size_t i;
1449 gpr_mu_lock(&bag->mu);
1450 for (i = 0; i < bag->pollset_set_count; i++) {
1451 if (bag->pollset_sets[i] == item) {
1452 bag->pollset_set_count--;
1453 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1454 bag->pollset_sets[bag->pollset_set_count]);
1455 break;
1456 }
1457 }
1458 gpr_mu_unlock(&bag->mu);
1459}
1460
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001461/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001462 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001463 */
1464
1465static void shutdown_engine(void) {
1466 fd_global_shutdown();
1467 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001468 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001469}
1470
1471static const grpc_event_engine_vtable vtable = {
1472 .pollset_size = sizeof(grpc_pollset),
1473
1474 .fd_create = fd_create,
1475 .fd_wrapped_fd = fd_wrapped_fd,
1476 .fd_orphan = fd_orphan,
1477 .fd_shutdown = fd_shutdown,
1478 .fd_notify_on_read = fd_notify_on_read,
1479 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001480 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001481
1482 .pollset_init = pollset_init,
1483 .pollset_shutdown = pollset_shutdown,
1484 .pollset_reset = pollset_reset,
1485 .pollset_destroy = pollset_destroy,
1486 .pollset_work = pollset_work,
1487 .pollset_kick = pollset_kick,
1488 .pollset_add_fd = pollset_add_fd,
1489
1490 .pollset_set_create = pollset_set_create,
1491 .pollset_set_destroy = pollset_set_destroy,
1492 .pollset_set_add_pollset = pollset_set_add_pollset,
1493 .pollset_set_del_pollset = pollset_set_del_pollset,
1494 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1495 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1496 .pollset_set_add_fd = pollset_set_add_fd,
1497 .pollset_set_del_fd = pollset_set_del_fd,
1498
1499 .kick_poller = kick_poller,
1500
1501 .shutdown_engine = shutdown_engine,
1502};
1503
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001504/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1505 * Create a dummy epoll_fd to make sure epoll support is available */
1506static bool is_epoll_available() {
1507 int fd = epoll_create1(EPOLL_CLOEXEC);
1508 if (fd < 0) {
1509 gpr_log(
1510 GPR_ERROR,
1511 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1512 fd);
1513 return false;
1514 }
1515 close(fd);
1516 return true;
1517}
1518
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001519const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001520 /* If use of signals is disabled, we cannot use epoll engine*/
1521 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1522 return NULL;
1523 }
1524
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001525 if (!is_epoll_available()) {
1526 return NULL;
1527 }
1528
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001529 if (!is_grpc_wakeup_signal_initialized) {
1530 grpc_use_signal(SIGRTMIN + 2);
1531 }
1532
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001533 fd_global_init();
1534 pollset_global_init();
1535 polling_island_global_init();
1536 return &vtable;
1537}
1538
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001539#else /* defined(GPR_LINUX_EPOLL) */
1540#if defined(GPR_POSIX_SOCKET)
1541#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001542/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1543 * NULL */
1544const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001545#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001546
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001547void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001548#endif /* !defined(GPR_LINUX_EPOLL) */