blob: e41bf5029c9aebbab1b650350a0597e02a28dc20 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
45#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/wakeup_fd_posix.h"
60#include "src/core/lib/profiling/timers.h"
61#include "src/core/lib/support/block_annotate.h"
62
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070063/* TODO: sreek - Move this to init.c and initialize this like other tracers.
64 * Also, enable this trace by default for now. */
65static int grpc_polling_trace = 1;
66#define GRPC_POLLING_TRACE(fmt, ...) \
67 if (grpc_polling_trace) { \
68 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
69 }
70
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070071static int grpc_wakeup_signal = -1;
72static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070073
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070074/* Implements the function defined in grpc_posix.h. This function might be
75 * called before even calling grpc_init() to set either a different signal to
76 * use. If signum == -1, then the use of signals is disabled */
77void grpc_use_signal(int signum) {
78 grpc_wakeup_signal = signum;
79 is_grpc_wakeup_signal_initialized = true;
80
81 if (grpc_wakeup_signal < 0) {
82 gpr_log(GPR_INFO,
83 "Use of signals is disabled. Epoll engine will not be used");
84 } else {
85 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
86 grpc_wakeup_signal);
87 }
88}
89
90struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070091
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070093 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070094 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095struct grpc_fd {
96 int fd;
97 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070098 bit 0 : 1=Active / 0=Orphaned
99 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700100 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700101 gpr_atm refst;
102
103 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700104
105 /* Indicates that the fd is shutdown and that any pending read/write closures
106 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700107 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700108
109 /* The fd is either closed or we relinquished control of it. In either cases,
110 this indicates that the 'fd' on this structure is no longer valid */
111 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700112
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700113 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *read_closure;
115 grpc_closure *write_closure;
116
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700117 /* The polling island to which this fd belongs to and the mutex protecting the
118 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700120 struct polling_island *polling_island;
121
122 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123 grpc_closure *on_done_closure;
124
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700125 /* The pollset that last noticed that the fd is readable */
126 grpc_pollset *read_notifier_pollset;
127
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700128 grpc_iomgr_object iomgr_object;
129};
130
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700131/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700132// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700133#ifdef GRPC_FD_REF_COUNT_DEBUG
134static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
135static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
136 int line);
137#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
138#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
139#else
140static void fd_ref(grpc_fd *fd);
141static void fd_unref(grpc_fd *fd);
142#define GRPC_FD_REF(fd, reason) fd_ref(fd)
143#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
144#endif
145
146static void fd_global_init(void);
147static void fd_global_shutdown(void);
148
149#define CLOSURE_NOT_READY ((grpc_closure *)0)
150#define CLOSURE_READY ((grpc_closure *)1)
151
152/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700153 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700154 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700155
156// #define GRPC_PI_REF_COUNT_DEBUG
157#ifdef GRPC_PI_REF_COUNT_DEBUG
158
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700159#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
160#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700161
162#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
163
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700164#define PI_ADD_REF(p, r) pi_add_ref((p))
165#define PI_UNREF(p, r) pi_unref((p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700166
167#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
168
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700169typedef struct polling_island {
170 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700171 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
172 the refcount.
173 Once the ref count becomes zero, this structure is destroyed which means
174 we should ensure that there is never a scenario where a PI_ADD_REF() is
175 racing with a PI_UNREF() that just made the ref_count zero. */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700176 gpr_refcount ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700177
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700178 /* Pointer to the polling_island this merged into.
179 * merged_to value is only set once in polling_island's lifetime (and that too
180 * only if the island is merged with another island). Because of this, we can
181 * use gpr_atm type here so that we can do atomic access on this and reduce
182 * lock contention on 'mu' mutex.
183 *
184 * Note that if this field is not NULL (i.e not 0), all the remaining fields
185 * (except mu and ref_count) are invalid and must be ignored. */
186 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700187
188 /* The fd of the underlying epoll set */
189 int epoll_fd;
190
191 /* The file descriptors in the epoll set */
192 size_t fd_cnt;
193 size_t fd_capacity;
194 grpc_fd **fds;
195
196 /* Polling islands that are no longer needed are kept in a freelist so that
197 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700198 free list */
199 struct polling_island *next_free;
200} polling_island;
201
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700202/*******************************************************************************
203 * Pollset Declarations
204 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700205struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700206 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700207 struct grpc_pollset_worker *next;
208 struct grpc_pollset_worker *prev;
209};
210
211struct grpc_pollset {
212 gpr_mu mu;
213 grpc_pollset_worker root_worker;
214 bool kicked_without_pollers;
215
216 bool shutting_down; /* Is the pollset shutting down ? */
217 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
218 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
219
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700220 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700221 struct polling_island *polling_island;
222};
223
224/*******************************************************************************
225 * Pollset-set Declarations
226 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700227/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
228 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
229 * the current pollset_set would result in polling island merges. This would
230 * remove the need to maintain fd_count here. This will also significantly
231 * simplify the grpc_fd structure since we would no longer need to explicitly
232 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700233struct grpc_pollset_set {
234 gpr_mu mu;
235
236 size_t pollset_count;
237 size_t pollset_capacity;
238 grpc_pollset **pollsets;
239
240 size_t pollset_set_count;
241 size_t pollset_set_capacity;
242 struct grpc_pollset_set **pollset_sets;
243
244 size_t fd_count;
245 size_t fd_capacity;
246 grpc_fd **fds;
247};
248
249/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700250 * Common helpers
251 */
252
253static void append_error(grpc_error **composite, grpc_error *error,
254 const char *desc) {
255 if (error == GRPC_ERROR_NONE) return;
256 if (*composite == GRPC_ERROR_NONE) {
257 *composite = GRPC_ERROR_CREATE(desc);
258 }
259 *composite = grpc_error_add_child(*composite, error);
260}
261
262/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700263 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700264 */
265
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700266/* The wakeup fd that is used to wake up all threads in a Polling island. This
267 is useful in the polling island merge operation where we need to wakeup all
268 the threads currently polling the smaller polling island (so that they can
269 start polling the new/merged polling island)
270
271 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
272 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
273static grpc_wakeup_fd polling_island_wakeup_fd;
274
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700275/* Polling island freelist */
276static gpr_mu g_pi_freelist_mu;
277static polling_island *g_pi_freelist = NULL;
278
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700279static void polling_island_delete(); /* Forward declaration */
280
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700281#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700282/* Currently TSAN may incorrectly flag data races between epoll_ctl and
283 epoll_wait for any grpc_fd structs that are added to the epoll set via
284 epoll_ctl and are returned (within a very short window) via epoll_wait().
285
286 To work-around this race, we establish a happens-before relation between
287 the code just-before epoll_ctl() and the code after epoll_wait() by using
288 this atomic */
289gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700290#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700291
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700292#ifdef GRPC_PI_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700293void pi_add_ref(polling_island *pi);
294void pi_unref(polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700295
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700296void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) {
297 long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
298 pi_add_ref(pi);
299 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
300 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700301}
302
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700303void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) {
304 long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
305 pi_unref(pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700306 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700307 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700308}
309#endif
310
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700311void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700312
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700313void pi_unref(polling_island *pi) {
314 /* If ref count went to zero, delete the polling island.
315 Note that this deletion not be done under a lock. Once the ref count goes
316 to zero, we are guaranteed that no one else holds a reference to the
317 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700318
319 Also, if we are deleting the polling island and the merged_to field is
320 non-empty, we should remove a ref to the merged_to polling island
321 */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700322 if (gpr_unref(&pi->ref_count)) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700323 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
324 polling_island_delete(pi);
325 if (next != NULL) {
326 PI_UNREF(next, "pi_delete"); /* Recursive call */
327 }
328 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700329}
330
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700331/* The caller is expected to hold pi->mu lock before calling this function */
332static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700333 size_t fd_count, bool add_fd_refs,
334 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700335 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700336 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700337 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700338 char *err_msg;
339 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700340
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700341#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700342 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700343 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700344#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700345
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700346 for (i = 0; i < fd_count; i++) {
347 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
348 ev.data.ptr = fds[i];
349 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700350
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700351 if (err < 0) {
352 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700353 gpr_asprintf(
354 &err_msg,
355 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
356 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
357 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
358 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700359 }
360
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700361 continue;
362 }
363
364 if (pi->fd_cnt == pi->fd_capacity) {
365 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
366 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
367 }
368
369 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700370 if (add_fd_refs) {
371 GRPC_FD_REF(fds[i], "polling_island");
372 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700373 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700374}
375
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700376/* The caller is expected to hold pi->mu before calling this */
377static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700378 grpc_wakeup_fd *wakeup_fd,
379 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700380 struct epoll_event ev;
381 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700382 char *err_msg;
383 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700384
385 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
386 ev.data.ptr = wakeup_fd;
387 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
388 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700389 if (err < 0 && errno != EEXIST) {
390 gpr_asprintf(&err_msg,
391 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
392 "error: %d (%s)",
393 pi->epoll_fd,
394 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
395 strerror(errno));
396 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
397 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700398 }
399}
400
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700401/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700402static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700403 bool remove_fd_refs,
404 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700405 int err;
406 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700407 char *err_msg;
408 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700409
410 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700411 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700412 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700413 gpr_asprintf(&err_msg,
414 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
415 "error: %d (%s)",
416 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
417 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
418 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700419 }
420
421 if (remove_fd_refs) {
422 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700423 }
424 }
425
426 pi->fd_cnt = 0;
427}
428
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700429/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700430static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700431 bool is_fd_closed,
432 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700433 int err;
434 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700435 char *err_msg;
436 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700437
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700438 /* If fd is already closed, then it would have been automatically been removed
439 from the epoll set */
440 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700441 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
442 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700443 gpr_asprintf(
444 &err_msg,
445 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
446 pi->epoll_fd, fd->fd, errno, strerror(errno));
447 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
448 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700449 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700450 }
451
452 for (i = 0; i < pi->fd_cnt; i++) {
453 if (pi->fds[i] == fd) {
454 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700455 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700456 break;
457 }
458 }
459}
460
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700461/* Might return NULL in case of an error */
462static polling_island *polling_island_create(grpc_fd *initial_fd,
463 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700464 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700465 char *err_msg;
466 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700467
468 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700469 gpr_mu_lock(&g_pi_freelist_mu);
470 if (g_pi_freelist != NULL) {
471 pi = g_pi_freelist;
472 g_pi_freelist = g_pi_freelist->next_free;
473 pi->next_free = NULL;
474 }
475 gpr_mu_unlock(&g_pi_freelist_mu);
476
477 /* Create new polling island if we could not get one from the free list */
478 if (pi == NULL) {
479 pi = gpr_malloc(sizeof(*pi));
480 gpr_mu_init(&pi->mu);
481 pi->fd_cnt = 0;
482 pi->fd_capacity = 0;
483 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700484 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700485
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700486 gpr_ref_init(&pi->ref_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700487 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700488
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700489 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700490
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700491 if (pi->epoll_fd < 0) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700492 gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno,
493 strerror(errno));
494 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
495 gpr_free(err_msg);
496 } else {
497 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
498 pi->next_free = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700499
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700500 if (initial_fd != NULL) {
501 /* Lock the polling island here just in case we got this structure from
502 the freelist and the polling island lock was not released yet (by the
503 code that adds the polling island to the freelist) */
504 gpr_mu_lock(&pi->mu);
505 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
506 gpr_mu_unlock(&pi->mu);
507 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700508 }
509
510 return pi;
511}
512
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700513static void polling_island_delete(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700514 GPR_ASSERT(pi->fd_cnt == 0);
515
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700516 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700517
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700518 close(pi->epoll_fd);
519 pi->epoll_fd = -1;
520
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700521 gpr_mu_lock(&g_pi_freelist_mu);
522 pi->next_free = g_pi_freelist;
523 g_pi_freelist = pi;
524 gpr_mu_unlock(&g_pi_freelist_mu);
525}
526
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700527/* Attempts to gets the last polling island in the linked list (liked by the
528 * 'merged_to' field). Since this does not lock the polling island, there are no
529 * guarantees that the island returned is the last island */
530static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
531 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
532 while (next != NULL) {
533 pi = next;
534 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
535 }
536
537 return pi;
538}
539
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700540/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700541 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700542 returned polling island's mu.
543 Usage: To lock/unlock polling island "pi", do the following:
544 polling_island *pi_latest = polling_island_lock(pi);
545 ...
546 ... critical section ..
547 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700548 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
549static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700550 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700551
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700552 while (true) {
553 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
554 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700555 /* Looks like 'pi' is the last node in the linked list but unless we check
556 this by holding the pi->mu lock, we cannot be sure (i.e without the
557 pi->mu lock, we don't prevent island merges).
558 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700559 gpr_mu_lock(&pi->mu);
560 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
561 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700562 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700563 break;
564 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700565
566 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
567 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700568 gpr_mu_unlock(&pi->mu);
569 }
570
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700571 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700572 }
573
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700574 return pi;
575}
576
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700577/* Gets the lock on the *latest* polling islands in the linked lists pointed by
578 *p and *q (and also updates *p and *q to point to the latest polling islands)
579
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700580 This function is needed because calling the following block of code to obtain
581 locks on polling islands (*p and *q) is prone to deadlocks.
582 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700583 polling_island_lock(*p, true);
584 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700585 }
586
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700587 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700588 polling_island *p1;
589 polling_island *p2;
590 ..
591 polling_island_lock_pair(&p1, &p2);
592 ..
593 .. Critical section with both p1 and p2 locked
594 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700595 // Release locks: Always call polling_island_unlock_pair() to release locks
596 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700597*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700598static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700599 polling_island *pi_1 = *p;
600 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700601 polling_island *next_1 = NULL;
602 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700603
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700604 /* The algorithm is simple:
605 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
606 keep updating pi_1 and pi_2)
607 - Then obtain locks on the islands by following a lock order rule of
608 locking polling_island with lower address first
609 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
610 pointing to the same island. If that is the case, we can just call
611 polling_island_lock()
612 - After obtaining both the locks, double check that the polling islands
613 are still the last polling islands in their respective linked lists
614 (this is because there might have been polling island merges before
615 we got the lock)
616 - If the polling islands are the last islands, we are done. If not,
617 release the locks and continue the process from the first step */
618 while (true) {
619 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
620 while (next_1 != NULL) {
621 pi_1 = next_1;
622 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700623 }
624
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700625 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
626 while (next_2 != NULL) {
627 pi_2 = next_2;
628 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
629 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700630
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700631 if (pi_1 == pi_2) {
632 pi_1 = pi_2 = polling_island_lock(pi_1);
633 break;
634 }
635
636 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700637 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700638 gpr_mu_lock(&pi_2->mu);
639 } else {
640 gpr_mu_lock(&pi_2->mu);
641 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700642 }
643
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700644 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
645 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
646 if (next_1 == NULL && next_2 == NULL) {
647 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700648 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700649
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700650 gpr_mu_unlock(&pi_1->mu);
651 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700652 }
653
654 *p = pi_1;
655 *q = pi_2;
656}
657
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700658static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
659 if (p == q) {
660 gpr_mu_unlock(&p->mu);
661 } else {
662 gpr_mu_unlock(&p->mu);
663 gpr_mu_unlock(&q->mu);
664 }
665}
666
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700667static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700668 polling_island *q,
669 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700670 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700671 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700672
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700673 if (p != q) {
674 /* Make sure that p points to the polling island with fewer fds than q */
675 if (p->fd_cnt > q->fd_cnt) {
676 GPR_SWAP(polling_island *, p, q);
677 }
678
679 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
680 Note that the refcounts on the fds being moved will not change here.
681 This is why the last param in the following two functions is 'false') */
682 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
683 polling_island_remove_all_fds_locked(p, false, error);
684
685 /* Wakeup all the pollers (if any) on p so that they pickup this change */
686 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
687
688 /* Add the 'merged_to' link from p --> q */
689 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
690 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700691 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700692 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700693
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700694 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700695
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700696 /* Return the merged polling island (Note that no merge would have happened
697 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700698 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700699}
700
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700701static grpc_error *polling_island_global_init() {
702 grpc_error *error = GRPC_ERROR_NONE;
703
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700704 gpr_mu_init(&g_pi_freelist_mu);
705 g_pi_freelist = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700706
707 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
708 if (error == GRPC_ERROR_NONE) {
709 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
710 }
711
712 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700713}
714
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700715static void polling_island_global_shutdown() {
716 polling_island *next;
717 gpr_mu_lock(&g_pi_freelist_mu);
718 gpr_mu_unlock(&g_pi_freelist_mu);
719 while (g_pi_freelist != NULL) {
720 next = g_pi_freelist->next_free;
721 gpr_mu_destroy(&g_pi_freelist->mu);
722 gpr_free(g_pi_freelist->fds);
723 gpr_free(g_pi_freelist);
724 g_pi_freelist = next;
725 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700726 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700727
728 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700729}
730
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700731/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700732 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700733 */
734
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700735/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700736 * but instead so that implementations with multiple threads in (for example)
737 * epoll_wait deal with the race between pollset removal and incoming poll
738 * notifications.
739 *
740 * The problem is that the poller ultimately holds a reference to this
741 * object, so it is very difficult to know when is safe to free it, at least
742 * without some expensive synchronization.
743 *
744 * If we keep the object freelisted, in the worst case losing this race just
745 * becomes a spurious read notification on a reused fd.
746 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700747
748/* The alarm system needs to be able to wakeup 'some poller' sometimes
749 * (specifically when a new alarm needs to be triggered earlier than the next
750 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
751 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700752
753/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
754 * sure to wake up one polling thread (which can wake up other threads if
755 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700756grpc_wakeup_fd grpc_global_wakeup_fd;
757
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700758static grpc_fd *fd_freelist = NULL;
759static gpr_mu fd_freelist_mu;
760
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700761#ifdef GRPC_FD_REF_COUNT_DEBUG
762#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
763#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
764static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
765 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700766 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
767 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700768 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
769#else
770#define REF_BY(fd, n, reason) ref_by(fd, n)
771#define UNREF_BY(fd, n, reason) unref_by(fd, n)
772static void ref_by(grpc_fd *fd, int n) {
773#endif
774 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
775}
776
777#ifdef GRPC_FD_REF_COUNT_DEBUG
778static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
779 int line) {
780 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700781 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
782 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700783 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
784#else
785static void unref_by(grpc_fd *fd, int n) {
786 gpr_atm old;
787#endif
788 old = gpr_atm_full_fetch_add(&fd->refst, -n);
789 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700790 /* Add the fd to the freelist */
791 gpr_mu_lock(&fd_freelist_mu);
792 fd->freelist_next = fd_freelist;
793 fd_freelist = fd;
794 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700795
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700796 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700797 } else {
798 GPR_ASSERT(old > n);
799 }
800}
801
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700802/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700803#ifdef GRPC_FD_REF_COUNT_DEBUG
804static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
805 int line) {
806 ref_by(fd, 2, reason, file, line);
807}
808
809static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
810 int line) {
811 unref_by(fd, 2, reason, file, line);
812}
813#else
814static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700815static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
816#endif
817
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700818static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
819
820static void fd_global_shutdown(void) {
821 gpr_mu_lock(&fd_freelist_mu);
822 gpr_mu_unlock(&fd_freelist_mu);
823 while (fd_freelist != NULL) {
824 grpc_fd *fd = fd_freelist;
825 fd_freelist = fd_freelist->freelist_next;
826 gpr_mu_destroy(&fd->mu);
827 gpr_free(fd);
828 }
829 gpr_mu_destroy(&fd_freelist_mu);
830}
831
832static grpc_fd *fd_create(int fd, const char *name) {
833 grpc_fd *new_fd = NULL;
834
835 gpr_mu_lock(&fd_freelist_mu);
836 if (fd_freelist != NULL) {
837 new_fd = fd_freelist;
838 fd_freelist = fd_freelist->freelist_next;
839 }
840 gpr_mu_unlock(&fd_freelist_mu);
841
842 if (new_fd == NULL) {
843 new_fd = gpr_malloc(sizeof(grpc_fd));
844 gpr_mu_init(&new_fd->mu);
845 gpr_mu_init(&new_fd->pi_mu);
846 }
847
848 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
849 newly created fd (or an fd we got from the freelist), no one else would be
850 holding a lock to it anyway. */
851 gpr_mu_lock(&new_fd->mu);
852
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700853 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700854 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700855 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700856 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700857 new_fd->read_closure = CLOSURE_NOT_READY;
858 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700859 new_fd->polling_island = NULL;
860 new_fd->freelist_next = NULL;
861 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700862 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700863
864 gpr_mu_unlock(&new_fd->mu);
865
866 char *fd_name;
867 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
868 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700869#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700870 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700871#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700872 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700873 return new_fd;
874}
875
876static bool fd_is_orphaned(grpc_fd *fd) {
877 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
878}
879
880static int fd_wrapped_fd(grpc_fd *fd) {
881 int ret_fd = -1;
882 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700883 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700884 ret_fd = fd->fd;
885 }
886 gpr_mu_unlock(&fd->mu);
887
888 return ret_fd;
889}
890
891static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
892 grpc_closure *on_done, int *release_fd,
893 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700894 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700895 grpc_error *error = GRPC_ERROR_NONE;
896
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700897 gpr_mu_lock(&fd->mu);
898 fd->on_done_closure = on_done;
899
900 /* If release_fd is not NULL, we should be relinquishing control of the file
901 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700902 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700903 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700904 } else {
905 close(fd->fd);
906 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700907 }
908
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700909 fd->orphaned = true;
910
911 /* Remove the active status but keep referenced. We want this grpc_fd struct
912 to be alive (and not added to freelist) until the end of this function */
913 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700914
915 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700916 - Get a lock on the latest polling island (i.e the last island in the
917 linked list pointed by fd->polling_island). This is the island that
918 would actually contain the fd
919 - Remove the fd from the latest polling island
920 - Unlock the latest polling island
921 - Set fd->polling_island to NULL (but remove the ref on the polling island
922 before doing this.) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700923 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700924 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700925 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700926 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700927 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700928
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700929 PI_UNREF(fd->polling_island, "fd_orphan");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700930 fd->polling_island = NULL;
931 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700932 gpr_mu_unlock(&fd->pi_mu);
933
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700934 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700935
936 gpr_mu_unlock(&fd->mu);
937 UNREF_BY(fd, 2, reason); /* Drop the reference */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700938 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700939}
940
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700941static grpc_error *fd_shutdown_error(bool shutdown) {
942 if (!shutdown) {
943 return GRPC_ERROR_NONE;
944 } else {
945 return GRPC_ERROR_CREATE("FD shutdown");
946 }
947}
948
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700949static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
950 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700951 if (fd->shutdown) {
952 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
953 NULL);
954 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700955 /* not ready ==> switch to a waiting state by setting the closure */
956 *st = closure;
957 } else if (*st == CLOSURE_READY) {
958 /* already ready ==> queue the closure to run immediately */
959 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700960 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
961 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700962 } else {
963 /* upcallptr was set to a different closure. This is an error! */
964 gpr_log(GPR_ERROR,
965 "User called a notify_on function with a previous callback still "
966 "pending");
967 abort();
968 }
969}
970
971/* returns 1 if state becomes not ready */
972static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
973 grpc_closure **st) {
974 if (*st == CLOSURE_READY) {
975 /* duplicate ready ==> ignore */
976 return 0;
977 } else if (*st == CLOSURE_NOT_READY) {
978 /* not ready, and not waiting ==> flag ready */
979 *st = CLOSURE_READY;
980 return 0;
981 } else {
982 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700983 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700984 *st = CLOSURE_NOT_READY;
985 return 1;
986 }
987}
988
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700989static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
990 grpc_fd *fd) {
991 grpc_pollset *notifier = NULL;
992
993 gpr_mu_lock(&fd->mu);
994 notifier = fd->read_notifier_pollset;
995 gpr_mu_unlock(&fd->mu);
996
997 return notifier;
998}
999
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001000static bool fd_is_shutdown(grpc_fd *fd) {
1001 gpr_mu_lock(&fd->mu);
1002 const bool r = fd->shutdown;
1003 gpr_mu_unlock(&fd->mu);
1004 return r;
1005}
1006
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001007/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001008static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1009 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001010 /* Do the actual shutdown only once */
1011 if (!fd->shutdown) {
1012 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001013
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001014 shutdown(fd->fd, SHUT_RDWR);
1015 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1016 at this point, the closures would be called with 'success = false' */
1017 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1018 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1019 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001020 gpr_mu_unlock(&fd->mu);
1021}
1022
1023static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1024 grpc_closure *closure) {
1025 gpr_mu_lock(&fd->mu);
1026 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1027 gpr_mu_unlock(&fd->mu);
1028}
1029
1030static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1031 grpc_closure *closure) {
1032 gpr_mu_lock(&fd->mu);
1033 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1034 gpr_mu_unlock(&fd->mu);
1035}
1036
1037/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001038 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001039 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001040GPR_TLS_DECL(g_current_thread_pollset);
1041GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001042static __thread bool g_initialized_sigmask;
1043static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001044
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001045static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001046#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001047 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001048#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001049}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001050
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001051static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001052
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001053/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001054static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001055 gpr_tls_init(&g_current_thread_pollset);
1056 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001057 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001058 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001059}
1060
1061static void pollset_global_shutdown(void) {
1062 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001063 gpr_tls_destroy(&g_current_thread_pollset);
1064 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001065}
1066
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001067static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1068 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001069 GRPC_POLLING_TRACE("pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1070 (void *)worker, worker->pt_id);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001071 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1072 if (err_num != 0) {
1073 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1074 }
1075 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001076}
1077
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001078/* Return 1 if the pollset has active threads in pollset_work (pollset must
1079 * be locked) */
1080static int pollset_has_workers(grpc_pollset *p) {
1081 return p->root_worker.next != &p->root_worker;
1082}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001083
1084static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1085 worker->prev->next = worker->next;
1086 worker->next->prev = worker->prev;
1087}
1088
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001089static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1090 if (pollset_has_workers(p)) {
1091 grpc_pollset_worker *w = p->root_worker.next;
1092 remove_worker(p, w);
1093 return w;
1094 } else {
1095 return NULL;
1096 }
1097}
1098
1099static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1100 worker->next = &p->root_worker;
1101 worker->prev = worker->next->prev;
1102 worker->prev->next = worker->next->prev = worker;
1103}
1104
1105static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1106 worker->prev = &p->root_worker;
1107 worker->next = worker->prev->next;
1108 worker->prev->next = worker->next->prev = worker;
1109}
1110
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001111/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001112static grpc_error *pollset_kick(grpc_pollset *p,
1113 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001114 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001115 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001116 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001117 grpc_pollset_worker *worker = specific_worker;
1118 if (worker != NULL) {
1119 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001120 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001121 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001122 for (worker = p->root_worker.next; worker != &p->root_worker;
1123 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001124 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001125 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001126 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001127 }
Craig Tillera218a062016-06-26 09:58:37 -07001128 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001129 } else {
1130 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001131 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001132 } else {
1133 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001134 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001135 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001136 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001137 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001138 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1139 /* Since worker == NULL, it means that we can kick "any" worker on this
1140 pollset 'p'. If 'p' happens to be the same pollset this thread is
1141 currently polling (i.e in pollset_work() function), then there is no need
1142 to kick any other worker since the current thread can just absorb the
1143 kick. This is the reason why we enter this case only when
1144 g_current_thread_pollset is != p */
1145
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001146 GPR_TIMER_MARK("kick_anonymous", 0);
1147 worker = pop_front_worker(p);
1148 if (worker != NULL) {
1149 GPR_TIMER_MARK("finally_kick", 0);
1150 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001151 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001152 } else {
1153 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001154 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001155 }
1156 }
1157
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001158 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001159 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1160 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001161}
1162
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001163static grpc_error *kick_poller(void) {
1164 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1165}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001166
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001167static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1168 gpr_mu_init(&pollset->mu);
1169 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001170
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001171 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001172 pollset->kicked_without_pollers = false;
1173
1174 pollset->shutting_down = false;
1175 pollset->finish_shutdown_called = false;
1176 pollset->shutdown_done = NULL;
1177
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001178 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001179}
1180
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001181/* Convert a timespec to milliseconds:
1182 - Very small or negative poll times are clamped to zero to do a non-blocking
1183 poll (which becomes spin polling)
1184 - Other small values are rounded up to one millisecond
1185 - Longer than a millisecond polls are rounded up to the next nearest
1186 millisecond to avoid spinning
1187 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001188static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1189 gpr_timespec now) {
1190 gpr_timespec timeout;
1191 static const int64_t max_spin_polling_us = 10;
1192 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1193 return -1;
1194 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001195
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001196 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1197 max_spin_polling_us,
1198 GPR_TIMESPAN))) <= 0) {
1199 return 0;
1200 }
1201 timeout = gpr_time_sub(deadline, now);
1202 return gpr_time_to_millis(gpr_time_add(
1203 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1204}
1205
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001206static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1207 grpc_pollset *notifier) {
1208 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001209 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001210 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1211 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001212 gpr_mu_unlock(&fd->mu);
1213}
1214
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001215static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001216 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1217 gpr_mu_lock(&fd->mu);
1218 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1219 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001220}
1221
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001222static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001223 if (ps->polling_island != NULL) {
1224 PI_UNREF(ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001225 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001226 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001227}
1228
1229static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1230 grpc_pollset *pollset) {
1231 /* The pollset cannot have any workers if we are at this stage */
1232 GPR_ASSERT(!pollset_has_workers(pollset));
1233
1234 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001235
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001236 /* Release the ref and set pollset->polling_island to NULL */
1237 pollset_release_polling_island(pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001238 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001239}
1240
1241/* pollset->mu lock must be held by the caller before calling this */
1242static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1243 grpc_closure *closure) {
1244 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1245 GPR_ASSERT(!pollset->shutting_down);
1246 pollset->shutting_down = true;
1247 pollset->shutdown_done = closure;
1248 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1249
1250 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1251 because it would release the underlying polling island. In such a case, we
1252 let the last worker call finish_shutdown_locked() from pollset_work() */
1253 if (!pollset_has_workers(pollset)) {
1254 GPR_ASSERT(!pollset->finish_shutdown_called);
1255 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1256 finish_shutdown_locked(exec_ctx, pollset);
1257 }
1258 GPR_TIMER_END("pollset_shutdown", 0);
1259}
1260
1261/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1262 * than destroying the mutexes, there is nothing special that needs to be done
1263 * here */
1264static void pollset_destroy(grpc_pollset *pollset) {
1265 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001266 gpr_mu_destroy(&pollset->mu);
1267}
1268
1269static void pollset_reset(grpc_pollset *pollset) {
1270 GPR_ASSERT(pollset->shutting_down);
1271 GPR_ASSERT(!pollset_has_workers(pollset));
1272 pollset->shutting_down = false;
1273 pollset->finish_shutdown_called = false;
1274 pollset->kicked_without_pollers = false;
1275 pollset->shutdown_done = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001276 pollset_release_polling_island(pollset, "ps_reset");
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001277}
1278
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001279#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001280/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1281static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001282 grpc_pollset *pollset,
1283 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001284 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001285 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001286 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001287 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001288 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001289 char *err_msg;
1290 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001291 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1292
1293 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001294 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001295
1296 Since epoll_fd is immutable, we can read it without obtaining the polling
1297 island lock. There is however a possibility that the polling island (from
1298 which we got the epoll_fd) got merged with another island while we are
1299 in this function. This is still okay because in such a case, we will wakeup
1300 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001301 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001302
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001303 if (pollset->polling_island == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001304 pollset->polling_island = polling_island_create(NULL, error);
1305 if (pollset->polling_island == NULL) {
1306 GPR_TIMER_END("pollset_work_and_unlock", 0);
1307 return; /* Fatal error. We cannot continue */
1308 }
1309
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001310 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001311 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1312 (void *)pollset, (void *)pollset->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001313 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001314
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001315 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001316 epoll_fd = pi->epoll_fd;
1317
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001318 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001319 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001320 if (pollset->polling_island != pi) {
1321 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1322 polling island to be deleted */
1323 PI_ADD_REF(pi, "ps");
1324 PI_UNREF(pollset->polling_island, "ps");
1325 pollset->polling_island = pi;
1326 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001327
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001328 /* Add an extra ref so that the island does not get destroyed (which means
1329 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1330 epoll_fd */
1331 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001332 gpr_mu_unlock(&pollset->mu);
1333
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001334 do {
1335 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1336 sig_mask);
1337 if (ep_rv < 0) {
1338 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001339 gpr_asprintf(&err_msg,
1340 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1341 epoll_fd, errno, strerror(errno));
1342 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001343 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001344 /* We were interrupted. Save an interation by doing a zero timeout
1345 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001346 GRPC_POLLING_TRACE(
1347 "pollset_work: pollset: %p, worker: %p received kick",
1348 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001349 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001350 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001351 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001352
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001353#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001354 /* See the definition of g_poll_sync for more details */
1355 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001356#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001357
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001358 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001359 void *data_ptr = ep_ev[i].data.ptr;
1360 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001361 append_error(error,
1362 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1363 err_desc);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001364 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001365 GRPC_POLLING_TRACE(
1366 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1367 "%d) got merged",
1368 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001369 /* This means that our polling island is merged with a different
1370 island. We do not have to do anything here since the subsequent call
1371 to the function pollset_work_and_unlock() will pick up the correct
1372 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001373 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001374 grpc_fd *fd = data_ptr;
1375 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1376 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1377 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001378 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001379 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001380 }
1381 if (write_ev || cancel) {
1382 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001383 }
1384 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001385 }
1386 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001387
1388 GPR_ASSERT(pi != NULL);
1389
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001390 /* Before leaving, release the extra ref we added to the polling island. It
1391 is important to use "pi" here (i.e our old copy of pollset->polling_island
1392 that we got before releasing the polling island lock). This is because
1393 pollset->polling_island pointer might get udpated in other parts of the
1394 code when there is an island merge while we are doing epoll_wait() above */
1395 PI_UNREF(pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001396
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001397 GPR_TIMER_END("pollset_work_and_unlock", 0);
1398}
1399
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001400/* pollset->mu lock must be held by the caller before calling this.
1401 The function pollset_work() may temporarily release the lock (pollset->mu)
1402 during the course of its execution but it will always re-acquire the lock and
1403 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001404static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1405 grpc_pollset_worker **worker_hdl,
1406 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001407 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001408 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001409 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1410
1411 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001412
1413 grpc_pollset_worker worker;
1414 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001415 worker.pt_id = pthread_self();
1416
1417 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001418
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001419 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1420 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001421
1422 if (pollset->kicked_without_pollers) {
1423 /* If the pollset was kicked without pollers, pretend that the current
1424 worker got the kick and skip polling. A kick indicates that there is some
1425 work that needs attention like an event on the completion queue or an
1426 alarm */
1427 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1428 pollset->kicked_without_pollers = 0;
1429 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001430 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
1431 (i.e 'kicking') a worker in the pollset.
1432 A 'kick' is a way to inform that worker that there is some pending work
1433 that needs immediate attention (like an event on the completion queue,
1434 or a polling island merge that results in a new epoll-fd to wait on) and
1435 that the worker should not spend time waiting in epoll_pwait().
1436
1437 A kick can come at anytime (i.e before/during or after the worker calls
1438 epoll_pwait()) but in all cases we have to make sure that when a worker
1439 gets a kick, it does not spend time in epoll_pwait(). In other words, one
1440 kick should result in skipping/exiting of one epoll_pwait();
1441
1442 To accomplish this, we mask 'grpc_wakeup_signal' on this worker at all
1443 times *except* when it is in epoll_pwait(). This way, the worker never
1444 misses acting on a kick */
1445
Craig Tiller19196992016-06-27 18:45:56 -07001446 if (!g_initialized_sigmask) {
1447 sigemptyset(&new_mask);
1448 sigaddset(&new_mask, grpc_wakeup_signal);
1449 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1450 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1451 g_initialized_sigmask = true;
1452 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1453 This is the mask used at all times *except during
1454 epoll_wait()*"
1455 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001456 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001457
Craig Tiller19196992016-06-27 18:45:56 -07001458 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001459 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001460 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001461
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001462 push_front_worker(pollset, &worker); /* Add worker to pollset */
1463
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001464 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1465 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001466 grpc_exec_ctx_flush(exec_ctx);
1467
1468 gpr_mu_lock(&pollset->mu);
1469 remove_worker(pollset, &worker);
1470 }
1471
1472 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1473 false at this point) and the pollset is shutting down, we may have to
1474 finish the shutdown process by calling finish_shutdown_locked().
1475 See pollset_shutdown() for more details.
1476
1477 Note: Continuing to access pollset here is safe; it is the caller's
1478 responsibility to not destroy a pollset when it has outstanding calls to
1479 pollset_work() */
1480 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1481 !pollset->finish_shutdown_called) {
1482 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1483 finish_shutdown_locked(exec_ctx, pollset);
1484
1485 gpr_mu_unlock(&pollset->mu);
1486 grpc_exec_ctx_flush(exec_ctx);
1487 gpr_mu_lock(&pollset->mu);
1488 }
1489
1490 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001491
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001492 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1493 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001494
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001495 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001496
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001497 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1498 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001499}
1500
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001501static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1502 grpc_fd *fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001503 grpc_error *error = GRPC_ERROR_NONE;
1504
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001505 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001506 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001507
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001508 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001509
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001510 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1511 * equal, do nothing.
1512 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1513 * a new polling island (with a refcount of 2) and make the polling_island
1514 * fields in both fd and pollset to point to the new island
1515 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1516 * the NULL polling_island field to point to the non-NULL polling_island
1517 * field (ensure that the refcount on the polling island is incremented by
1518 * 1 to account for the newly added reference)
1519 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1520 * and different, merge both the polling islands and update the
1521 * polling_island fields in both fd and pollset to point to the merged
1522 * polling island.
1523 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001524 if (fd->polling_island == pollset->polling_island) {
1525 pi_new = fd->polling_island;
1526 if (pi_new == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001527 pi_new = polling_island_create(fd, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001528
1529 GRPC_POLLING_TRACE(
1530 "pollset_add_fd: Created new polling island. pi_new:%p (fd: %d, "
1531 "pollset: %p)",
1532 (void *)pi_new, fd->fd, (void *)pollset);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001533 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001534 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001535 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001536 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001537 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001538
1539 GRPC_POLLING_TRACE(
1540 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1541 "pollset->pi: %p)",
1542 (void *)pi_new, fd->fd, (void *)pollset,
1543 (void *)pollset->polling_island);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001544 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001545 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001546 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001547
1548 GRPC_POLLING_TRACE(
1549 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1550 "%p, fd->pi: %p",
1551 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001552 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001553 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1554 &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001555 GRPC_POLLING_TRACE(
1556 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1557 "%p, fd->pi: %p, pollset->pi: %p)",
1558 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
1559 (void *)pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001560 }
1561
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001562 /* At this point, pi_new is the polling island that both fd->polling_island
1563 and pollset->polling_island must be pointing to */
1564
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001565 if (fd->polling_island != pi_new) {
1566 PI_ADD_REF(pi_new, "fd");
1567 if (fd->polling_island != NULL) {
1568 PI_UNREF(fd->polling_island, "fd");
1569 }
1570 fd->polling_island = pi_new;
1571 }
1572
1573 if (pollset->polling_island != pi_new) {
1574 PI_ADD_REF(pi_new, "ps");
1575 if (pollset->polling_island != NULL) {
1576 PI_UNREF(pollset->polling_island, "ps");
1577 }
1578 pollset->polling_island = pi_new;
1579 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001580
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001581 gpr_mu_unlock(&fd->pi_mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001582 gpr_mu_unlock(&pollset->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001583}
1584
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001585/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001586 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001587 */
1588
1589static grpc_pollset_set *pollset_set_create(void) {
1590 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1591 memset(pollset_set, 0, sizeof(*pollset_set));
1592 gpr_mu_init(&pollset_set->mu);
1593 return pollset_set;
1594}
1595
1596static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1597 size_t i;
1598 gpr_mu_destroy(&pollset_set->mu);
1599 for (i = 0; i < pollset_set->fd_count; i++) {
1600 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1601 }
1602 gpr_free(pollset_set->pollsets);
1603 gpr_free(pollset_set->pollset_sets);
1604 gpr_free(pollset_set->fds);
1605 gpr_free(pollset_set);
1606}
1607
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001608static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1609 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1610 size_t i;
1611 gpr_mu_lock(&pollset_set->mu);
1612 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1613 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1614 pollset_set->fds = gpr_realloc(
1615 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1616 }
1617 GRPC_FD_REF(fd, "pollset_set");
1618 pollset_set->fds[pollset_set->fd_count++] = fd;
1619 for (i = 0; i < pollset_set->pollset_count; i++) {
1620 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1621 }
1622 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1623 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1624 }
1625 gpr_mu_unlock(&pollset_set->mu);
1626}
1627
1628static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1629 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1630 size_t i;
1631 gpr_mu_lock(&pollset_set->mu);
1632 for (i = 0; i < pollset_set->fd_count; i++) {
1633 if (pollset_set->fds[i] == fd) {
1634 pollset_set->fd_count--;
1635 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1636 pollset_set->fds[pollset_set->fd_count]);
1637 GRPC_FD_UNREF(fd, "pollset_set");
1638 break;
1639 }
1640 }
1641 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1642 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1643 }
1644 gpr_mu_unlock(&pollset_set->mu);
1645}
1646
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001647static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1648 grpc_pollset_set *pollset_set,
1649 grpc_pollset *pollset) {
1650 size_t i, j;
1651 gpr_mu_lock(&pollset_set->mu);
1652 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1653 pollset_set->pollset_capacity =
1654 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1655 pollset_set->pollsets =
1656 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1657 sizeof(*pollset_set->pollsets));
1658 }
1659 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1660 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1661 if (fd_is_orphaned(pollset_set->fds[i])) {
1662 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1663 } else {
1664 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1665 pollset_set->fds[j++] = pollset_set->fds[i];
1666 }
1667 }
1668 pollset_set->fd_count = j;
1669 gpr_mu_unlock(&pollset_set->mu);
1670}
1671
1672static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1673 grpc_pollset_set *pollset_set,
1674 grpc_pollset *pollset) {
1675 size_t i;
1676 gpr_mu_lock(&pollset_set->mu);
1677 for (i = 0; i < pollset_set->pollset_count; i++) {
1678 if (pollset_set->pollsets[i] == pollset) {
1679 pollset_set->pollset_count--;
1680 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1681 pollset_set->pollsets[pollset_set->pollset_count]);
1682 break;
1683 }
1684 }
1685 gpr_mu_unlock(&pollset_set->mu);
1686}
1687
1688static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1689 grpc_pollset_set *bag,
1690 grpc_pollset_set *item) {
1691 size_t i, j;
1692 gpr_mu_lock(&bag->mu);
1693 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1694 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1695 bag->pollset_sets =
1696 gpr_realloc(bag->pollset_sets,
1697 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1698 }
1699 bag->pollset_sets[bag->pollset_set_count++] = item;
1700 for (i = 0, j = 0; i < bag->fd_count; i++) {
1701 if (fd_is_orphaned(bag->fds[i])) {
1702 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1703 } else {
1704 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1705 bag->fds[j++] = bag->fds[i];
1706 }
1707 }
1708 bag->fd_count = j;
1709 gpr_mu_unlock(&bag->mu);
1710}
1711
1712static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1713 grpc_pollset_set *bag,
1714 grpc_pollset_set *item) {
1715 size_t i;
1716 gpr_mu_lock(&bag->mu);
1717 for (i = 0; i < bag->pollset_set_count; i++) {
1718 if (bag->pollset_sets[i] == item) {
1719 bag->pollset_set_count--;
1720 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1721 bag->pollset_sets[bag->pollset_set_count]);
1722 break;
1723 }
1724 }
1725 gpr_mu_unlock(&bag->mu);
1726}
1727
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001728/* Test helper functions
1729 * */
1730void *grpc_fd_get_polling_island(grpc_fd *fd) {
1731 polling_island *pi;
1732
1733 gpr_mu_lock(&fd->pi_mu);
1734 pi = fd->polling_island;
1735 gpr_mu_unlock(&fd->pi_mu);
1736
1737 return pi;
1738}
1739
1740void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1741 polling_island *pi;
1742
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001743 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001744 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001745 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001746
1747 return pi;
1748}
1749
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001750bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001751 polling_island *p1 = p;
1752 polling_island *p2 = q;
1753
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001754 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1755 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001756 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001757 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001758
1759 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001760}
1761
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001762/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001763 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001764 */
1765
1766static void shutdown_engine(void) {
1767 fd_global_shutdown();
1768 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001769 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001770}
1771
1772static const grpc_event_engine_vtable vtable = {
1773 .pollset_size = sizeof(grpc_pollset),
1774
1775 .fd_create = fd_create,
1776 .fd_wrapped_fd = fd_wrapped_fd,
1777 .fd_orphan = fd_orphan,
1778 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001779 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001780 .fd_notify_on_read = fd_notify_on_read,
1781 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001782 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001783
1784 .pollset_init = pollset_init,
1785 .pollset_shutdown = pollset_shutdown,
1786 .pollset_reset = pollset_reset,
1787 .pollset_destroy = pollset_destroy,
1788 .pollset_work = pollset_work,
1789 .pollset_kick = pollset_kick,
1790 .pollset_add_fd = pollset_add_fd,
1791
1792 .pollset_set_create = pollset_set_create,
1793 .pollset_set_destroy = pollset_set_destroy,
1794 .pollset_set_add_pollset = pollset_set_add_pollset,
1795 .pollset_set_del_pollset = pollset_set_del_pollset,
1796 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1797 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1798 .pollset_set_add_fd = pollset_set_add_fd,
1799 .pollset_set_del_fd = pollset_set_del_fd,
1800
1801 .kick_poller = kick_poller,
1802
1803 .shutdown_engine = shutdown_engine,
1804};
1805
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001806/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1807 * Create a dummy epoll_fd to make sure epoll support is available */
1808static bool is_epoll_available() {
1809 int fd = epoll_create1(EPOLL_CLOEXEC);
1810 if (fd < 0) {
1811 gpr_log(
1812 GPR_ERROR,
1813 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1814 fd);
1815 return false;
1816 }
1817 close(fd);
1818 return true;
1819}
1820
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001821const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001822 /* If use of signals is disabled, we cannot use epoll engine*/
1823 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1824 return NULL;
1825 }
1826
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001827 if (!is_epoll_available()) {
1828 return NULL;
1829 }
1830
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001831 if (!is_grpc_wakeup_signal_initialized) {
1832 grpc_use_signal(SIGRTMIN + 2);
1833 }
1834
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001835 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001836
1837 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1838 return NULL;
1839 }
1840
1841 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1842 polling_island_global_init())) {
1843 return NULL;
1844 }
1845
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001846 return &vtable;
1847}
1848
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001849#else /* defined(GPR_LINUX_EPOLL) */
1850#if defined(GPR_POSIX_SOCKET)
1851#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001852/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1853 * NULL */
1854const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001855#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001856
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001857void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001858#endif /* !defined(GPR_LINUX_EPOLL) */