blob: ac6a215725664a76d0fd9b273cf4a5628f20d3c4 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
45#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070060#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070061#include "src/core/lib/profiling/timers.h"
62#include "src/core/lib/support/block_annotate.h"
63
Sree Kuchibhotla34217242016-06-29 00:19:07 -070064/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
65static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070066#define GRPC_POLLING_TRACE(fmt, ...) \
67 if (grpc_polling_trace) { \
68 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
69 }
70
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070071static int grpc_wakeup_signal = -1;
72static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070073
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070074/* Implements the function defined in grpc_posix.h. This function might be
75 * called before even calling grpc_init() to set either a different signal to
76 * use. If signum == -1, then the use of signals is disabled */
77void grpc_use_signal(int signum) {
78 grpc_wakeup_signal = signum;
79 is_grpc_wakeup_signal_initialized = true;
80
81 if (grpc_wakeup_signal < 0) {
82 gpr_log(GPR_INFO,
83 "Use of signals is disabled. Epoll engine will not be used");
84 } else {
85 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
86 grpc_wakeup_signal);
87 }
88}
89
90struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070091
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070093 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070094 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095struct grpc_fd {
96 int fd;
97 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070098 bit 0 : 1=Active / 0=Orphaned
99 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700100 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700101 gpr_atm refst;
102
103 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700104
105 /* Indicates that the fd is shutdown and that any pending read/write closures
106 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700107 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700108
109 /* The fd is either closed or we relinquished control of it. In either cases,
110 this indicates that the 'fd' on this structure is no longer valid */
111 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700112
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700113 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *read_closure;
115 grpc_closure *write_closure;
116
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700117 /* The polling island to which this fd belongs to and the mutex protecting the
118 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700120 struct polling_island *polling_island;
121
122 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123 grpc_closure *on_done_closure;
124
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700125 /* The pollset that last noticed that the fd is readable */
126 grpc_pollset *read_notifier_pollset;
127
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700128 grpc_iomgr_object iomgr_object;
129};
130
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700131/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700132// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700133#ifdef GRPC_FD_REF_COUNT_DEBUG
134static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
135static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
136 int line);
137#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
138#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
139#else
140static void fd_ref(grpc_fd *fd);
141static void fd_unref(grpc_fd *fd);
142#define GRPC_FD_REF(fd, reason) fd_ref(fd)
143#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
144#endif
145
146static void fd_global_init(void);
147static void fd_global_shutdown(void);
148
149#define CLOSURE_NOT_READY ((grpc_closure *)0)
150#define CLOSURE_READY ((grpc_closure *)1)
151
152/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700153 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700154 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700155
156// #define GRPC_PI_REF_COUNT_DEBUG
157#ifdef GRPC_PI_REF_COUNT_DEBUG
158
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700159#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700160#define PI_UNREF(exec_ctx, p, r) \
161 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700162
163#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
164
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700165#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700166#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700167
168#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
169
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700170typedef struct polling_island {
171 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700172 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
173 the refcount.
174 Once the ref count becomes zero, this structure is destroyed which means
175 we should ensure that there is never a scenario where a PI_ADD_REF() is
176 racing with a PI_UNREF() that just made the ref_count zero. */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700177 gpr_refcount ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700178
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700179 /* Pointer to the polling_island this merged into.
180 * merged_to value is only set once in polling_island's lifetime (and that too
181 * only if the island is merged with another island). Because of this, we can
182 * use gpr_atm type here so that we can do atomic access on this and reduce
183 * lock contention on 'mu' mutex.
184 *
185 * Note that if this field is not NULL (i.e not 0), all the remaining fields
186 * (except mu and ref_count) are invalid and must be ignored. */
187 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700188
Craig Tillerb39307d2016-06-30 15:39:13 -0700189 /* The workqueue associated with this polling island */
190 grpc_workqueue *workqueue;
191
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700192 /* The fd of the underlying epoll set */
193 int epoll_fd;
194
195 /* The file descriptors in the epoll set */
196 size_t fd_cnt;
197 size_t fd_capacity;
198 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700199} polling_island;
200
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700201/*******************************************************************************
202 * Pollset Declarations
203 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700204struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700205 /* Thread id of this worker */
206 pthread_t pt_id;
207
208 /* Used to prevent a worker from getting kicked multiple times */
209 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700210 struct grpc_pollset_worker *next;
211 struct grpc_pollset_worker *prev;
212};
213
214struct grpc_pollset {
215 gpr_mu mu;
216 grpc_pollset_worker root_worker;
217 bool kicked_without_pollers;
218
219 bool shutting_down; /* Is the pollset shutting down ? */
220 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
221 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
222
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700223 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700224 struct polling_island *polling_island;
225};
226
227/*******************************************************************************
228 * Pollset-set Declarations
229 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700230/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
231 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
232 * the current pollset_set would result in polling island merges. This would
233 * remove the need to maintain fd_count here. This will also significantly
234 * simplify the grpc_fd structure since we would no longer need to explicitly
235 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700236struct grpc_pollset_set {
237 gpr_mu mu;
238
239 size_t pollset_count;
240 size_t pollset_capacity;
241 grpc_pollset **pollsets;
242
243 size_t pollset_set_count;
244 size_t pollset_set_capacity;
245 struct grpc_pollset_set **pollset_sets;
246
247 size_t fd_count;
248 size_t fd_capacity;
249 grpc_fd **fds;
250};
251
252/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700253 * Common helpers
254 */
255
256static void append_error(grpc_error **composite, grpc_error *error,
257 const char *desc) {
258 if (error == GRPC_ERROR_NONE) return;
259 if (*composite == GRPC_ERROR_NONE) {
260 *composite = GRPC_ERROR_CREATE(desc);
261 }
262 *composite = grpc_error_add_child(*composite, error);
263}
264
265/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700266 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700267 */
268
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700269/* The wakeup fd that is used to wake up all threads in a Polling island. This
270 is useful in the polling island merge operation where we need to wakeup all
271 the threads currently polling the smaller polling island (so that they can
272 start polling the new/merged polling island)
273
274 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
275 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
276static grpc_wakeup_fd polling_island_wakeup_fd;
277
Craig Tillerb39307d2016-06-30 15:39:13 -0700278/* Forward declaration */
279static void polling_island_delete(grpc_exec_ctx *exec_ctx);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700280
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700281#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700282/* Currently TSAN may incorrectly flag data races between epoll_ctl and
283 epoll_wait for any grpc_fd structs that are added to the epoll set via
284 epoll_ctl and are returned (within a very short window) via epoll_wait().
285
286 To work-around this race, we establish a happens-before relation between
287 the code just-before epoll_ctl() and the code after epoll_wait() by using
288 this atomic */
289gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700290#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700291
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700292#ifdef GRPC_PI_REF_COUNT_DEBUG
Craig Tillerb39307d2016-06-30 15:39:13 -0700293static void pi_add_ref(polling_island *pi);
294static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700295
Craig Tillerb39307d2016-06-30 15:39:13 -0700296static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
297 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700298 long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
299 pi_add_ref(pi);
300 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
301 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700302}
303
Craig Tillerb39307d2016-06-30 15:39:13 -0700304static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
305 char *reason, char *file, int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700306 long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
Craig Tillerb39307d2016-06-30 15:39:13 -0700307 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700308 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700309 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700310}
311#endif
312
Craig Tillerb39307d2016-06-30 15:39:13 -0700313static void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700314
Craig Tillerb39307d2016-06-30 15:39:13 -0700315static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700316 /* If ref count went to zero, delete the polling island.
317 Note that this deletion not be done under a lock. Once the ref count goes
318 to zero, we are guaranteed that no one else holds a reference to the
319 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700320
321 Also, if we are deleting the polling island and the merged_to field is
322 non-empty, we should remove a ref to the merged_to polling island
323 */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700324 if (gpr_unref(&pi->ref_count)) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700325 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
Craig Tillerb39307d2016-06-30 15:39:13 -0700326 polling_island_delete(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700327 if (next != NULL) {
328 PI_UNREF(next, "pi_delete"); /* Recursive call */
329 }
330 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700331}
332
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700333/* The caller is expected to hold pi->mu lock before calling this function */
334static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700335 size_t fd_count, bool add_fd_refs,
336 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700337 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700338 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700339 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700340 char *err_msg;
341 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700342
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700343#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700344 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700345 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700346#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700347
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700348 for (i = 0; i < fd_count; i++) {
349 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
350 ev.data.ptr = fds[i];
351 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700352
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700353 if (err < 0) {
354 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700355 gpr_asprintf(
356 &err_msg,
357 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
358 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
359 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
360 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700361 }
362
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700363 continue;
364 }
365
366 if (pi->fd_cnt == pi->fd_capacity) {
367 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
368 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
369 }
370
371 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700372 if (add_fd_refs) {
373 GRPC_FD_REF(fds[i], "polling_island");
374 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700375 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700376}
377
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700378/* The caller is expected to hold pi->mu before calling this */
379static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700380 grpc_wakeup_fd *wakeup_fd,
381 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700382 struct epoll_event ev;
383 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700384 char *err_msg;
385 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700386
387 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
388 ev.data.ptr = wakeup_fd;
389 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
390 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700391 if (err < 0 && errno != EEXIST) {
392 gpr_asprintf(&err_msg,
393 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
394 "error: %d (%s)",
395 pi->epoll_fd,
396 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
397 strerror(errno));
398 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
399 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700400 }
401}
402
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700403/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700404static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700405 bool remove_fd_refs,
406 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700407 int err;
408 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700409 char *err_msg;
410 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700411
412 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700413 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700414 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700415 gpr_asprintf(&err_msg,
416 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
417 "error: %d (%s)",
418 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
419 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
420 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700421 }
422
423 if (remove_fd_refs) {
424 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700425 }
426 }
427
428 pi->fd_cnt = 0;
429}
430
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700431/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700432static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700433 bool is_fd_closed,
434 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700435 int err;
436 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700437 char *err_msg;
438 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700439
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700440 /* If fd is already closed, then it would have been automatically been removed
441 from the epoll set */
442 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700443 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
444 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700445 gpr_asprintf(
446 &err_msg,
447 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
448 pi->epoll_fd, fd->fd, errno, strerror(errno));
449 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
450 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700451 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700452 }
453
454 for (i = 0; i < pi->fd_cnt; i++) {
455 if (pi->fds[i] == fd) {
456 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700457 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700458 break;
459 }
460 }
461}
462
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700463/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700464static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
465 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700466 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700467 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700468 char *err_msg;
469 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700470
Craig Tillerb39307d2016-06-30 15:39:13 -0700471 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700472
Craig Tillerb39307d2016-06-30 15:39:13 -0700473 pi = gpr_malloc(sizeof(*pi));
474 gpr_mu_init(&pi->mu);
475 pi->fd_cnt = 0;
476 pi->fd_capacity = 0;
477 pi->fds = NULL;
478 pi->epoll_fd = -1;
479 pi->workqueue = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700480
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700481 gpr_ref_init(&pi->ref_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700482 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700483
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700484 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700485
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700486 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700487 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
488 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700489 }
490
Craig Tillerb39307d2016-06-30 15:39:13 -0700491 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
492
493 if (initial_fd != NULL) {
494 /* Lock the polling island here just in case we got this structure from
495 the freelist and the polling island lock was not released yet (by the
496 code that adds the polling island to the freelist) */
497 gpr_mu_lock(&pi->mu);
498 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
499 gpr_mu_unlock(&pi->mu);
500 }
501
502 append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
503 err_desc);
504
505done:
506 if (*error != GRPC_ERROR_NONE) {
507 if (pi->epoll_fd < 0) {
508 close(pi->epoll_fd);
509 }
510 if (pi->workqueue != NULL) {
511 GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
512 }
513 gpr_mu_destroy(&pi->mu);
514 gpr_free(pi);
515 pi = NULL;
516 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700517 return pi;
518}
519
Craig Tillerb39307d2016-06-30 15:39:13 -0700520static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700521 GPR_ASSERT(pi->fd_cnt == 0);
522
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700523 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700524
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700525 close(pi->epoll_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700526 GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
527 gpr_mu_destroy(&pi->mu);
528 gpr_free(pi->fds);
529 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700530}
531
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700532/* Attempts to gets the last polling island in the linked list (liked by the
533 * 'merged_to' field). Since this does not lock the polling island, there are no
534 * guarantees that the island returned is the last island */
535static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
536 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
537 while (next != NULL) {
538 pi = next;
539 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
540 }
541
542 return pi;
543}
544
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700545/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700546 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700547 returned polling island's mu.
548 Usage: To lock/unlock polling island "pi", do the following:
549 polling_island *pi_latest = polling_island_lock(pi);
550 ...
551 ... critical section ..
552 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700553 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
554static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700555 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700556
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700557 while (true) {
558 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
559 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700560 /* Looks like 'pi' is the last node in the linked list but unless we check
561 this by holding the pi->mu lock, we cannot be sure (i.e without the
562 pi->mu lock, we don't prevent island merges).
563 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700564 gpr_mu_lock(&pi->mu);
565 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
566 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700567 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700568 break;
569 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700570
571 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
572 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700573 gpr_mu_unlock(&pi->mu);
574 }
575
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700576 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700577 }
578
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700579 return pi;
580}
581
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700582/* Gets the lock on the *latest* polling islands in the linked lists pointed by
583 *p and *q (and also updates *p and *q to point to the latest polling islands)
584
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700585 This function is needed because calling the following block of code to obtain
586 locks on polling islands (*p and *q) is prone to deadlocks.
587 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700588 polling_island_lock(*p, true);
589 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700590 }
591
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700592 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700593 polling_island *p1;
594 polling_island *p2;
595 ..
596 polling_island_lock_pair(&p1, &p2);
597 ..
598 .. Critical section with both p1 and p2 locked
599 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700600 // Release locks: Always call polling_island_unlock_pair() to release locks
601 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700602*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700603static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700604 polling_island *pi_1 = *p;
605 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700606 polling_island *next_1 = NULL;
607 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700608
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700609 /* The algorithm is simple:
610 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
611 keep updating pi_1 and pi_2)
612 - Then obtain locks on the islands by following a lock order rule of
613 locking polling_island with lower address first
614 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
615 pointing to the same island. If that is the case, we can just call
616 polling_island_lock()
617 - After obtaining both the locks, double check that the polling islands
618 are still the last polling islands in their respective linked lists
619 (this is because there might have been polling island merges before
620 we got the lock)
621 - If the polling islands are the last islands, we are done. If not,
622 release the locks and continue the process from the first step */
623 while (true) {
624 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
625 while (next_1 != NULL) {
626 pi_1 = next_1;
627 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700628 }
629
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700630 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
631 while (next_2 != NULL) {
632 pi_2 = next_2;
633 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
634 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700635
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700636 if (pi_1 == pi_2) {
637 pi_1 = pi_2 = polling_island_lock(pi_1);
638 break;
639 }
640
641 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700642 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700643 gpr_mu_lock(&pi_2->mu);
644 } else {
645 gpr_mu_lock(&pi_2->mu);
646 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700647 }
648
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700649 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
650 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
651 if (next_1 == NULL && next_2 == NULL) {
652 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700653 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700654
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700655 gpr_mu_unlock(&pi_1->mu);
656 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700657 }
658
659 *p = pi_1;
660 *q = pi_2;
661}
662
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700663static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
664 if (p == q) {
665 gpr_mu_unlock(&p->mu);
666 } else {
667 gpr_mu_unlock(&p->mu);
668 gpr_mu_unlock(&q->mu);
669 }
670}
671
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700672static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700673 polling_island *q,
674 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700675 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700676 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700677
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700678 if (p != q) {
679 /* Make sure that p points to the polling island with fewer fds than q */
680 if (p->fd_cnt > q->fd_cnt) {
681 GPR_SWAP(polling_island *, p, q);
682 }
683
684 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
685 Note that the refcounts on the fds being moved will not change here.
686 This is why the last param in the following two functions is 'false') */
687 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
688 polling_island_remove_all_fds_locked(p, false, error);
689
690 /* Wakeup all the pollers (if any) on p so that they pickup this change */
691 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
692
693 /* Add the 'merged_to' link from p --> q */
694 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
695 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700696 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700697 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700698
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700699 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700700
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700701 /* Return the merged polling island (Note that no merge would have happened
702 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700703 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700704}
705
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700706static grpc_error *polling_island_global_init() {
707 grpc_error *error = GRPC_ERROR_NONE;
708
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700709 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
710 if (error == GRPC_ERROR_NONE) {
711 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
712 }
713
714 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700715}
716
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700717static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700718 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700719}
720
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700721/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700722 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700723 */
724
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700725/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700726 * but instead so that implementations with multiple threads in (for example)
727 * epoll_wait deal with the race between pollset removal and incoming poll
728 * notifications.
729 *
730 * The problem is that the poller ultimately holds a reference to this
731 * object, so it is very difficult to know when is safe to free it, at least
732 * without some expensive synchronization.
733 *
734 * If we keep the object freelisted, in the worst case losing this race just
735 * becomes a spurious read notification on a reused fd.
736 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700737
738/* The alarm system needs to be able to wakeup 'some poller' sometimes
739 * (specifically when a new alarm needs to be triggered earlier than the next
740 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
741 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700742
743/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
744 * sure to wake up one polling thread (which can wake up other threads if
745 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700746grpc_wakeup_fd grpc_global_wakeup_fd;
747
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700748static grpc_fd *fd_freelist = NULL;
749static gpr_mu fd_freelist_mu;
750
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700751#ifdef GRPC_FD_REF_COUNT_DEBUG
752#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
753#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
754static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
755 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700756 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
757 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700758 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
759#else
760#define REF_BY(fd, n, reason) ref_by(fd, n)
761#define UNREF_BY(fd, n, reason) unref_by(fd, n)
762static void ref_by(grpc_fd *fd, int n) {
763#endif
764 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
765}
766
767#ifdef GRPC_FD_REF_COUNT_DEBUG
768static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
769 int line) {
770 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700771 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
772 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700773 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
774#else
775static void unref_by(grpc_fd *fd, int n) {
776 gpr_atm old;
777#endif
778 old = gpr_atm_full_fetch_add(&fd->refst, -n);
779 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700780 /* Add the fd to the freelist */
781 gpr_mu_lock(&fd_freelist_mu);
782 fd->freelist_next = fd_freelist;
783 fd_freelist = fd;
784 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700785
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700786 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700787 } else {
788 GPR_ASSERT(old > n);
789 }
790}
791
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700792/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700793#ifdef GRPC_FD_REF_COUNT_DEBUG
794static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
795 int line) {
796 ref_by(fd, 2, reason, file, line);
797}
798
799static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
800 int line) {
801 unref_by(fd, 2, reason, file, line);
802}
803#else
804static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700805static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
806#endif
807
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700808static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
809
810static void fd_global_shutdown(void) {
811 gpr_mu_lock(&fd_freelist_mu);
812 gpr_mu_unlock(&fd_freelist_mu);
813 while (fd_freelist != NULL) {
814 grpc_fd *fd = fd_freelist;
815 fd_freelist = fd_freelist->freelist_next;
816 gpr_mu_destroy(&fd->mu);
817 gpr_free(fd);
818 }
819 gpr_mu_destroy(&fd_freelist_mu);
820}
821
822static grpc_fd *fd_create(int fd, const char *name) {
823 grpc_fd *new_fd = NULL;
824
825 gpr_mu_lock(&fd_freelist_mu);
826 if (fd_freelist != NULL) {
827 new_fd = fd_freelist;
828 fd_freelist = fd_freelist->freelist_next;
829 }
830 gpr_mu_unlock(&fd_freelist_mu);
831
832 if (new_fd == NULL) {
833 new_fd = gpr_malloc(sizeof(grpc_fd));
834 gpr_mu_init(&new_fd->mu);
835 gpr_mu_init(&new_fd->pi_mu);
836 }
837
838 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
839 newly created fd (or an fd we got from the freelist), no one else would be
840 holding a lock to it anyway. */
841 gpr_mu_lock(&new_fd->mu);
842
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700843 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700844 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700845 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700846 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700847 new_fd->read_closure = CLOSURE_NOT_READY;
848 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700849 new_fd->polling_island = NULL;
850 new_fd->freelist_next = NULL;
851 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700852 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700853
854 gpr_mu_unlock(&new_fd->mu);
855
856 char *fd_name;
857 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
858 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700859#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700860 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700861#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700862 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700863 return new_fd;
864}
865
866static bool fd_is_orphaned(grpc_fd *fd) {
867 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
868}
869
870static int fd_wrapped_fd(grpc_fd *fd) {
871 int ret_fd = -1;
872 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700873 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700874 ret_fd = fd->fd;
875 }
876 gpr_mu_unlock(&fd->mu);
877
878 return ret_fd;
879}
880
881static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
882 grpc_closure *on_done, int *release_fd,
883 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700884 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700885 grpc_error *error = GRPC_ERROR_NONE;
886
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700887 gpr_mu_lock(&fd->mu);
888 fd->on_done_closure = on_done;
889
890 /* If release_fd is not NULL, we should be relinquishing control of the file
891 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700892 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700893 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700894 } else {
895 close(fd->fd);
896 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700897 }
898
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700899 fd->orphaned = true;
900
901 /* Remove the active status but keep referenced. We want this grpc_fd struct
902 to be alive (and not added to freelist) until the end of this function */
903 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700904
905 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700906 - Get a lock on the latest polling island (i.e the last island in the
907 linked list pointed by fd->polling_island). This is the island that
908 would actually contain the fd
909 - Remove the fd from the latest polling island
910 - Unlock the latest polling island
911 - Set fd->polling_island to NULL (but remove the ref on the polling island
912 before doing this.) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700913 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700914 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700915 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700916 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700917 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700918
Craig Tillerb39307d2016-06-30 15:39:13 -0700919 PI_UNREF(exec_ctx, fd->polling_island, "fd_orphan");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700920 fd->polling_island = NULL;
921 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700922 gpr_mu_unlock(&fd->pi_mu);
923
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700924 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700925
926 gpr_mu_unlock(&fd->mu);
927 UNREF_BY(fd, 2, reason); /* Drop the reference */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700928 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700929}
930
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700931static grpc_error *fd_shutdown_error(bool shutdown) {
932 if (!shutdown) {
933 return GRPC_ERROR_NONE;
934 } else {
935 return GRPC_ERROR_CREATE("FD shutdown");
936 }
937}
938
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700939static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
940 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700941 if (fd->shutdown) {
942 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
943 NULL);
944 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700945 /* not ready ==> switch to a waiting state by setting the closure */
946 *st = closure;
947 } else if (*st == CLOSURE_READY) {
948 /* already ready ==> queue the closure to run immediately */
949 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700950 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
951 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700952 } else {
953 /* upcallptr was set to a different closure. This is an error! */
954 gpr_log(GPR_ERROR,
955 "User called a notify_on function with a previous callback still "
956 "pending");
957 abort();
958 }
959}
960
961/* returns 1 if state becomes not ready */
962static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
963 grpc_closure **st) {
964 if (*st == CLOSURE_READY) {
965 /* duplicate ready ==> ignore */
966 return 0;
967 } else if (*st == CLOSURE_NOT_READY) {
968 /* not ready, and not waiting ==> flag ready */
969 *st = CLOSURE_READY;
970 return 0;
971 } else {
972 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700973 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700974 *st = CLOSURE_NOT_READY;
975 return 1;
976 }
977}
978
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700979static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
980 grpc_fd *fd) {
981 grpc_pollset *notifier = NULL;
982
983 gpr_mu_lock(&fd->mu);
984 notifier = fd->read_notifier_pollset;
985 gpr_mu_unlock(&fd->mu);
986
987 return notifier;
988}
989
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -0700990static bool fd_is_shutdown(grpc_fd *fd) {
991 gpr_mu_lock(&fd->mu);
992 const bool r = fd->shutdown;
993 gpr_mu_unlock(&fd->mu);
994 return r;
995}
996
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700997/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700998static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
999 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001000 /* Do the actual shutdown only once */
1001 if (!fd->shutdown) {
1002 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001003
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001004 shutdown(fd->fd, SHUT_RDWR);
1005 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1006 at this point, the closures would be called with 'success = false' */
1007 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1008 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1009 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001010 gpr_mu_unlock(&fd->mu);
1011}
1012
1013static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1014 grpc_closure *closure) {
1015 gpr_mu_lock(&fd->mu);
1016 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1017 gpr_mu_unlock(&fd->mu);
1018}
1019
1020static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1021 grpc_closure *closure) {
1022 gpr_mu_lock(&fd->mu);
1023 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1024 gpr_mu_unlock(&fd->mu);
1025}
1026
Craig Tiller70bd4832016-06-30 14:20:46 -07001027static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
1028
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001029/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001030 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001031 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001032GPR_TLS_DECL(g_current_thread_pollset);
1033GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001034static __thread bool g_initialized_sigmask;
1035static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001036
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001037static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001038#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001039 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001040#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001041}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001042
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001043static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001044
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001045/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001046static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001047 gpr_tls_init(&g_current_thread_pollset);
1048 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001049 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001050 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001051}
1052
1053static void pollset_global_shutdown(void) {
1054 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001055 gpr_tls_destroy(&g_current_thread_pollset);
1056 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001057}
1058
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001059static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1060 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001061
1062 /* Kick the worker only if it was not already kicked */
1063 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1064 GRPC_POLLING_TRACE(
1065 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1066 (void *)worker, worker->pt_id);
1067 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1068 if (err_num != 0) {
1069 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1070 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001071 }
1072 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001073}
1074
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001075/* Return 1 if the pollset has active threads in pollset_work (pollset must
1076 * be locked) */
1077static int pollset_has_workers(grpc_pollset *p) {
1078 return p->root_worker.next != &p->root_worker;
1079}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001080
1081static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1082 worker->prev->next = worker->next;
1083 worker->next->prev = worker->prev;
1084}
1085
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001086static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1087 if (pollset_has_workers(p)) {
1088 grpc_pollset_worker *w = p->root_worker.next;
1089 remove_worker(p, w);
1090 return w;
1091 } else {
1092 return NULL;
1093 }
1094}
1095
1096static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1097 worker->next = &p->root_worker;
1098 worker->prev = worker->next->prev;
1099 worker->prev->next = worker->next->prev = worker;
1100}
1101
1102static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1103 worker->prev = &p->root_worker;
1104 worker->next = worker->prev->next;
1105 worker->prev->next = worker->next->prev = worker;
1106}
1107
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001108/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001109static grpc_error *pollset_kick(grpc_pollset *p,
1110 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001111 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001112 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001113 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001114 grpc_pollset_worker *worker = specific_worker;
1115 if (worker != NULL) {
1116 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001117 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001118 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001119 for (worker = p->root_worker.next; worker != &p->root_worker;
1120 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001121 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001122 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001123 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001124 }
Craig Tillera218a062016-06-26 09:58:37 -07001125 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001126 } else {
1127 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001128 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001129 } else {
1130 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001131 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001132 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001133 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001134 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001135 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1136 /* Since worker == NULL, it means that we can kick "any" worker on this
1137 pollset 'p'. If 'p' happens to be the same pollset this thread is
1138 currently polling (i.e in pollset_work() function), then there is no need
1139 to kick any other worker since the current thread can just absorb the
1140 kick. This is the reason why we enter this case only when
1141 g_current_thread_pollset is != p */
1142
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001143 GPR_TIMER_MARK("kick_anonymous", 0);
1144 worker = pop_front_worker(p);
1145 if (worker != NULL) {
1146 GPR_TIMER_MARK("finally_kick", 0);
1147 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001148 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001149 } else {
1150 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001151 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001152 }
1153 }
1154
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001155 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001156 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1157 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001158}
1159
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001160static grpc_error *kick_poller(void) {
1161 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1162}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001163
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001164static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1165 gpr_mu_init(&pollset->mu);
1166 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001167
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001168 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001169 pollset->kicked_without_pollers = false;
1170
1171 pollset->shutting_down = false;
1172 pollset->finish_shutdown_called = false;
1173 pollset->shutdown_done = NULL;
1174
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001175 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001176}
1177
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001178/* Convert a timespec to milliseconds:
1179 - Very small or negative poll times are clamped to zero to do a non-blocking
1180 poll (which becomes spin polling)
1181 - Other small values are rounded up to one millisecond
1182 - Longer than a millisecond polls are rounded up to the next nearest
1183 millisecond to avoid spinning
1184 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001185static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1186 gpr_timespec now) {
1187 gpr_timespec timeout;
1188 static const int64_t max_spin_polling_us = 10;
1189 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1190 return -1;
1191 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001192
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001193 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1194 max_spin_polling_us,
1195 GPR_TIMESPAN))) <= 0) {
1196 return 0;
1197 }
1198 timeout = gpr_time_sub(deadline, now);
1199 return gpr_time_to_millis(gpr_time_add(
1200 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1201}
1202
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001203static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1204 grpc_pollset *notifier) {
1205 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001206 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001207 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1208 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001209 gpr_mu_unlock(&fd->mu);
1210}
1211
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001212static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001213 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1214 gpr_mu_lock(&fd->mu);
1215 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1216 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001217}
1218
Craig Tillerb39307d2016-06-30 15:39:13 -07001219static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1220 grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001221 if (ps->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001222 PI_UNREF(exec_ctx, ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001223 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001224 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001225}
1226
1227static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1228 grpc_pollset *pollset) {
1229 /* The pollset cannot have any workers if we are at this stage */
1230 GPR_ASSERT(!pollset_has_workers(pollset));
1231
1232 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001233
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001234 /* Release the ref and set pollset->polling_island to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001235 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001236 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001237}
1238
1239/* pollset->mu lock must be held by the caller before calling this */
1240static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1241 grpc_closure *closure) {
1242 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1243 GPR_ASSERT(!pollset->shutting_down);
1244 pollset->shutting_down = true;
1245 pollset->shutdown_done = closure;
1246 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1247
1248 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1249 because it would release the underlying polling island. In such a case, we
1250 let the last worker call finish_shutdown_locked() from pollset_work() */
1251 if (!pollset_has_workers(pollset)) {
1252 GPR_ASSERT(!pollset->finish_shutdown_called);
1253 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1254 finish_shutdown_locked(exec_ctx, pollset);
1255 }
1256 GPR_TIMER_END("pollset_shutdown", 0);
1257}
1258
1259/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1260 * than destroying the mutexes, there is nothing special that needs to be done
1261 * here */
1262static void pollset_destroy(grpc_pollset *pollset) {
1263 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001264 gpr_mu_destroy(&pollset->mu);
1265}
1266
Craig Tillerb39307d2016-06-30 15:39:13 -07001267static void pollset_reset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001268 GPR_ASSERT(pollset->shutting_down);
1269 GPR_ASSERT(!pollset_has_workers(pollset));
1270 pollset->shutting_down = false;
1271 pollset->finish_shutdown_called = false;
1272 pollset->kicked_without_pollers = false;
1273 pollset->shutdown_done = NULL;
Craig Tillerb39307d2016-06-30 15:39:13 -07001274 GPR_ASSERT(pollset->polling_island == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001275}
1276
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001277#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001278/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1279static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001280 grpc_pollset *pollset,
1281 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001282 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001283 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001284 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001285 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001286 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001287 char *err_msg;
1288 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001289 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1290
1291 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001292 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001293
1294 Since epoll_fd is immutable, we can read it without obtaining the polling
1295 island lock. There is however a possibility that the polling island (from
1296 which we got the epoll_fd) got merged with another island while we are
1297 in this function. This is still okay because in such a case, we will wakeup
1298 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001299 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001300
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001301 if (pollset->polling_island == NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001302 pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001303 if (pollset->polling_island == NULL) {
1304 GPR_TIMER_END("pollset_work_and_unlock", 0);
1305 return; /* Fatal error. We cannot continue */
1306 }
1307
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001308 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001309 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1310 (void *)pollset, (void *)pollset->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001311 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001312
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001313 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001314 epoll_fd = pi->epoll_fd;
1315
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001316 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001317 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001318 if (pollset->polling_island != pi) {
1319 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1320 polling island to be deleted */
1321 PI_ADD_REF(pi, "ps");
Craig Tillerb39307d2016-06-30 15:39:13 -07001322 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001323 pollset->polling_island = pi;
1324 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001325
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001326 /* Add an extra ref so that the island does not get destroyed (which means
1327 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1328 epoll_fd */
1329 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001330 gpr_mu_unlock(&pollset->mu);
1331
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001332 do {
1333 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1334 sig_mask);
1335 if (ep_rv < 0) {
1336 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001337 gpr_asprintf(&err_msg,
1338 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1339 epoll_fd, errno, strerror(errno));
1340 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001341 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001342 /* We were interrupted. Save an interation by doing a zero timeout
1343 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001344 GRPC_POLLING_TRACE(
1345 "pollset_work: pollset: %p, worker: %p received kick",
1346 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001347 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001348 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001349 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001350
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001351#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001352 /* See the definition of g_poll_sync for more details */
1353 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001354#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001355
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001356 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001357 void *data_ptr = ep_ev[i].data.ptr;
1358 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001359 append_error(error,
1360 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1361 err_desc);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001362 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001363 GRPC_POLLING_TRACE(
1364 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1365 "%d) got merged",
1366 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001367 /* This means that our polling island is merged with a different
1368 island. We do not have to do anything here since the subsequent call
1369 to the function pollset_work_and_unlock() will pick up the correct
1370 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001371 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001372 grpc_fd *fd = data_ptr;
1373 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1374 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1375 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001376 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001377 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001378 }
1379 if (write_ev || cancel) {
1380 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001381 }
1382 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001383 }
1384 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001385
1386 GPR_ASSERT(pi != NULL);
1387
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001388 /* Before leaving, release the extra ref we added to the polling island. It
1389 is important to use "pi" here (i.e our old copy of pollset->polling_island
1390 that we got before releasing the polling island lock). This is because
1391 pollset->polling_island pointer might get udpated in other parts of the
1392 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001393 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001394
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001395 GPR_TIMER_END("pollset_work_and_unlock", 0);
1396}
1397
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001398/* pollset->mu lock must be held by the caller before calling this.
1399 The function pollset_work() may temporarily release the lock (pollset->mu)
1400 during the course of its execution but it will always re-acquire the lock and
1401 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001402static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1403 grpc_pollset_worker **worker_hdl,
1404 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001405 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001406 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001407 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1408
1409 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001410
1411 grpc_pollset_worker worker;
1412 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001413 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001414 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001415
1416 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001417
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001418 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1419 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001420
1421 if (pollset->kicked_without_pollers) {
1422 /* If the pollset was kicked without pollers, pretend that the current
1423 worker got the kick and skip polling. A kick indicates that there is some
1424 work that needs attention like an event on the completion queue or an
1425 alarm */
1426 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1427 pollset->kicked_without_pollers = 0;
1428 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001429 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001430 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1431 worker that there is some pending work that needs immediate attention
1432 (like an event on the completion queue, or a polling island merge that
1433 results in a new epoll-fd to wait on) and that the worker should not
1434 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001435
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001436 A worker can be kicked anytime from the point it is added to the pollset
1437 via push_front_worker() (or push_back_worker()) to the point it is
1438 removed via remove_worker().
1439 If the worker is kicked before/during it calls epoll_pwait(), it should
1440 immediately exit from epoll_wait(). If the worker is kicked after it
1441 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001442
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001443 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001444 times *except* when it is in epoll_pwait(). This way, the worker never
1445 misses acting on a kick */
1446
Craig Tiller19196992016-06-27 18:45:56 -07001447 if (!g_initialized_sigmask) {
1448 sigemptyset(&new_mask);
1449 sigaddset(&new_mask, grpc_wakeup_signal);
1450 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1451 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1452 g_initialized_sigmask = true;
1453 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1454 This is the mask used at all times *except during
1455 epoll_wait()*"
1456 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001457 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001458
Craig Tiller19196992016-06-27 18:45:56 -07001459 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001460 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001461 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001462
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001463 push_front_worker(pollset, &worker); /* Add worker to pollset */
1464
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001465 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1466 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001467 grpc_exec_ctx_flush(exec_ctx);
1468
1469 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001470
1471 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1472 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001473 remove_worker(pollset, &worker);
1474 }
1475
1476 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1477 false at this point) and the pollset is shutting down, we may have to
1478 finish the shutdown process by calling finish_shutdown_locked().
1479 See pollset_shutdown() for more details.
1480
1481 Note: Continuing to access pollset here is safe; it is the caller's
1482 responsibility to not destroy a pollset when it has outstanding calls to
1483 pollset_work() */
1484 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1485 !pollset->finish_shutdown_called) {
1486 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1487 finish_shutdown_locked(exec_ctx, pollset);
1488
1489 gpr_mu_unlock(&pollset->mu);
1490 grpc_exec_ctx_flush(exec_ctx);
1491 gpr_mu_lock(&pollset->mu);
1492 }
1493
1494 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001495
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001496 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1497 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001498
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001499 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001500
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001501 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1502 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001503}
1504
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001505static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1506 grpc_fd *fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001507 grpc_error *error = GRPC_ERROR_NONE;
1508
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001509 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001510 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001511
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001512 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001513
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001514 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1515 * equal, do nothing.
1516 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1517 * a new polling island (with a refcount of 2) and make the polling_island
1518 * fields in both fd and pollset to point to the new island
1519 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1520 * the NULL polling_island field to point to the non-NULL polling_island
1521 * field (ensure that the refcount on the polling island is incremented by
1522 * 1 to account for the newly added reference)
1523 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1524 * and different, merge both the polling islands and update the
1525 * polling_island fields in both fd and pollset to point to the merged
1526 * polling island.
1527 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001528 if (fd->polling_island == pollset->polling_island) {
1529 pi_new = fd->polling_island;
1530 if (pi_new == NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001531 pi_new = polling_island_create(exec_ctx, fd, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001532
1533 GRPC_POLLING_TRACE(
Sree Kuchibhotla9de42ab2016-06-28 17:41:21 -07001534 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001535 "pollset: %p)",
1536 (void *)pi_new, fd->fd, (void *)pollset);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001537 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001538 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001539 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001540 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001541 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001542
1543 GRPC_POLLING_TRACE(
1544 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1545 "pollset->pi: %p)",
1546 (void *)pi_new, fd->fd, (void *)pollset,
1547 (void *)pollset->polling_island);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001548 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001549 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001550 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001551
1552 GRPC_POLLING_TRACE(
1553 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1554 "%p, fd->pi: %p",
1555 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001556 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001557 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1558 &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001559 GRPC_POLLING_TRACE(
1560 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1561 "%p, fd->pi: %p, pollset->pi: %p)",
1562 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
1563 (void *)pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001564 }
1565
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001566 /* At this point, pi_new is the polling island that both fd->polling_island
1567 and pollset->polling_island must be pointing to */
1568
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001569 if (fd->polling_island != pi_new) {
1570 PI_ADD_REF(pi_new, "fd");
1571 if (fd->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001572 PI_UNREF(exec_ctx, fd->polling_island, "fd");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001573 }
1574 fd->polling_island = pi_new;
1575 }
1576
1577 if (pollset->polling_island != pi_new) {
1578 PI_ADD_REF(pi_new, "ps");
1579 if (pollset->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001580 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001581 }
1582 pollset->polling_island = pi_new;
1583 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001584
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001585 gpr_mu_unlock(&fd->pi_mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001586 gpr_mu_unlock(&pollset->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001587}
1588
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001589/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001590 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001591 */
1592
1593static grpc_pollset_set *pollset_set_create(void) {
1594 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1595 memset(pollset_set, 0, sizeof(*pollset_set));
1596 gpr_mu_init(&pollset_set->mu);
1597 return pollset_set;
1598}
1599
1600static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1601 size_t i;
1602 gpr_mu_destroy(&pollset_set->mu);
1603 for (i = 0; i < pollset_set->fd_count; i++) {
1604 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1605 }
1606 gpr_free(pollset_set->pollsets);
1607 gpr_free(pollset_set->pollset_sets);
1608 gpr_free(pollset_set->fds);
1609 gpr_free(pollset_set);
1610}
1611
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001612static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1613 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1614 size_t i;
1615 gpr_mu_lock(&pollset_set->mu);
1616 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1617 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1618 pollset_set->fds = gpr_realloc(
1619 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1620 }
1621 GRPC_FD_REF(fd, "pollset_set");
1622 pollset_set->fds[pollset_set->fd_count++] = fd;
1623 for (i = 0; i < pollset_set->pollset_count; i++) {
1624 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1625 }
1626 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1627 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1628 }
1629 gpr_mu_unlock(&pollset_set->mu);
1630}
1631
1632static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1633 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1634 size_t i;
1635 gpr_mu_lock(&pollset_set->mu);
1636 for (i = 0; i < pollset_set->fd_count; i++) {
1637 if (pollset_set->fds[i] == fd) {
1638 pollset_set->fd_count--;
1639 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1640 pollset_set->fds[pollset_set->fd_count]);
1641 GRPC_FD_UNREF(fd, "pollset_set");
1642 break;
1643 }
1644 }
1645 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1646 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1647 }
1648 gpr_mu_unlock(&pollset_set->mu);
1649}
1650
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001651static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1652 grpc_pollset_set *pollset_set,
1653 grpc_pollset *pollset) {
1654 size_t i, j;
1655 gpr_mu_lock(&pollset_set->mu);
1656 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1657 pollset_set->pollset_capacity =
1658 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1659 pollset_set->pollsets =
1660 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1661 sizeof(*pollset_set->pollsets));
1662 }
1663 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1664 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1665 if (fd_is_orphaned(pollset_set->fds[i])) {
1666 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1667 } else {
1668 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1669 pollset_set->fds[j++] = pollset_set->fds[i];
1670 }
1671 }
1672 pollset_set->fd_count = j;
1673 gpr_mu_unlock(&pollset_set->mu);
1674}
1675
1676static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1677 grpc_pollset_set *pollset_set,
1678 grpc_pollset *pollset) {
1679 size_t i;
1680 gpr_mu_lock(&pollset_set->mu);
1681 for (i = 0; i < pollset_set->pollset_count; i++) {
1682 if (pollset_set->pollsets[i] == pollset) {
1683 pollset_set->pollset_count--;
1684 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1685 pollset_set->pollsets[pollset_set->pollset_count]);
1686 break;
1687 }
1688 }
1689 gpr_mu_unlock(&pollset_set->mu);
1690}
1691
1692static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1693 grpc_pollset_set *bag,
1694 grpc_pollset_set *item) {
1695 size_t i, j;
1696 gpr_mu_lock(&bag->mu);
1697 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1698 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1699 bag->pollset_sets =
1700 gpr_realloc(bag->pollset_sets,
1701 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1702 }
1703 bag->pollset_sets[bag->pollset_set_count++] = item;
1704 for (i = 0, j = 0; i < bag->fd_count; i++) {
1705 if (fd_is_orphaned(bag->fds[i])) {
1706 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1707 } else {
1708 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1709 bag->fds[j++] = bag->fds[i];
1710 }
1711 }
1712 bag->fd_count = j;
1713 gpr_mu_unlock(&bag->mu);
1714}
1715
1716static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1717 grpc_pollset_set *bag,
1718 grpc_pollset_set *item) {
1719 size_t i;
1720 gpr_mu_lock(&bag->mu);
1721 for (i = 0; i < bag->pollset_set_count; i++) {
1722 if (bag->pollset_sets[i] == item) {
1723 bag->pollset_set_count--;
1724 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1725 bag->pollset_sets[bag->pollset_set_count]);
1726 break;
1727 }
1728 }
1729 gpr_mu_unlock(&bag->mu);
1730}
1731
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001732/* Test helper functions
1733 * */
1734void *grpc_fd_get_polling_island(grpc_fd *fd) {
1735 polling_island *pi;
1736
1737 gpr_mu_lock(&fd->pi_mu);
1738 pi = fd->polling_island;
1739 gpr_mu_unlock(&fd->pi_mu);
1740
1741 return pi;
1742}
1743
1744void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1745 polling_island *pi;
1746
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001747 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001748 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001749 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001750
1751 return pi;
1752}
1753
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001754bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001755 polling_island *p1 = p;
1756 polling_island *p2 = q;
1757
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001758 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1759 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001760 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001761 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001762
1763 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001764}
1765
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001766/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001767 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001768 */
1769
1770static void shutdown_engine(void) {
1771 fd_global_shutdown();
1772 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001773 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001774}
1775
1776static const grpc_event_engine_vtable vtable = {
1777 .pollset_size = sizeof(grpc_pollset),
1778
1779 .fd_create = fd_create,
1780 .fd_wrapped_fd = fd_wrapped_fd,
1781 .fd_orphan = fd_orphan,
1782 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001783 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001784 .fd_notify_on_read = fd_notify_on_read,
1785 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001786 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001787 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001788
1789 .pollset_init = pollset_init,
1790 .pollset_shutdown = pollset_shutdown,
1791 .pollset_reset = pollset_reset,
1792 .pollset_destroy = pollset_destroy,
1793 .pollset_work = pollset_work,
1794 .pollset_kick = pollset_kick,
1795 .pollset_add_fd = pollset_add_fd,
1796
1797 .pollset_set_create = pollset_set_create,
1798 .pollset_set_destroy = pollset_set_destroy,
1799 .pollset_set_add_pollset = pollset_set_add_pollset,
1800 .pollset_set_del_pollset = pollset_set_del_pollset,
1801 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1802 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1803 .pollset_set_add_fd = pollset_set_add_fd,
1804 .pollset_set_del_fd = pollset_set_del_fd,
1805
1806 .kick_poller = kick_poller,
1807
1808 .shutdown_engine = shutdown_engine,
1809};
1810
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001811/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1812 * Create a dummy epoll_fd to make sure epoll support is available */
1813static bool is_epoll_available() {
1814 int fd = epoll_create1(EPOLL_CLOEXEC);
1815 if (fd < 0) {
1816 gpr_log(
1817 GPR_ERROR,
1818 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1819 fd);
1820 return false;
1821 }
1822 close(fd);
1823 return true;
1824}
1825
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001826const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001827 /* If use of signals is disabled, we cannot use epoll engine*/
1828 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1829 return NULL;
1830 }
1831
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001832 if (!is_epoll_available()) {
1833 return NULL;
1834 }
1835
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001836 if (!is_grpc_wakeup_signal_initialized) {
1837 grpc_use_signal(SIGRTMIN + 2);
1838 }
1839
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001840 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001841
1842 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1843 return NULL;
1844 }
1845
1846 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1847 polling_island_global_init())) {
1848 return NULL;
1849 }
1850
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001851 return &vtable;
1852}
1853
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001854#else /* defined(GPR_LINUX_EPOLL) */
1855#if defined(GPR_POSIX_SOCKET)
1856#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001857/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1858 * NULL */
1859const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001860#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001861
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001862void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001863#endif /* !defined(GPR_LINUX_EPOLL) */