blob: ed2c494b783103a4e243d344d32f0bd69a845fde [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070037#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <signal.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
51#include <grpc/support/log.h>
52#include <grpc/support/string_util.h>
53#include <grpc/support/tls.h>
54#include <grpc/support/useful.h>
55
56#include "src/core/lib/iomgr/ev_posix.h"
57#include "src/core/lib/iomgr/iomgr_internal.h"
58#include "src/core/lib/iomgr/wakeup_fd_posix.h"
59#include "src/core/lib/profiling/timers.h"
60#include "src/core/lib/support/block_annotate.h"
61
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070062static int grpc_wakeup_signal = -1;
63static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070064
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070065/* Implements the function defined in grpc_posix.h. This function might be
66 * called before even calling grpc_init() to set either a different signal to
67 * use. If signum == -1, then the use of signals is disabled */
68void grpc_use_signal(int signum) {
69 grpc_wakeup_signal = signum;
70 is_grpc_wakeup_signal_initialized = true;
71
72 if (grpc_wakeup_signal < 0) {
73 gpr_log(GPR_INFO,
74 "Use of signals is disabled. Epoll engine will not be used");
75 } else {
76 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
77 grpc_wakeup_signal);
78 }
79}
80
81struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070082
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070083/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070084 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070086struct grpc_fd {
87 int fd;
88 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070089 bit 0 : 1=Active / 0=Orphaned
90 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070091 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 gpr_atm refst;
93
94 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070095
96 /* Indicates that the fd is shutdown and that any pending read/write closures
97 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070098 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070099
100 /* The fd is either closed or we relinquished control of it. In either cases,
101 this indicates that the 'fd' on this structure is no longer valid */
102 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700103
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700104 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700105 grpc_closure *read_closure;
106 grpc_closure *write_closure;
107
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 /* The polling island to which this fd belongs to and the mutex protecting the
109 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700110 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 struct polling_island *polling_island;
112
113 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *on_done_closure;
115
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700116 /* The pollset that last noticed that the fd is readable */
117 grpc_pollset *read_notifier_pollset;
118
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 grpc_iomgr_object iomgr_object;
120};
121
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123#ifdef GRPC_FD_REF_COUNT_DEBUG
124static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
125static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
126 int line);
127#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
128#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
129#else
130static void fd_ref(grpc_fd *fd);
131static void fd_unref(grpc_fd *fd);
132#define GRPC_FD_REF(fd, reason) fd_ref(fd)
133#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
134#endif
135
136static void fd_global_init(void);
137static void fd_global_shutdown(void);
138
139#define CLOSURE_NOT_READY ((grpc_closure *)0)
140#define CLOSURE_READY ((grpc_closure *)1)
141
142/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700143 * Polling-island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700144 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700145/* TODO: sree: Consider making ref_cnt and merged_to to gpr_atm - This would
146 * significantly reduce the number of mutex acquisition calls. */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700147typedef struct polling_island {
148 gpr_mu mu;
149 int ref_cnt;
150
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700151 /* Points to the polling_island this merged into.
152 * If merged_to is not NULL, all the remaining fields (except mu and ref_cnt)
153 * are invalid and must be ignored */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700154 struct polling_island *merged_to;
155
156 /* The fd of the underlying epoll set */
157 int epoll_fd;
158
159 /* The file descriptors in the epoll set */
160 size_t fd_cnt;
161 size_t fd_capacity;
162 grpc_fd **fds;
163
164 /* Polling islands that are no longer needed are kept in a freelist so that
165 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700166 free list */
167 struct polling_island *next_free;
168} polling_island;
169
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700170/*******************************************************************************
171 * Pollset Declarations
172 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700173struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700174 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700175 struct grpc_pollset_worker *next;
176 struct grpc_pollset_worker *prev;
177};
178
179struct grpc_pollset {
180 gpr_mu mu;
181 grpc_pollset_worker root_worker;
182 bool kicked_without_pollers;
183
184 bool shutting_down; /* Is the pollset shutting down ? */
185 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
186 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
187
188 /* The polling island to which this pollset belongs to and the mutex
189 protecting the field */
Sree Kuchibhotlae682e462016-06-08 15:40:21 -0700190 /* TODO: sreek: This lock might actually be adding more overhead to the
191 critical path (i.e pollset_work() function). Consider removing this lock
192 and just using the overall pollset lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700193 gpr_mu pi_mu;
194 struct polling_island *polling_island;
195};
196
197/*******************************************************************************
198 * Pollset-set Declarations
199 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700200/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
201 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
202 * the current pollset_set would result in polling island merges. This would
203 * remove the need to maintain fd_count here. This will also significantly
204 * simplify the grpc_fd structure since we would no longer need to explicitly
205 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700206struct grpc_pollset_set {
207 gpr_mu mu;
208
209 size_t pollset_count;
210 size_t pollset_capacity;
211 grpc_pollset **pollsets;
212
213 size_t pollset_set_count;
214 size_t pollset_set_capacity;
215 struct grpc_pollset_set **pollset_sets;
216
217 size_t fd_count;
218 size_t fd_capacity;
219 grpc_fd **fds;
220};
221
222/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700223 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700224 */
225
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700226/* The wakeup fd that is used to wake up all threads in a Polling island. This
227 is useful in the polling island merge operation where we need to wakeup all
228 the threads currently polling the smaller polling island (so that they can
229 start polling the new/merged polling island)
230
231 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
232 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
233static grpc_wakeup_fd polling_island_wakeup_fd;
234
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700235/* Polling island freelist */
236static gpr_mu g_pi_freelist_mu;
237static polling_island *g_pi_freelist = NULL;
238
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700239#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700240/* Currently TSAN may incorrectly flag data races between epoll_ctl and
241 epoll_wait for any grpc_fd structs that are added to the epoll set via
242 epoll_ctl and are returned (within a very short window) via epoll_wait().
243
244 To work-around this race, we establish a happens-before relation between
245 the code just-before epoll_ctl() and the code after epoll_wait() by using
246 this atomic */
247gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700248#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700249
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700250/* The caller is expected to hold pi->mu lock before calling this function */
251static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700252 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700253 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700254 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700255 struct epoll_event ev;
256
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700257#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700258 /* See the definition of g_epoll_sync for more context */
259 gpr_atm_rel_store(&g_epoll_sync, 0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700260#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700261
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700262 for (i = 0; i < fd_count; i++) {
263 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
264 ev.data.ptr = fds[i];
265 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700266
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700267 if (err < 0) {
268 if (errno != EEXIST) {
269 /* TODO: sreek - We need a better way to bubble up this error instead of
270 just logging a message */
271 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
272 fds[i]->fd, strerror(errno));
273 }
274
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700275 continue;
276 }
277
278 if (pi->fd_cnt == pi->fd_capacity) {
279 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
280 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
281 }
282
283 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700284 if (add_fd_refs) {
285 GRPC_FD_REF(fds[i], "polling_island");
286 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700287 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700288}
289
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700290/* The caller is expected to hold pi->mu before calling this */
291static void polling_island_add_wakeup_fd_locked(polling_island *pi,
292 grpc_wakeup_fd *wakeup_fd) {
293 struct epoll_event ev;
294 int err;
295
296 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
297 ev.data.ptr = wakeup_fd;
298 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
299 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
300 if (err < 0) {
301 gpr_log(GPR_ERROR,
302 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
303 ". Error: %s",
304 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
305 strerror(errno));
306 }
307}
308
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700309/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700310static void polling_island_remove_all_fds_locked(polling_island *pi,
311 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700312 int err;
313 size_t i;
314
315 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700316 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700317 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700318 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700319 * just logging a message */
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -0700320 gpr_log(GPR_ERROR,
321 "epoll_ctl deleting fds[%zu]: %d failed with error: %s", i,
322 pi->fds[i]->fd, strerror(errno));
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700323 }
324
325 if (remove_fd_refs) {
326 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700327 }
328 }
329
330 pi->fd_cnt = 0;
331}
332
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700333/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700334static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700335 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700336 int err;
337 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700338
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700339 /* If fd is already closed, then it would have been automatically been removed
340 from the epoll set */
341 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700342 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
343 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700344 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700345 fd->fd, strerror(errno));
346 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700347 }
348
349 for (i = 0; i < pi->fd_cnt; i++) {
350 if (pi->fds[i] == fd) {
351 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700352 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700353 break;
354 }
355 }
356}
357
358static polling_island *polling_island_create(grpc_fd *initial_fd,
359 int initial_ref_cnt) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700360 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700361
362 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700363 gpr_mu_lock(&g_pi_freelist_mu);
364 if (g_pi_freelist != NULL) {
365 pi = g_pi_freelist;
366 g_pi_freelist = g_pi_freelist->next_free;
367 pi->next_free = NULL;
368 }
369 gpr_mu_unlock(&g_pi_freelist_mu);
370
371 /* Create new polling island if we could not get one from the free list */
372 if (pi == NULL) {
373 pi = gpr_malloc(sizeof(*pi));
374 gpr_mu_init(&pi->mu);
375 pi->fd_cnt = 0;
376 pi->fd_capacity = 0;
377 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700378 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700379
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700380 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700381
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700382 if (pi->epoll_fd < 0) {
383 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
384 strerror(errno));
385 }
386 GPR_ASSERT(pi->epoll_fd >= 0);
387
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700388 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700389
390 pi->ref_cnt = initial_ref_cnt;
391 pi->merged_to = NULL;
392 pi->next_free = NULL;
393
394 if (initial_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700395 /* It is not really needed to get the pi->mu lock here. If this is a newly
396 created polling island (or one that we got from the freelist), no one
397 else would be holding a lock to it anyway */
398 gpr_mu_lock(&pi->mu);
399 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
400 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700401 }
402
403 return pi;
404}
405
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700406static void polling_island_delete(polling_island *pi) {
407 GPR_ASSERT(pi->ref_cnt == 0);
408 GPR_ASSERT(pi->fd_cnt == 0);
409
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700410 close(pi->epoll_fd);
411 pi->epoll_fd = -1;
412
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700413 pi->merged_to = NULL;
414
415 gpr_mu_lock(&g_pi_freelist_mu);
416 pi->next_free = g_pi_freelist;
417 g_pi_freelist = pi;
418 gpr_mu_unlock(&g_pi_freelist_mu);
419}
420
421void polling_island_unref_and_unlock(polling_island *pi, int unref_by) {
422 pi->ref_cnt -= unref_by;
423 int ref_cnt = pi->ref_cnt;
424 GPR_ASSERT(ref_cnt >= 0);
425
426 gpr_mu_unlock(&pi->mu);
427
428 if (ref_cnt == 0) {
429 polling_island_delete(pi);
430 }
431}
432
433polling_island *polling_island_update_and_lock(polling_island *pi, int unref_by,
434 int add_ref_by) {
435 polling_island *next = NULL;
436 gpr_mu_lock(&pi->mu);
437 while (pi->merged_to != NULL) {
438 next = pi->merged_to;
439 polling_island_unref_and_unlock(pi, unref_by);
440 pi = next;
441 gpr_mu_lock(&pi->mu);
442 }
443
444 pi->ref_cnt += add_ref_by;
445 return pi;
446}
447
448void polling_island_pair_update_and_lock(polling_island **p,
449 polling_island **q) {
450 polling_island *pi_1 = *p;
451 polling_island *pi_2 = *q;
452 polling_island *temp = NULL;
453 bool pi_1_locked = false;
454 bool pi_2_locked = false;
455 int num_swaps = 0;
456
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700457 /* Loop until either pi_1 == pi_2 or until we acquired locks on both pi_1
458 and pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700459 while (pi_1 != pi_2 && !(pi_1_locked && pi_2_locked)) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700460 /* The following assertions are true at this point:
461 - pi_1 != pi_2 (else, the while loop would have exited)
462 - pi_1 MAY be locked
463 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700464
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700465 /* To maintain lock order consistency, always lock polling_island node with
466 lower address first.
467 First, make sure pi_1 < pi_2 before proceeding any further. If it turns
468 out that pi_1 > pi_2, unlock pi_1 if locked (because pi_2 is not locked
469 at this point and having pi_1 locked would violate the lock order) and
470 swap pi_1 and pi_2 so that pi_1 becomes less than pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700471 if (pi_1 > pi_2) {
472 if (pi_1_locked) {
473 gpr_mu_unlock(&pi_1->mu);
474 pi_1_locked = false;
475 }
476
477 GPR_SWAP(polling_island *, pi_1, pi_2);
478 num_swaps++;
479 }
480
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700481 /* The following assertions are true at this point:
482 - pi_1 != pi_2
483 - pi_1 < pi_2 (address of pi_1 is less than that of pi_2)
484 - pi_1 MAYBE locked
485 - pi_2 is NOT locked */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700486
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700487 /* Lock pi_1 (if pi_1 is pointing to the terminal node in the list) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700488 if (!pi_1_locked) {
489 gpr_mu_lock(&pi_1->mu);
490 pi_1_locked = true;
491
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700492 /* If pi_1 is not terminal node (i.e pi_1->merged_to != NULL), we are not
493 done locking this polling_island yet. Release the lock on this node and
494 advance pi_1 to the next node in the list; and go to the beginning of
495 the loop (we can't proceed to locking pi_2 unless we locked pi_1 first)
496 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700497 if (pi_1->merged_to != NULL) {
498 temp = pi_1->merged_to;
499 polling_island_unref_and_unlock(pi_1, 1);
500 pi_1 = temp;
501 pi_1_locked = false;
502
503 continue;
504 }
505 }
506
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700507 /* The following assertions are true at this point:
508 - pi_1 is locked
509 - pi_2 is unlocked
510 - pi_1 != pi_2 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700511
512 gpr_mu_lock(&pi_2->mu);
513 pi_2_locked = true;
514
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700515 /* If pi_2 is not terminal node, we are not done locking this polling_island
516 yet. Release the lock and update pi_2 to the next node in the list */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700517 if (pi_2->merged_to != NULL) {
518 temp = pi_2->merged_to;
519 polling_island_unref_and_unlock(pi_2, 1);
520 pi_2 = temp;
521 pi_2_locked = false;
522 }
523 }
524
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700525 /* At this point, either pi_1 == pi_2 AND/OR we got both locks */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700526 if (pi_1 == pi_2) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700527 /* We may or may not have gotten the lock. If we didn't, walk the rest of
528 the polling_island list and get the lock */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700529 GPR_ASSERT(pi_1_locked || (!pi_1_locked && !pi_2_locked));
530 if (!pi_1_locked) {
531 pi_1 = pi_2 = polling_island_update_and_lock(pi_1, 2, 0);
532 }
533 } else {
534 GPR_ASSERT(pi_1_locked && pi_2_locked);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700535 /* If we swapped pi_1 and pi_2 odd number of times, do one more swap so that
536 pi_1 and pi_2 point to the same polling_island lists they started off
537 with at the beginning of this function (i.e *p and *q respectively) */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700538 if (num_swaps % 2 > 0) {
539 GPR_SWAP(polling_island *, pi_1, pi_2);
540 }
541 }
542
543 *p = pi_1;
544 *q = pi_2;
545}
546
547polling_island *polling_island_merge(polling_island *p, polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700548 /* Get locks on both the polling islands */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700549 polling_island_pair_update_and_lock(&p, &q);
550
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700551 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700552 /* Nothing needs to be done here */
553 gpr_mu_unlock(&p->mu);
554 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700555 }
556
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700557 /* Make sure that p points to the polling island with fewer fds than q */
558 if (p->fd_cnt > q->fd_cnt) {
559 GPR_SWAP(polling_island *, p, q);
560 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700561
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700562 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700563 Note that the refcounts on the fds being moved will not change here. This
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700564 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700565 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
566 polling_island_remove_all_fds_locked(p, false);
567
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700568 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
569 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
570
Sree Kuchibhotla58e58962016-06-13 00:52:56 -0700571 p->merged_to = q;
572
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700573 /* - The merged polling island (i.e q) inherits all the ref counts of the
574 island merging with it (i.e p)
575 - The island p will lose a ref count */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700576 q->ref_cnt += p->ref_cnt;
Sree Kuchibhotla58e58962016-06-13 00:52:56 -0700577 polling_island_unref_and_unlock(p, 1); /* Decrement refcount */
578 polling_island_unref_and_unlock(q, 0); /* Just Unlock. Don't decrement ref */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700579
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700580 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700581}
582
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700583static void polling_island_global_init() {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700584 gpr_mu_init(&g_pi_freelist_mu);
585 g_pi_freelist = NULL;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700586 grpc_wakeup_fd_init(&polling_island_wakeup_fd);
587 grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700588}
589
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700590static void polling_island_global_shutdown() {
591 polling_island *next;
592 gpr_mu_lock(&g_pi_freelist_mu);
593 gpr_mu_unlock(&g_pi_freelist_mu);
594 while (g_pi_freelist != NULL) {
595 next = g_pi_freelist->next_free;
596 gpr_mu_destroy(&g_pi_freelist->mu);
597 gpr_free(g_pi_freelist->fds);
598 gpr_free(g_pi_freelist);
599 g_pi_freelist = next;
600 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700601 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700602
603 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700604}
605
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700606/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700607 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700608 */
609
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700610/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700611 * but instead so that implementations with multiple threads in (for example)
612 * epoll_wait deal with the race between pollset removal and incoming poll
613 * notifications.
614 *
615 * The problem is that the poller ultimately holds a reference to this
616 * object, so it is very difficult to know when is safe to free it, at least
617 * without some expensive synchronization.
618 *
619 * If we keep the object freelisted, in the worst case losing this race just
620 * becomes a spurious read notification on a reused fd.
621 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700622
623/* The alarm system needs to be able to wakeup 'some poller' sometimes
624 * (specifically when a new alarm needs to be triggered earlier than the next
625 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
626 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700627
628/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
629 * sure to wake up one polling thread (which can wake up other threads if
630 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700631grpc_wakeup_fd grpc_global_wakeup_fd;
632
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700633static grpc_fd *fd_freelist = NULL;
634static gpr_mu fd_freelist_mu;
635
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700636#ifdef GRPC_FD_REF_COUNT_DEBUG
637#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
638#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
639static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
640 int line) {
641 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
642 gpr_atm_no_barrier_load(&fd->refst),
643 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
644#else
645#define REF_BY(fd, n, reason) ref_by(fd, n)
646#define UNREF_BY(fd, n, reason) unref_by(fd, n)
647static void ref_by(grpc_fd *fd, int n) {
648#endif
649 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
650}
651
652#ifdef GRPC_FD_REF_COUNT_DEBUG
653static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
654 int line) {
655 gpr_atm old;
656 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
657 gpr_atm_no_barrier_load(&fd->refst),
658 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
659#else
660static void unref_by(grpc_fd *fd, int n) {
661 gpr_atm old;
662#endif
663 old = gpr_atm_full_fetch_add(&fd->refst, -n);
664 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700665 /* Add the fd to the freelist */
666 gpr_mu_lock(&fd_freelist_mu);
667 fd->freelist_next = fd_freelist;
668 fd_freelist = fd;
669 grpc_iomgr_unregister_object(&fd->iomgr_object);
670 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700671 } else {
672 GPR_ASSERT(old > n);
673 }
674}
675
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700676/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700677#ifdef GRPC_FD_REF_COUNT_DEBUG
678static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
679 int line) {
680 ref_by(fd, 2, reason, file, line);
681}
682
683static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
684 int line) {
685 unref_by(fd, 2, reason, file, line);
686}
687#else
688static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700689static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
690#endif
691
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700692static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
693
694static void fd_global_shutdown(void) {
695 gpr_mu_lock(&fd_freelist_mu);
696 gpr_mu_unlock(&fd_freelist_mu);
697 while (fd_freelist != NULL) {
698 grpc_fd *fd = fd_freelist;
699 fd_freelist = fd_freelist->freelist_next;
700 gpr_mu_destroy(&fd->mu);
701 gpr_free(fd);
702 }
703 gpr_mu_destroy(&fd_freelist_mu);
704}
705
706static grpc_fd *fd_create(int fd, const char *name) {
707 grpc_fd *new_fd = NULL;
708
709 gpr_mu_lock(&fd_freelist_mu);
710 if (fd_freelist != NULL) {
711 new_fd = fd_freelist;
712 fd_freelist = fd_freelist->freelist_next;
713 }
714 gpr_mu_unlock(&fd_freelist_mu);
715
716 if (new_fd == NULL) {
717 new_fd = gpr_malloc(sizeof(grpc_fd));
718 gpr_mu_init(&new_fd->mu);
719 gpr_mu_init(&new_fd->pi_mu);
720 }
721
722 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
723 newly created fd (or an fd we got from the freelist), no one else would be
724 holding a lock to it anyway. */
725 gpr_mu_lock(&new_fd->mu);
726
727 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700728 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700729 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700730 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700731 new_fd->read_closure = CLOSURE_NOT_READY;
732 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700733 new_fd->polling_island = NULL;
734 new_fd->freelist_next = NULL;
735 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700736 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700737
738 gpr_mu_unlock(&new_fd->mu);
739
740 char *fd_name;
741 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
742 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
743 gpr_free(fd_name);
744#ifdef GRPC_FD_REF_COUNT_DEBUG
745 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
746#endif
747 return new_fd;
748}
749
750static bool fd_is_orphaned(grpc_fd *fd) {
751 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
752}
753
754static int fd_wrapped_fd(grpc_fd *fd) {
755 int ret_fd = -1;
756 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700757 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700758 ret_fd = fd->fd;
759 }
760 gpr_mu_unlock(&fd->mu);
761
762 return ret_fd;
763}
764
765static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
766 grpc_closure *on_done, int *release_fd,
767 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700768 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700769 gpr_mu_lock(&fd->mu);
770 fd->on_done_closure = on_done;
771
772 /* If release_fd is not NULL, we should be relinquishing control of the file
773 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700774 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700775 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700776 } else {
777 close(fd->fd);
778 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700779 }
780
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700781 fd->orphaned = true;
782
783 /* Remove the active status but keep referenced. We want this grpc_fd struct
784 to be alive (and not added to freelist) until the end of this function */
785 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700786
787 /* Remove the fd from the polling island:
788 - Update the fd->polling_island to point to the latest polling island
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700789 - Remove the fd from the polling island.
790 - Remove a ref to the polling island and set fd->polling_island to NULL */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700791 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700792 if (fd->polling_island != NULL) {
793 fd->polling_island =
794 polling_island_update_and_lock(fd->polling_island, 1, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700795 polling_island_remove_fd_locked(fd->polling_island, fd, is_fd_closed);
796
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700797 polling_island_unref_and_unlock(fd->polling_island, 1);
798 fd->polling_island = NULL;
799 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700800 gpr_mu_unlock(&fd->pi_mu);
801
802 grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
803
804 gpr_mu_unlock(&fd->mu);
805 UNREF_BY(fd, 2, reason); /* Drop the reference */
806}
807
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700808static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
809 grpc_closure **st, grpc_closure *closure) {
810 if (*st == CLOSURE_NOT_READY) {
811 /* not ready ==> switch to a waiting state by setting the closure */
812 *st = closure;
813 } else if (*st == CLOSURE_READY) {
814 /* already ready ==> queue the closure to run immediately */
815 *st = CLOSURE_NOT_READY;
816 grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
817 } else {
818 /* upcallptr was set to a different closure. This is an error! */
819 gpr_log(GPR_ERROR,
820 "User called a notify_on function with a previous callback still "
821 "pending");
822 abort();
823 }
824}
825
826/* returns 1 if state becomes not ready */
827static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
828 grpc_closure **st) {
829 if (*st == CLOSURE_READY) {
830 /* duplicate ready ==> ignore */
831 return 0;
832 } else if (*st == CLOSURE_NOT_READY) {
833 /* not ready, and not waiting ==> flag ready */
834 *st = CLOSURE_READY;
835 return 0;
836 } else {
837 /* waiting ==> queue closure */
838 grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
839 *st = CLOSURE_NOT_READY;
840 return 1;
841 }
842}
843
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700844static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
845 grpc_fd *fd) {
846 grpc_pollset *notifier = NULL;
847
848 gpr_mu_lock(&fd->mu);
849 notifier = fd->read_notifier_pollset;
850 gpr_mu_unlock(&fd->mu);
851
852 return notifier;
853}
854
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700855static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
856 gpr_mu_lock(&fd->mu);
857 GPR_ASSERT(!fd->shutdown);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700858 fd->shutdown = true;
859
860 /* Flush any pending read and write closures. Since fd->shutdown is 'true' at
861 this point, the closures would be called with 'success = false' */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700862 set_ready_locked(exec_ctx, fd, &fd->read_closure);
863 set_ready_locked(exec_ctx, fd, &fd->write_closure);
864 gpr_mu_unlock(&fd->mu);
865}
866
867static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
868 grpc_closure *closure) {
869 gpr_mu_lock(&fd->mu);
870 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
871 gpr_mu_unlock(&fd->mu);
872}
873
874static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
875 grpc_closure *closure) {
876 gpr_mu_lock(&fd->mu);
877 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
878 gpr_mu_unlock(&fd->mu);
879}
880
881/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700882 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700883 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700884GPR_TLS_DECL(g_current_thread_pollset);
885GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700886
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700887static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700888#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700889 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700890#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700891}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700892
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -0700893static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700894
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700895/* Global state management */
896static void pollset_global_init(void) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700897 grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700898 gpr_tls_init(&g_current_thread_pollset);
899 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700900 poller_kick_init();
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700901}
902
903static void pollset_global_shutdown(void) {
904 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700905 gpr_tls_destroy(&g_current_thread_pollset);
906 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700907}
908
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700909static void pollset_worker_kick(grpc_pollset_worker *worker) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -0700910 pthread_kill(worker->pt_id, grpc_wakeup_signal);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700911}
912
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700913/* Return 1 if the pollset has active threads in pollset_work (pollset must
914 * be locked) */
915static int pollset_has_workers(grpc_pollset *p) {
916 return p->root_worker.next != &p->root_worker;
917}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700918
919static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
920 worker->prev->next = worker->next;
921 worker->next->prev = worker->prev;
922}
923
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700924static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
925 if (pollset_has_workers(p)) {
926 grpc_pollset_worker *w = p->root_worker.next;
927 remove_worker(p, w);
928 return w;
929 } else {
930 return NULL;
931 }
932}
933
934static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
935 worker->next = &p->root_worker;
936 worker->prev = worker->next->prev;
937 worker->prev->next = worker->next->prev = worker;
938}
939
940static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
941 worker->prev = &p->root_worker;
942 worker->next = worker->prev->next;
943 worker->prev->next = worker->next->prev = worker;
944}
945
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700946/* p->mu must be held before calling this function */
947static void pollset_kick(grpc_pollset *p,
948 grpc_pollset_worker *specific_worker) {
949 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700950
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700951 grpc_pollset_worker *worker = specific_worker;
952 if (worker != NULL) {
953 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700954 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700955 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700956 for (worker = p->root_worker.next; worker != &p->root_worker;
957 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700958 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
959 pollset_worker_kick(worker);
960 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700961 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700962 } else {
963 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700964 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700965 GPR_TIMER_END("pollset_kick.broadcast", 0);
966 } else {
967 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700968 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
969 pollset_worker_kick(worker);
970 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700971 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700972 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
973 /* Since worker == NULL, it means that we can kick "any" worker on this
974 pollset 'p'. If 'p' happens to be the same pollset this thread is
975 currently polling (i.e in pollset_work() function), then there is no need
976 to kick any other worker since the current thread can just absorb the
977 kick. This is the reason why we enter this case only when
978 g_current_thread_pollset is != p */
979
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700980 GPR_TIMER_MARK("kick_anonymous", 0);
981 worker = pop_front_worker(p);
982 if (worker != NULL) {
983 GPR_TIMER_MARK("finally_kick", 0);
984 push_back_worker(p, worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700985 pollset_worker_kick(worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700986 } else {
987 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700988 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700989 }
990 }
991
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700992 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700993}
994
995static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
996
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700997static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
998 gpr_mu_init(&pollset->mu);
999 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001000
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001001 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001002 pollset->kicked_without_pollers = false;
1003
1004 pollset->shutting_down = false;
1005 pollset->finish_shutdown_called = false;
1006 pollset->shutdown_done = NULL;
1007
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001008 gpr_mu_init(&pollset->pi_mu);
1009 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001010}
1011
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001012/* Convert a timespec to milliseconds:
1013 - Very small or negative poll times are clamped to zero to do a non-blocking
1014 poll (which becomes spin polling)
1015 - Other small values are rounded up to one millisecond
1016 - Longer than a millisecond polls are rounded up to the next nearest
1017 millisecond to avoid spinning
1018 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001019static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1020 gpr_timespec now) {
1021 gpr_timespec timeout;
1022 static const int64_t max_spin_polling_us = 10;
1023 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1024 return -1;
1025 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001026
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001027 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1028 max_spin_polling_us,
1029 GPR_TIMESPAN))) <= 0) {
1030 return 0;
1031 }
1032 timeout = gpr_time_sub(deadline, now);
1033 return gpr_time_to_millis(gpr_time_add(
1034 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1035}
1036
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001037static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1038 grpc_pollset *notifier) {
1039 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001040 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001041 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1042 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001043 gpr_mu_unlock(&fd->mu);
1044}
1045
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001046static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001047 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1048 gpr_mu_lock(&fd->mu);
1049 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1050 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001051}
1052
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001053/* Release the reference to pollset->polling_island and set it to NULL.
1054 pollset->mu must be held */
1055static void pollset_release_polling_island_locked(grpc_pollset *pollset) {
1056 gpr_mu_lock(&pollset->pi_mu);
1057 if (pollset->polling_island) {
1058 pollset->polling_island =
1059 polling_island_update_and_lock(pollset->polling_island, 1, 0);
1060 polling_island_unref_and_unlock(pollset->polling_island, 1);
1061 pollset->polling_island = NULL;
1062 }
1063 gpr_mu_unlock(&pollset->pi_mu);
1064}
1065
1066static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1067 grpc_pollset *pollset) {
1068 /* The pollset cannot have any workers if we are at this stage */
1069 GPR_ASSERT(!pollset_has_workers(pollset));
1070
1071 pollset->finish_shutdown_called = true;
1072 pollset_release_polling_island_locked(pollset);
1073
1074 grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
1075}
1076
1077/* pollset->mu lock must be held by the caller before calling this */
1078static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1079 grpc_closure *closure) {
1080 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1081 GPR_ASSERT(!pollset->shutting_down);
1082 pollset->shutting_down = true;
1083 pollset->shutdown_done = closure;
1084 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1085
1086 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1087 because it would release the underlying polling island. In such a case, we
1088 let the last worker call finish_shutdown_locked() from pollset_work() */
1089 if (!pollset_has_workers(pollset)) {
1090 GPR_ASSERT(!pollset->finish_shutdown_called);
1091 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1092 finish_shutdown_locked(exec_ctx, pollset);
1093 }
1094 GPR_TIMER_END("pollset_shutdown", 0);
1095}
1096
1097/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1098 * than destroying the mutexes, there is nothing special that needs to be done
1099 * here */
1100static void pollset_destroy(grpc_pollset *pollset) {
1101 GPR_ASSERT(!pollset_has_workers(pollset));
1102 gpr_mu_destroy(&pollset->pi_mu);
1103 gpr_mu_destroy(&pollset->mu);
1104}
1105
1106static void pollset_reset(grpc_pollset *pollset) {
1107 GPR_ASSERT(pollset->shutting_down);
1108 GPR_ASSERT(!pollset_has_workers(pollset));
1109 pollset->shutting_down = false;
1110 pollset->finish_shutdown_called = false;
1111 pollset->kicked_without_pollers = false;
1112 pollset->shutdown_done = NULL;
1113 pollset_release_polling_island_locked(pollset);
1114}
1115
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001116#define GRPC_EPOLL_MAX_EVENTS 1000
1117static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1118 grpc_pollset *pollset, int timeout_ms,
1119 sigset_t *sig_mask) {
1120 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001121 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001122 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001123 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001124 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1125
1126 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
1127 polling island pointed by pollset->polling_island.
1128 Acquire the following locks:
1129 - pollset->mu (which we already have)
1130 - pollset->pi_mu
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001131 - pollset->polling_island->mu (call polling_island_update_and_lock())*/
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001132 gpr_mu_lock(&pollset->pi_mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001133
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001134 pi = pollset->polling_island;
1135 if (pi == NULL) {
1136 pi = polling_island_create(NULL, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001137 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001138
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001139 /* In addition to locking the polling island, add a ref so that the island
1140 does not get destroyed (which means the epoll_fd won't be closed) while
1141 we are are doing an epoll_wait() on the epoll_fd */
1142 pi = polling_island_update_and_lock(pi, 1, 1);
1143 epoll_fd = pi->epoll_fd;
1144
1145 /* Update the pollset->polling_island */
1146 pollset->polling_island = pi;
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001147
Sree Kuchibhotlaeb16b3d2016-06-10 23:06:25 -07001148 polling_island_unref_and_unlock(pollset->polling_island, 0); /* Keep the ref*/
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001149 gpr_mu_unlock(&pollset->pi_mu);
1150 gpr_mu_unlock(&pollset->mu);
1151
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001152 do {
1153 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1154 sig_mask);
1155 if (ep_rv < 0) {
1156 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001157 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
1158 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001159 /* We were interrupted. Save an interation by doing a zero timeout
1160 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001161 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001162 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001163 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001164
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001165#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001166 /* See the definition of g_poll_sync for more details */
1167 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001168#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001169
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001170 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001171 void *data_ptr = ep_ev[i].data.ptr;
1172 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001173 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001174 } else if (data_ptr == &polling_island_wakeup_fd) {
1175 /* This means that our polling island is merged with a different
1176 island. We do not have to do anything here since the subsequent call
1177 to the function pollset_work_and_unlock() will pick up the correct
1178 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001179 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001180 grpc_fd *fd = data_ptr;
1181 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1182 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1183 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001184 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001185 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001186 }
1187 if (write_ev || cancel) {
1188 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001189 }
1190 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001191 }
1192 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001193
1194 GPR_ASSERT(pi != NULL);
1195
1196 /* Before leaving, release the extra ref we added to the polling island */
1197 /* It is important to note that at this point 'pi' may not be the same as
1198 * pollset->polling_island. This is because pollset->polling_island pointer
1199 * gets updated whenever the underlying polling island is merged with another
1200 * island and while we are doing epoll_wait() above, the polling island may
1201 * have been merged */
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001202 pi = polling_island_update_and_lock(pi, 1, 0); /* No new ref added */
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001203 polling_island_unref_and_unlock(pi, 1);
1204
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001205 GPR_TIMER_END("pollset_work_and_unlock", 0);
1206}
1207
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001208/* pollset->mu lock must be held by the caller before calling this.
1209 The function pollset_work() may temporarily release the lock (pollset->mu)
1210 during the course of its execution but it will always re-acquire the lock and
1211 ensure that it is held by the time the function returns */
1212static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1213 grpc_pollset_worker **worker_hdl, gpr_timespec now,
1214 gpr_timespec deadline) {
1215 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001216 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1217
1218 sigset_t new_mask;
1219 sigset_t orig_mask;
1220
1221 grpc_pollset_worker worker;
1222 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001223 worker.pt_id = pthread_self();
1224
1225 *worker_hdl = &worker;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001226 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1227 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001228
1229 if (pollset->kicked_without_pollers) {
1230 /* If the pollset was kicked without pollers, pretend that the current
1231 worker got the kick and skip polling. A kick indicates that there is some
1232 work that needs attention like an event on the completion queue or an
1233 alarm */
1234 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1235 pollset->kicked_without_pollers = 0;
1236 } else if (!pollset->shutting_down) {
1237 sigemptyset(&new_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001238 sigaddset(&new_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001239 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001240 sigdelset(&orig_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001241
1242 push_front_worker(pollset, &worker);
1243
1244 pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
1245 grpc_exec_ctx_flush(exec_ctx);
1246
1247 gpr_mu_lock(&pollset->mu);
1248 remove_worker(pollset, &worker);
1249 }
1250
1251 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1252 false at this point) and the pollset is shutting down, we may have to
1253 finish the shutdown process by calling finish_shutdown_locked().
1254 See pollset_shutdown() for more details.
1255
1256 Note: Continuing to access pollset here is safe; it is the caller's
1257 responsibility to not destroy a pollset when it has outstanding calls to
1258 pollset_work() */
1259 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1260 !pollset->finish_shutdown_called) {
1261 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1262 finish_shutdown_locked(exec_ctx, pollset);
1263
1264 gpr_mu_unlock(&pollset->mu);
1265 grpc_exec_ctx_flush(exec_ctx);
1266 gpr_mu_lock(&pollset->mu);
1267 }
1268
1269 *worker_hdl = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001270 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1271 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001272 GPR_TIMER_END("pollset_work", 0);
1273}
1274
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001275static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1276 grpc_fd *fd) {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001277 /* TODO sreek - Double check if we need to get a pollset->mu lock here */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001278 gpr_mu_lock(&pollset->pi_mu);
1279 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001280
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001281 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001282
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001283 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1284 * equal, do nothing.
1285 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1286 * a new polling island (with a refcount of 2) and make the polling_island
1287 * fields in both fd and pollset to point to the new island
1288 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1289 * the NULL polling_island field to point to the non-NULL polling_island
1290 * field (ensure that the refcount on the polling island is incremented by
1291 * 1 to account for the newly added reference)
1292 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1293 * and different, merge both the polling islands and update the
1294 * polling_island fields in both fd and pollset to point to the merged
1295 * polling island.
1296 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001297 if (fd->polling_island == pollset->polling_island) {
1298 pi_new = fd->polling_island;
1299 if (pi_new == NULL) {
1300 pi_new = polling_island_create(fd, 2);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001301 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001302 } else if (fd->polling_island == NULL) {
1303 pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001304 polling_island_add_fds_locked(pollset->polling_island, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001305 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001306 } else if (pollset->polling_island == NULL) {
1307 pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001308 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001309 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001310 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001311 }
1312
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001313 fd->polling_island = pollset->polling_island = pi_new;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001314
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001315 gpr_mu_unlock(&fd->pi_mu);
1316 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001317}
1318
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001319/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001320 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001321 */
1322
1323static grpc_pollset_set *pollset_set_create(void) {
1324 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1325 memset(pollset_set, 0, sizeof(*pollset_set));
1326 gpr_mu_init(&pollset_set->mu);
1327 return pollset_set;
1328}
1329
1330static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1331 size_t i;
1332 gpr_mu_destroy(&pollset_set->mu);
1333 for (i = 0; i < pollset_set->fd_count; i++) {
1334 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1335 }
1336 gpr_free(pollset_set->pollsets);
1337 gpr_free(pollset_set->pollset_sets);
1338 gpr_free(pollset_set->fds);
1339 gpr_free(pollset_set);
1340}
1341
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001342static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1343 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1344 size_t i;
1345 gpr_mu_lock(&pollset_set->mu);
1346 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1347 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1348 pollset_set->fds = gpr_realloc(
1349 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1350 }
1351 GRPC_FD_REF(fd, "pollset_set");
1352 pollset_set->fds[pollset_set->fd_count++] = fd;
1353 for (i = 0; i < pollset_set->pollset_count; i++) {
1354 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1355 }
1356 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1357 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1358 }
1359 gpr_mu_unlock(&pollset_set->mu);
1360}
1361
1362static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1363 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1364 size_t i;
1365 gpr_mu_lock(&pollset_set->mu);
1366 for (i = 0; i < pollset_set->fd_count; i++) {
1367 if (pollset_set->fds[i] == fd) {
1368 pollset_set->fd_count--;
1369 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1370 pollset_set->fds[pollset_set->fd_count]);
1371 GRPC_FD_UNREF(fd, "pollset_set");
1372 break;
1373 }
1374 }
1375 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1376 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1377 }
1378 gpr_mu_unlock(&pollset_set->mu);
1379}
1380
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001381static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1382 grpc_pollset_set *pollset_set,
1383 grpc_pollset *pollset) {
1384 size_t i, j;
1385 gpr_mu_lock(&pollset_set->mu);
1386 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1387 pollset_set->pollset_capacity =
1388 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1389 pollset_set->pollsets =
1390 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1391 sizeof(*pollset_set->pollsets));
1392 }
1393 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1394 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1395 if (fd_is_orphaned(pollset_set->fds[i])) {
1396 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1397 } else {
1398 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1399 pollset_set->fds[j++] = pollset_set->fds[i];
1400 }
1401 }
1402 pollset_set->fd_count = j;
1403 gpr_mu_unlock(&pollset_set->mu);
1404}
1405
1406static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1407 grpc_pollset_set *pollset_set,
1408 grpc_pollset *pollset) {
1409 size_t i;
1410 gpr_mu_lock(&pollset_set->mu);
1411 for (i = 0; i < pollset_set->pollset_count; i++) {
1412 if (pollset_set->pollsets[i] == pollset) {
1413 pollset_set->pollset_count--;
1414 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1415 pollset_set->pollsets[pollset_set->pollset_count]);
1416 break;
1417 }
1418 }
1419 gpr_mu_unlock(&pollset_set->mu);
1420}
1421
1422static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1423 grpc_pollset_set *bag,
1424 grpc_pollset_set *item) {
1425 size_t i, j;
1426 gpr_mu_lock(&bag->mu);
1427 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1428 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1429 bag->pollset_sets =
1430 gpr_realloc(bag->pollset_sets,
1431 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1432 }
1433 bag->pollset_sets[bag->pollset_set_count++] = item;
1434 for (i = 0, j = 0; i < bag->fd_count; i++) {
1435 if (fd_is_orphaned(bag->fds[i])) {
1436 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1437 } else {
1438 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1439 bag->fds[j++] = bag->fds[i];
1440 }
1441 }
1442 bag->fd_count = j;
1443 gpr_mu_unlock(&bag->mu);
1444}
1445
1446static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1447 grpc_pollset_set *bag,
1448 grpc_pollset_set *item) {
1449 size_t i;
1450 gpr_mu_lock(&bag->mu);
1451 for (i = 0; i < bag->pollset_set_count; i++) {
1452 if (bag->pollset_sets[i] == item) {
1453 bag->pollset_set_count--;
1454 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1455 bag->pollset_sets[bag->pollset_set_count]);
1456 break;
1457 }
1458 }
1459 gpr_mu_unlock(&bag->mu);
1460}
1461
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001462/* Test helper functions
1463 * */
1464void *grpc_fd_get_polling_island(grpc_fd *fd) {
1465 polling_island *pi;
1466
1467 gpr_mu_lock(&fd->pi_mu);
1468 pi = fd->polling_island;
1469 gpr_mu_unlock(&fd->pi_mu);
1470
1471 return pi;
1472}
1473
1474void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1475 polling_island *pi;
1476
1477 gpr_mu_lock(&ps->pi_mu);
1478 pi = ps->polling_island;
1479 gpr_mu_unlock(&ps->pi_mu);
1480
1481 return pi;
1482}
1483
1484static polling_island *get_polling_island(polling_island *p) {
1485 if (p == NULL) {
1486 return NULL;
1487 }
1488
1489 polling_island *next;
1490 gpr_mu_lock(&p->mu);
1491 while (p->merged_to != NULL) {
1492 next = p->merged_to;
1493 gpr_mu_unlock(&p->mu);
1494 p = next;
1495 gpr_mu_lock(&p->mu);
1496 }
1497 gpr_mu_unlock(&p->mu);
1498
1499 return p;
1500}
1501
1502bool grpc_are_polling_islands_equal(void *p, void *q) {
1503 p = get_polling_island(p);
1504 q = get_polling_island(q);
1505 return p == q;
1506}
1507
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001508/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001509 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001510 */
1511
1512static void shutdown_engine(void) {
1513 fd_global_shutdown();
1514 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001515 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001516}
1517
1518static const grpc_event_engine_vtable vtable = {
1519 .pollset_size = sizeof(grpc_pollset),
1520
1521 .fd_create = fd_create,
1522 .fd_wrapped_fd = fd_wrapped_fd,
1523 .fd_orphan = fd_orphan,
1524 .fd_shutdown = fd_shutdown,
1525 .fd_notify_on_read = fd_notify_on_read,
1526 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001527 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001528
1529 .pollset_init = pollset_init,
1530 .pollset_shutdown = pollset_shutdown,
1531 .pollset_reset = pollset_reset,
1532 .pollset_destroy = pollset_destroy,
1533 .pollset_work = pollset_work,
1534 .pollset_kick = pollset_kick,
1535 .pollset_add_fd = pollset_add_fd,
1536
1537 .pollset_set_create = pollset_set_create,
1538 .pollset_set_destroy = pollset_set_destroy,
1539 .pollset_set_add_pollset = pollset_set_add_pollset,
1540 .pollset_set_del_pollset = pollset_set_del_pollset,
1541 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1542 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1543 .pollset_set_add_fd = pollset_set_add_fd,
1544 .pollset_set_del_fd = pollset_set_del_fd,
1545
1546 .kick_poller = kick_poller,
1547
1548 .shutdown_engine = shutdown_engine,
1549};
1550
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001551/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1552 * Create a dummy epoll_fd to make sure epoll support is available */
1553static bool is_epoll_available() {
1554 int fd = epoll_create1(EPOLL_CLOEXEC);
1555 if (fd < 0) {
1556 gpr_log(
1557 GPR_ERROR,
1558 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1559 fd);
1560 return false;
1561 }
1562 close(fd);
1563 return true;
1564}
1565
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001566const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001567 /* If use of signals is disabled, we cannot use epoll engine*/
1568 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1569 return NULL;
1570 }
1571
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001572 if (!is_epoll_available()) {
1573 return NULL;
1574 }
1575
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001576 if (!is_grpc_wakeup_signal_initialized) {
1577 grpc_use_signal(SIGRTMIN + 2);
1578 }
1579
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001580 fd_global_init();
1581 pollset_global_init();
1582 polling_island_global_init();
1583 return &vtable;
1584}
1585
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001586#else /* defined(GPR_LINUX_EPOLL) */
1587#if defined(GPR_POSIX_SOCKET)
1588#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001589/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1590 * NULL */
1591const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001592#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001593
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001594void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001595#endif /* !defined(GPR_LINUX_EPOLL) */