blob: d625b096a10c8865924e00e7c1dd6839f1cf764f [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070037#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <signal.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
51#include <grpc/support/log.h>
52#include <grpc/support/string_util.h>
53#include <grpc/support/tls.h>
54#include <grpc/support/useful.h>
55
56#include "src/core/lib/iomgr/ev_posix.h"
57#include "src/core/lib/iomgr/iomgr_internal.h"
58#include "src/core/lib/iomgr/wakeup_fd_posix.h"
59#include "src/core/lib/profiling/timers.h"
60#include "src/core/lib/support/block_annotate.h"
61
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070062static int grpc_wakeup_signal = -1;
63static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070064
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070065/* Implements the function defined in grpc_posix.h. This function might be
66 * called before even calling grpc_init() to set either a different signal to
67 * use. If signum == -1, then the use of signals is disabled */
68void grpc_use_signal(int signum) {
69 grpc_wakeup_signal = signum;
70 is_grpc_wakeup_signal_initialized = true;
71
72 if (grpc_wakeup_signal < 0) {
73 gpr_log(GPR_INFO,
74 "Use of signals is disabled. Epoll engine will not be used");
75 } else {
76 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
77 grpc_wakeup_signal);
78 }
79}
80
81struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070082
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070083/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070084 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070086struct grpc_fd {
87 int fd;
88 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070089 bit 0 : 1=Active / 0=Orphaned
90 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070091 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 gpr_atm refst;
93
94 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070095
96 /* Indicates that the fd is shutdown and that any pending read/write closures
97 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070098 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070099
100 /* The fd is either closed or we relinquished control of it. In either cases,
101 this indicates that the 'fd' on this structure is no longer valid */
102 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700103
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700104 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700105 grpc_closure *read_closure;
106 grpc_closure *write_closure;
107
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 /* The polling island to which this fd belongs to and the mutex protecting the
109 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700110 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 struct polling_island *polling_island;
112
113 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *on_done_closure;
115
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700116 /* The pollset that last noticed that the fd is readable */
117 grpc_pollset *read_notifier_pollset;
118
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 grpc_iomgr_object iomgr_object;
120};
121
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123#ifdef GRPC_FD_REF_COUNT_DEBUG
124static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
125static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
126 int line);
127#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
128#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
129#else
130static void fd_ref(grpc_fd *fd);
131static void fd_unref(grpc_fd *fd);
132#define GRPC_FD_REF(fd, reason) fd_ref(fd)
133#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
134#endif
135
136static void fd_global_init(void);
137static void fd_global_shutdown(void);
138
139#define CLOSURE_NOT_READY ((grpc_closure *)0)
140#define CLOSURE_READY ((grpc_closure *)1)
141
142/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700143 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700144 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700145
146// #define GRPC_PI_REF_COUNT_DEBUG
147#ifdef GRPC_PI_REF_COUNT_DEBUG
148
149#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), 1, (r), __FILE__, __LINE__)
150#define PI_UNREF(p, r) pi_unref_dbg((p), 1, (r), __FILE__, __LINE__)
151
152#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
153
154#define PI_ADD_REF(p, r) pi_add_ref((p), 1)
155#define PI_UNREF(p, r) pi_unref((p), 1)
156
157#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
158
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700159typedef struct polling_island {
160 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700161 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
162 the refcount.
163 Once the ref count becomes zero, this structure is destroyed which means
164 we should ensure that there is never a scenario where a PI_ADD_REF() is
165 racing with a PI_UNREF() that just made the ref_count zero. */
166 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700167
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700168 /* Pointer to the polling_island this merged into.
169 * merged_to value is only set once in polling_island's lifetime (and that too
170 * only if the island is merged with another island). Because of this, we can
171 * use gpr_atm type here so that we can do atomic access on this and reduce
172 * lock contention on 'mu' mutex.
173 *
174 * Note that if this field is not NULL (i.e not 0), all the remaining fields
175 * (except mu and ref_count) are invalid and must be ignored. */
176 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700177
178 /* The fd of the underlying epoll set */
179 int epoll_fd;
180
181 /* The file descriptors in the epoll set */
182 size_t fd_cnt;
183 size_t fd_capacity;
184 grpc_fd **fds;
185
186 /* Polling islands that are no longer needed are kept in a freelist so that
187 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700188 free list */
189 struct polling_island *next_free;
190} polling_island;
191
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700192/*******************************************************************************
193 * Pollset Declarations
194 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700195struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700196 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700197 struct grpc_pollset_worker *next;
198 struct grpc_pollset_worker *prev;
199};
200
201struct grpc_pollset {
202 gpr_mu mu;
203 grpc_pollset_worker root_worker;
204 bool kicked_without_pollers;
205
206 bool shutting_down; /* Is the pollset shutting down ? */
207 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
208 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
209
210 /* The polling island to which this pollset belongs to and the mutex
211 protecting the field */
Sree Kuchibhotlae682e462016-06-08 15:40:21 -0700212 /* TODO: sreek: This lock might actually be adding more overhead to the
213 critical path (i.e pollset_work() function). Consider removing this lock
214 and just using the overall pollset lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700215 gpr_mu pi_mu;
216 struct polling_island *polling_island;
217};
218
219/*******************************************************************************
220 * Pollset-set Declarations
221 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700222/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
223 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
224 * the current pollset_set would result in polling island merges. This would
225 * remove the need to maintain fd_count here. This will also significantly
226 * simplify the grpc_fd structure since we would no longer need to explicitly
227 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700228struct grpc_pollset_set {
229 gpr_mu mu;
230
231 size_t pollset_count;
232 size_t pollset_capacity;
233 grpc_pollset **pollsets;
234
235 size_t pollset_set_count;
236 size_t pollset_set_capacity;
237 struct grpc_pollset_set **pollset_sets;
238
239 size_t fd_count;
240 size_t fd_capacity;
241 grpc_fd **fds;
242};
243
244/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700245 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700246 */
247
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700248/* The wakeup fd that is used to wake up all threads in a Polling island. This
249 is useful in the polling island merge operation where we need to wakeup all
250 the threads currently polling the smaller polling island (so that they can
251 start polling the new/merged polling island)
252
253 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
254 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
255static grpc_wakeup_fd polling_island_wakeup_fd;
256
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700257/* Polling island freelist */
258static gpr_mu g_pi_freelist_mu;
259static polling_island *g_pi_freelist = NULL;
260
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700261static void polling_island_delete(); /* Forward declaration */
262
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700263#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700264/* Currently TSAN may incorrectly flag data races between epoll_ctl and
265 epoll_wait for any grpc_fd structs that are added to the epoll set via
266 epoll_ctl and are returned (within a very short window) via epoll_wait().
267
268 To work-around this race, we establish a happens-before relation between
269 the code just-before epoll_ctl() and the code after epoll_wait() by using
270 this atomic */
271gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700272#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700273
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700274#ifdef GRPC_PI_REF_COUNT_DEBUG
275long pi_add_ref(polling_island *pi, int ref_cnt);
276long pi_unref(polling_island *pi, int ref_cnt);
277
278void pi_add_ref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
279 int line) {
280 long old_cnt = pi_add_ref(pi, ref_cnt);
281 gpr_log(GPR_DEBUG, "Add ref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
282 (void *)pi, old_cnt, (old_cnt + ref_cnt), reason, file, line);
283}
284
285void pi_unref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
286 int line) {
287 long old_cnt = pi_unref(pi, ref_cnt);
288 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
289 (void *)pi, old_cnt, (old_cnt - ref_cnt), reason, file, line);
290}
291#endif
292
293long pi_add_ref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700294 return gpr_atm_full_fetch_add(&pi->ref_count, ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700295}
296
297long pi_unref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700298 long old_cnt = gpr_atm_full_fetch_add(&pi->ref_count, -ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700299
300 /* If ref count went to zero, delete the polling island. Note that this need
301 not be done under a lock. Once the ref count goes to zero, we are
302 guaranteed that no one else holds a reference to the polling island (and
303 that there is no racing pi_add_ref() call either.
304
305 Also, if we are deleting the polling island and the merged_to field is
306 non-empty, we should remove a ref to the merged_to polling island
307 */
308 if (old_cnt == ref_cnt) {
309 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
310 polling_island_delete(pi);
311 if (next != NULL) {
312 PI_UNREF(next, "pi_delete"); /* Recursive call */
313 }
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700314 } else {
315 GPR_ASSERT(old_cnt > ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700316 }
317
318 return old_cnt;
319}
320
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700321/* The caller is expected to hold pi->mu lock before calling this function */
322static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700323 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700324 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700325 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700326 struct epoll_event ev;
327
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700328#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700329 /* See the definition of g_epoll_sync for more context */
330 gpr_atm_rel_store(&g_epoll_sync, 0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700331#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700332
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700333 for (i = 0; i < fd_count; i++) {
334 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
335 ev.data.ptr = fds[i];
336 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700337
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700338 if (err < 0) {
339 if (errno != EEXIST) {
340 /* TODO: sreek - We need a better way to bubble up this error instead of
341 just logging a message */
342 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
343 fds[i]->fd, strerror(errno));
344 }
345
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700346 continue;
347 }
348
349 if (pi->fd_cnt == pi->fd_capacity) {
350 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
351 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
352 }
353
354 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700355 if (add_fd_refs) {
356 GRPC_FD_REF(fds[i], "polling_island");
357 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700358 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700359}
360
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700361/* The caller is expected to hold pi->mu before calling this */
362static void polling_island_add_wakeup_fd_locked(polling_island *pi,
363 grpc_wakeup_fd *wakeup_fd) {
364 struct epoll_event ev;
365 int err;
366
367 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
368 ev.data.ptr = wakeup_fd;
369 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
370 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
371 if (err < 0) {
372 gpr_log(GPR_ERROR,
373 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
374 ". Error: %s",
375 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
376 strerror(errno));
377 }
378}
379
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700380/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700381static void polling_island_remove_all_fds_locked(polling_island *pi,
382 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700383 int err;
384 size_t i;
385
386 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700387 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700388 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700389 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700390 * just logging a message */
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -0700391 gpr_log(GPR_ERROR,
392 "epoll_ctl deleting fds[%zu]: %d failed with error: %s", i,
393 pi->fds[i]->fd, strerror(errno));
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700394 }
395
396 if (remove_fd_refs) {
397 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700398 }
399 }
400
401 pi->fd_cnt = 0;
402}
403
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700404/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700405static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700406 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700407 int err;
408 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700409
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700410 /* If fd is already closed, then it would have been automatically been removed
411 from the epoll set */
412 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700413 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
414 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700415 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700416 fd->fd, strerror(errno));
417 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700418 }
419
420 for (i = 0; i < pi->fd_cnt; i++) {
421 if (pi->fds[i] == fd) {
422 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700423 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700424 break;
425 }
426 }
427}
428
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700429static polling_island *polling_island_create(grpc_fd *initial_fd) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700430 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700431
432 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700433 gpr_mu_lock(&g_pi_freelist_mu);
434 if (g_pi_freelist != NULL) {
435 pi = g_pi_freelist;
436 g_pi_freelist = g_pi_freelist->next_free;
437 pi->next_free = NULL;
438 }
439 gpr_mu_unlock(&g_pi_freelist_mu);
440
441 /* Create new polling island if we could not get one from the free list */
442 if (pi == NULL) {
443 pi = gpr_malloc(sizeof(*pi));
444 gpr_mu_init(&pi->mu);
445 pi->fd_cnt = 0;
446 pi->fd_capacity = 0;
447 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700448 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700449
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700450 gpr_atm_rel_store(&pi->ref_count, 0);
451 gpr_atm_rel_store(&pi->merged_to, NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700452
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700453 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700454
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700455 if (pi->epoll_fd < 0) {
456 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
457 strerror(errno));
458 }
459 GPR_ASSERT(pi->epoll_fd >= 0);
460
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700461 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700462
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700463 pi->next_free = NULL;
464
465 if (initial_fd != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700466 /* Lock the polling island here just in case we got this structure from the
467 freelist and the polling island lock was not released yet (by the code
468 that adds the polling island to the freelist) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700469 gpr_mu_lock(&pi->mu);
470 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
471 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700472 }
473
474 return pi;
475}
476
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700477static void polling_island_delete(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700478 GPR_ASSERT(pi->fd_cnt == 0);
479
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700480 gpr_atm_rel_store(&pi->merged_to, NULL);
481
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700482 close(pi->epoll_fd);
483 pi->epoll_fd = -1;
484
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700485 gpr_mu_lock(&g_pi_freelist_mu);
486 pi->next_free = g_pi_freelist;
487 g_pi_freelist = pi;
488 gpr_mu_unlock(&g_pi_freelist_mu);
489}
490
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700491/* Gets the lock on the *latest* polling island i.e the last polling island in
492 the linked list (linked by 'merged_to' link). Call gpr_mu_unlock on the
493 returned polling island's mu.
494 Usage: To lock/unlock polling island "pi", do the following:
495 polling_island *pi_latest = polling_island_lock(pi);
496 ...
497 ... critical section ..
498 ...
499 gpr_mu_unlock(&pi_latest->mu); //NOTE: use pi_latest->mu. NOT pi->mu */
500polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700501 polling_island *next = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700502 while (true) {
503 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
504 if (next == NULL) {
505 /* pi is the last node in the linked list. Get the lock and check again
506 (under the pi->mu lock) that pi is still the last node (because a merge
507 may have happend after the (next == NULL) check above and before
508 getting the pi->mu lock.
509 If pi is the last node, we are done. If not, unlock and continue
510 traversing the list */
511 gpr_mu_lock(&pi->mu);
512 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
513 if (next == NULL) {
514 break;
515 }
516 gpr_mu_unlock(&pi->mu);
517 }
518
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700519 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700520 }
521
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700522 return pi;
523}
524
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700525/* Gets the lock on the *latest* polling islands pointed by *p and *q.
526 This function is needed because calling the following block of code to obtain
527 locks on polling islands (*p and *q) is prone to deadlocks.
528 {
529 polling_island_lock(*p);
530 polling_island_lock(*q);
531 }
532
533 Usage/exmaple:
534 polling_island *p1;
535 polling_island *p2;
536 ..
537 polling_island_lock_pair(&p1, &p2);
538 ..
539 .. Critical section with both p1 and p2 locked
540 ..
541 // Release locks
542 // **IMPORTANT**: Make sure you check p1 == p2 AFTER the function
543 // polling_island_lock_pair() was called and if so, release the lock only
544 // once. Note: Even if p1 != p2 beforec calling polling_island_lock_pair(),
545 // they might be after the function returns:
546 if (p1 == p2) {
547 gpr_mu_unlock(&p1->mu)
548 } else {
549 gpr_mu_unlock(&p1->mu);
550 gpr_mu_unlock(&p2->mu);
551 }
552
553*/
554void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700555 polling_island *pi_1 = *p;
556 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700557 polling_island *next_1 = NULL;
558 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700559
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700560 /* The algorithm is simple:
561 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
562 keep updating pi_1 and pi_2)
563 - Then obtain locks on the islands by following a lock order rule of
564 locking polling_island with lower address first
565 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
566 pointing to the same island. If that is the case, we can just call
567 polling_island_lock()
568 - After obtaining both the locks, double check that the polling islands
569 are still the last polling islands in their respective linked lists
570 (this is because there might have been polling island merges before
571 we got the lock)
572 - If the polling islands are the last islands, we are done. If not,
573 release the locks and continue the process from the first step */
574 while (true) {
575 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
576 while (next_1 != NULL) {
577 pi_1 = next_1;
578 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700579 }
580
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700581 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
582 while (next_2 != NULL) {
583 pi_2 = next_2;
584 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
585 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700586
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700587 if (pi_1 == pi_2) {
588 pi_1 = pi_2 = polling_island_lock(pi_1);
589 break;
590 }
591
592 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700593 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700594 gpr_mu_lock(&pi_2->mu);
595 } else {
596 gpr_mu_lock(&pi_2->mu);
597 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700598 }
599
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700600 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
601 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
602 if (next_1 == NULL && next_2 == NULL) {
603 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700604 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700605
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700606 gpr_mu_unlock(&pi_1->mu);
607 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700608 }
609
610 *p = pi_1;
611 *q = pi_2;
612}
613
614polling_island *polling_island_merge(polling_island *p, polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700615 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700616 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700617
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700618 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700619 /* Nothing needs to be done here */
620 gpr_mu_unlock(&p->mu);
621 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700622 }
623
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700624 /* Make sure that p points to the polling island with fewer fds than q */
625 if (p->fd_cnt > q->fd_cnt) {
626 GPR_SWAP(polling_island *, p, q);
627 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700628
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700629 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700630 Note that the refcounts on the fds being moved will not change here. This
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700631 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700632 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
633 polling_island_remove_all_fds_locked(p, false);
634
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700635 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
636 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
637
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700638 /* Add the 'merged_to' link from p --> q */
639 gpr_atm_rel_store(&p->merged_to, q);
640 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla58e58962016-06-13 00:52:56 -0700641
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700642 gpr_mu_unlock(&p->mu);
643 gpr_mu_unlock(&q->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700644
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700645 /* Return the merged polling island */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700646 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700647}
648
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700649static grpc_error *polling_island_global_init() {
650 grpc_error *error = GRPC_ERROR_NONE;
651
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700652 gpr_mu_init(&g_pi_freelist_mu);
653 g_pi_freelist = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700654
655 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
656 if (error == GRPC_ERROR_NONE) {
657 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
658 }
659
660 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700661}
662
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700663static void polling_island_global_shutdown() {
664 polling_island *next;
665 gpr_mu_lock(&g_pi_freelist_mu);
666 gpr_mu_unlock(&g_pi_freelist_mu);
667 while (g_pi_freelist != NULL) {
668 next = g_pi_freelist->next_free;
669 gpr_mu_destroy(&g_pi_freelist->mu);
670 gpr_free(g_pi_freelist->fds);
671 gpr_free(g_pi_freelist);
672 g_pi_freelist = next;
673 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700674 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700675
676 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700677}
678
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700679/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700680 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700681 */
682
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700683/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700684 * but instead so that implementations with multiple threads in (for example)
685 * epoll_wait deal with the race between pollset removal and incoming poll
686 * notifications.
687 *
688 * The problem is that the poller ultimately holds a reference to this
689 * object, so it is very difficult to know when is safe to free it, at least
690 * without some expensive synchronization.
691 *
692 * If we keep the object freelisted, in the worst case losing this race just
693 * becomes a spurious read notification on a reused fd.
694 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700695
696/* The alarm system needs to be able to wakeup 'some poller' sometimes
697 * (specifically when a new alarm needs to be triggered earlier than the next
698 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
699 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700700
701/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
702 * sure to wake up one polling thread (which can wake up other threads if
703 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700704grpc_wakeup_fd grpc_global_wakeup_fd;
705
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700706static grpc_fd *fd_freelist = NULL;
707static gpr_mu fd_freelist_mu;
708
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700709#ifdef GRPC_FD_REF_COUNT_DEBUG
710#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
711#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
712static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
713 int line) {
714 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
715 gpr_atm_no_barrier_load(&fd->refst),
716 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
717#else
718#define REF_BY(fd, n, reason) ref_by(fd, n)
719#define UNREF_BY(fd, n, reason) unref_by(fd, n)
720static void ref_by(grpc_fd *fd, int n) {
721#endif
722 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
723}
724
725#ifdef GRPC_FD_REF_COUNT_DEBUG
726static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
727 int line) {
728 gpr_atm old;
729 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
730 gpr_atm_no_barrier_load(&fd->refst),
731 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
732#else
733static void unref_by(grpc_fd *fd, int n) {
734 gpr_atm old;
735#endif
736 old = gpr_atm_full_fetch_add(&fd->refst, -n);
737 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700738 /* Add the fd to the freelist */
739 gpr_mu_lock(&fd_freelist_mu);
740 fd->freelist_next = fd_freelist;
741 fd_freelist = fd;
742 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700743
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700744 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700745 } else {
746 GPR_ASSERT(old > n);
747 }
748}
749
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700750/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700751#ifdef GRPC_FD_REF_COUNT_DEBUG
752static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
753 int line) {
754 ref_by(fd, 2, reason, file, line);
755}
756
757static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
758 int line) {
759 unref_by(fd, 2, reason, file, line);
760}
761#else
762static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700763static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
764#endif
765
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700766static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
767
768static void fd_global_shutdown(void) {
769 gpr_mu_lock(&fd_freelist_mu);
770 gpr_mu_unlock(&fd_freelist_mu);
771 while (fd_freelist != NULL) {
772 grpc_fd *fd = fd_freelist;
773 fd_freelist = fd_freelist->freelist_next;
774 gpr_mu_destroy(&fd->mu);
775 gpr_free(fd);
776 }
777 gpr_mu_destroy(&fd_freelist_mu);
778}
779
780static grpc_fd *fd_create(int fd, const char *name) {
781 grpc_fd *new_fd = NULL;
782
783 gpr_mu_lock(&fd_freelist_mu);
784 if (fd_freelist != NULL) {
785 new_fd = fd_freelist;
786 fd_freelist = fd_freelist->freelist_next;
787 }
788 gpr_mu_unlock(&fd_freelist_mu);
789
790 if (new_fd == NULL) {
791 new_fd = gpr_malloc(sizeof(grpc_fd));
792 gpr_mu_init(&new_fd->mu);
793 gpr_mu_init(&new_fd->pi_mu);
794 }
795
796 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
797 newly created fd (or an fd we got from the freelist), no one else would be
798 holding a lock to it anyway. */
799 gpr_mu_lock(&new_fd->mu);
800
801 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700802 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700803 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700804 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700805 new_fd->read_closure = CLOSURE_NOT_READY;
806 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700807 new_fd->polling_island = NULL;
808 new_fd->freelist_next = NULL;
809 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700810 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700811
812 gpr_mu_unlock(&new_fd->mu);
813
814 char *fd_name;
815 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
816 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
817 gpr_free(fd_name);
818#ifdef GRPC_FD_REF_COUNT_DEBUG
819 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
820#endif
821 return new_fd;
822}
823
824static bool fd_is_orphaned(grpc_fd *fd) {
825 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
826}
827
828static int fd_wrapped_fd(grpc_fd *fd) {
829 int ret_fd = -1;
830 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700831 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700832 ret_fd = fd->fd;
833 }
834 gpr_mu_unlock(&fd->mu);
835
836 return ret_fd;
837}
838
839static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
840 grpc_closure *on_done, int *release_fd,
841 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700842 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700843 gpr_mu_lock(&fd->mu);
844 fd->on_done_closure = on_done;
845
846 /* If release_fd is not NULL, we should be relinquishing control of the file
847 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700848 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700849 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700850 } else {
851 close(fd->fd);
852 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700853 }
854
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700855 fd->orphaned = true;
856
857 /* Remove the active status but keep referenced. We want this grpc_fd struct
858 to be alive (and not added to freelist) until the end of this function */
859 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700860
861 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700862 - Get a lock on the latest polling island (i.e the last island in the
863 linked list pointed by fd->polling_island). This is the island that
864 would actually contain the fd
865 - Remove the fd from the latest polling island
866 - Unlock the latest polling island
867 - Set fd->polling_island to NULL (but remove the ref on the polling island
868 before doing this.) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700869 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700870 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700871 polling_island *pi_latest = polling_island_lock(fd->polling_island);
872 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed);
873 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700874
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700875 PI_UNREF(fd->polling_island, "fd_orphan");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700876 fd->polling_island = NULL;
877 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700878 gpr_mu_unlock(&fd->pi_mu);
879
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700880 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700881
882 gpr_mu_unlock(&fd->mu);
883 UNREF_BY(fd, 2, reason); /* Drop the reference */
884}
885
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700886static grpc_error *fd_shutdown_error(bool shutdown) {
887 if (!shutdown) {
888 return GRPC_ERROR_NONE;
889 } else {
890 return GRPC_ERROR_CREATE("FD shutdown");
891 }
892}
893
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700894static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
895 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700896 if (fd->shutdown) {
897 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
898 NULL);
899 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700900 /* not ready ==> switch to a waiting state by setting the closure */
901 *st = closure;
902 } else if (*st == CLOSURE_READY) {
903 /* already ready ==> queue the closure to run immediately */
904 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700905 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
906 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700907 } else {
908 /* upcallptr was set to a different closure. This is an error! */
909 gpr_log(GPR_ERROR,
910 "User called a notify_on function with a previous callback still "
911 "pending");
912 abort();
913 }
914}
915
916/* returns 1 if state becomes not ready */
917static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
918 grpc_closure **st) {
919 if (*st == CLOSURE_READY) {
920 /* duplicate ready ==> ignore */
921 return 0;
922 } else if (*st == CLOSURE_NOT_READY) {
923 /* not ready, and not waiting ==> flag ready */
924 *st = CLOSURE_READY;
925 return 0;
926 } else {
927 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700928 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700929 *st = CLOSURE_NOT_READY;
930 return 1;
931 }
932}
933
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700934static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
935 grpc_fd *fd) {
936 grpc_pollset *notifier = NULL;
937
938 gpr_mu_lock(&fd->mu);
939 notifier = fd->read_notifier_pollset;
940 gpr_mu_unlock(&fd->mu);
941
942 return notifier;
943}
944
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700945static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
946 gpr_mu_lock(&fd->mu);
947 GPR_ASSERT(!fd->shutdown);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700948 fd->shutdown = true;
949
950 /* Flush any pending read and write closures. Since fd->shutdown is 'true' at
951 this point, the closures would be called with 'success = false' */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700952 set_ready_locked(exec_ctx, fd, &fd->read_closure);
953 set_ready_locked(exec_ctx, fd, &fd->write_closure);
954 gpr_mu_unlock(&fd->mu);
955}
956
957static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
958 grpc_closure *closure) {
959 gpr_mu_lock(&fd->mu);
960 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
961 gpr_mu_unlock(&fd->mu);
962}
963
964static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
965 grpc_closure *closure) {
966 gpr_mu_lock(&fd->mu);
967 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
968 gpr_mu_unlock(&fd->mu);
969}
970
971/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700972 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700973 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700974GPR_TLS_DECL(g_current_thread_pollset);
975GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700976
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700977static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700978#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700979 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700980#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700981}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700982
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -0700983static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700984
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700985/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700986static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700987 gpr_tls_init(&g_current_thread_pollset);
988 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700989 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700990 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700991}
992
993static void pollset_global_shutdown(void) {
994 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700995 gpr_tls_destroy(&g_current_thread_pollset);
996 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700997}
998
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700999static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1000 grpc_error *err = GRPC_ERROR_NONE;
1001 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1002 if (err_num != 0) {
1003 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1004 }
1005 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001006}
1007
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001008/* Return 1 if the pollset has active threads in pollset_work (pollset must
1009 * be locked) */
1010static int pollset_has_workers(grpc_pollset *p) {
1011 return p->root_worker.next != &p->root_worker;
1012}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001013
1014static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1015 worker->prev->next = worker->next;
1016 worker->next->prev = worker->prev;
1017}
1018
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001019static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1020 if (pollset_has_workers(p)) {
1021 grpc_pollset_worker *w = p->root_worker.next;
1022 remove_worker(p, w);
1023 return w;
1024 } else {
1025 return NULL;
1026 }
1027}
1028
1029static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1030 worker->next = &p->root_worker;
1031 worker->prev = worker->next->prev;
1032 worker->prev->next = worker->next->prev = worker;
1033}
1034
1035static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1036 worker->prev = &p->root_worker;
1037 worker->next = worker->prev->next;
1038 worker->prev->next = worker->next->prev = worker;
1039}
1040
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001041static void kick_append_error(grpc_error **composite, grpc_error *error) {
1042 if (error == GRPC_ERROR_NONE) return;
1043 if (*composite == GRPC_ERROR_NONE) {
1044 *composite = GRPC_ERROR_CREATE("Kick Failure");
1045 }
1046 *composite = grpc_error_add_child(*composite, error);
1047}
1048
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001049/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001050static grpc_error *pollset_kick(grpc_pollset *p,
1051 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001052 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001053 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001054
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001055 grpc_pollset_worker *worker = specific_worker;
1056 if (worker != NULL) {
1057 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001058 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001059 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001060 for (worker = p->root_worker.next; worker != &p->root_worker;
1061 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001062 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001063 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001064 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001065 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001066 } else {
1067 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001068 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001069 GPR_TIMER_END("pollset_kick.broadcast", 0);
1070 } else {
1071 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001072 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001073 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001074 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001075 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001076 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1077 /* Since worker == NULL, it means that we can kick "any" worker on this
1078 pollset 'p'. If 'p' happens to be the same pollset this thread is
1079 currently polling (i.e in pollset_work() function), then there is no need
1080 to kick any other worker since the current thread can just absorb the
1081 kick. This is the reason why we enter this case only when
1082 g_current_thread_pollset is != p */
1083
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001084 GPR_TIMER_MARK("kick_anonymous", 0);
1085 worker = pop_front_worker(p);
1086 if (worker != NULL) {
1087 GPR_TIMER_MARK("finally_kick", 0);
1088 push_back_worker(p, worker);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001089 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001090 } else {
1091 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001092 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001093 }
1094 }
1095
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001096 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001097 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1098 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001099}
1100
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001101static grpc_error *kick_poller(void) {
1102 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1103}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001104
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001105static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1106 gpr_mu_init(&pollset->mu);
1107 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001108
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001109 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001110 pollset->kicked_without_pollers = false;
1111
1112 pollset->shutting_down = false;
1113 pollset->finish_shutdown_called = false;
1114 pollset->shutdown_done = NULL;
1115
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001116 gpr_mu_init(&pollset->pi_mu);
1117 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001118}
1119
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001120/* Convert a timespec to milliseconds:
1121 - Very small or negative poll times are clamped to zero to do a non-blocking
1122 poll (which becomes spin polling)
1123 - Other small values are rounded up to one millisecond
1124 - Longer than a millisecond polls are rounded up to the next nearest
1125 millisecond to avoid spinning
1126 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001127static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1128 gpr_timespec now) {
1129 gpr_timespec timeout;
1130 static const int64_t max_spin_polling_us = 10;
1131 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1132 return -1;
1133 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001134
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001135 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1136 max_spin_polling_us,
1137 GPR_TIMESPAN))) <= 0) {
1138 return 0;
1139 }
1140 timeout = gpr_time_sub(deadline, now);
1141 return gpr_time_to_millis(gpr_time_add(
1142 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1143}
1144
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001145static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1146 grpc_pollset *notifier) {
1147 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001148 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001149 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1150 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001151 gpr_mu_unlock(&fd->mu);
1152}
1153
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001154static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001155 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1156 gpr_mu_lock(&fd->mu);
1157 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1158 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001159}
1160
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001161static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
1162 gpr_mu_lock(&ps->pi_mu);
1163 if (ps->polling_island != NULL) {
1164 PI_UNREF(ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001165 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001166 ps->polling_island = NULL;
1167 gpr_mu_unlock(&ps->pi_mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001168}
1169
1170static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1171 grpc_pollset *pollset) {
1172 /* The pollset cannot have any workers if we are at this stage */
1173 GPR_ASSERT(!pollset_has_workers(pollset));
1174
1175 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001176
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001177 /* Release the ref and set pollset->polling_island to NULL */
1178 pollset_release_polling_island(pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001179 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001180}
1181
1182/* pollset->mu lock must be held by the caller before calling this */
1183static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1184 grpc_closure *closure) {
1185 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1186 GPR_ASSERT(!pollset->shutting_down);
1187 pollset->shutting_down = true;
1188 pollset->shutdown_done = closure;
1189 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1190
1191 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1192 because it would release the underlying polling island. In such a case, we
1193 let the last worker call finish_shutdown_locked() from pollset_work() */
1194 if (!pollset_has_workers(pollset)) {
1195 GPR_ASSERT(!pollset->finish_shutdown_called);
1196 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1197 finish_shutdown_locked(exec_ctx, pollset);
1198 }
1199 GPR_TIMER_END("pollset_shutdown", 0);
1200}
1201
1202/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1203 * than destroying the mutexes, there is nothing special that needs to be done
1204 * here */
1205static void pollset_destroy(grpc_pollset *pollset) {
1206 GPR_ASSERT(!pollset_has_workers(pollset));
1207 gpr_mu_destroy(&pollset->pi_mu);
1208 gpr_mu_destroy(&pollset->mu);
1209}
1210
1211static void pollset_reset(grpc_pollset *pollset) {
1212 GPR_ASSERT(pollset->shutting_down);
1213 GPR_ASSERT(!pollset_has_workers(pollset));
1214 pollset->shutting_down = false;
1215 pollset->finish_shutdown_called = false;
1216 pollset->kicked_without_pollers = false;
1217 pollset->shutdown_done = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001218 pollset_release_polling_island(pollset, "ps_reset");
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001219}
1220
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001221static void work_combine_error(grpc_error **composite, grpc_error *error) {
1222 if (error == GRPC_ERROR_NONE) return;
1223 if (*composite == GRPC_ERROR_NONE) {
1224 *composite = GRPC_ERROR_CREATE("pollset_work");
1225 }
1226 *composite = grpc_error_add_child(*composite, error);
1227}
1228
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001229#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001230static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1231 grpc_pollset *pollset,
1232 int timeout_ms, sigset_t *sig_mask) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001233 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001234 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001235 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001236 polling_island *pi = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001237 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001238 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1239
1240 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001241 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001242 Acquire the following locks:
1243 - pollset->mu (which we already have)
1244 - pollset->pi_mu
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001245 - pollset->polling_island lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001246 gpr_mu_lock(&pollset->pi_mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001247
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001248 if (pollset->polling_island == NULL) {
1249 pollset->polling_island = polling_island_create(NULL);
1250 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001251 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001252
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001253 pi = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001254 epoll_fd = pi->epoll_fd;
1255
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001256 /* Update the pollset->polling_island since the island being pointed by
1257 pollset->polling_island may not be the latest (i.e pi) */
1258 if (pollset->polling_island != pi) {
1259 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1260 polling island to be deleted */
1261 PI_ADD_REF(pi, "ps");
1262 PI_UNREF(pollset->polling_island, "ps");
1263 pollset->polling_island = pi;
1264 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001265
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001266 /* Add an extra ref so that the island does not get destroyed (which means
1267 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1268 epoll_fd */
1269 PI_ADD_REF(pi, "ps_work");
1270
1271 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001272 gpr_mu_unlock(&pollset->pi_mu);
1273 gpr_mu_unlock(&pollset->mu);
1274
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001275 do {
1276 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1277 sig_mask);
1278 if (ep_rv < 0) {
1279 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001280 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001281 work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_pwait"));
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001282 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001283 /* We were interrupted. Save an interation by doing a zero timeout
1284 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001285 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001286 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001287 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001288
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001289#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001290 /* See the definition of g_poll_sync for more details */
1291 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001292#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001293
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001294 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001295 void *data_ptr = ep_ev[i].data.ptr;
1296 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001297 work_combine_error(
1298 &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001299 } else if (data_ptr == &polling_island_wakeup_fd) {
1300 /* This means that our polling island is merged with a different
1301 island. We do not have to do anything here since the subsequent call
1302 to the function pollset_work_and_unlock() will pick up the correct
1303 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001304 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001305 grpc_fd *fd = data_ptr;
1306 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1307 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1308 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001309 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001310 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001311 }
1312 if (write_ev || cancel) {
1313 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001314 }
1315 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001316 }
1317 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001318
1319 GPR_ASSERT(pi != NULL);
1320
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001321 /* Before leaving, release the extra ref we added to the polling island. It
1322 is important to use "pi" here (i.e our old copy of pollset->polling_island
1323 that we got before releasing the polling island lock). This is because
1324 pollset->polling_island pointer might get udpated in other parts of the
1325 code when there is an island merge while we are doing epoll_wait() above */
1326 PI_UNREF(pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001327
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001328 GPR_TIMER_END("pollset_work_and_unlock", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001329 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001330}
1331
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001332/* pollset->mu lock must be held by the caller before calling this.
1333 The function pollset_work() may temporarily release the lock (pollset->mu)
1334 during the course of its execution but it will always re-acquire the lock and
1335 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001336static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1337 grpc_pollset_worker **worker_hdl,
1338 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001339 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001340 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001341 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1342
1343 sigset_t new_mask;
1344 sigset_t orig_mask;
1345
1346 grpc_pollset_worker worker;
1347 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001348 worker.pt_id = pthread_self();
1349
1350 *worker_hdl = &worker;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001351 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1352 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001353
1354 if (pollset->kicked_without_pollers) {
1355 /* If the pollset was kicked without pollers, pretend that the current
1356 worker got the kick and skip polling. A kick indicates that there is some
1357 work that needs attention like an event on the completion queue or an
1358 alarm */
1359 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1360 pollset->kicked_without_pollers = 0;
1361 } else if (!pollset->shutting_down) {
1362 sigemptyset(&new_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001363 sigaddset(&new_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001364 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001365 sigdelset(&orig_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001366
1367 push_front_worker(pollset, &worker);
1368
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001369 error = pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001370 grpc_exec_ctx_flush(exec_ctx);
1371
1372 gpr_mu_lock(&pollset->mu);
1373 remove_worker(pollset, &worker);
1374 }
1375
1376 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1377 false at this point) and the pollset is shutting down, we may have to
1378 finish the shutdown process by calling finish_shutdown_locked().
1379 See pollset_shutdown() for more details.
1380
1381 Note: Continuing to access pollset here is safe; it is the caller's
1382 responsibility to not destroy a pollset when it has outstanding calls to
1383 pollset_work() */
1384 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1385 !pollset->finish_shutdown_called) {
1386 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1387 finish_shutdown_locked(exec_ctx, pollset);
1388
1389 gpr_mu_unlock(&pollset->mu);
1390 grpc_exec_ctx_flush(exec_ctx);
1391 gpr_mu_lock(&pollset->mu);
1392 }
1393
1394 *worker_hdl = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001395 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1396 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001397 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001398 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1399 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001400}
1401
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001402static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1403 grpc_fd *fd) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001404 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001405 gpr_mu_lock(&pollset->pi_mu);
1406 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001407
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001408 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001409
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001410 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1411 * equal, do nothing.
1412 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1413 * a new polling island (with a refcount of 2) and make the polling_island
1414 * fields in both fd and pollset to point to the new island
1415 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1416 * the NULL polling_island field to point to the non-NULL polling_island
1417 * field (ensure that the refcount on the polling island is incremented by
1418 * 1 to account for the newly added reference)
1419 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1420 * and different, merge both the polling islands and update the
1421 * polling_island fields in both fd and pollset to point to the merged
1422 * polling island.
1423 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001424 if (fd->polling_island == pollset->polling_island) {
1425 pi_new = fd->polling_island;
1426 if (pi_new == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001427 pi_new = polling_island_create(fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001428 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001429 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001430 pi_new = polling_island_lock(pollset->polling_island);
1431 polling_island_add_fds_locked(pi_new, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001432 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001433 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001434 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001435 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001436 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001437 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001438 }
1439
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001440 if (fd->polling_island != pi_new) {
1441 PI_ADD_REF(pi_new, "fd");
1442 if (fd->polling_island != NULL) {
1443 PI_UNREF(fd->polling_island, "fd");
1444 }
1445 fd->polling_island = pi_new;
1446 }
1447
1448 if (pollset->polling_island != pi_new) {
1449 PI_ADD_REF(pi_new, "ps");
1450 if (pollset->polling_island != NULL) {
1451 PI_UNREF(pollset->polling_island, "ps");
1452 }
1453 pollset->polling_island = pi_new;
1454 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001455
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001456 gpr_mu_unlock(&fd->pi_mu);
1457 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001458 gpr_mu_unlock(&pollset->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001459}
1460
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001461/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001462 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001463 */
1464
1465static grpc_pollset_set *pollset_set_create(void) {
1466 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1467 memset(pollset_set, 0, sizeof(*pollset_set));
1468 gpr_mu_init(&pollset_set->mu);
1469 return pollset_set;
1470}
1471
1472static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1473 size_t i;
1474 gpr_mu_destroy(&pollset_set->mu);
1475 for (i = 0; i < pollset_set->fd_count; i++) {
1476 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1477 }
1478 gpr_free(pollset_set->pollsets);
1479 gpr_free(pollset_set->pollset_sets);
1480 gpr_free(pollset_set->fds);
1481 gpr_free(pollset_set);
1482}
1483
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001484static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1485 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1486 size_t i;
1487 gpr_mu_lock(&pollset_set->mu);
1488 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1489 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1490 pollset_set->fds = gpr_realloc(
1491 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1492 }
1493 GRPC_FD_REF(fd, "pollset_set");
1494 pollset_set->fds[pollset_set->fd_count++] = fd;
1495 for (i = 0; i < pollset_set->pollset_count; i++) {
1496 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1497 }
1498 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1499 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1500 }
1501 gpr_mu_unlock(&pollset_set->mu);
1502}
1503
1504static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1505 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1506 size_t i;
1507 gpr_mu_lock(&pollset_set->mu);
1508 for (i = 0; i < pollset_set->fd_count; i++) {
1509 if (pollset_set->fds[i] == fd) {
1510 pollset_set->fd_count--;
1511 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1512 pollset_set->fds[pollset_set->fd_count]);
1513 GRPC_FD_UNREF(fd, "pollset_set");
1514 break;
1515 }
1516 }
1517 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1518 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1519 }
1520 gpr_mu_unlock(&pollset_set->mu);
1521}
1522
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001523static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1524 grpc_pollset_set *pollset_set,
1525 grpc_pollset *pollset) {
1526 size_t i, j;
1527 gpr_mu_lock(&pollset_set->mu);
1528 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1529 pollset_set->pollset_capacity =
1530 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1531 pollset_set->pollsets =
1532 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1533 sizeof(*pollset_set->pollsets));
1534 }
1535 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1536 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1537 if (fd_is_orphaned(pollset_set->fds[i])) {
1538 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1539 } else {
1540 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1541 pollset_set->fds[j++] = pollset_set->fds[i];
1542 }
1543 }
1544 pollset_set->fd_count = j;
1545 gpr_mu_unlock(&pollset_set->mu);
1546}
1547
1548static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1549 grpc_pollset_set *pollset_set,
1550 grpc_pollset *pollset) {
1551 size_t i;
1552 gpr_mu_lock(&pollset_set->mu);
1553 for (i = 0; i < pollset_set->pollset_count; i++) {
1554 if (pollset_set->pollsets[i] == pollset) {
1555 pollset_set->pollset_count--;
1556 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1557 pollset_set->pollsets[pollset_set->pollset_count]);
1558 break;
1559 }
1560 }
1561 gpr_mu_unlock(&pollset_set->mu);
1562}
1563
1564static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1565 grpc_pollset_set *bag,
1566 grpc_pollset_set *item) {
1567 size_t i, j;
1568 gpr_mu_lock(&bag->mu);
1569 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1570 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1571 bag->pollset_sets =
1572 gpr_realloc(bag->pollset_sets,
1573 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1574 }
1575 bag->pollset_sets[bag->pollset_set_count++] = item;
1576 for (i = 0, j = 0; i < bag->fd_count; i++) {
1577 if (fd_is_orphaned(bag->fds[i])) {
1578 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1579 } else {
1580 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1581 bag->fds[j++] = bag->fds[i];
1582 }
1583 }
1584 bag->fd_count = j;
1585 gpr_mu_unlock(&bag->mu);
1586}
1587
1588static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1589 grpc_pollset_set *bag,
1590 grpc_pollset_set *item) {
1591 size_t i;
1592 gpr_mu_lock(&bag->mu);
1593 for (i = 0; i < bag->pollset_set_count; i++) {
1594 if (bag->pollset_sets[i] == item) {
1595 bag->pollset_set_count--;
1596 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1597 bag->pollset_sets[bag->pollset_set_count]);
1598 break;
1599 }
1600 }
1601 gpr_mu_unlock(&bag->mu);
1602}
1603
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001604/* Test helper functions
1605 * */
1606void *grpc_fd_get_polling_island(grpc_fd *fd) {
1607 polling_island *pi;
1608
1609 gpr_mu_lock(&fd->pi_mu);
1610 pi = fd->polling_island;
1611 gpr_mu_unlock(&fd->pi_mu);
1612
1613 return pi;
1614}
1615
1616void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1617 polling_island *pi;
1618
1619 gpr_mu_lock(&ps->pi_mu);
1620 pi = ps->polling_island;
1621 gpr_mu_unlock(&ps->pi_mu);
1622
1623 return pi;
1624}
1625
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001626bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001627 polling_island *p1 = p;
1628 polling_island *p2 = q;
1629
1630 polling_island_lock_pair(&p1, &p2);
1631 if (p1 == p2) {
1632 gpr_mu_unlock(&p1->mu);
1633 } else {
1634 gpr_mu_unlock(&p1->mu);
1635 gpr_mu_unlock(&p2->mu);
1636 }
1637
1638 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001639}
1640
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001641/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001642 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001643 */
1644
1645static void shutdown_engine(void) {
1646 fd_global_shutdown();
1647 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001648 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001649}
1650
1651static const grpc_event_engine_vtable vtable = {
1652 .pollset_size = sizeof(grpc_pollset),
1653
1654 .fd_create = fd_create,
1655 .fd_wrapped_fd = fd_wrapped_fd,
1656 .fd_orphan = fd_orphan,
1657 .fd_shutdown = fd_shutdown,
1658 .fd_notify_on_read = fd_notify_on_read,
1659 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001660 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001661
1662 .pollset_init = pollset_init,
1663 .pollset_shutdown = pollset_shutdown,
1664 .pollset_reset = pollset_reset,
1665 .pollset_destroy = pollset_destroy,
1666 .pollset_work = pollset_work,
1667 .pollset_kick = pollset_kick,
1668 .pollset_add_fd = pollset_add_fd,
1669
1670 .pollset_set_create = pollset_set_create,
1671 .pollset_set_destroy = pollset_set_destroy,
1672 .pollset_set_add_pollset = pollset_set_add_pollset,
1673 .pollset_set_del_pollset = pollset_set_del_pollset,
1674 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1675 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1676 .pollset_set_add_fd = pollset_set_add_fd,
1677 .pollset_set_del_fd = pollset_set_del_fd,
1678
1679 .kick_poller = kick_poller,
1680
1681 .shutdown_engine = shutdown_engine,
1682};
1683
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001684/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1685 * Create a dummy epoll_fd to make sure epoll support is available */
1686static bool is_epoll_available() {
1687 int fd = epoll_create1(EPOLL_CLOEXEC);
1688 if (fd < 0) {
1689 gpr_log(
1690 GPR_ERROR,
1691 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1692 fd);
1693 return false;
1694 }
1695 close(fd);
1696 return true;
1697}
1698
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001699const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001700 /* If use of signals is disabled, we cannot use epoll engine*/
1701 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1702 return NULL;
1703 }
1704
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001705 if (!is_epoll_available()) {
1706 return NULL;
1707 }
1708
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001709 if (!is_grpc_wakeup_signal_initialized) {
1710 grpc_use_signal(SIGRTMIN + 2);
1711 }
1712
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001713 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001714
1715 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1716 return NULL;
1717 }
1718
1719 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1720 polling_island_global_init())) {
1721 return NULL;
1722 }
1723
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001724 return &vtable;
1725}
1726
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001727#else /* defined(GPR_LINUX_EPOLL) */
1728#if defined(GPR_POSIX_SOCKET)
1729#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001730/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1731 * NULL */
1732const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001733#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001734
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001735void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001736#endif /* !defined(GPR_LINUX_EPOLL) */