blob: a77044edc50e84cd7842332210a272485a9cb41e [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
45#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/wakeup_fd_posix.h"
60#include "src/core/lib/profiling/timers.h"
61#include "src/core/lib/support/block_annotate.h"
62
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070063static int grpc_wakeup_signal = -1;
64static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070065
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070066/* Implements the function defined in grpc_posix.h. This function might be
67 * called before even calling grpc_init() to set either a different signal to
68 * use. If signum == -1, then the use of signals is disabled */
69void grpc_use_signal(int signum) {
70 grpc_wakeup_signal = signum;
71 is_grpc_wakeup_signal_initialized = true;
72
73 if (grpc_wakeup_signal < 0) {
74 gpr_log(GPR_INFO,
75 "Use of signals is disabled. Epoll engine will not be used");
76 } else {
77 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
78 grpc_wakeup_signal);
79 }
80}
81
82struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070083
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070084/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070085 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070086 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070087struct grpc_fd {
88 int fd;
89 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070090 bit 0 : 1=Active / 0=Orphaned
91 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070092 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093 gpr_atm refst;
94
95 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070096
97 /* Indicates that the fd is shutdown and that any pending read/write closures
98 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070099 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700100
101 /* The fd is either closed or we relinquished control of it. In either cases,
102 this indicates that the 'fd' on this structure is no longer valid */
103 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700104
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700105 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700106 grpc_closure *read_closure;
107 grpc_closure *write_closure;
108
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700109 /* The polling island to which this fd belongs to and the mutex protecting the
110 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700112 struct polling_island *polling_island;
113
114 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700115 grpc_closure *on_done_closure;
116
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700117 /* The pollset that last noticed that the fd is readable */
118 grpc_pollset *read_notifier_pollset;
119
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700120 grpc_iomgr_object iomgr_object;
121};
122
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700124#ifdef GRPC_FD_REF_COUNT_DEBUG
125static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
126static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
127 int line);
128#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
129#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
130#else
131static void fd_ref(grpc_fd *fd);
132static void fd_unref(grpc_fd *fd);
133#define GRPC_FD_REF(fd, reason) fd_ref(fd)
134#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
135#endif
136
137static void fd_global_init(void);
138static void fd_global_shutdown(void);
139
140#define CLOSURE_NOT_READY ((grpc_closure *)0)
141#define CLOSURE_READY ((grpc_closure *)1)
142
143/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700144 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700145 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700146
147// #define GRPC_PI_REF_COUNT_DEBUG
148#ifdef GRPC_PI_REF_COUNT_DEBUG
149
150#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), 1, (r), __FILE__, __LINE__)
151#define PI_UNREF(p, r) pi_unref_dbg((p), 1, (r), __FILE__, __LINE__)
152
153#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
154
155#define PI_ADD_REF(p, r) pi_add_ref((p), 1)
156#define PI_UNREF(p, r) pi_unref((p), 1)
157
158#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
159
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700160typedef struct polling_island {
161 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700162 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
163 the refcount.
164 Once the ref count becomes zero, this structure is destroyed which means
165 we should ensure that there is never a scenario where a PI_ADD_REF() is
166 racing with a PI_UNREF() that just made the ref_count zero. */
167 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700168
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700169 /* Pointer to the polling_island this merged into.
170 * merged_to value is only set once in polling_island's lifetime (and that too
171 * only if the island is merged with another island). Because of this, we can
172 * use gpr_atm type here so that we can do atomic access on this and reduce
173 * lock contention on 'mu' mutex.
174 *
175 * Note that if this field is not NULL (i.e not 0), all the remaining fields
176 * (except mu and ref_count) are invalid and must be ignored. */
177 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700178
179 /* The fd of the underlying epoll set */
180 int epoll_fd;
181
182 /* The file descriptors in the epoll set */
183 size_t fd_cnt;
184 size_t fd_capacity;
185 grpc_fd **fds;
186
187 /* Polling islands that are no longer needed are kept in a freelist so that
188 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700189 free list */
190 struct polling_island *next_free;
191} polling_island;
192
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700193/*******************************************************************************
194 * Pollset Declarations
195 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700196struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700197 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700198 struct grpc_pollset_worker *next;
199 struct grpc_pollset_worker *prev;
200};
201
202struct grpc_pollset {
203 gpr_mu mu;
204 grpc_pollset_worker root_worker;
205 bool kicked_without_pollers;
206
207 bool shutting_down; /* Is the pollset shutting down ? */
208 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
209 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
210
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700211 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700212 struct polling_island *polling_island;
213};
214
215/*******************************************************************************
216 * Pollset-set Declarations
217 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700218/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
219 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
220 * the current pollset_set would result in polling island merges. This would
221 * remove the need to maintain fd_count here. This will also significantly
222 * simplify the grpc_fd structure since we would no longer need to explicitly
223 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700224struct grpc_pollset_set {
225 gpr_mu mu;
226
227 size_t pollset_count;
228 size_t pollset_capacity;
229 grpc_pollset **pollsets;
230
231 size_t pollset_set_count;
232 size_t pollset_set_capacity;
233 struct grpc_pollset_set **pollset_sets;
234
235 size_t fd_count;
236 size_t fd_capacity;
237 grpc_fd **fds;
238};
239
240/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700241 * Common helpers
242 */
243
244static void append_error(grpc_error **composite, grpc_error *error,
245 const char *desc) {
246 if (error == GRPC_ERROR_NONE) return;
247 if (*composite == GRPC_ERROR_NONE) {
248 *composite = GRPC_ERROR_CREATE(desc);
249 }
250 *composite = grpc_error_add_child(*composite, error);
251}
252
253/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700254 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700255 */
256
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700257/* The wakeup fd that is used to wake up all threads in a Polling island. This
258 is useful in the polling island merge operation where we need to wakeup all
259 the threads currently polling the smaller polling island (so that they can
260 start polling the new/merged polling island)
261
262 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
263 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
264static grpc_wakeup_fd polling_island_wakeup_fd;
265
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700266/* Polling island freelist */
267static gpr_mu g_pi_freelist_mu;
268static polling_island *g_pi_freelist = NULL;
269
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700270static void polling_island_delete(); /* Forward declaration */
271
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700272#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700273/* Currently TSAN may incorrectly flag data races between epoll_ctl and
274 epoll_wait for any grpc_fd structs that are added to the epoll set via
275 epoll_ctl and are returned (within a very short window) via epoll_wait().
276
277 To work-around this race, we establish a happens-before relation between
278 the code just-before epoll_ctl() and the code after epoll_wait() by using
279 this atomic */
280gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700281#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700282
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700283#ifdef GRPC_PI_REF_COUNT_DEBUG
284long pi_add_ref(polling_island *pi, int ref_cnt);
285long pi_unref(polling_island *pi, int ref_cnt);
286
287void pi_add_ref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
288 int line) {
289 long old_cnt = pi_add_ref(pi, ref_cnt);
290 gpr_log(GPR_DEBUG, "Add ref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
291 (void *)pi, old_cnt, (old_cnt + ref_cnt), reason, file, line);
292}
293
294void pi_unref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
295 int line) {
296 long old_cnt = pi_unref(pi, ref_cnt);
297 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
298 (void *)pi, old_cnt, (old_cnt - ref_cnt), reason, file, line);
299}
300#endif
301
302long pi_add_ref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700303 return gpr_atm_full_fetch_add(&pi->ref_count, ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700304}
305
306long pi_unref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700307 long old_cnt = gpr_atm_full_fetch_add(&pi->ref_count, -ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700308
309 /* If ref count went to zero, delete the polling island. Note that this need
310 not be done under a lock. Once the ref count goes to zero, we are
311 guaranteed that no one else holds a reference to the polling island (and
312 that there is no racing pi_add_ref() call either.
313
314 Also, if we are deleting the polling island and the merged_to field is
315 non-empty, we should remove a ref to the merged_to polling island
316 */
317 if (old_cnt == ref_cnt) {
318 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
319 polling_island_delete(pi);
320 if (next != NULL) {
321 PI_UNREF(next, "pi_delete"); /* Recursive call */
322 }
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700323 } else {
324 GPR_ASSERT(old_cnt > ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700325 }
326
327 return old_cnt;
328}
329
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700330/* The caller is expected to hold pi->mu lock before calling this function */
331static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700332 size_t fd_count, bool add_fd_refs,
333 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700334 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700335 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700336 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700337 char *err_msg;
338 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700339
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700340#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700341 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700342 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700343#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700344
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700345 for (i = 0; i < fd_count; i++) {
346 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
347 ev.data.ptr = fds[i];
348 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700349
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700350 if (err < 0) {
351 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700352 gpr_asprintf(
353 &err_msg,
354 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
355 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
356 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
357 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700358 }
359
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700360 continue;
361 }
362
363 if (pi->fd_cnt == pi->fd_capacity) {
364 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
365 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
366 }
367
368 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700369 if (add_fd_refs) {
370 GRPC_FD_REF(fds[i], "polling_island");
371 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700372 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700373}
374
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700375/* The caller is expected to hold pi->mu before calling this */
376static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700377 grpc_wakeup_fd *wakeup_fd,
378 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700379 struct epoll_event ev;
380 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700381 char *err_msg;
382 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700383
384 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
385 ev.data.ptr = wakeup_fd;
386 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
387 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700388 if (err < 0 && errno != EEXIST) {
389 gpr_asprintf(&err_msg,
390 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
391 "error: %d (%s)",
392 pi->epoll_fd,
393 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
394 strerror(errno));
395 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
396 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700397 }
398}
399
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700400/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700401static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700402 bool remove_fd_refs,
403 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700404 int err;
405 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700406 char *err_msg;
407 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700408
409 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700410 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700411 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700412 gpr_asprintf(&err_msg,
413 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
414 "error: %d (%s)",
415 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
416 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
417 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700418 }
419
420 if (remove_fd_refs) {
421 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700422 }
423 }
424
425 pi->fd_cnt = 0;
426}
427
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700428/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700429static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700430 bool is_fd_closed,
431 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700432 int err;
433 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700434 char *err_msg;
435 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700436
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700437 /* If fd is already closed, then it would have been automatically been removed
438 from the epoll set */
439 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700440 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
441 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700442 gpr_asprintf(
443 &err_msg,
444 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
445 pi->epoll_fd, fd->fd, errno, strerror(errno));
446 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
447 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700448 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700449 }
450
451 for (i = 0; i < pi->fd_cnt; i++) {
452 if (pi->fds[i] == fd) {
453 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700454 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700455 break;
456 }
457 }
458}
459
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700460/* Might return NULL in case of an error */
461static polling_island *polling_island_create(grpc_fd *initial_fd,
462 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700463 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700464 char *err_msg;
465 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700466
467 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700468 gpr_mu_lock(&g_pi_freelist_mu);
469 if (g_pi_freelist != NULL) {
470 pi = g_pi_freelist;
471 g_pi_freelist = g_pi_freelist->next_free;
472 pi->next_free = NULL;
473 }
474 gpr_mu_unlock(&g_pi_freelist_mu);
475
476 /* Create new polling island if we could not get one from the free list */
477 if (pi == NULL) {
478 pi = gpr_malloc(sizeof(*pi));
479 gpr_mu_init(&pi->mu);
480 pi->fd_cnt = 0;
481 pi->fd_capacity = 0;
482 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700483 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700484
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700485 gpr_atm_rel_store(&pi->ref_count, (gpr_atm)0);
486 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700487
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700488 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700489
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700490 if (pi->epoll_fd < 0) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700491 gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno,
492 strerror(errno));
493 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
494 gpr_free(err_msg);
495 } else {
496 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
497 pi->next_free = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700498
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700499 if (initial_fd != NULL) {
500 /* Lock the polling island here just in case we got this structure from
501 the freelist and the polling island lock was not released yet (by the
502 code that adds the polling island to the freelist) */
503 gpr_mu_lock(&pi->mu);
504 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
505 gpr_mu_unlock(&pi->mu);
506 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700507 }
508
509 return pi;
510}
511
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700512static void polling_island_delete(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700513 GPR_ASSERT(pi->fd_cnt == 0);
514
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700515 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700516
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700517 close(pi->epoll_fd);
518 pi->epoll_fd = -1;
519
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700520 gpr_mu_lock(&g_pi_freelist_mu);
521 pi->next_free = g_pi_freelist;
522 g_pi_freelist = pi;
523 gpr_mu_unlock(&g_pi_freelist_mu);
524}
525
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700526/* Attempts to gets the last polling island in the linked list (liked by the
527 * 'merged_to' field). Since this does not lock the polling island, there are no
528 * guarantees that the island returned is the last island */
529static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
530 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
531 while (next != NULL) {
532 pi = next;
533 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
534 }
535
536 return pi;
537}
538
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700539/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700540 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700541 returned polling island's mu.
542 Usage: To lock/unlock polling island "pi", do the following:
543 polling_island *pi_latest = polling_island_lock(pi);
544 ...
545 ... critical section ..
546 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700547 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
548static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700549 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700550
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700551 while (true) {
552 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
553 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700554 /* Looks like 'pi' is the last node in the linked list but unless we check
555 this by holding the pi->mu lock, we cannot be sure (i.e without the
556 pi->mu lock, we don't prevent island merges).
557 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700558 gpr_mu_lock(&pi->mu);
559 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
560 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700561 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700562 break;
563 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700564
565 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
566 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700567 gpr_mu_unlock(&pi->mu);
568 }
569
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700570 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700571 }
572
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700573 return pi;
574}
575
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700576/* Gets the lock on the *latest* polling islands in the linked lists pointed by
577 *p and *q (and also updates *p and *q to point to the latest polling islands)
578
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700579 This function is needed because calling the following block of code to obtain
580 locks on polling islands (*p and *q) is prone to deadlocks.
581 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700582 polling_island_lock(*p, true);
583 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700584 }
585
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700586 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700587 polling_island *p1;
588 polling_island *p2;
589 ..
590 polling_island_lock_pair(&p1, &p2);
591 ..
592 .. Critical section with both p1 and p2 locked
593 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700594 // Release locks: Always call polling_island_unlock_pair() to release locks
595 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700596*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700597static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700598 polling_island *pi_1 = *p;
599 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700600 polling_island *next_1 = NULL;
601 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700602
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700603 /* The algorithm is simple:
604 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
605 keep updating pi_1 and pi_2)
606 - Then obtain locks on the islands by following a lock order rule of
607 locking polling_island with lower address first
608 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
609 pointing to the same island. If that is the case, we can just call
610 polling_island_lock()
611 - After obtaining both the locks, double check that the polling islands
612 are still the last polling islands in their respective linked lists
613 (this is because there might have been polling island merges before
614 we got the lock)
615 - If the polling islands are the last islands, we are done. If not,
616 release the locks and continue the process from the first step */
617 while (true) {
618 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
619 while (next_1 != NULL) {
620 pi_1 = next_1;
621 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700622 }
623
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700624 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
625 while (next_2 != NULL) {
626 pi_2 = next_2;
627 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
628 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700629
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700630 if (pi_1 == pi_2) {
631 pi_1 = pi_2 = polling_island_lock(pi_1);
632 break;
633 }
634
635 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700636 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700637 gpr_mu_lock(&pi_2->mu);
638 } else {
639 gpr_mu_lock(&pi_2->mu);
640 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700641 }
642
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700643 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
644 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
645 if (next_1 == NULL && next_2 == NULL) {
646 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700647 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700648
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700649 gpr_mu_unlock(&pi_1->mu);
650 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700651 }
652
653 *p = pi_1;
654 *q = pi_2;
655}
656
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700657static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
658 if (p == q) {
659 gpr_mu_unlock(&p->mu);
660 } else {
661 gpr_mu_unlock(&p->mu);
662 gpr_mu_unlock(&q->mu);
663 }
664}
665
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700666static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700667 polling_island *q,
668 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700669 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700670 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700671
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700672 if (p != q) {
673 /* Make sure that p points to the polling island with fewer fds than q */
674 if (p->fd_cnt > q->fd_cnt) {
675 GPR_SWAP(polling_island *, p, q);
676 }
677
678 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
679 Note that the refcounts on the fds being moved will not change here.
680 This is why the last param in the following two functions is 'false') */
681 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
682 polling_island_remove_all_fds_locked(p, false, error);
683
684 /* Wakeup all the pollers (if any) on p so that they pickup this change */
685 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
686
687 /* Add the 'merged_to' link from p --> q */
688 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
689 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700690 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700691 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700692
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700693 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700694
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700695 /* Return the merged polling island (Note that no merge would have happened
696 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700697 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700698}
699
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700700static grpc_error *polling_island_global_init() {
701 grpc_error *error = GRPC_ERROR_NONE;
702
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700703 gpr_mu_init(&g_pi_freelist_mu);
704 g_pi_freelist = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700705
706 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
707 if (error == GRPC_ERROR_NONE) {
708 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
709 }
710
711 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700712}
713
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700714static void polling_island_global_shutdown() {
715 polling_island *next;
716 gpr_mu_lock(&g_pi_freelist_mu);
717 gpr_mu_unlock(&g_pi_freelist_mu);
718 while (g_pi_freelist != NULL) {
719 next = g_pi_freelist->next_free;
720 gpr_mu_destroy(&g_pi_freelist->mu);
721 gpr_free(g_pi_freelist->fds);
722 gpr_free(g_pi_freelist);
723 g_pi_freelist = next;
724 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700725 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700726
727 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700728}
729
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700730/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700731 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700732 */
733
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700734/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700735 * but instead so that implementations with multiple threads in (for example)
736 * epoll_wait deal with the race between pollset removal and incoming poll
737 * notifications.
738 *
739 * The problem is that the poller ultimately holds a reference to this
740 * object, so it is very difficult to know when is safe to free it, at least
741 * without some expensive synchronization.
742 *
743 * If we keep the object freelisted, in the worst case losing this race just
744 * becomes a spurious read notification on a reused fd.
745 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700746
747/* The alarm system needs to be able to wakeup 'some poller' sometimes
748 * (specifically when a new alarm needs to be triggered earlier than the next
749 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
750 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700751
752/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
753 * sure to wake up one polling thread (which can wake up other threads if
754 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700755grpc_wakeup_fd grpc_global_wakeup_fd;
756
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700757static grpc_fd *fd_freelist = NULL;
758static gpr_mu fd_freelist_mu;
759
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700760#ifdef GRPC_FD_REF_COUNT_DEBUG
761#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
762#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
763static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
764 int line) {
765 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
766 gpr_atm_no_barrier_load(&fd->refst),
767 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
768#else
769#define REF_BY(fd, n, reason) ref_by(fd, n)
770#define UNREF_BY(fd, n, reason) unref_by(fd, n)
771static void ref_by(grpc_fd *fd, int n) {
772#endif
773 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
774}
775
776#ifdef GRPC_FD_REF_COUNT_DEBUG
777static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
778 int line) {
779 gpr_atm old;
780 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
781 gpr_atm_no_barrier_load(&fd->refst),
782 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
783#else
784static void unref_by(grpc_fd *fd, int n) {
785 gpr_atm old;
786#endif
787 old = gpr_atm_full_fetch_add(&fd->refst, -n);
788 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700789 /* Add the fd to the freelist */
790 gpr_mu_lock(&fd_freelist_mu);
791 fd->freelist_next = fd_freelist;
792 fd_freelist = fd;
793 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700794
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700795 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700796 } else {
797 GPR_ASSERT(old > n);
798 }
799}
800
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700801/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700802#ifdef GRPC_FD_REF_COUNT_DEBUG
803static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
804 int line) {
805 ref_by(fd, 2, reason, file, line);
806}
807
808static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
809 int line) {
810 unref_by(fd, 2, reason, file, line);
811}
812#else
813static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700814static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
815#endif
816
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700817static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
818
819static void fd_global_shutdown(void) {
820 gpr_mu_lock(&fd_freelist_mu);
821 gpr_mu_unlock(&fd_freelist_mu);
822 while (fd_freelist != NULL) {
823 grpc_fd *fd = fd_freelist;
824 fd_freelist = fd_freelist->freelist_next;
825 gpr_mu_destroy(&fd->mu);
826 gpr_free(fd);
827 }
828 gpr_mu_destroy(&fd_freelist_mu);
829}
830
831static grpc_fd *fd_create(int fd, const char *name) {
832 grpc_fd *new_fd = NULL;
833
834 gpr_mu_lock(&fd_freelist_mu);
835 if (fd_freelist != NULL) {
836 new_fd = fd_freelist;
837 fd_freelist = fd_freelist->freelist_next;
838 }
839 gpr_mu_unlock(&fd_freelist_mu);
840
841 if (new_fd == NULL) {
842 new_fd = gpr_malloc(sizeof(grpc_fd));
843 gpr_mu_init(&new_fd->mu);
844 gpr_mu_init(&new_fd->pi_mu);
845 }
846
847 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
848 newly created fd (or an fd we got from the freelist), no one else would be
849 holding a lock to it anyway. */
850 gpr_mu_lock(&new_fd->mu);
851
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700852 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700853 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700854 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700855 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700856 new_fd->read_closure = CLOSURE_NOT_READY;
857 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700858 new_fd->polling_island = NULL;
859 new_fd->freelist_next = NULL;
860 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700861 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700862
863 gpr_mu_unlock(&new_fd->mu);
864
865 char *fd_name;
866 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
867 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
868 gpr_free(fd_name);
869#ifdef GRPC_FD_REF_COUNT_DEBUG
870 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
871#endif
872 return new_fd;
873}
874
875static bool fd_is_orphaned(grpc_fd *fd) {
876 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
877}
878
879static int fd_wrapped_fd(grpc_fd *fd) {
880 int ret_fd = -1;
881 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700882 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700883 ret_fd = fd->fd;
884 }
885 gpr_mu_unlock(&fd->mu);
886
887 return ret_fd;
888}
889
890static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
891 grpc_closure *on_done, int *release_fd,
892 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700893 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700894 grpc_error *error = GRPC_ERROR_NONE;
895
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700896 gpr_mu_lock(&fd->mu);
897 fd->on_done_closure = on_done;
898
899 /* If release_fd is not NULL, we should be relinquishing control of the file
900 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700901 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700902 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700903 } else {
904 close(fd->fd);
905 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700906 }
907
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700908 fd->orphaned = true;
909
910 /* Remove the active status but keep referenced. We want this grpc_fd struct
911 to be alive (and not added to freelist) until the end of this function */
912 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700913
914 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700915 - Get a lock on the latest polling island (i.e the last island in the
916 linked list pointed by fd->polling_island). This is the island that
917 would actually contain the fd
918 - Remove the fd from the latest polling island
919 - Unlock the latest polling island
920 - Set fd->polling_island to NULL (but remove the ref on the polling island
921 before doing this.) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700922 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700923 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700924 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700925 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700926 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700927
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700928 PI_UNREF(fd->polling_island, "fd_orphan");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700929 fd->polling_island = NULL;
930 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700931 gpr_mu_unlock(&fd->pi_mu);
932
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700933 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700934
935 gpr_mu_unlock(&fd->mu);
936 UNREF_BY(fd, 2, reason); /* Drop the reference */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700937 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700938}
939
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700940static grpc_error *fd_shutdown_error(bool shutdown) {
941 if (!shutdown) {
942 return GRPC_ERROR_NONE;
943 } else {
944 return GRPC_ERROR_CREATE("FD shutdown");
945 }
946}
947
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700948static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
949 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700950 if (fd->shutdown) {
951 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
952 NULL);
953 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700954 /* not ready ==> switch to a waiting state by setting the closure */
955 *st = closure;
956 } else if (*st == CLOSURE_READY) {
957 /* already ready ==> queue the closure to run immediately */
958 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700959 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
960 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700961 } else {
962 /* upcallptr was set to a different closure. This is an error! */
963 gpr_log(GPR_ERROR,
964 "User called a notify_on function with a previous callback still "
965 "pending");
966 abort();
967 }
968}
969
970/* returns 1 if state becomes not ready */
971static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
972 grpc_closure **st) {
973 if (*st == CLOSURE_READY) {
974 /* duplicate ready ==> ignore */
975 return 0;
976 } else if (*st == CLOSURE_NOT_READY) {
977 /* not ready, and not waiting ==> flag ready */
978 *st = CLOSURE_READY;
979 return 0;
980 } else {
981 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700982 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700983 *st = CLOSURE_NOT_READY;
984 return 1;
985 }
986}
987
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700988static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
989 grpc_fd *fd) {
990 grpc_pollset *notifier = NULL;
991
992 gpr_mu_lock(&fd->mu);
993 notifier = fd->read_notifier_pollset;
994 gpr_mu_unlock(&fd->mu);
995
996 return notifier;
997}
998
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -0700999static bool fd_is_shutdown(grpc_fd *fd) {
1000 gpr_mu_lock(&fd->mu);
1001 const bool r = fd->shutdown;
1002 gpr_mu_unlock(&fd->mu);
1003 return r;
1004}
1005
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001006/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001007static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1008 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001009 /* Do the actual shutdown only once */
1010 if (!fd->shutdown) {
1011 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001012
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001013 shutdown(fd->fd, SHUT_RDWR);
1014 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1015 at this point, the closures would be called with 'success = false' */
1016 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1017 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1018 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001019 gpr_mu_unlock(&fd->mu);
1020}
1021
1022static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1023 grpc_closure *closure) {
1024 gpr_mu_lock(&fd->mu);
1025 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1026 gpr_mu_unlock(&fd->mu);
1027}
1028
1029static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1030 grpc_closure *closure) {
1031 gpr_mu_lock(&fd->mu);
1032 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1033 gpr_mu_unlock(&fd->mu);
1034}
1035
1036/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001037 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001038 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001039GPR_TLS_DECL(g_current_thread_pollset);
1040GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001041
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001042static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001043#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001044 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001045#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001046}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001047
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001048static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001049
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001050/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001051static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001052 gpr_tls_init(&g_current_thread_pollset);
1053 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001054 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001055 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001056}
1057
1058static void pollset_global_shutdown(void) {
1059 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001060 gpr_tls_destroy(&g_current_thread_pollset);
1061 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001062}
1063
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001064static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1065 grpc_error *err = GRPC_ERROR_NONE;
1066 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1067 if (err_num != 0) {
1068 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1069 }
1070 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001071}
1072
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001073/* Return 1 if the pollset has active threads in pollset_work (pollset must
1074 * be locked) */
1075static int pollset_has_workers(grpc_pollset *p) {
1076 return p->root_worker.next != &p->root_worker;
1077}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001078
1079static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1080 worker->prev->next = worker->next;
1081 worker->next->prev = worker->prev;
1082}
1083
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001084static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1085 if (pollset_has_workers(p)) {
1086 grpc_pollset_worker *w = p->root_worker.next;
1087 remove_worker(p, w);
1088 return w;
1089 } else {
1090 return NULL;
1091 }
1092}
1093
1094static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1095 worker->next = &p->root_worker;
1096 worker->prev = worker->next->prev;
1097 worker->prev->next = worker->next->prev = worker;
1098}
1099
1100static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1101 worker->prev = &p->root_worker;
1102 worker->next = worker->prev->next;
1103 worker->prev->next = worker->next->prev = worker;
1104}
1105
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001106/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001107static grpc_error *pollset_kick(grpc_pollset *p,
1108 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001109 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001110 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001111 const char *err_desc = "Kick Failure";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001112
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001113 grpc_pollset_worker *worker = specific_worker;
1114 if (worker != NULL) {
1115 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001116 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001117 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001118 for (worker = p->root_worker.next; worker != &p->root_worker;
1119 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001120 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001121 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001122 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001123 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001124 } else {
1125 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001126 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001127 GPR_TIMER_END("pollset_kick.broadcast", 0);
1128 } else {
1129 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001130 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001131 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001132 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001133 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001134 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1135 /* Since worker == NULL, it means that we can kick "any" worker on this
1136 pollset 'p'. If 'p' happens to be the same pollset this thread is
1137 currently polling (i.e in pollset_work() function), then there is no need
1138 to kick any other worker since the current thread can just absorb the
1139 kick. This is the reason why we enter this case only when
1140 g_current_thread_pollset is != p */
1141
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001142 GPR_TIMER_MARK("kick_anonymous", 0);
1143 worker = pop_front_worker(p);
1144 if (worker != NULL) {
1145 GPR_TIMER_MARK("finally_kick", 0);
1146 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001147 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001148 } else {
1149 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001150 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001151 }
1152 }
1153
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001154 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001155 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1156 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001157}
1158
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001159static grpc_error *kick_poller(void) {
1160 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1161}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001162
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001163static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1164 gpr_mu_init(&pollset->mu);
1165 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001166
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001167 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001168 pollset->kicked_without_pollers = false;
1169
1170 pollset->shutting_down = false;
1171 pollset->finish_shutdown_called = false;
1172 pollset->shutdown_done = NULL;
1173
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001174 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001175}
1176
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001177/* Convert a timespec to milliseconds:
1178 - Very small or negative poll times are clamped to zero to do a non-blocking
1179 poll (which becomes spin polling)
1180 - Other small values are rounded up to one millisecond
1181 - Longer than a millisecond polls are rounded up to the next nearest
1182 millisecond to avoid spinning
1183 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001184static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1185 gpr_timespec now) {
1186 gpr_timespec timeout;
1187 static const int64_t max_spin_polling_us = 10;
1188 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1189 return -1;
1190 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001191
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001192 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1193 max_spin_polling_us,
1194 GPR_TIMESPAN))) <= 0) {
1195 return 0;
1196 }
1197 timeout = gpr_time_sub(deadline, now);
1198 return gpr_time_to_millis(gpr_time_add(
1199 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1200}
1201
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001202static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1203 grpc_pollset *notifier) {
1204 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001205 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001206 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1207 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001208 gpr_mu_unlock(&fd->mu);
1209}
1210
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001211static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001212 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1213 gpr_mu_lock(&fd->mu);
1214 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1215 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001216}
1217
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001218static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001219 if (ps->polling_island != NULL) {
1220 PI_UNREF(ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001221 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001222 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001223}
1224
1225static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1226 grpc_pollset *pollset) {
1227 /* The pollset cannot have any workers if we are at this stage */
1228 GPR_ASSERT(!pollset_has_workers(pollset));
1229
1230 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001231
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001232 /* Release the ref and set pollset->polling_island to NULL */
1233 pollset_release_polling_island(pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001234 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001235}
1236
1237/* pollset->mu lock must be held by the caller before calling this */
1238static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1239 grpc_closure *closure) {
1240 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1241 GPR_ASSERT(!pollset->shutting_down);
1242 pollset->shutting_down = true;
1243 pollset->shutdown_done = closure;
1244 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1245
1246 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1247 because it would release the underlying polling island. In such a case, we
1248 let the last worker call finish_shutdown_locked() from pollset_work() */
1249 if (!pollset_has_workers(pollset)) {
1250 GPR_ASSERT(!pollset->finish_shutdown_called);
1251 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1252 finish_shutdown_locked(exec_ctx, pollset);
1253 }
1254 GPR_TIMER_END("pollset_shutdown", 0);
1255}
1256
1257/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1258 * than destroying the mutexes, there is nothing special that needs to be done
1259 * here */
1260static void pollset_destroy(grpc_pollset *pollset) {
1261 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001262 gpr_mu_destroy(&pollset->mu);
1263}
1264
1265static void pollset_reset(grpc_pollset *pollset) {
1266 GPR_ASSERT(pollset->shutting_down);
1267 GPR_ASSERT(!pollset_has_workers(pollset));
1268 pollset->shutting_down = false;
1269 pollset->finish_shutdown_called = false;
1270 pollset->kicked_without_pollers = false;
1271 pollset->shutdown_done = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001272 pollset_release_polling_island(pollset, "ps_reset");
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001273}
1274
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001275#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001276/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1277static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1278 grpc_pollset *pollset, int timeout_ms,
1279 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001280 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001281 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001282 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001283 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001284 char *err_msg;
1285 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001286 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1287
1288 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001289 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001290
1291 Since epoll_fd is immutable, we can read it without obtaining the polling
1292 island lock. There is however a possibility that the polling island (from
1293 which we got the epoll_fd) got merged with another island while we are
1294 in this function. This is still okay because in such a case, we will wakeup
1295 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001296 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001297
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001298 if (pollset->polling_island == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001299 pollset->polling_island = polling_island_create(NULL, error);
1300 if (pollset->polling_island == NULL) {
1301 GPR_TIMER_END("pollset_work_and_unlock", 0);
1302 return; /* Fatal error. We cannot continue */
1303 }
1304
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001305 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001306 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001307
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001308 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001309 epoll_fd = pi->epoll_fd;
1310
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001311 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001312 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001313 if (pollset->polling_island != pi) {
1314 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1315 polling island to be deleted */
1316 PI_ADD_REF(pi, "ps");
1317 PI_UNREF(pollset->polling_island, "ps");
1318 pollset->polling_island = pi;
1319 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001320
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001321 /* Add an extra ref so that the island does not get destroyed (which means
1322 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1323 epoll_fd */
1324 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001325 gpr_mu_unlock(&pollset->mu);
1326
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001327 do {
1328 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1329 sig_mask);
1330 if (ep_rv < 0) {
1331 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001332 gpr_asprintf(&err_msg,
1333 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1334 epoll_fd, errno, strerror(errno));
1335 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001336 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001337 /* We were interrupted. Save an interation by doing a zero timeout
1338 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001339 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001340 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001341 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001342
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001343#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001344 /* See the definition of g_poll_sync for more details */
1345 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001346#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001347
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001348 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001349 void *data_ptr = ep_ev[i].data.ptr;
1350 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001351 append_error(error,
1352 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1353 err_desc);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001354 } else if (data_ptr == &polling_island_wakeup_fd) {
1355 /* This means that our polling island is merged with a different
1356 island. We do not have to do anything here since the subsequent call
1357 to the function pollset_work_and_unlock() will pick up the correct
1358 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001359 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001360 grpc_fd *fd = data_ptr;
1361 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1362 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1363 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001364 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001365 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001366 }
1367 if (write_ev || cancel) {
1368 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001369 }
1370 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001371 }
1372 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001373
1374 GPR_ASSERT(pi != NULL);
1375
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001376 /* Before leaving, release the extra ref we added to the polling island. It
1377 is important to use "pi" here (i.e our old copy of pollset->polling_island
1378 that we got before releasing the polling island lock). This is because
1379 pollset->polling_island pointer might get udpated in other parts of the
1380 code when there is an island merge while we are doing epoll_wait() above */
1381 PI_UNREF(pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001382
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001383 GPR_TIMER_END("pollset_work_and_unlock", 0);
1384}
1385
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001386/* pollset->mu lock must be held by the caller before calling this.
1387 The function pollset_work() may temporarily release the lock (pollset->mu)
1388 during the course of its execution but it will always re-acquire the lock and
1389 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001390static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1391 grpc_pollset_worker **worker_hdl,
1392 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001393 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001394 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001395 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1396
1397 sigset_t new_mask;
1398 sigset_t orig_mask;
1399
1400 grpc_pollset_worker worker;
1401 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001402 worker.pt_id = pthread_self();
1403
1404 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001405
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001406 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1407 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001408
1409 if (pollset->kicked_without_pollers) {
1410 /* If the pollset was kicked without pollers, pretend that the current
1411 worker got the kick and skip polling. A kick indicates that there is some
1412 work that needs attention like an event on the completion queue or an
1413 alarm */
1414 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1415 pollset->kicked_without_pollers = 0;
1416 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001417 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
1418 (i.e 'kicking') a worker in the pollset.
1419 A 'kick' is a way to inform that worker that there is some pending work
1420 that needs immediate attention (like an event on the completion queue,
1421 or a polling island merge that results in a new epoll-fd to wait on) and
1422 that the worker should not spend time waiting in epoll_pwait().
1423
1424 A kick can come at anytime (i.e before/during or after the worker calls
1425 epoll_pwait()) but in all cases we have to make sure that when a worker
1426 gets a kick, it does not spend time in epoll_pwait(). In other words, one
1427 kick should result in skipping/exiting of one epoll_pwait();
1428
1429 To accomplish this, we mask 'grpc_wakeup_signal' on this worker at all
1430 times *except* when it is in epoll_pwait(). This way, the worker never
1431 misses acting on a kick */
1432
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001433 sigemptyset(&new_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001434 sigaddset(&new_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001435 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001436 sigdelset(&orig_mask, grpc_wakeup_signal);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001437 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'. This is
1438 the mask used at all times *except during epoll_wait()*"
1439 orig_mask: The thread mask which allows 'grpc_wakeup_signal' and this is
1440 the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001441
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001442 The new_mask is set on the worker before it is added to the pollset (i.e
1443 before it can be kicked) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001444
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001445 push_front_worker(pollset, &worker); /* Add worker to pollset */
1446
1447 pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001448 grpc_exec_ctx_flush(exec_ctx);
1449
1450 gpr_mu_lock(&pollset->mu);
1451 remove_worker(pollset, &worker);
1452 }
1453
1454 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1455 false at this point) and the pollset is shutting down, we may have to
1456 finish the shutdown process by calling finish_shutdown_locked().
1457 See pollset_shutdown() for more details.
1458
1459 Note: Continuing to access pollset here is safe; it is the caller's
1460 responsibility to not destroy a pollset when it has outstanding calls to
1461 pollset_work() */
1462 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1463 !pollset->finish_shutdown_called) {
1464 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1465 finish_shutdown_locked(exec_ctx, pollset);
1466
1467 gpr_mu_unlock(&pollset->mu);
1468 grpc_exec_ctx_flush(exec_ctx);
1469 gpr_mu_lock(&pollset->mu);
1470 }
1471
1472 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001473
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001474 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1475 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001476
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001477 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001478
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001479 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1480 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001481}
1482
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001483static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1484 grpc_fd *fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001485 grpc_error *error = GRPC_ERROR_NONE;
1486
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001487 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001488 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001489
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001490 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001491
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001492 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1493 * equal, do nothing.
1494 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1495 * a new polling island (with a refcount of 2) and make the polling_island
1496 * fields in both fd and pollset to point to the new island
1497 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1498 * the NULL polling_island field to point to the non-NULL polling_island
1499 * field (ensure that the refcount on the polling island is incremented by
1500 * 1 to account for the newly added reference)
1501 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1502 * and different, merge both the polling islands and update the
1503 * polling_island fields in both fd and pollset to point to the merged
1504 * polling island.
1505 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001506 if (fd->polling_island == pollset->polling_island) {
1507 pi_new = fd->polling_island;
1508 if (pi_new == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001509 pi_new = polling_island_create(fd, &error);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001510 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001511 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001512 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001513 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001514 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001515 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001516 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001517 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001518 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001519 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1520 &error);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001521 }
1522
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001523 /* At this point, pi_new is the polling island that both fd->polling_island
1524 and pollset->polling_island must be pointing to */
1525
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001526 if (fd->polling_island != pi_new) {
1527 PI_ADD_REF(pi_new, "fd");
1528 if (fd->polling_island != NULL) {
1529 PI_UNREF(fd->polling_island, "fd");
1530 }
1531 fd->polling_island = pi_new;
1532 }
1533
1534 if (pollset->polling_island != pi_new) {
1535 PI_ADD_REF(pi_new, "ps");
1536 if (pollset->polling_island != NULL) {
1537 PI_UNREF(pollset->polling_island, "ps");
1538 }
1539 pollset->polling_island = pi_new;
1540 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001541
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001542 gpr_mu_unlock(&fd->pi_mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001543 gpr_mu_unlock(&pollset->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001544}
1545
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001546/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001547 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001548 */
1549
1550static grpc_pollset_set *pollset_set_create(void) {
1551 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1552 memset(pollset_set, 0, sizeof(*pollset_set));
1553 gpr_mu_init(&pollset_set->mu);
1554 return pollset_set;
1555}
1556
1557static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1558 size_t i;
1559 gpr_mu_destroy(&pollset_set->mu);
1560 for (i = 0; i < pollset_set->fd_count; i++) {
1561 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1562 }
1563 gpr_free(pollset_set->pollsets);
1564 gpr_free(pollset_set->pollset_sets);
1565 gpr_free(pollset_set->fds);
1566 gpr_free(pollset_set);
1567}
1568
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001569static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1570 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1571 size_t i;
1572 gpr_mu_lock(&pollset_set->mu);
1573 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1574 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1575 pollset_set->fds = gpr_realloc(
1576 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1577 }
1578 GRPC_FD_REF(fd, "pollset_set");
1579 pollset_set->fds[pollset_set->fd_count++] = fd;
1580 for (i = 0; i < pollset_set->pollset_count; i++) {
1581 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1582 }
1583 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1584 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1585 }
1586 gpr_mu_unlock(&pollset_set->mu);
1587}
1588
1589static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1590 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1591 size_t i;
1592 gpr_mu_lock(&pollset_set->mu);
1593 for (i = 0; i < pollset_set->fd_count; i++) {
1594 if (pollset_set->fds[i] == fd) {
1595 pollset_set->fd_count--;
1596 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1597 pollset_set->fds[pollset_set->fd_count]);
1598 GRPC_FD_UNREF(fd, "pollset_set");
1599 break;
1600 }
1601 }
1602 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1603 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1604 }
1605 gpr_mu_unlock(&pollset_set->mu);
1606}
1607
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001608static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1609 grpc_pollset_set *pollset_set,
1610 grpc_pollset *pollset) {
1611 size_t i, j;
1612 gpr_mu_lock(&pollset_set->mu);
1613 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1614 pollset_set->pollset_capacity =
1615 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1616 pollset_set->pollsets =
1617 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1618 sizeof(*pollset_set->pollsets));
1619 }
1620 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1621 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1622 if (fd_is_orphaned(pollset_set->fds[i])) {
1623 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1624 } else {
1625 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1626 pollset_set->fds[j++] = pollset_set->fds[i];
1627 }
1628 }
1629 pollset_set->fd_count = j;
1630 gpr_mu_unlock(&pollset_set->mu);
1631}
1632
1633static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1634 grpc_pollset_set *pollset_set,
1635 grpc_pollset *pollset) {
1636 size_t i;
1637 gpr_mu_lock(&pollset_set->mu);
1638 for (i = 0; i < pollset_set->pollset_count; i++) {
1639 if (pollset_set->pollsets[i] == pollset) {
1640 pollset_set->pollset_count--;
1641 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1642 pollset_set->pollsets[pollset_set->pollset_count]);
1643 break;
1644 }
1645 }
1646 gpr_mu_unlock(&pollset_set->mu);
1647}
1648
1649static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1650 grpc_pollset_set *bag,
1651 grpc_pollset_set *item) {
1652 size_t i, j;
1653 gpr_mu_lock(&bag->mu);
1654 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1655 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1656 bag->pollset_sets =
1657 gpr_realloc(bag->pollset_sets,
1658 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1659 }
1660 bag->pollset_sets[bag->pollset_set_count++] = item;
1661 for (i = 0, j = 0; i < bag->fd_count; i++) {
1662 if (fd_is_orphaned(bag->fds[i])) {
1663 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1664 } else {
1665 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1666 bag->fds[j++] = bag->fds[i];
1667 }
1668 }
1669 bag->fd_count = j;
1670 gpr_mu_unlock(&bag->mu);
1671}
1672
1673static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1674 grpc_pollset_set *bag,
1675 grpc_pollset_set *item) {
1676 size_t i;
1677 gpr_mu_lock(&bag->mu);
1678 for (i = 0; i < bag->pollset_set_count; i++) {
1679 if (bag->pollset_sets[i] == item) {
1680 bag->pollset_set_count--;
1681 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1682 bag->pollset_sets[bag->pollset_set_count]);
1683 break;
1684 }
1685 }
1686 gpr_mu_unlock(&bag->mu);
1687}
1688
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001689/* Test helper functions
1690 * */
1691void *grpc_fd_get_polling_island(grpc_fd *fd) {
1692 polling_island *pi;
1693
1694 gpr_mu_lock(&fd->pi_mu);
1695 pi = fd->polling_island;
1696 gpr_mu_unlock(&fd->pi_mu);
1697
1698 return pi;
1699}
1700
1701void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1702 polling_island *pi;
1703
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001704 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001705 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001706 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001707
1708 return pi;
1709}
1710
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001711bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001712 polling_island *p1 = p;
1713 polling_island *p2 = q;
1714
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001715 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1716 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001717 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001718 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001719
1720 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001721}
1722
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001723/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001724 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001725 */
1726
1727static void shutdown_engine(void) {
1728 fd_global_shutdown();
1729 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001730 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001731}
1732
1733static const grpc_event_engine_vtable vtable = {
1734 .pollset_size = sizeof(grpc_pollset),
1735
1736 .fd_create = fd_create,
1737 .fd_wrapped_fd = fd_wrapped_fd,
1738 .fd_orphan = fd_orphan,
1739 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001740 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001741 .fd_notify_on_read = fd_notify_on_read,
1742 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001743 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001744
1745 .pollset_init = pollset_init,
1746 .pollset_shutdown = pollset_shutdown,
1747 .pollset_reset = pollset_reset,
1748 .pollset_destroy = pollset_destroy,
1749 .pollset_work = pollset_work,
1750 .pollset_kick = pollset_kick,
1751 .pollset_add_fd = pollset_add_fd,
1752
1753 .pollset_set_create = pollset_set_create,
1754 .pollset_set_destroy = pollset_set_destroy,
1755 .pollset_set_add_pollset = pollset_set_add_pollset,
1756 .pollset_set_del_pollset = pollset_set_del_pollset,
1757 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1758 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1759 .pollset_set_add_fd = pollset_set_add_fd,
1760 .pollset_set_del_fd = pollset_set_del_fd,
1761
1762 .kick_poller = kick_poller,
1763
1764 .shutdown_engine = shutdown_engine,
1765};
1766
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001767/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1768 * Create a dummy epoll_fd to make sure epoll support is available */
1769static bool is_epoll_available() {
1770 int fd = epoll_create1(EPOLL_CLOEXEC);
1771 if (fd < 0) {
1772 gpr_log(
1773 GPR_ERROR,
1774 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1775 fd);
1776 return false;
1777 }
1778 close(fd);
1779 return true;
1780}
1781
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001782const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001783 /* If use of signals is disabled, we cannot use epoll engine*/
1784 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1785 return NULL;
1786 }
1787
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001788 if (!is_epoll_available()) {
1789 return NULL;
1790 }
1791
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001792 if (!is_grpc_wakeup_signal_initialized) {
1793 grpc_use_signal(SIGRTMIN + 2);
1794 }
1795
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001796 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001797
1798 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1799 return NULL;
1800 }
1801
1802 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1803 polling_island_global_init())) {
1804 return NULL;
1805 }
1806
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001807 return &vtable;
1808}
1809
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001810#else /* defined(GPR_LINUX_EPOLL) */
1811#if defined(GPR_POSIX_SOCKET)
1812#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001813/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1814 * NULL */
1815const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001816#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001817
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001818void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001819#endif /* !defined(GPR_LINUX_EPOLL) */