blob: 3a774a8876f79f236290706b5dab5f4072b8dd09 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070037#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <signal.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
51#include <grpc/support/log.h>
52#include <grpc/support/string_util.h>
53#include <grpc/support/tls.h>
54#include <grpc/support/useful.h>
55
56#include "src/core/lib/iomgr/ev_posix.h"
57#include "src/core/lib/iomgr/iomgr_internal.h"
58#include "src/core/lib/iomgr/wakeup_fd_posix.h"
59#include "src/core/lib/profiling/timers.h"
60#include "src/core/lib/support/block_annotate.h"
61
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070062static int grpc_wakeup_signal = -1;
63static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070064
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070065/* Implements the function defined in grpc_posix.h. This function might be
66 * called before even calling grpc_init() to set either a different signal to
67 * use. If signum == -1, then the use of signals is disabled */
68void grpc_use_signal(int signum) {
69 grpc_wakeup_signal = signum;
70 is_grpc_wakeup_signal_initialized = true;
71
72 if (grpc_wakeup_signal < 0) {
73 gpr_log(GPR_INFO,
74 "Use of signals is disabled. Epoll engine will not be used");
75 } else {
76 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
77 grpc_wakeup_signal);
78 }
79}
80
81struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070082
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070083/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070084 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070086struct grpc_fd {
87 int fd;
88 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070089 bit 0 : 1=Active / 0=Orphaned
90 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070091 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 gpr_atm refst;
93
94 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070095
96 /* Indicates that the fd is shutdown and that any pending read/write closures
97 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070098 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070099
100 /* The fd is either closed or we relinquished control of it. In either cases,
101 this indicates that the 'fd' on this structure is no longer valid */
102 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700103
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700104 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700105 grpc_closure *read_closure;
106 grpc_closure *write_closure;
107
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 /* The polling island to which this fd belongs to and the mutex protecting the
109 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700110 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 struct polling_island *polling_island;
112
113 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *on_done_closure;
115
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700116 /* The pollset that last noticed that the fd is readable */
117 grpc_pollset *read_notifier_pollset;
118
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 grpc_iomgr_object iomgr_object;
120};
121
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123#ifdef GRPC_FD_REF_COUNT_DEBUG
124static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
125static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
126 int line);
127#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
128#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
129#else
130static void fd_ref(grpc_fd *fd);
131static void fd_unref(grpc_fd *fd);
132#define GRPC_FD_REF(fd, reason) fd_ref(fd)
133#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
134#endif
135
136static void fd_global_init(void);
137static void fd_global_shutdown(void);
138
139#define CLOSURE_NOT_READY ((grpc_closure *)0)
140#define CLOSURE_READY ((grpc_closure *)1)
141
142/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700143 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700144 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700145
146// #define GRPC_PI_REF_COUNT_DEBUG
147#ifdef GRPC_PI_REF_COUNT_DEBUG
148
149#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), 1, (r), __FILE__, __LINE__)
150#define PI_UNREF(p, r) pi_unref_dbg((p), 1, (r), __FILE__, __LINE__)
151
152#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
153
154#define PI_ADD_REF(p, r) pi_add_ref((p), 1)
155#define PI_UNREF(p, r) pi_unref((p), 1)
156
157#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
158
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700159typedef struct polling_island {
160 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700161 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
162 the refcount.
163 Once the ref count becomes zero, this structure is destroyed which means
164 we should ensure that there is never a scenario where a PI_ADD_REF() is
165 racing with a PI_UNREF() that just made the ref_count zero. */
166 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700167
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700168 /* Pointer to the polling_island this merged into.
169 * merged_to value is only set once in polling_island's lifetime (and that too
170 * only if the island is merged with another island). Because of this, we can
171 * use gpr_atm type here so that we can do atomic access on this and reduce
172 * lock contention on 'mu' mutex.
173 *
174 * Note that if this field is not NULL (i.e not 0), all the remaining fields
175 * (except mu and ref_count) are invalid and must be ignored. */
176 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700177
178 /* The fd of the underlying epoll set */
179 int epoll_fd;
180
181 /* The file descriptors in the epoll set */
182 size_t fd_cnt;
183 size_t fd_capacity;
184 grpc_fd **fds;
185
186 /* Polling islands that are no longer needed are kept in a freelist so that
187 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700188 free list */
189 struct polling_island *next_free;
190} polling_island;
191
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700192/*******************************************************************************
193 * Pollset Declarations
194 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700195struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700196 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700197 struct grpc_pollset_worker *next;
198 struct grpc_pollset_worker *prev;
199};
200
201struct grpc_pollset {
202 gpr_mu mu;
203 grpc_pollset_worker root_worker;
204 bool kicked_without_pollers;
205
206 bool shutting_down; /* Is the pollset shutting down ? */
207 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
208 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
209
210 /* The polling island to which this pollset belongs to and the mutex
211 protecting the field */
Sree Kuchibhotlae682e462016-06-08 15:40:21 -0700212 /* TODO: sreek: This lock might actually be adding more overhead to the
213 critical path (i.e pollset_work() function). Consider removing this lock
214 and just using the overall pollset lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700215 gpr_mu pi_mu;
216 struct polling_island *polling_island;
217};
218
219/*******************************************************************************
220 * Pollset-set Declarations
221 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700222/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
223 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
224 * the current pollset_set would result in polling island merges. This would
225 * remove the need to maintain fd_count here. This will also significantly
226 * simplify the grpc_fd structure since we would no longer need to explicitly
227 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700228struct grpc_pollset_set {
229 gpr_mu mu;
230
231 size_t pollset_count;
232 size_t pollset_capacity;
233 grpc_pollset **pollsets;
234
235 size_t pollset_set_count;
236 size_t pollset_set_capacity;
237 struct grpc_pollset_set **pollset_sets;
238
239 size_t fd_count;
240 size_t fd_capacity;
241 grpc_fd **fds;
242};
243
244/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700245 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700246 */
247
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700248/* The wakeup fd that is used to wake up all threads in a Polling island. This
249 is useful in the polling island merge operation where we need to wakeup all
250 the threads currently polling the smaller polling island (so that they can
251 start polling the new/merged polling island)
252
253 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
254 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
255static grpc_wakeup_fd polling_island_wakeup_fd;
256
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700257/* Polling island freelist */
258static gpr_mu g_pi_freelist_mu;
259static polling_island *g_pi_freelist = NULL;
260
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700261static void polling_island_delete(); /* Forward declaration */
262
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700263#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700264/* Currently TSAN may incorrectly flag data races between epoll_ctl and
265 epoll_wait for any grpc_fd structs that are added to the epoll set via
266 epoll_ctl and are returned (within a very short window) via epoll_wait().
267
268 To work-around this race, we establish a happens-before relation between
269 the code just-before epoll_ctl() and the code after epoll_wait() by using
270 this atomic */
271gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700272#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700273
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700274#ifdef GRPC_PI_REF_COUNT_DEBUG
275long pi_add_ref(polling_island *pi, int ref_cnt);
276long pi_unref(polling_island *pi, int ref_cnt);
277
278void pi_add_ref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
279 int line) {
280 long old_cnt = pi_add_ref(pi, ref_cnt);
281 gpr_log(GPR_DEBUG, "Add ref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
282 (void *)pi, old_cnt, (old_cnt + ref_cnt), reason, file, line);
283}
284
285void pi_unref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
286 int line) {
287 long old_cnt = pi_unref(pi, ref_cnt);
288 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
289 (void *)pi, old_cnt, (old_cnt - ref_cnt), reason, file, line);
290}
291#endif
292
293long pi_add_ref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700294 return gpr_atm_full_fetch_add(&pi->ref_count, ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700295}
296
297long pi_unref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700298 long old_cnt = gpr_atm_full_fetch_add(&pi->ref_count, -ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700299
300 /* If ref count went to zero, delete the polling island. Note that this need
301 not be done under a lock. Once the ref count goes to zero, we are
302 guaranteed that no one else holds a reference to the polling island (and
303 that there is no racing pi_add_ref() call either.
304
305 Also, if we are deleting the polling island and the merged_to field is
306 non-empty, we should remove a ref to the merged_to polling island
307 */
308 if (old_cnt == ref_cnt) {
309 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
310 polling_island_delete(pi);
311 if (next != NULL) {
312 PI_UNREF(next, "pi_delete"); /* Recursive call */
313 }
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700314 } else {
315 GPR_ASSERT(old_cnt > ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700316 }
317
318 return old_cnt;
319}
320
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700321/* The caller is expected to hold pi->mu lock before calling this function */
322static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700323 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700324 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700325 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700326 struct epoll_event ev;
327
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700328#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700329 /* See the definition of g_epoll_sync for more context */
330 gpr_atm_rel_store(&g_epoll_sync, 0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700331#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700332
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700333 for (i = 0; i < fd_count; i++) {
334 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
335 ev.data.ptr = fds[i];
336 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700337
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700338 if (err < 0) {
339 if (errno != EEXIST) {
340 /* TODO: sreek - We need a better way to bubble up this error instead of
341 just logging a message */
342 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
343 fds[i]->fd, strerror(errno));
344 }
345
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700346 continue;
347 }
348
349 if (pi->fd_cnt == pi->fd_capacity) {
350 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
351 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
352 }
353
354 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700355 if (add_fd_refs) {
356 GRPC_FD_REF(fds[i], "polling_island");
357 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700358 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700359}
360
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700361/* The caller is expected to hold pi->mu before calling this */
362static void polling_island_add_wakeup_fd_locked(polling_island *pi,
363 grpc_wakeup_fd *wakeup_fd) {
364 struct epoll_event ev;
365 int err;
366
367 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
368 ev.data.ptr = wakeup_fd;
369 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
370 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
371 if (err < 0) {
372 gpr_log(GPR_ERROR,
373 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
374 ". Error: %s",
375 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
376 strerror(errno));
377 }
378}
379
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700380/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700381static void polling_island_remove_all_fds_locked(polling_island *pi,
382 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700383 int err;
384 size_t i;
385
386 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700387 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700388 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700389 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700390 * just logging a message */
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -0700391 gpr_log(GPR_ERROR,
392 "epoll_ctl deleting fds[%zu]: %d failed with error: %s", i,
393 pi->fds[i]->fd, strerror(errno));
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700394 }
395
396 if (remove_fd_refs) {
397 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700398 }
399 }
400
401 pi->fd_cnt = 0;
402}
403
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700404/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700405static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700406 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700407 int err;
408 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700409
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700410 /* If fd is already closed, then it would have been automatically been removed
411 from the epoll set */
412 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700413 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
414 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700415 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700416 fd->fd, strerror(errno));
417 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700418 }
419
420 for (i = 0; i < pi->fd_cnt; i++) {
421 if (pi->fds[i] == fd) {
422 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700423 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700424 break;
425 }
426 }
427}
428
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700429static polling_island *polling_island_create(grpc_fd *initial_fd) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700430 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700431
432 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700433 gpr_mu_lock(&g_pi_freelist_mu);
434 if (g_pi_freelist != NULL) {
435 pi = g_pi_freelist;
436 g_pi_freelist = g_pi_freelist->next_free;
437 pi->next_free = NULL;
438 }
439 gpr_mu_unlock(&g_pi_freelist_mu);
440
441 /* Create new polling island if we could not get one from the free list */
442 if (pi == NULL) {
443 pi = gpr_malloc(sizeof(*pi));
444 gpr_mu_init(&pi->mu);
445 pi->fd_cnt = 0;
446 pi->fd_capacity = 0;
447 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700448 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700449
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700450 gpr_atm_rel_store(&pi->ref_count, 0);
451 gpr_atm_rel_store(&pi->merged_to, NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700452
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700453 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700454
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700455 if (pi->epoll_fd < 0) {
456 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
457 strerror(errno));
458 }
459 GPR_ASSERT(pi->epoll_fd >= 0);
460
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700461 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700462
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700463 pi->next_free = NULL;
464
465 if (initial_fd != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700466 /* Lock the polling island here just in case we got this structure from the
467 freelist and the polling island lock was not released yet (by the code
468 that adds the polling island to the freelist) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700469 gpr_mu_lock(&pi->mu);
470 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
471 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700472 }
473
474 return pi;
475}
476
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700477static void polling_island_delete(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700478 GPR_ASSERT(pi->fd_cnt == 0);
479
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700480 gpr_atm_rel_store(&pi->merged_to, NULL);
481
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700482 close(pi->epoll_fd);
483 pi->epoll_fd = -1;
484
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700485 gpr_mu_lock(&g_pi_freelist_mu);
486 pi->next_free = g_pi_freelist;
487 g_pi_freelist = pi;
488 gpr_mu_unlock(&g_pi_freelist_mu);
489}
490
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700491/* Gets the lock on the *latest* polling island i.e the last polling island in
492 the linked list (linked by 'merged_to' link). Call gpr_mu_unlock on the
493 returned polling island's mu.
494 Usage: To lock/unlock polling island "pi", do the following:
495 polling_island *pi_latest = polling_island_lock(pi);
496 ...
497 ... critical section ..
498 ...
499 gpr_mu_unlock(&pi_latest->mu); //NOTE: use pi_latest->mu. NOT pi->mu */
500polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700501 polling_island *next = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700502 while (true) {
503 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
504 if (next == NULL) {
505 /* pi is the last node in the linked list. Get the lock and check again
506 (under the pi->mu lock) that pi is still the last node (because a merge
507 may have happend after the (next == NULL) check above and before
508 getting the pi->mu lock.
509 If pi is the last node, we are done. If not, unlock and continue
510 traversing the list */
511 gpr_mu_lock(&pi->mu);
512 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
513 if (next == NULL) {
514 break;
515 }
516 gpr_mu_unlock(&pi->mu);
517 }
518
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700519 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700520 }
521
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700522 return pi;
523}
524
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700525/* Gets the lock on the *latest* polling islands pointed by *p and *q.
526 This function is needed because calling the following block of code to obtain
527 locks on polling islands (*p and *q) is prone to deadlocks.
528 {
529 polling_island_lock(*p);
530 polling_island_lock(*q);
531 }
532
533 Usage/exmaple:
534 polling_island *p1;
535 polling_island *p2;
536 ..
537 polling_island_lock_pair(&p1, &p2);
538 ..
539 .. Critical section with both p1 and p2 locked
540 ..
541 // Release locks
542 // **IMPORTANT**: Make sure you check p1 == p2 AFTER the function
543 // polling_island_lock_pair() was called and if so, release the lock only
544 // once. Note: Even if p1 != p2 beforec calling polling_island_lock_pair(),
545 // they might be after the function returns:
546 if (p1 == p2) {
547 gpr_mu_unlock(&p1->mu)
548 } else {
549 gpr_mu_unlock(&p1->mu);
550 gpr_mu_unlock(&p2->mu);
551 }
552
553*/
554void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700555 polling_island *pi_1 = *p;
556 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700557 polling_island *next_1 = NULL;
558 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700559
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700560 /* The algorithm is simple:
561 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
562 keep updating pi_1 and pi_2)
563 - Then obtain locks on the islands by following a lock order rule of
564 locking polling_island with lower address first
565 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
566 pointing to the same island. If that is the case, we can just call
567 polling_island_lock()
568 - After obtaining both the locks, double check that the polling islands
569 are still the last polling islands in their respective linked lists
570 (this is because there might have been polling island merges before
571 we got the lock)
572 - If the polling islands are the last islands, we are done. If not,
573 release the locks and continue the process from the first step */
574 while (true) {
575 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
576 while (next_1 != NULL) {
577 pi_1 = next_1;
578 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700579 }
580
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700581 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
582 while (next_2 != NULL) {
583 pi_2 = next_2;
584 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
585 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700586
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700587 if (pi_1 == pi_2) {
588 pi_1 = pi_2 = polling_island_lock(pi_1);
589 break;
590 }
591
592 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700593 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700594 gpr_mu_lock(&pi_2->mu);
595 } else {
596 gpr_mu_lock(&pi_2->mu);
597 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700598 }
599
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700600 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
601 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
602 if (next_1 == NULL && next_2 == NULL) {
603 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700604 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700605
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700606 gpr_mu_unlock(&pi_1->mu);
607 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700608 }
609
610 *p = pi_1;
611 *q = pi_2;
612}
613
614polling_island *polling_island_merge(polling_island *p, polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700615 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700616 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700617
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700618 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700619 /* Nothing needs to be done here */
620 gpr_mu_unlock(&p->mu);
621 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700622 }
623
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700624 /* Make sure that p points to the polling island with fewer fds than q */
625 if (p->fd_cnt > q->fd_cnt) {
626 GPR_SWAP(polling_island *, p, q);
627 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700628
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700629 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700630 Note that the refcounts on the fds being moved will not change here. This
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700631 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700632 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
633 polling_island_remove_all_fds_locked(p, false);
634
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700635 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
636 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
637
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700638 /* Add the 'merged_to' link from p --> q */
639 gpr_atm_rel_store(&p->merged_to, q);
640 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla58e58962016-06-13 00:52:56 -0700641
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700642 gpr_mu_unlock(&p->mu);
643 gpr_mu_unlock(&q->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700644
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700645 /* Return the merged polling island */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700646 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700647}
648
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700649static grpc_error *polling_island_global_init() {
650 grpc_error *error = GRPC_ERROR_NONE;
651
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700652 gpr_mu_init(&g_pi_freelist_mu);
653 g_pi_freelist = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700654
655 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
656 if (error == GRPC_ERROR_NONE) {
657 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
658 }
659
660 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700661}
662
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700663static void polling_island_global_shutdown() {
664 polling_island *next;
665 gpr_mu_lock(&g_pi_freelist_mu);
666 gpr_mu_unlock(&g_pi_freelist_mu);
667 while (g_pi_freelist != NULL) {
668 next = g_pi_freelist->next_free;
669 gpr_mu_destroy(&g_pi_freelist->mu);
670 gpr_free(g_pi_freelist->fds);
671 gpr_free(g_pi_freelist);
672 g_pi_freelist = next;
673 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700674 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700675
676 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700677}
678
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700679/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700680 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700681 */
682
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700683/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700684 * but instead so that implementations with multiple threads in (for example)
685 * epoll_wait deal with the race between pollset removal and incoming poll
686 * notifications.
687 *
688 * The problem is that the poller ultimately holds a reference to this
689 * object, so it is very difficult to know when is safe to free it, at least
690 * without some expensive synchronization.
691 *
692 * If we keep the object freelisted, in the worst case losing this race just
693 * becomes a spurious read notification on a reused fd.
694 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700695
696/* The alarm system needs to be able to wakeup 'some poller' sometimes
697 * (specifically when a new alarm needs to be triggered earlier than the next
698 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
699 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700700
701/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
702 * sure to wake up one polling thread (which can wake up other threads if
703 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700704grpc_wakeup_fd grpc_global_wakeup_fd;
705
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700706static grpc_fd *fd_freelist = NULL;
707static gpr_mu fd_freelist_mu;
708
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700709#ifdef GRPC_FD_REF_COUNT_DEBUG
710#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
711#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
712static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
713 int line) {
714 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
715 gpr_atm_no_barrier_load(&fd->refst),
716 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
717#else
718#define REF_BY(fd, n, reason) ref_by(fd, n)
719#define UNREF_BY(fd, n, reason) unref_by(fd, n)
720static void ref_by(grpc_fd *fd, int n) {
721#endif
722 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
723}
724
725#ifdef GRPC_FD_REF_COUNT_DEBUG
726static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
727 int line) {
728 gpr_atm old;
729 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
730 gpr_atm_no_barrier_load(&fd->refst),
731 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
732#else
733static void unref_by(grpc_fd *fd, int n) {
734 gpr_atm old;
735#endif
736 old = gpr_atm_full_fetch_add(&fd->refst, -n);
737 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700738 /* Add the fd to the freelist */
739 gpr_mu_lock(&fd_freelist_mu);
740 fd->freelist_next = fd_freelist;
741 fd_freelist = fd;
742 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700743
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700744 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700745 } else {
746 GPR_ASSERT(old > n);
747 }
748}
749
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700750/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700751#ifdef GRPC_FD_REF_COUNT_DEBUG
752static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
753 int line) {
754 ref_by(fd, 2, reason, file, line);
755}
756
757static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
758 int line) {
759 unref_by(fd, 2, reason, file, line);
760}
761#else
762static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700763static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
764#endif
765
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700766static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
767
768static void fd_global_shutdown(void) {
769 gpr_mu_lock(&fd_freelist_mu);
770 gpr_mu_unlock(&fd_freelist_mu);
771 while (fd_freelist != NULL) {
772 grpc_fd *fd = fd_freelist;
773 fd_freelist = fd_freelist->freelist_next;
774 gpr_mu_destroy(&fd->mu);
775 gpr_free(fd);
776 }
777 gpr_mu_destroy(&fd_freelist_mu);
778}
779
780static grpc_fd *fd_create(int fd, const char *name) {
781 grpc_fd *new_fd = NULL;
782
783 gpr_mu_lock(&fd_freelist_mu);
784 if (fd_freelist != NULL) {
785 new_fd = fd_freelist;
786 fd_freelist = fd_freelist->freelist_next;
787 }
788 gpr_mu_unlock(&fd_freelist_mu);
789
790 if (new_fd == NULL) {
791 new_fd = gpr_malloc(sizeof(grpc_fd));
792 gpr_mu_init(&new_fd->mu);
793 gpr_mu_init(&new_fd->pi_mu);
794 }
795
796 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
797 newly created fd (or an fd we got from the freelist), no one else would be
798 holding a lock to it anyway. */
799 gpr_mu_lock(&new_fd->mu);
800
801 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700802 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700803 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700804 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700805 new_fd->read_closure = CLOSURE_NOT_READY;
806 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700807 new_fd->polling_island = NULL;
808 new_fd->freelist_next = NULL;
809 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700810 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700811
812 gpr_mu_unlock(&new_fd->mu);
813
814 char *fd_name;
815 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
816 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
817 gpr_free(fd_name);
818#ifdef GRPC_FD_REF_COUNT_DEBUG
819 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
820#endif
821 return new_fd;
822}
823
824static bool fd_is_orphaned(grpc_fd *fd) {
825 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
826}
827
828static int fd_wrapped_fd(grpc_fd *fd) {
829 int ret_fd = -1;
830 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700831 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700832 ret_fd = fd->fd;
833 }
834 gpr_mu_unlock(&fd->mu);
835
836 return ret_fd;
837}
838
839static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
840 grpc_closure *on_done, int *release_fd,
841 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700842 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700843 gpr_mu_lock(&fd->mu);
844 fd->on_done_closure = on_done;
845
846 /* If release_fd is not NULL, we should be relinquishing control of the file
847 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700848 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700849 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700850 } else {
851 close(fd->fd);
852 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700853 }
854
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700855 fd->orphaned = true;
856
857 /* Remove the active status but keep referenced. We want this grpc_fd struct
858 to be alive (and not added to freelist) until the end of this function */
859 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700860
861 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700862 - Get a lock on the latest polling island (i.e the last island in the
863 linked list pointed by fd->polling_island). This is the island that
864 would actually contain the fd
865 - Remove the fd from the latest polling island
866 - Unlock the latest polling island
867 - Set fd->polling_island to NULL (but remove the ref on the polling island
868 before doing this.) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700869 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700870 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700871 polling_island *pi_latest = polling_island_lock(fd->polling_island);
872 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed);
873 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700874
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700875 PI_UNREF(fd->polling_island, "fd_orphan");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700876 fd->polling_island = NULL;
877 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700878 gpr_mu_unlock(&fd->pi_mu);
879
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700880 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700881
882 gpr_mu_unlock(&fd->mu);
883 UNREF_BY(fd, 2, reason); /* Drop the reference */
884}
885
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700886static grpc_error *fd_shutdown_error(bool shutdown) {
887 if (!shutdown) {
888 return GRPC_ERROR_NONE;
889 } else {
890 return GRPC_ERROR_CREATE("FD shutdown");
891 }
892}
893
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700894static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
895 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700896 if (fd->shutdown) {
897 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
898 NULL);
899 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700900 /* not ready ==> switch to a waiting state by setting the closure */
901 *st = closure;
902 } else if (*st == CLOSURE_READY) {
903 /* already ready ==> queue the closure to run immediately */
904 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700905 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
906 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700907 } else {
908 /* upcallptr was set to a different closure. This is an error! */
909 gpr_log(GPR_ERROR,
910 "User called a notify_on function with a previous callback still "
911 "pending");
912 abort();
913 }
914}
915
916/* returns 1 if state becomes not ready */
917static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
918 grpc_closure **st) {
919 if (*st == CLOSURE_READY) {
920 /* duplicate ready ==> ignore */
921 return 0;
922 } else if (*st == CLOSURE_NOT_READY) {
923 /* not ready, and not waiting ==> flag ready */
924 *st = CLOSURE_READY;
925 return 0;
926 } else {
927 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700928 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700929 *st = CLOSURE_NOT_READY;
930 return 1;
931 }
932}
933
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700934static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
935 grpc_fd *fd) {
936 grpc_pollset *notifier = NULL;
937
938 gpr_mu_lock(&fd->mu);
939 notifier = fd->read_notifier_pollset;
940 gpr_mu_unlock(&fd->mu);
941
942 return notifier;
943}
944
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -0700945static bool fd_is_shutdown(grpc_fd *fd) {
946 gpr_mu_lock(&fd->mu);
947 const bool r = fd->shutdown;
948 gpr_mu_unlock(&fd->mu);
949 return r;
950}
951
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700952/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700953static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
954 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700955 /* Do the actual shutdown only once */
956 if (!fd->shutdown) {
957 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700958
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700959 shutdown(fd->fd, SHUT_RDWR);
960 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
961 at this point, the closures would be called with 'success = false' */
962 set_ready_locked(exec_ctx, fd, &fd->read_closure);
963 set_ready_locked(exec_ctx, fd, &fd->write_closure);
964 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700965 gpr_mu_unlock(&fd->mu);
966}
967
968static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
969 grpc_closure *closure) {
970 gpr_mu_lock(&fd->mu);
971 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
972 gpr_mu_unlock(&fd->mu);
973}
974
975static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
976 grpc_closure *closure) {
977 gpr_mu_lock(&fd->mu);
978 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
979 gpr_mu_unlock(&fd->mu);
980}
981
982/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700983 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700984 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700985GPR_TLS_DECL(g_current_thread_pollset);
986GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700987
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700988static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700989#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700990 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700991#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700992}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700993
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -0700994static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700995
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700996/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700997static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700998 gpr_tls_init(&g_current_thread_pollset);
999 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001000 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001001 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001002}
1003
1004static void pollset_global_shutdown(void) {
1005 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001006 gpr_tls_destroy(&g_current_thread_pollset);
1007 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001008}
1009
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001010static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1011 grpc_error *err = GRPC_ERROR_NONE;
1012 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1013 if (err_num != 0) {
1014 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1015 }
1016 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001017}
1018
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001019/* Return 1 if the pollset has active threads in pollset_work (pollset must
1020 * be locked) */
1021static int pollset_has_workers(grpc_pollset *p) {
1022 return p->root_worker.next != &p->root_worker;
1023}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001024
1025static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1026 worker->prev->next = worker->next;
1027 worker->next->prev = worker->prev;
1028}
1029
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001030static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1031 if (pollset_has_workers(p)) {
1032 grpc_pollset_worker *w = p->root_worker.next;
1033 remove_worker(p, w);
1034 return w;
1035 } else {
1036 return NULL;
1037 }
1038}
1039
1040static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1041 worker->next = &p->root_worker;
1042 worker->prev = worker->next->prev;
1043 worker->prev->next = worker->next->prev = worker;
1044}
1045
1046static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1047 worker->prev = &p->root_worker;
1048 worker->next = worker->prev->next;
1049 worker->prev->next = worker->next->prev = worker;
1050}
1051
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001052static void kick_append_error(grpc_error **composite, grpc_error *error) {
1053 if (error == GRPC_ERROR_NONE) return;
1054 if (*composite == GRPC_ERROR_NONE) {
1055 *composite = GRPC_ERROR_CREATE("Kick Failure");
1056 }
1057 *composite = grpc_error_add_child(*composite, error);
1058}
1059
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001060/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001061static grpc_error *pollset_kick(grpc_pollset *p,
1062 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001063 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001064 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001065
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001066 grpc_pollset_worker *worker = specific_worker;
1067 if (worker != NULL) {
1068 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001069 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001070 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001071 for (worker = p->root_worker.next; worker != &p->root_worker;
1072 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001073 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001074 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001075 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001076 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001077 } else {
1078 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001079 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001080 GPR_TIMER_END("pollset_kick.broadcast", 0);
1081 } else {
1082 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001083 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001084 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001085 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001086 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001087 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1088 /* Since worker == NULL, it means that we can kick "any" worker on this
1089 pollset 'p'. If 'p' happens to be the same pollset this thread is
1090 currently polling (i.e in pollset_work() function), then there is no need
1091 to kick any other worker since the current thread can just absorb the
1092 kick. This is the reason why we enter this case only when
1093 g_current_thread_pollset is != p */
1094
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001095 GPR_TIMER_MARK("kick_anonymous", 0);
1096 worker = pop_front_worker(p);
1097 if (worker != NULL) {
1098 GPR_TIMER_MARK("finally_kick", 0);
1099 push_back_worker(p, worker);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001100 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001101 } else {
1102 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001103 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001104 }
1105 }
1106
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001107 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001108 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1109 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001110}
1111
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001112static grpc_error *kick_poller(void) {
1113 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1114}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001115
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001116static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1117 gpr_mu_init(&pollset->mu);
1118 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001119
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001120 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001121 pollset->kicked_without_pollers = false;
1122
1123 pollset->shutting_down = false;
1124 pollset->finish_shutdown_called = false;
1125 pollset->shutdown_done = NULL;
1126
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001127 gpr_mu_init(&pollset->pi_mu);
1128 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001129}
1130
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001131/* Convert a timespec to milliseconds:
1132 - Very small or negative poll times are clamped to zero to do a non-blocking
1133 poll (which becomes spin polling)
1134 - Other small values are rounded up to one millisecond
1135 - Longer than a millisecond polls are rounded up to the next nearest
1136 millisecond to avoid spinning
1137 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001138static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1139 gpr_timespec now) {
1140 gpr_timespec timeout;
1141 static const int64_t max_spin_polling_us = 10;
1142 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1143 return -1;
1144 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001145
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001146 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1147 max_spin_polling_us,
1148 GPR_TIMESPAN))) <= 0) {
1149 return 0;
1150 }
1151 timeout = gpr_time_sub(deadline, now);
1152 return gpr_time_to_millis(gpr_time_add(
1153 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1154}
1155
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001156static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1157 grpc_pollset *notifier) {
1158 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001159 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001160 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1161 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001162 gpr_mu_unlock(&fd->mu);
1163}
1164
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001165static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001166 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1167 gpr_mu_lock(&fd->mu);
1168 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1169 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001170}
1171
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001172static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
1173 gpr_mu_lock(&ps->pi_mu);
1174 if (ps->polling_island != NULL) {
1175 PI_UNREF(ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001176 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001177 ps->polling_island = NULL;
1178 gpr_mu_unlock(&ps->pi_mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001179}
1180
1181static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1182 grpc_pollset *pollset) {
1183 /* The pollset cannot have any workers if we are at this stage */
1184 GPR_ASSERT(!pollset_has_workers(pollset));
1185
1186 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001187
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001188 /* Release the ref and set pollset->polling_island to NULL */
1189 pollset_release_polling_island(pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001190 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001191}
1192
1193/* pollset->mu lock must be held by the caller before calling this */
1194static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1195 grpc_closure *closure) {
1196 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1197 GPR_ASSERT(!pollset->shutting_down);
1198 pollset->shutting_down = true;
1199 pollset->shutdown_done = closure;
1200 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1201
1202 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1203 because it would release the underlying polling island. In such a case, we
1204 let the last worker call finish_shutdown_locked() from pollset_work() */
1205 if (!pollset_has_workers(pollset)) {
1206 GPR_ASSERT(!pollset->finish_shutdown_called);
1207 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1208 finish_shutdown_locked(exec_ctx, pollset);
1209 }
1210 GPR_TIMER_END("pollset_shutdown", 0);
1211}
1212
1213/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1214 * than destroying the mutexes, there is nothing special that needs to be done
1215 * here */
1216static void pollset_destroy(grpc_pollset *pollset) {
1217 GPR_ASSERT(!pollset_has_workers(pollset));
1218 gpr_mu_destroy(&pollset->pi_mu);
1219 gpr_mu_destroy(&pollset->mu);
1220}
1221
1222static void pollset_reset(grpc_pollset *pollset) {
1223 GPR_ASSERT(pollset->shutting_down);
1224 GPR_ASSERT(!pollset_has_workers(pollset));
1225 pollset->shutting_down = false;
1226 pollset->finish_shutdown_called = false;
1227 pollset->kicked_without_pollers = false;
1228 pollset->shutdown_done = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001229 pollset_release_polling_island(pollset, "ps_reset");
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001230}
1231
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001232static void work_combine_error(grpc_error **composite, grpc_error *error) {
1233 if (error == GRPC_ERROR_NONE) return;
1234 if (*composite == GRPC_ERROR_NONE) {
1235 *composite = GRPC_ERROR_CREATE("pollset_work");
1236 }
1237 *composite = grpc_error_add_child(*composite, error);
1238}
1239
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001240#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001241static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1242 grpc_pollset *pollset,
1243 int timeout_ms, sigset_t *sig_mask) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001244 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001245 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001246 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001247 polling_island *pi = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001248 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001249 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1250
1251 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001252 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001253 Acquire the following locks:
1254 - pollset->mu (which we already have)
1255 - pollset->pi_mu
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001256 - pollset->polling_island lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001257 gpr_mu_lock(&pollset->pi_mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001258
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001259 if (pollset->polling_island == NULL) {
1260 pollset->polling_island = polling_island_create(NULL);
1261 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001262 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001263
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001264 pi = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001265 epoll_fd = pi->epoll_fd;
1266
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001267 /* Update the pollset->polling_island since the island being pointed by
1268 pollset->polling_island may not be the latest (i.e pi) */
1269 if (pollset->polling_island != pi) {
1270 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1271 polling island to be deleted */
1272 PI_ADD_REF(pi, "ps");
1273 PI_UNREF(pollset->polling_island, "ps");
1274 pollset->polling_island = pi;
1275 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001276
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001277 /* Add an extra ref so that the island does not get destroyed (which means
1278 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1279 epoll_fd */
1280 PI_ADD_REF(pi, "ps_work");
1281
1282 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001283 gpr_mu_unlock(&pollset->pi_mu);
1284 gpr_mu_unlock(&pollset->mu);
1285
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001286 do {
1287 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1288 sig_mask);
1289 if (ep_rv < 0) {
1290 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001291 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001292 work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_pwait"));
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001293 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001294 /* We were interrupted. Save an interation by doing a zero timeout
1295 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001296 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001297 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001298 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001299
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001300#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001301 /* See the definition of g_poll_sync for more details */
1302 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001303#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001304
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001305 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001306 void *data_ptr = ep_ev[i].data.ptr;
1307 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001308 work_combine_error(
1309 &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001310 } else if (data_ptr == &polling_island_wakeup_fd) {
1311 /* This means that our polling island is merged with a different
1312 island. We do not have to do anything here since the subsequent call
1313 to the function pollset_work_and_unlock() will pick up the correct
1314 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001315 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001316 grpc_fd *fd = data_ptr;
1317 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1318 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1319 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001320 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001321 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001322 }
1323 if (write_ev || cancel) {
1324 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001325 }
1326 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001327 }
1328 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001329
1330 GPR_ASSERT(pi != NULL);
1331
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001332 /* Before leaving, release the extra ref we added to the polling island. It
1333 is important to use "pi" here (i.e our old copy of pollset->polling_island
1334 that we got before releasing the polling island lock). This is because
1335 pollset->polling_island pointer might get udpated in other parts of the
1336 code when there is an island merge while we are doing epoll_wait() above */
1337 PI_UNREF(pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001338
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001339 GPR_TIMER_END("pollset_work_and_unlock", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001340 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001341}
1342
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001343/* pollset->mu lock must be held by the caller before calling this.
1344 The function pollset_work() may temporarily release the lock (pollset->mu)
1345 during the course of its execution but it will always re-acquire the lock and
1346 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001347static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1348 grpc_pollset_worker **worker_hdl,
1349 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001350 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001351 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001352 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1353
1354 sigset_t new_mask;
1355 sigset_t orig_mask;
1356
1357 grpc_pollset_worker worker;
1358 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001359 worker.pt_id = pthread_self();
1360
1361 *worker_hdl = &worker;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001362 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1363 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001364
1365 if (pollset->kicked_without_pollers) {
1366 /* If the pollset was kicked without pollers, pretend that the current
1367 worker got the kick and skip polling. A kick indicates that there is some
1368 work that needs attention like an event on the completion queue or an
1369 alarm */
1370 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1371 pollset->kicked_without_pollers = 0;
1372 } else if (!pollset->shutting_down) {
1373 sigemptyset(&new_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001374 sigaddset(&new_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001375 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001376 sigdelset(&orig_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001377
1378 push_front_worker(pollset, &worker);
1379
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001380 error = pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001381 grpc_exec_ctx_flush(exec_ctx);
1382
1383 gpr_mu_lock(&pollset->mu);
1384 remove_worker(pollset, &worker);
1385 }
1386
1387 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1388 false at this point) and the pollset is shutting down, we may have to
1389 finish the shutdown process by calling finish_shutdown_locked().
1390 See pollset_shutdown() for more details.
1391
1392 Note: Continuing to access pollset here is safe; it is the caller's
1393 responsibility to not destroy a pollset when it has outstanding calls to
1394 pollset_work() */
1395 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1396 !pollset->finish_shutdown_called) {
1397 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1398 finish_shutdown_locked(exec_ctx, pollset);
1399
1400 gpr_mu_unlock(&pollset->mu);
1401 grpc_exec_ctx_flush(exec_ctx);
1402 gpr_mu_lock(&pollset->mu);
1403 }
1404
1405 *worker_hdl = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001406 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1407 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001408 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001409 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1410 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001411}
1412
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001413static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1414 grpc_fd *fd) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001415 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001416 gpr_mu_lock(&pollset->pi_mu);
1417 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001418
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001419 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001420
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001421 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1422 * equal, do nothing.
1423 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1424 * a new polling island (with a refcount of 2) and make the polling_island
1425 * fields in both fd and pollset to point to the new island
1426 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1427 * the NULL polling_island field to point to the non-NULL polling_island
1428 * field (ensure that the refcount on the polling island is incremented by
1429 * 1 to account for the newly added reference)
1430 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1431 * and different, merge both the polling islands and update the
1432 * polling_island fields in both fd and pollset to point to the merged
1433 * polling island.
1434 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001435 if (fd->polling_island == pollset->polling_island) {
1436 pi_new = fd->polling_island;
1437 if (pi_new == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001438 pi_new = polling_island_create(fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001439 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001440 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001441 pi_new = polling_island_lock(pollset->polling_island);
1442 polling_island_add_fds_locked(pi_new, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001443 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001444 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001445 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001446 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001447 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001448 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001449 }
1450
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001451 if (fd->polling_island != pi_new) {
1452 PI_ADD_REF(pi_new, "fd");
1453 if (fd->polling_island != NULL) {
1454 PI_UNREF(fd->polling_island, "fd");
1455 }
1456 fd->polling_island = pi_new;
1457 }
1458
1459 if (pollset->polling_island != pi_new) {
1460 PI_ADD_REF(pi_new, "ps");
1461 if (pollset->polling_island != NULL) {
1462 PI_UNREF(pollset->polling_island, "ps");
1463 }
1464 pollset->polling_island = pi_new;
1465 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001466
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001467 gpr_mu_unlock(&fd->pi_mu);
1468 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001469 gpr_mu_unlock(&pollset->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001470}
1471
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001472/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001473 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001474 */
1475
1476static grpc_pollset_set *pollset_set_create(void) {
1477 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1478 memset(pollset_set, 0, sizeof(*pollset_set));
1479 gpr_mu_init(&pollset_set->mu);
1480 return pollset_set;
1481}
1482
1483static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1484 size_t i;
1485 gpr_mu_destroy(&pollset_set->mu);
1486 for (i = 0; i < pollset_set->fd_count; i++) {
1487 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1488 }
1489 gpr_free(pollset_set->pollsets);
1490 gpr_free(pollset_set->pollset_sets);
1491 gpr_free(pollset_set->fds);
1492 gpr_free(pollset_set);
1493}
1494
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001495static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1496 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1497 size_t i;
1498 gpr_mu_lock(&pollset_set->mu);
1499 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1500 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1501 pollset_set->fds = gpr_realloc(
1502 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1503 }
1504 GRPC_FD_REF(fd, "pollset_set");
1505 pollset_set->fds[pollset_set->fd_count++] = fd;
1506 for (i = 0; i < pollset_set->pollset_count; i++) {
1507 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1508 }
1509 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1510 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1511 }
1512 gpr_mu_unlock(&pollset_set->mu);
1513}
1514
1515static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1516 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1517 size_t i;
1518 gpr_mu_lock(&pollset_set->mu);
1519 for (i = 0; i < pollset_set->fd_count; i++) {
1520 if (pollset_set->fds[i] == fd) {
1521 pollset_set->fd_count--;
1522 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1523 pollset_set->fds[pollset_set->fd_count]);
1524 GRPC_FD_UNREF(fd, "pollset_set");
1525 break;
1526 }
1527 }
1528 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1529 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1530 }
1531 gpr_mu_unlock(&pollset_set->mu);
1532}
1533
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001534static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1535 grpc_pollset_set *pollset_set,
1536 grpc_pollset *pollset) {
1537 size_t i, j;
1538 gpr_mu_lock(&pollset_set->mu);
1539 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1540 pollset_set->pollset_capacity =
1541 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1542 pollset_set->pollsets =
1543 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1544 sizeof(*pollset_set->pollsets));
1545 }
1546 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1547 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1548 if (fd_is_orphaned(pollset_set->fds[i])) {
1549 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1550 } else {
1551 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1552 pollset_set->fds[j++] = pollset_set->fds[i];
1553 }
1554 }
1555 pollset_set->fd_count = j;
1556 gpr_mu_unlock(&pollset_set->mu);
1557}
1558
1559static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1560 grpc_pollset_set *pollset_set,
1561 grpc_pollset *pollset) {
1562 size_t i;
1563 gpr_mu_lock(&pollset_set->mu);
1564 for (i = 0; i < pollset_set->pollset_count; i++) {
1565 if (pollset_set->pollsets[i] == pollset) {
1566 pollset_set->pollset_count--;
1567 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1568 pollset_set->pollsets[pollset_set->pollset_count]);
1569 break;
1570 }
1571 }
1572 gpr_mu_unlock(&pollset_set->mu);
1573}
1574
1575static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1576 grpc_pollset_set *bag,
1577 grpc_pollset_set *item) {
1578 size_t i, j;
1579 gpr_mu_lock(&bag->mu);
1580 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1581 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1582 bag->pollset_sets =
1583 gpr_realloc(bag->pollset_sets,
1584 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1585 }
1586 bag->pollset_sets[bag->pollset_set_count++] = item;
1587 for (i = 0, j = 0; i < bag->fd_count; i++) {
1588 if (fd_is_orphaned(bag->fds[i])) {
1589 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1590 } else {
1591 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1592 bag->fds[j++] = bag->fds[i];
1593 }
1594 }
1595 bag->fd_count = j;
1596 gpr_mu_unlock(&bag->mu);
1597}
1598
1599static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1600 grpc_pollset_set *bag,
1601 grpc_pollset_set *item) {
1602 size_t i;
1603 gpr_mu_lock(&bag->mu);
1604 for (i = 0; i < bag->pollset_set_count; i++) {
1605 if (bag->pollset_sets[i] == item) {
1606 bag->pollset_set_count--;
1607 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1608 bag->pollset_sets[bag->pollset_set_count]);
1609 break;
1610 }
1611 }
1612 gpr_mu_unlock(&bag->mu);
1613}
1614
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001615/* Test helper functions
1616 * */
1617void *grpc_fd_get_polling_island(grpc_fd *fd) {
1618 polling_island *pi;
1619
1620 gpr_mu_lock(&fd->pi_mu);
1621 pi = fd->polling_island;
1622 gpr_mu_unlock(&fd->pi_mu);
1623
1624 return pi;
1625}
1626
1627void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1628 polling_island *pi;
1629
1630 gpr_mu_lock(&ps->pi_mu);
1631 pi = ps->polling_island;
1632 gpr_mu_unlock(&ps->pi_mu);
1633
1634 return pi;
1635}
1636
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001637bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001638 polling_island *p1 = p;
1639 polling_island *p2 = q;
1640
1641 polling_island_lock_pair(&p1, &p2);
1642 if (p1 == p2) {
1643 gpr_mu_unlock(&p1->mu);
1644 } else {
1645 gpr_mu_unlock(&p1->mu);
1646 gpr_mu_unlock(&p2->mu);
1647 }
1648
1649 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001650}
1651
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001652/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001653 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001654 */
1655
1656static void shutdown_engine(void) {
1657 fd_global_shutdown();
1658 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001659 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001660}
1661
1662static const grpc_event_engine_vtable vtable = {
1663 .pollset_size = sizeof(grpc_pollset),
1664
1665 .fd_create = fd_create,
1666 .fd_wrapped_fd = fd_wrapped_fd,
1667 .fd_orphan = fd_orphan,
1668 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001669 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001670 .fd_notify_on_read = fd_notify_on_read,
1671 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001672 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001673
1674 .pollset_init = pollset_init,
1675 .pollset_shutdown = pollset_shutdown,
1676 .pollset_reset = pollset_reset,
1677 .pollset_destroy = pollset_destroy,
1678 .pollset_work = pollset_work,
1679 .pollset_kick = pollset_kick,
1680 .pollset_add_fd = pollset_add_fd,
1681
1682 .pollset_set_create = pollset_set_create,
1683 .pollset_set_destroy = pollset_set_destroy,
1684 .pollset_set_add_pollset = pollset_set_add_pollset,
1685 .pollset_set_del_pollset = pollset_set_del_pollset,
1686 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1687 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1688 .pollset_set_add_fd = pollset_set_add_fd,
1689 .pollset_set_del_fd = pollset_set_del_fd,
1690
1691 .kick_poller = kick_poller,
1692
1693 .shutdown_engine = shutdown_engine,
1694};
1695
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001696/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1697 * Create a dummy epoll_fd to make sure epoll support is available */
1698static bool is_epoll_available() {
1699 int fd = epoll_create1(EPOLL_CLOEXEC);
1700 if (fd < 0) {
1701 gpr_log(
1702 GPR_ERROR,
1703 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1704 fd);
1705 return false;
1706 }
1707 close(fd);
1708 return true;
1709}
1710
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001711const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001712 /* If use of signals is disabled, we cannot use epoll engine*/
1713 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1714 return NULL;
1715 }
1716
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001717 if (!is_epoll_available()) {
1718 return NULL;
1719 }
1720
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001721 if (!is_grpc_wakeup_signal_initialized) {
1722 grpc_use_signal(SIGRTMIN + 2);
1723 }
1724
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001725 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001726
1727 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1728 return NULL;
1729 }
1730
1731 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1732 polling_island_global_init())) {
1733 return NULL;
1734 }
1735
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001736 return &vtable;
1737}
1738
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001739#else /* defined(GPR_LINUX_EPOLL) */
1740#if defined(GPR_POSIX_SOCKET)
1741#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001742/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1743 * NULL */
1744const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001745#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001746
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001747void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001748#endif /* !defined(GPR_LINUX_EPOLL) */