blob: c077987c01bd58324618e4648fc2cf6991b9e792 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070037#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <signal.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
51#include <grpc/support/log.h>
52#include <grpc/support/string_util.h>
53#include <grpc/support/tls.h>
54#include <grpc/support/useful.h>
55
56#include "src/core/lib/iomgr/ev_posix.h"
57#include "src/core/lib/iomgr/iomgr_internal.h"
58#include "src/core/lib/iomgr/wakeup_fd_posix.h"
59#include "src/core/lib/profiling/timers.h"
60#include "src/core/lib/support/block_annotate.h"
61
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070062static int grpc_wakeup_signal = -1;
63static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070064
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070065/* Implements the function defined in grpc_posix.h. This function might be
66 * called before even calling grpc_init() to set either a different signal to
67 * use. If signum == -1, then the use of signals is disabled */
68void grpc_use_signal(int signum) {
69 grpc_wakeup_signal = signum;
70 is_grpc_wakeup_signal_initialized = true;
71
72 if (grpc_wakeup_signal < 0) {
73 gpr_log(GPR_INFO,
74 "Use of signals is disabled. Epoll engine will not be used");
75 } else {
76 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
77 grpc_wakeup_signal);
78 }
79}
80
81struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070082
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070083/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070084 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070086struct grpc_fd {
87 int fd;
88 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070089 bit 0 : 1=Active / 0=Orphaned
90 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070091 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 gpr_atm refst;
93
94 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070095
96 /* Indicates that the fd is shutdown and that any pending read/write closures
97 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070098 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070099
100 /* The fd is either closed or we relinquished control of it. In either cases,
101 this indicates that the 'fd' on this structure is no longer valid */
102 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700103
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700104 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700105 grpc_closure *read_closure;
106 grpc_closure *write_closure;
107
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 /* The polling island to which this fd belongs to and the mutex protecting the
109 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700110 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 struct polling_island *polling_island;
112
113 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *on_done_closure;
115
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700116 /* The pollset that last noticed that the fd is readable */
117 grpc_pollset *read_notifier_pollset;
118
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 grpc_iomgr_object iomgr_object;
120};
121
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123#ifdef GRPC_FD_REF_COUNT_DEBUG
124static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
125static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
126 int line);
127#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
128#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
129#else
130static void fd_ref(grpc_fd *fd);
131static void fd_unref(grpc_fd *fd);
132#define GRPC_FD_REF(fd, reason) fd_ref(fd)
133#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
134#endif
135
136static void fd_global_init(void);
137static void fd_global_shutdown(void);
138
139#define CLOSURE_NOT_READY ((grpc_closure *)0)
140#define CLOSURE_READY ((grpc_closure *)1)
141
142/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700143 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700144 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700145
146// #define GRPC_PI_REF_COUNT_DEBUG
147#ifdef GRPC_PI_REF_COUNT_DEBUG
148
149#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), 1, (r), __FILE__, __LINE__)
150#define PI_UNREF(p, r) pi_unref_dbg((p), 1, (r), __FILE__, __LINE__)
151
152#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
153
154#define PI_ADD_REF(p, r) pi_add_ref((p), 1)
155#define PI_UNREF(p, r) pi_unref((p), 1)
156
157#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
158
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700159typedef struct polling_island {
160 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700161 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
162 the refcount.
163 Once the ref count becomes zero, this structure is destroyed which means
164 we should ensure that there is never a scenario where a PI_ADD_REF() is
165 racing with a PI_UNREF() that just made the ref_count zero. */
166 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700167
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700168 /* Pointer to the polling_island this merged into.
169 * merged_to value is only set once in polling_island's lifetime (and that too
170 * only if the island is merged with another island). Because of this, we can
171 * use gpr_atm type here so that we can do atomic access on this and reduce
172 * lock contention on 'mu' mutex.
173 *
174 * Note that if this field is not NULL (i.e not 0), all the remaining fields
175 * (except mu and ref_count) are invalid and must be ignored. */
176 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700177
178 /* The fd of the underlying epoll set */
179 int epoll_fd;
180
181 /* The file descriptors in the epoll set */
182 size_t fd_cnt;
183 size_t fd_capacity;
184 grpc_fd **fds;
185
186 /* Polling islands that are no longer needed are kept in a freelist so that
187 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700188 free list */
189 struct polling_island *next_free;
190} polling_island;
191
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700192/*******************************************************************************
193 * Pollset Declarations
194 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700195struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700196 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700197 struct grpc_pollset_worker *next;
198 struct grpc_pollset_worker *prev;
199};
200
201struct grpc_pollset {
202 gpr_mu mu;
203 grpc_pollset_worker root_worker;
204 bool kicked_without_pollers;
205
206 bool shutting_down; /* Is the pollset shutting down ? */
207 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
208 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
209
210 /* The polling island to which this pollset belongs to and the mutex
211 protecting the field */
Sree Kuchibhotlae682e462016-06-08 15:40:21 -0700212 /* TODO: sreek: This lock might actually be adding more overhead to the
213 critical path (i.e pollset_work() function). Consider removing this lock
214 and just using the overall pollset lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700215 gpr_mu pi_mu;
216 struct polling_island *polling_island;
217};
218
219/*******************************************************************************
220 * Pollset-set Declarations
221 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700222/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
223 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
224 * the current pollset_set would result in polling island merges. This would
225 * remove the need to maintain fd_count here. This will also significantly
226 * simplify the grpc_fd structure since we would no longer need to explicitly
227 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700228struct grpc_pollset_set {
229 gpr_mu mu;
230
231 size_t pollset_count;
232 size_t pollset_capacity;
233 grpc_pollset **pollsets;
234
235 size_t pollset_set_count;
236 size_t pollset_set_capacity;
237 struct grpc_pollset_set **pollset_sets;
238
239 size_t fd_count;
240 size_t fd_capacity;
241 grpc_fd **fds;
242};
243
244/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700245 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700246 */
247
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700248/* The wakeup fd that is used to wake up all threads in a Polling island. This
249 is useful in the polling island merge operation where we need to wakeup all
250 the threads currently polling the smaller polling island (so that they can
251 start polling the new/merged polling island)
252
253 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
254 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
255static grpc_wakeup_fd polling_island_wakeup_fd;
256
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700257/* Polling island freelist */
258static gpr_mu g_pi_freelist_mu;
259static polling_island *g_pi_freelist = NULL;
260
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700261static void polling_island_delete(); /* Forward declaration */
262
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700263#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700264/* Currently TSAN may incorrectly flag data races between epoll_ctl and
265 epoll_wait for any grpc_fd structs that are added to the epoll set via
266 epoll_ctl and are returned (within a very short window) via epoll_wait().
267
268 To work-around this race, we establish a happens-before relation between
269 the code just-before epoll_ctl() and the code after epoll_wait() by using
270 this atomic */
271gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700272#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700273
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700274#ifdef GRPC_PI_REF_COUNT_DEBUG
275long pi_add_ref(polling_island *pi, int ref_cnt);
276long pi_unref(polling_island *pi, int ref_cnt);
277
278void pi_add_ref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
279 int line) {
280 long old_cnt = pi_add_ref(pi, ref_cnt);
281 gpr_log(GPR_DEBUG, "Add ref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
282 (void *)pi, old_cnt, (old_cnt + ref_cnt), reason, file, line);
283}
284
285void pi_unref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
286 int line) {
287 long old_cnt = pi_unref(pi, ref_cnt);
288 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
289 (void *)pi, old_cnt, (old_cnt - ref_cnt), reason, file, line);
290}
291#endif
292
293long pi_add_ref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700294 return gpr_atm_full_fetch_add(&pi->ref_count, ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700295}
296
297long pi_unref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700298 long old_cnt = gpr_atm_full_fetch_add(&pi->ref_count, -ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700299
300 /* If ref count went to zero, delete the polling island. Note that this need
301 not be done under a lock. Once the ref count goes to zero, we are
302 guaranteed that no one else holds a reference to the polling island (and
303 that there is no racing pi_add_ref() call either.
304
305 Also, if we are deleting the polling island and the merged_to field is
306 non-empty, we should remove a ref to the merged_to polling island
307 */
308 if (old_cnt == ref_cnt) {
309 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
310 polling_island_delete(pi);
311 if (next != NULL) {
312 PI_UNREF(next, "pi_delete"); /* Recursive call */
313 }
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700314 } else {
315 GPR_ASSERT(old_cnt > ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700316 }
317
318 return old_cnt;
319}
320
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700321/* The caller is expected to hold pi->mu lock before calling this function */
322static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700323 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700324 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700325 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700326 struct epoll_event ev;
327
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700328#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700329 /* See the definition of g_epoll_sync for more context */
330 gpr_atm_rel_store(&g_epoll_sync, 0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700331#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700332
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700333 for (i = 0; i < fd_count; i++) {
334 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
335 ev.data.ptr = fds[i];
336 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700337
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700338 if (err < 0) {
339 if (errno != EEXIST) {
340 /* TODO: sreek - We need a better way to bubble up this error instead of
341 just logging a message */
342 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
343 fds[i]->fd, strerror(errno));
344 }
345
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700346 continue;
347 }
348
349 if (pi->fd_cnt == pi->fd_capacity) {
350 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
351 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
352 }
353
354 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700355 if (add_fd_refs) {
356 GRPC_FD_REF(fds[i], "polling_island");
357 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700358 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700359}
360
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700361/* The caller is expected to hold pi->mu before calling this */
362static void polling_island_add_wakeup_fd_locked(polling_island *pi,
363 grpc_wakeup_fd *wakeup_fd) {
364 struct epoll_event ev;
365 int err;
366
367 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
368 ev.data.ptr = wakeup_fd;
369 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
370 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
371 if (err < 0) {
372 gpr_log(GPR_ERROR,
373 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
374 ". Error: %s",
375 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
376 strerror(errno));
377 }
378}
379
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700380/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700381static void polling_island_remove_all_fds_locked(polling_island *pi,
382 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700383 int err;
384 size_t i;
385
386 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700387 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700388 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700389 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700390 * just logging a message */
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -0700391 gpr_log(GPR_ERROR,
392 "epoll_ctl deleting fds[%zu]: %d failed with error: %s", i,
393 pi->fds[i]->fd, strerror(errno));
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700394 }
395
396 if (remove_fd_refs) {
397 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700398 }
399 }
400
401 pi->fd_cnt = 0;
402}
403
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700404/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700405static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700406 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700407 int err;
408 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700409
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700410 /* If fd is already closed, then it would have been automatically been removed
411 from the epoll set */
412 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700413 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
414 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700415 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700416 fd->fd, strerror(errno));
417 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700418 }
419
420 for (i = 0; i < pi->fd_cnt; i++) {
421 if (pi->fds[i] == fd) {
422 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700423 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700424 break;
425 }
426 }
427}
428
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700429static polling_island *polling_island_create(grpc_fd *initial_fd) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700430 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700431
432 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700433 gpr_mu_lock(&g_pi_freelist_mu);
434 if (g_pi_freelist != NULL) {
435 pi = g_pi_freelist;
436 g_pi_freelist = g_pi_freelist->next_free;
437 pi->next_free = NULL;
438 }
439 gpr_mu_unlock(&g_pi_freelist_mu);
440
441 /* Create new polling island if we could not get one from the free list */
442 if (pi == NULL) {
443 pi = gpr_malloc(sizeof(*pi));
444 gpr_mu_init(&pi->mu);
445 pi->fd_cnt = 0;
446 pi->fd_capacity = 0;
447 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700448 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700449
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700450 gpr_atm_rel_store(&pi->ref_count, 0);
451 gpr_atm_rel_store(&pi->merged_to, NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700452
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700453 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700454
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700455 if (pi->epoll_fd < 0) {
456 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
457 strerror(errno));
458 }
459 GPR_ASSERT(pi->epoll_fd >= 0);
460
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700461 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700462
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700463 pi->next_free = NULL;
464
465 if (initial_fd != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700466 /* Lock the polling island here just in case we got this structure from the
467 freelist and the polling island lock was not released yet (by the code
468 that adds the polling island to the freelist) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700469 gpr_mu_lock(&pi->mu);
470 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
471 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700472 }
473
474 return pi;
475}
476
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700477static void polling_island_delete(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700478 GPR_ASSERT(pi->fd_cnt == 0);
479
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700480 gpr_atm_rel_store(&pi->merged_to, NULL);
481
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700482 close(pi->epoll_fd);
483 pi->epoll_fd = -1;
484
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700485 gpr_mu_lock(&g_pi_freelist_mu);
486 pi->next_free = g_pi_freelist;
487 g_pi_freelist = pi;
488 gpr_mu_unlock(&g_pi_freelist_mu);
489}
490
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700491/* Gets the lock on the *latest* polling island i.e the last polling island in
492 the linked list (linked by 'merged_to' link). Call gpr_mu_unlock on the
493 returned polling island's mu.
494 Usage: To lock/unlock polling island "pi", do the following:
495 polling_island *pi_latest = polling_island_lock(pi);
496 ...
497 ... critical section ..
498 ...
499 gpr_mu_unlock(&pi_latest->mu); //NOTE: use pi_latest->mu. NOT pi->mu */
500polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700501 polling_island *next = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700502 while (true) {
503 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
504 if (next == NULL) {
505 /* pi is the last node in the linked list. Get the lock and check again
506 (under the pi->mu lock) that pi is still the last node (because a merge
507 may have happend after the (next == NULL) check above and before
508 getting the pi->mu lock.
509 If pi is the last node, we are done. If not, unlock and continue
510 traversing the list */
511 gpr_mu_lock(&pi->mu);
512 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
513 if (next == NULL) {
514 break;
515 }
516 gpr_mu_unlock(&pi->mu);
517 }
518
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700519 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700520 }
521
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700522 return pi;
523}
524
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700525/* Gets the lock on the *latest* polling islands pointed by *p and *q.
526 This function is needed because calling the following block of code to obtain
527 locks on polling islands (*p and *q) is prone to deadlocks.
528 {
529 polling_island_lock(*p);
530 polling_island_lock(*q);
531 }
532
533 Usage/exmaple:
534 polling_island *p1;
535 polling_island *p2;
536 ..
537 polling_island_lock_pair(&p1, &p2);
538 ..
539 .. Critical section with both p1 and p2 locked
540 ..
541 // Release locks
542 // **IMPORTANT**: Make sure you check p1 == p2 AFTER the function
543 // polling_island_lock_pair() was called and if so, release the lock only
544 // once. Note: Even if p1 != p2 beforec calling polling_island_lock_pair(),
545 // they might be after the function returns:
546 if (p1 == p2) {
547 gpr_mu_unlock(&p1->mu)
548 } else {
549 gpr_mu_unlock(&p1->mu);
550 gpr_mu_unlock(&p2->mu);
551 }
552
553*/
554void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700555 polling_island *pi_1 = *p;
556 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700557 polling_island *next_1 = NULL;
558 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700559
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700560 /* The algorithm is simple:
561 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
562 keep updating pi_1 and pi_2)
563 - Then obtain locks on the islands by following a lock order rule of
564 locking polling_island with lower address first
565 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
566 pointing to the same island. If that is the case, we can just call
567 polling_island_lock()
568 - After obtaining both the locks, double check that the polling islands
569 are still the last polling islands in their respective linked lists
570 (this is because there might have been polling island merges before
571 we got the lock)
572 - If the polling islands are the last islands, we are done. If not,
573 release the locks and continue the process from the first step */
574 while (true) {
575 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
576 while (next_1 != NULL) {
577 pi_1 = next_1;
578 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700579 }
580
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700581 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
582 while (next_2 != NULL) {
583 pi_2 = next_2;
584 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
585 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700586
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700587 if (pi_1 == pi_2) {
588 pi_1 = pi_2 = polling_island_lock(pi_1);
589 break;
590 }
591
592 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700593 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700594 gpr_mu_lock(&pi_2->mu);
595 } else {
596 gpr_mu_lock(&pi_2->mu);
597 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700598 }
599
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700600 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
601 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
602 if (next_1 == NULL && next_2 == NULL) {
603 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700604 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700605
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700606 gpr_mu_unlock(&pi_1->mu);
607 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700608 }
609
610 *p = pi_1;
611 *q = pi_2;
612}
613
614polling_island *polling_island_merge(polling_island *p, polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700615 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700616 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700617
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700618 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700619 /* Nothing needs to be done here */
620 gpr_mu_unlock(&p->mu);
621 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700622 }
623
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700624 /* Make sure that p points to the polling island with fewer fds than q */
625 if (p->fd_cnt > q->fd_cnt) {
626 GPR_SWAP(polling_island *, p, q);
627 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700628
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700629 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700630 Note that the refcounts on the fds being moved will not change here. This
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700631 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700632 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
633 polling_island_remove_all_fds_locked(p, false);
634
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700635 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
636 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
637
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700638 /* Add the 'merged_to' link from p --> q */
639 gpr_atm_rel_store(&p->merged_to, q);
640 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla58e58962016-06-13 00:52:56 -0700641
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700642 gpr_mu_unlock(&p->mu);
643 gpr_mu_unlock(&q->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700644
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700645 /* Return the merged polling island */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700646 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700647}
648
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700649static grpc_error *polling_island_global_init() {
650 grpc_error *error = GRPC_ERROR_NONE;
651
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700652 gpr_mu_init(&g_pi_freelist_mu);
653 g_pi_freelist = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700654
655 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
656 if (error == GRPC_ERROR_NONE) {
657 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
658 }
659
660 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700661}
662
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700663static void polling_island_global_shutdown() {
664 polling_island *next;
665 gpr_mu_lock(&g_pi_freelist_mu);
666 gpr_mu_unlock(&g_pi_freelist_mu);
667 while (g_pi_freelist != NULL) {
668 next = g_pi_freelist->next_free;
669 gpr_mu_destroy(&g_pi_freelist->mu);
670 gpr_free(g_pi_freelist->fds);
671 gpr_free(g_pi_freelist);
672 g_pi_freelist = next;
673 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700674 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700675
676 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700677}
678
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700679/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700680 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700681 */
682
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700683/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700684 * but instead so that implementations with multiple threads in (for example)
685 * epoll_wait deal with the race between pollset removal and incoming poll
686 * notifications.
687 *
688 * The problem is that the poller ultimately holds a reference to this
689 * object, so it is very difficult to know when is safe to free it, at least
690 * without some expensive synchronization.
691 *
692 * If we keep the object freelisted, in the worst case losing this race just
693 * becomes a spurious read notification on a reused fd.
694 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700695
696/* The alarm system needs to be able to wakeup 'some poller' sometimes
697 * (specifically when a new alarm needs to be triggered earlier than the next
698 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
699 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700700
701/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
702 * sure to wake up one polling thread (which can wake up other threads if
703 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700704grpc_wakeup_fd grpc_global_wakeup_fd;
705
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700706static grpc_fd *fd_freelist = NULL;
707static gpr_mu fd_freelist_mu;
708
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700709#ifdef GRPC_FD_REF_COUNT_DEBUG
710#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
711#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
712static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
713 int line) {
714 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
715 gpr_atm_no_barrier_load(&fd->refst),
716 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
717#else
718#define REF_BY(fd, n, reason) ref_by(fd, n)
719#define UNREF_BY(fd, n, reason) unref_by(fd, n)
720static void ref_by(grpc_fd *fd, int n) {
721#endif
722 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
723}
724
725#ifdef GRPC_FD_REF_COUNT_DEBUG
726static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
727 int line) {
728 gpr_atm old;
729 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
730 gpr_atm_no_barrier_load(&fd->refst),
731 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
732#else
733static void unref_by(grpc_fd *fd, int n) {
734 gpr_atm old;
735#endif
736 old = gpr_atm_full_fetch_add(&fd->refst, -n);
737 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700738 /* Add the fd to the freelist */
739 gpr_mu_lock(&fd_freelist_mu);
740 fd->freelist_next = fd_freelist;
741 fd_freelist = fd;
742 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700743
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700744 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700745 } else {
746 GPR_ASSERT(old > n);
747 }
748}
749
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700750/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700751#ifdef GRPC_FD_REF_COUNT_DEBUG
752static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
753 int line) {
754 ref_by(fd, 2, reason, file, line);
755}
756
757static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
758 int line) {
759 unref_by(fd, 2, reason, file, line);
760}
761#else
762static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700763static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
764#endif
765
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700766static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
767
768static void fd_global_shutdown(void) {
769 gpr_mu_lock(&fd_freelist_mu);
770 gpr_mu_unlock(&fd_freelist_mu);
771 while (fd_freelist != NULL) {
772 grpc_fd *fd = fd_freelist;
773 fd_freelist = fd_freelist->freelist_next;
774 gpr_mu_destroy(&fd->mu);
775 gpr_free(fd);
776 }
777 gpr_mu_destroy(&fd_freelist_mu);
778}
779
780static grpc_fd *fd_create(int fd, const char *name) {
781 grpc_fd *new_fd = NULL;
782
783 gpr_mu_lock(&fd_freelist_mu);
784 if (fd_freelist != NULL) {
785 new_fd = fd_freelist;
786 fd_freelist = fd_freelist->freelist_next;
787 }
788 gpr_mu_unlock(&fd_freelist_mu);
789
790 if (new_fd == NULL) {
791 new_fd = gpr_malloc(sizeof(grpc_fd));
792 gpr_mu_init(&new_fd->mu);
793 gpr_mu_init(&new_fd->pi_mu);
794 }
795
796 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
797 newly created fd (or an fd we got from the freelist), no one else would be
798 holding a lock to it anyway. */
799 gpr_mu_lock(&new_fd->mu);
800
801 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700802 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700803 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700804 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700805 new_fd->read_closure = CLOSURE_NOT_READY;
806 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700807 new_fd->polling_island = NULL;
808 new_fd->freelist_next = NULL;
809 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700810 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700811
812 gpr_mu_unlock(&new_fd->mu);
813
814 char *fd_name;
815 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
816 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
817 gpr_free(fd_name);
818#ifdef GRPC_FD_REF_COUNT_DEBUG
819 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
820#endif
821 return new_fd;
822}
823
824static bool fd_is_orphaned(grpc_fd *fd) {
825 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
826}
827
828static int fd_wrapped_fd(grpc_fd *fd) {
829 int ret_fd = -1;
830 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700831 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700832 ret_fd = fd->fd;
833 }
834 gpr_mu_unlock(&fd->mu);
835
836 return ret_fd;
837}
838
839static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
840 grpc_closure *on_done, int *release_fd,
841 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700842 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700843 gpr_mu_lock(&fd->mu);
844 fd->on_done_closure = on_done;
845
846 /* If release_fd is not NULL, we should be relinquishing control of the file
847 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700848 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700849 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700850 } else {
851 close(fd->fd);
852 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700853 }
854
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700855 fd->orphaned = true;
856
857 /* Remove the active status but keep referenced. We want this grpc_fd struct
858 to be alive (and not added to freelist) until the end of this function */
859 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700860
861 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700862 - Get a lock on the latest polling island (i.e the last island in the
863 linked list pointed by fd->polling_island). This is the island that
864 would actually contain the fd
865 - Remove the fd from the latest polling island
866 - Unlock the latest polling island
867 - Set fd->polling_island to NULL (but remove the ref on the polling island
868 before doing this.) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700869 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700870 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700871 polling_island *pi_latest = polling_island_lock(fd->polling_island);
872 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed);
873 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700874
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700875 PI_UNREF(fd->polling_island, "fd_orphan");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700876 fd->polling_island = NULL;
877 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700878 gpr_mu_unlock(&fd->pi_mu);
879
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700880 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700881
882 gpr_mu_unlock(&fd->mu);
883 UNREF_BY(fd, 2, reason); /* Drop the reference */
884}
885
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700886static grpc_error *fd_shutdown_error(bool shutdown) {
887 if (!shutdown) {
888 return GRPC_ERROR_NONE;
889 } else {
890 return GRPC_ERROR_CREATE("FD shutdown");
891 }
892}
893
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700894static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
895 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700896 if (fd->shutdown) {
897 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
898 NULL);
899 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700900 /* not ready ==> switch to a waiting state by setting the closure */
901 *st = closure;
902 } else if (*st == CLOSURE_READY) {
903 /* already ready ==> queue the closure to run immediately */
904 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700905 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
906 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700907 } else {
908 /* upcallptr was set to a different closure. This is an error! */
909 gpr_log(GPR_ERROR,
910 "User called a notify_on function with a previous callback still "
911 "pending");
912 abort();
913 }
914}
915
916/* returns 1 if state becomes not ready */
917static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
918 grpc_closure **st) {
919 if (*st == CLOSURE_READY) {
920 /* duplicate ready ==> ignore */
921 return 0;
922 } else if (*st == CLOSURE_NOT_READY) {
923 /* not ready, and not waiting ==> flag ready */
924 *st = CLOSURE_READY;
925 return 0;
926 } else {
927 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700928 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700929 *st = CLOSURE_NOT_READY;
930 return 1;
931 }
932}
933
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700934static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
935 grpc_fd *fd) {
936 grpc_pollset *notifier = NULL;
937
938 gpr_mu_lock(&fd->mu);
939 notifier = fd->read_notifier_pollset;
940 gpr_mu_unlock(&fd->mu);
941
942 return notifier;
943}
944
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700945/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700946static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
947 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700948 /* Do the actual shutdown only once */
949 if (!fd->shutdown) {
950 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700951
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700952 shutdown(fd->fd, SHUT_RDWR);
953 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
954 at this point, the closures would be called with 'success = false' */
955 set_ready_locked(exec_ctx, fd, &fd->read_closure);
956 set_ready_locked(exec_ctx, fd, &fd->write_closure);
957 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700958 gpr_mu_unlock(&fd->mu);
959}
960
961static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
962 grpc_closure *closure) {
963 gpr_mu_lock(&fd->mu);
964 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
965 gpr_mu_unlock(&fd->mu);
966}
967
968static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
969 grpc_closure *closure) {
970 gpr_mu_lock(&fd->mu);
971 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
972 gpr_mu_unlock(&fd->mu);
973}
974
975/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700976 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700977 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700978GPR_TLS_DECL(g_current_thread_pollset);
979GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700980
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700981static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700982#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700983 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700984#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700985}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700986
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -0700987static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700988
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700989/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700990static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700991 gpr_tls_init(&g_current_thread_pollset);
992 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700993 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700994 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700995}
996
997static void pollset_global_shutdown(void) {
998 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700999 gpr_tls_destroy(&g_current_thread_pollset);
1000 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001001}
1002
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001003static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1004 grpc_error *err = GRPC_ERROR_NONE;
1005 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1006 if (err_num != 0) {
1007 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1008 }
1009 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001010}
1011
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001012/* Return 1 if the pollset has active threads in pollset_work (pollset must
1013 * be locked) */
1014static int pollset_has_workers(grpc_pollset *p) {
1015 return p->root_worker.next != &p->root_worker;
1016}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001017
1018static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1019 worker->prev->next = worker->next;
1020 worker->next->prev = worker->prev;
1021}
1022
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001023static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1024 if (pollset_has_workers(p)) {
1025 grpc_pollset_worker *w = p->root_worker.next;
1026 remove_worker(p, w);
1027 return w;
1028 } else {
1029 return NULL;
1030 }
1031}
1032
1033static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1034 worker->next = &p->root_worker;
1035 worker->prev = worker->next->prev;
1036 worker->prev->next = worker->next->prev = worker;
1037}
1038
1039static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1040 worker->prev = &p->root_worker;
1041 worker->next = worker->prev->next;
1042 worker->prev->next = worker->next->prev = worker;
1043}
1044
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001045static void kick_append_error(grpc_error **composite, grpc_error *error) {
1046 if (error == GRPC_ERROR_NONE) return;
1047 if (*composite == GRPC_ERROR_NONE) {
1048 *composite = GRPC_ERROR_CREATE("Kick Failure");
1049 }
1050 *composite = grpc_error_add_child(*composite, error);
1051}
1052
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001053/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001054static grpc_error *pollset_kick(grpc_pollset *p,
1055 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001056 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001057 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001058
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001059 grpc_pollset_worker *worker = specific_worker;
1060 if (worker != NULL) {
1061 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001062 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001063 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001064 for (worker = p->root_worker.next; worker != &p->root_worker;
1065 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001066 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001067 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001068 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001069 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001070 } else {
1071 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001072 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001073 GPR_TIMER_END("pollset_kick.broadcast", 0);
1074 } else {
1075 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001076 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001077 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001078 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001079 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001080 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1081 /* Since worker == NULL, it means that we can kick "any" worker on this
1082 pollset 'p'. If 'p' happens to be the same pollset this thread is
1083 currently polling (i.e in pollset_work() function), then there is no need
1084 to kick any other worker since the current thread can just absorb the
1085 kick. This is the reason why we enter this case only when
1086 g_current_thread_pollset is != p */
1087
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001088 GPR_TIMER_MARK("kick_anonymous", 0);
1089 worker = pop_front_worker(p);
1090 if (worker != NULL) {
1091 GPR_TIMER_MARK("finally_kick", 0);
1092 push_back_worker(p, worker);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001093 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001094 } else {
1095 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001096 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001097 }
1098 }
1099
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001100 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001101 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1102 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001103}
1104
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001105static grpc_error *kick_poller(void) {
1106 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1107}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001108
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001109static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1110 gpr_mu_init(&pollset->mu);
1111 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001112
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001113 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001114 pollset->kicked_without_pollers = false;
1115
1116 pollset->shutting_down = false;
1117 pollset->finish_shutdown_called = false;
1118 pollset->shutdown_done = NULL;
1119
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001120 gpr_mu_init(&pollset->pi_mu);
1121 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001122}
1123
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001124/* Convert a timespec to milliseconds:
1125 - Very small or negative poll times are clamped to zero to do a non-blocking
1126 poll (which becomes spin polling)
1127 - Other small values are rounded up to one millisecond
1128 - Longer than a millisecond polls are rounded up to the next nearest
1129 millisecond to avoid spinning
1130 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001131static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1132 gpr_timespec now) {
1133 gpr_timespec timeout;
1134 static const int64_t max_spin_polling_us = 10;
1135 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1136 return -1;
1137 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001138
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001139 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1140 max_spin_polling_us,
1141 GPR_TIMESPAN))) <= 0) {
1142 return 0;
1143 }
1144 timeout = gpr_time_sub(deadline, now);
1145 return gpr_time_to_millis(gpr_time_add(
1146 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1147}
1148
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001149static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1150 grpc_pollset *notifier) {
1151 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001152 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001153 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1154 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001155 gpr_mu_unlock(&fd->mu);
1156}
1157
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001158static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001159 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1160 gpr_mu_lock(&fd->mu);
1161 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1162 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001163}
1164
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001165static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
1166 gpr_mu_lock(&ps->pi_mu);
1167 if (ps->polling_island != NULL) {
1168 PI_UNREF(ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001169 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001170 ps->polling_island = NULL;
1171 gpr_mu_unlock(&ps->pi_mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001172}
1173
1174static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1175 grpc_pollset *pollset) {
1176 /* The pollset cannot have any workers if we are at this stage */
1177 GPR_ASSERT(!pollset_has_workers(pollset));
1178
1179 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001180
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001181 /* Release the ref and set pollset->polling_island to NULL */
1182 pollset_release_polling_island(pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001183 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001184}
1185
1186/* pollset->mu lock must be held by the caller before calling this */
1187static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1188 grpc_closure *closure) {
1189 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1190 GPR_ASSERT(!pollset->shutting_down);
1191 pollset->shutting_down = true;
1192 pollset->shutdown_done = closure;
1193 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1194
1195 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1196 because it would release the underlying polling island. In such a case, we
1197 let the last worker call finish_shutdown_locked() from pollset_work() */
1198 if (!pollset_has_workers(pollset)) {
1199 GPR_ASSERT(!pollset->finish_shutdown_called);
1200 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1201 finish_shutdown_locked(exec_ctx, pollset);
1202 }
1203 GPR_TIMER_END("pollset_shutdown", 0);
1204}
1205
1206/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1207 * than destroying the mutexes, there is nothing special that needs to be done
1208 * here */
1209static void pollset_destroy(grpc_pollset *pollset) {
1210 GPR_ASSERT(!pollset_has_workers(pollset));
1211 gpr_mu_destroy(&pollset->pi_mu);
1212 gpr_mu_destroy(&pollset->mu);
1213}
1214
1215static void pollset_reset(grpc_pollset *pollset) {
1216 GPR_ASSERT(pollset->shutting_down);
1217 GPR_ASSERT(!pollset_has_workers(pollset));
1218 pollset->shutting_down = false;
1219 pollset->finish_shutdown_called = false;
1220 pollset->kicked_without_pollers = false;
1221 pollset->shutdown_done = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001222 pollset_release_polling_island(pollset, "ps_reset");
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001223}
1224
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001225static void work_combine_error(grpc_error **composite, grpc_error *error) {
1226 if (error == GRPC_ERROR_NONE) return;
1227 if (*composite == GRPC_ERROR_NONE) {
1228 *composite = GRPC_ERROR_CREATE("pollset_work");
1229 }
1230 *composite = grpc_error_add_child(*composite, error);
1231}
1232
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001233#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001234static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1235 grpc_pollset *pollset,
1236 int timeout_ms, sigset_t *sig_mask) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001237 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001238 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001239 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001240 polling_island *pi = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001241 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001242 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1243
1244 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001245 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001246 Acquire the following locks:
1247 - pollset->mu (which we already have)
1248 - pollset->pi_mu
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001249 - pollset->polling_island lock */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001250 gpr_mu_lock(&pollset->pi_mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001251
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001252 if (pollset->polling_island == NULL) {
1253 pollset->polling_island = polling_island_create(NULL);
1254 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001255 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001256
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001257 pi = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001258 epoll_fd = pi->epoll_fd;
1259
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001260 /* Update the pollset->polling_island since the island being pointed by
1261 pollset->polling_island may not be the latest (i.e pi) */
1262 if (pollset->polling_island != pi) {
1263 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1264 polling island to be deleted */
1265 PI_ADD_REF(pi, "ps");
1266 PI_UNREF(pollset->polling_island, "ps");
1267 pollset->polling_island = pi;
1268 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001269
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001270 /* Add an extra ref so that the island does not get destroyed (which means
1271 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1272 epoll_fd */
1273 PI_ADD_REF(pi, "ps_work");
1274
1275 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001276 gpr_mu_unlock(&pollset->pi_mu);
1277 gpr_mu_unlock(&pollset->mu);
1278
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001279 do {
1280 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1281 sig_mask);
1282 if (ep_rv < 0) {
1283 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001284 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001285 work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_pwait"));
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001286 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001287 /* We were interrupted. Save an interation by doing a zero timeout
1288 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001289 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001290 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001291 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001292
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001293#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001294 /* See the definition of g_poll_sync for more details */
1295 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001296#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001297
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001298 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001299 void *data_ptr = ep_ev[i].data.ptr;
1300 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001301 work_combine_error(
1302 &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001303 } else if (data_ptr == &polling_island_wakeup_fd) {
1304 /* This means that our polling island is merged with a different
1305 island. We do not have to do anything here since the subsequent call
1306 to the function pollset_work_and_unlock() will pick up the correct
1307 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001308 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001309 grpc_fd *fd = data_ptr;
1310 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1311 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1312 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001313 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001314 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001315 }
1316 if (write_ev || cancel) {
1317 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001318 }
1319 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001320 }
1321 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001322
1323 GPR_ASSERT(pi != NULL);
1324
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001325 /* Before leaving, release the extra ref we added to the polling island. It
1326 is important to use "pi" here (i.e our old copy of pollset->polling_island
1327 that we got before releasing the polling island lock). This is because
1328 pollset->polling_island pointer might get udpated in other parts of the
1329 code when there is an island merge while we are doing epoll_wait() above */
1330 PI_UNREF(pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001331
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001332 GPR_TIMER_END("pollset_work_and_unlock", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001333 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001334}
1335
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001336/* pollset->mu lock must be held by the caller before calling this.
1337 The function pollset_work() may temporarily release the lock (pollset->mu)
1338 during the course of its execution but it will always re-acquire the lock and
1339 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001340static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1341 grpc_pollset_worker **worker_hdl,
1342 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001343 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001344 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001345 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1346
1347 sigset_t new_mask;
1348 sigset_t orig_mask;
1349
1350 grpc_pollset_worker worker;
1351 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001352 worker.pt_id = pthread_self();
1353
1354 *worker_hdl = &worker;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001355 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1356 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001357
1358 if (pollset->kicked_without_pollers) {
1359 /* If the pollset was kicked without pollers, pretend that the current
1360 worker got the kick and skip polling. A kick indicates that there is some
1361 work that needs attention like an event on the completion queue or an
1362 alarm */
1363 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1364 pollset->kicked_without_pollers = 0;
1365 } else if (!pollset->shutting_down) {
1366 sigemptyset(&new_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001367 sigaddset(&new_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001368 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001369 sigdelset(&orig_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001370
1371 push_front_worker(pollset, &worker);
1372
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001373 error = pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001374 grpc_exec_ctx_flush(exec_ctx);
1375
1376 gpr_mu_lock(&pollset->mu);
1377 remove_worker(pollset, &worker);
1378 }
1379
1380 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1381 false at this point) and the pollset is shutting down, we may have to
1382 finish the shutdown process by calling finish_shutdown_locked().
1383 See pollset_shutdown() for more details.
1384
1385 Note: Continuing to access pollset here is safe; it is the caller's
1386 responsibility to not destroy a pollset when it has outstanding calls to
1387 pollset_work() */
1388 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1389 !pollset->finish_shutdown_called) {
1390 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1391 finish_shutdown_locked(exec_ctx, pollset);
1392
1393 gpr_mu_unlock(&pollset->mu);
1394 grpc_exec_ctx_flush(exec_ctx);
1395 gpr_mu_lock(&pollset->mu);
1396 }
1397
1398 *worker_hdl = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001399 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1400 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001401 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001402 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1403 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001404}
1405
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001406static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1407 grpc_fd *fd) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001408 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001409 gpr_mu_lock(&pollset->pi_mu);
1410 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001411
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001412 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001413
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001414 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1415 * equal, do nothing.
1416 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1417 * a new polling island (with a refcount of 2) and make the polling_island
1418 * fields in both fd and pollset to point to the new island
1419 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1420 * the NULL polling_island field to point to the non-NULL polling_island
1421 * field (ensure that the refcount on the polling island is incremented by
1422 * 1 to account for the newly added reference)
1423 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1424 * and different, merge both the polling islands and update the
1425 * polling_island fields in both fd and pollset to point to the merged
1426 * polling island.
1427 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001428 if (fd->polling_island == pollset->polling_island) {
1429 pi_new = fd->polling_island;
1430 if (pi_new == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001431 pi_new = polling_island_create(fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001432 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001433 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001434 pi_new = polling_island_lock(pollset->polling_island);
1435 polling_island_add_fds_locked(pi_new, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001436 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001437 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001438 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001439 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001440 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001441 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001442 }
1443
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001444 if (fd->polling_island != pi_new) {
1445 PI_ADD_REF(pi_new, "fd");
1446 if (fd->polling_island != NULL) {
1447 PI_UNREF(fd->polling_island, "fd");
1448 }
1449 fd->polling_island = pi_new;
1450 }
1451
1452 if (pollset->polling_island != pi_new) {
1453 PI_ADD_REF(pi_new, "ps");
1454 if (pollset->polling_island != NULL) {
1455 PI_UNREF(pollset->polling_island, "ps");
1456 }
1457 pollset->polling_island = pi_new;
1458 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001459
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001460 gpr_mu_unlock(&fd->pi_mu);
1461 gpr_mu_unlock(&pollset->pi_mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001462 gpr_mu_unlock(&pollset->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001463}
1464
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001465/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001466 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001467 */
1468
1469static grpc_pollset_set *pollset_set_create(void) {
1470 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1471 memset(pollset_set, 0, sizeof(*pollset_set));
1472 gpr_mu_init(&pollset_set->mu);
1473 return pollset_set;
1474}
1475
1476static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1477 size_t i;
1478 gpr_mu_destroy(&pollset_set->mu);
1479 for (i = 0; i < pollset_set->fd_count; i++) {
1480 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1481 }
1482 gpr_free(pollset_set->pollsets);
1483 gpr_free(pollset_set->pollset_sets);
1484 gpr_free(pollset_set->fds);
1485 gpr_free(pollset_set);
1486}
1487
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001488static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1489 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1490 size_t i;
1491 gpr_mu_lock(&pollset_set->mu);
1492 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1493 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1494 pollset_set->fds = gpr_realloc(
1495 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1496 }
1497 GRPC_FD_REF(fd, "pollset_set");
1498 pollset_set->fds[pollset_set->fd_count++] = fd;
1499 for (i = 0; i < pollset_set->pollset_count; i++) {
1500 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1501 }
1502 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1503 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1504 }
1505 gpr_mu_unlock(&pollset_set->mu);
1506}
1507
1508static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1509 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1510 size_t i;
1511 gpr_mu_lock(&pollset_set->mu);
1512 for (i = 0; i < pollset_set->fd_count; i++) {
1513 if (pollset_set->fds[i] == fd) {
1514 pollset_set->fd_count--;
1515 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1516 pollset_set->fds[pollset_set->fd_count]);
1517 GRPC_FD_UNREF(fd, "pollset_set");
1518 break;
1519 }
1520 }
1521 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1522 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1523 }
1524 gpr_mu_unlock(&pollset_set->mu);
1525}
1526
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001527static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1528 grpc_pollset_set *pollset_set,
1529 grpc_pollset *pollset) {
1530 size_t i, j;
1531 gpr_mu_lock(&pollset_set->mu);
1532 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1533 pollset_set->pollset_capacity =
1534 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1535 pollset_set->pollsets =
1536 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1537 sizeof(*pollset_set->pollsets));
1538 }
1539 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1540 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1541 if (fd_is_orphaned(pollset_set->fds[i])) {
1542 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1543 } else {
1544 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1545 pollset_set->fds[j++] = pollset_set->fds[i];
1546 }
1547 }
1548 pollset_set->fd_count = j;
1549 gpr_mu_unlock(&pollset_set->mu);
1550}
1551
1552static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1553 grpc_pollset_set *pollset_set,
1554 grpc_pollset *pollset) {
1555 size_t i;
1556 gpr_mu_lock(&pollset_set->mu);
1557 for (i = 0; i < pollset_set->pollset_count; i++) {
1558 if (pollset_set->pollsets[i] == pollset) {
1559 pollset_set->pollset_count--;
1560 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1561 pollset_set->pollsets[pollset_set->pollset_count]);
1562 break;
1563 }
1564 }
1565 gpr_mu_unlock(&pollset_set->mu);
1566}
1567
1568static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1569 grpc_pollset_set *bag,
1570 grpc_pollset_set *item) {
1571 size_t i, j;
1572 gpr_mu_lock(&bag->mu);
1573 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1574 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1575 bag->pollset_sets =
1576 gpr_realloc(bag->pollset_sets,
1577 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1578 }
1579 bag->pollset_sets[bag->pollset_set_count++] = item;
1580 for (i = 0, j = 0; i < bag->fd_count; i++) {
1581 if (fd_is_orphaned(bag->fds[i])) {
1582 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1583 } else {
1584 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1585 bag->fds[j++] = bag->fds[i];
1586 }
1587 }
1588 bag->fd_count = j;
1589 gpr_mu_unlock(&bag->mu);
1590}
1591
1592static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1593 grpc_pollset_set *bag,
1594 grpc_pollset_set *item) {
1595 size_t i;
1596 gpr_mu_lock(&bag->mu);
1597 for (i = 0; i < bag->pollset_set_count; i++) {
1598 if (bag->pollset_sets[i] == item) {
1599 bag->pollset_set_count--;
1600 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1601 bag->pollset_sets[bag->pollset_set_count]);
1602 break;
1603 }
1604 }
1605 gpr_mu_unlock(&bag->mu);
1606}
1607
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001608/* Test helper functions
1609 * */
1610void *grpc_fd_get_polling_island(grpc_fd *fd) {
1611 polling_island *pi;
1612
1613 gpr_mu_lock(&fd->pi_mu);
1614 pi = fd->polling_island;
1615 gpr_mu_unlock(&fd->pi_mu);
1616
1617 return pi;
1618}
1619
1620void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1621 polling_island *pi;
1622
1623 gpr_mu_lock(&ps->pi_mu);
1624 pi = ps->polling_island;
1625 gpr_mu_unlock(&ps->pi_mu);
1626
1627 return pi;
1628}
1629
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001630bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001631 polling_island *p1 = p;
1632 polling_island *p2 = q;
1633
1634 polling_island_lock_pair(&p1, &p2);
1635 if (p1 == p2) {
1636 gpr_mu_unlock(&p1->mu);
1637 } else {
1638 gpr_mu_unlock(&p1->mu);
1639 gpr_mu_unlock(&p2->mu);
1640 }
1641
1642 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001643}
1644
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001645/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001646 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001647 */
1648
1649static void shutdown_engine(void) {
1650 fd_global_shutdown();
1651 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001652 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001653}
1654
1655static const grpc_event_engine_vtable vtable = {
1656 .pollset_size = sizeof(grpc_pollset),
1657
1658 .fd_create = fd_create,
1659 .fd_wrapped_fd = fd_wrapped_fd,
1660 .fd_orphan = fd_orphan,
1661 .fd_shutdown = fd_shutdown,
1662 .fd_notify_on_read = fd_notify_on_read,
1663 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001664 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001665
1666 .pollset_init = pollset_init,
1667 .pollset_shutdown = pollset_shutdown,
1668 .pollset_reset = pollset_reset,
1669 .pollset_destroy = pollset_destroy,
1670 .pollset_work = pollset_work,
1671 .pollset_kick = pollset_kick,
1672 .pollset_add_fd = pollset_add_fd,
1673
1674 .pollset_set_create = pollset_set_create,
1675 .pollset_set_destroy = pollset_set_destroy,
1676 .pollset_set_add_pollset = pollset_set_add_pollset,
1677 .pollset_set_del_pollset = pollset_set_del_pollset,
1678 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1679 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1680 .pollset_set_add_fd = pollset_set_add_fd,
1681 .pollset_set_del_fd = pollset_set_del_fd,
1682
1683 .kick_poller = kick_poller,
1684
1685 .shutdown_engine = shutdown_engine,
1686};
1687
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001688/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1689 * Create a dummy epoll_fd to make sure epoll support is available */
1690static bool is_epoll_available() {
1691 int fd = epoll_create1(EPOLL_CLOEXEC);
1692 if (fd < 0) {
1693 gpr_log(
1694 GPR_ERROR,
1695 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1696 fd);
1697 return false;
1698 }
1699 close(fd);
1700 return true;
1701}
1702
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001703const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001704 /* If use of signals is disabled, we cannot use epoll engine*/
1705 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1706 return NULL;
1707 }
1708
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001709 if (!is_epoll_available()) {
1710 return NULL;
1711 }
1712
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001713 if (!is_grpc_wakeup_signal_initialized) {
1714 grpc_use_signal(SIGRTMIN + 2);
1715 }
1716
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001717 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001718
1719 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1720 return NULL;
1721 }
1722
1723 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1724 polling_island_global_init())) {
1725 return NULL;
1726 }
1727
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001728 return &vtable;
1729}
1730
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001731#else /* defined(GPR_LINUX_EPOLL) */
1732#if defined(GPR_POSIX_SOCKET)
1733#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001734/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1735 * NULL */
1736const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001737#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001738
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001739void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001740#endif /* !defined(GPR_LINUX_EPOLL) */