blob: 6a63c4d1d1868ece89bc75aafc1ba08ee8cde1de [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
45#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070060#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070061#include "src/core/lib/profiling/timers.h"
62#include "src/core/lib/support/block_annotate.h"
63
Sree Kuchibhotla34217242016-06-29 00:19:07 -070064/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
65static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070066#define GRPC_POLLING_TRACE(fmt, ...) \
67 if (grpc_polling_trace) { \
68 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
69 }
70
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070071static int grpc_wakeup_signal = -1;
72static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070073
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070074/* Implements the function defined in grpc_posix.h. This function might be
75 * called before even calling grpc_init() to set either a different signal to
76 * use. If signum == -1, then the use of signals is disabled */
77void grpc_use_signal(int signum) {
78 grpc_wakeup_signal = signum;
79 is_grpc_wakeup_signal_initialized = true;
80
81 if (grpc_wakeup_signal < 0) {
82 gpr_log(GPR_INFO,
83 "Use of signals is disabled. Epoll engine will not be used");
84 } else {
85 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
86 grpc_wakeup_signal);
87 }
88}
89
90struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070091
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070093 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070094 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095struct grpc_fd {
96 int fd;
97 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070098 bit 0 : 1=Active / 0=Orphaned
99 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700100 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700101 gpr_atm refst;
102
103 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700104
105 /* Indicates that the fd is shutdown and that any pending read/write closures
106 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700107 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700108
109 /* The fd is either closed or we relinquished control of it. In either cases,
110 this indicates that the 'fd' on this structure is no longer valid */
111 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700112
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700113 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *read_closure;
115 grpc_closure *write_closure;
116
Craig Tillerf83f8ca2016-07-06 11:34:08 -0700117 /* The polling island to which this fd belongs to (protected by mu) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700118 struct polling_island *polling_island;
119
120 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700121 grpc_closure *on_done_closure;
122
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700123 /* The pollset that last noticed that the fd is readable */
124 grpc_pollset *read_notifier_pollset;
125
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700126 grpc_iomgr_object iomgr_object;
127};
128
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700129/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700130// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700131#ifdef GRPC_FD_REF_COUNT_DEBUG
132static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
133static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
134 int line);
135#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
136#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
137#else
138static void fd_ref(grpc_fd *fd);
139static void fd_unref(grpc_fd *fd);
140#define GRPC_FD_REF(fd, reason) fd_ref(fd)
141#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
142#endif
143
144static void fd_global_init(void);
145static void fd_global_shutdown(void);
146
147#define CLOSURE_NOT_READY ((grpc_closure *)0)
148#define CLOSURE_READY ((grpc_closure *)1)
149
150/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700151 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700152 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700153
Craig Tiller15007612016-07-06 09:36:16 -0700154//#define GRPC_PI_REF_COUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700155#ifdef GRPC_PI_REF_COUNT_DEBUG
156
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700157#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700158#define PI_UNREF(exec_ctx, p, r) \
159 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700160
161#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
162
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700163#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700164#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700165
166#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
167
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700168typedef struct polling_island {
169 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700170 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
171 the refcount.
172 Once the ref count becomes zero, this structure is destroyed which means
173 we should ensure that there is never a scenario where a PI_ADD_REF() is
174 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700175 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700176
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700177 /* Pointer to the polling_island this merged into.
178 * merged_to value is only set once in polling_island's lifetime (and that too
179 * only if the island is merged with another island). Because of this, we can
180 * use gpr_atm type here so that we can do atomic access on this and reduce
181 * lock contention on 'mu' mutex.
182 *
183 * Note that if this field is not NULL (i.e not 0), all the remaining fields
184 * (except mu and ref_count) are invalid and must be ignored. */
185 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700186
Craig Tillerb39307d2016-06-30 15:39:13 -0700187 /* The workqueue associated with this polling island */
188 grpc_workqueue *workqueue;
189
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700190 /* The fd of the underlying epoll set */
191 int epoll_fd;
192
193 /* The file descriptors in the epoll set */
194 size_t fd_cnt;
195 size_t fd_capacity;
196 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700197} polling_island;
198
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700199/*******************************************************************************
200 * Pollset Declarations
201 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700202struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700203 /* Thread id of this worker */
204 pthread_t pt_id;
205
206 /* Used to prevent a worker from getting kicked multiple times */
207 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700208 struct grpc_pollset_worker *next;
209 struct grpc_pollset_worker *prev;
210};
211
212struct grpc_pollset {
213 gpr_mu mu;
214 grpc_pollset_worker root_worker;
215 bool kicked_without_pollers;
216
217 bool shutting_down; /* Is the pollset shutting down ? */
218 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
219 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
220
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700221 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700222 struct polling_island *polling_island;
223};
224
225/*******************************************************************************
226 * Pollset-set Declarations
227 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700228/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
229 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
230 * the current pollset_set would result in polling island merges. This would
231 * remove the need to maintain fd_count here. This will also significantly
232 * simplify the grpc_fd structure since we would no longer need to explicitly
233 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700234struct grpc_pollset_set {
235 gpr_mu mu;
236
237 size_t pollset_count;
238 size_t pollset_capacity;
239 grpc_pollset **pollsets;
240
241 size_t pollset_set_count;
242 size_t pollset_set_capacity;
243 struct grpc_pollset_set **pollset_sets;
244
245 size_t fd_count;
246 size_t fd_capacity;
247 grpc_fd **fds;
248};
249
250/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700251 * Common helpers
252 */
253
Craig Tillerf975f742016-07-01 14:56:27 -0700254static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700255 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700256 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700257 if (*composite == GRPC_ERROR_NONE) {
258 *composite = GRPC_ERROR_CREATE(desc);
259 }
260 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700261 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700262}
263
264/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700265 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700266 */
267
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700268/* The wakeup fd that is used to wake up all threads in a Polling island. This
269 is useful in the polling island merge operation where we need to wakeup all
270 the threads currently polling the smaller polling island (so that they can
271 start polling the new/merged polling island)
272
273 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
274 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
275static grpc_wakeup_fd polling_island_wakeup_fd;
276
Craig Tillerb39307d2016-06-30 15:39:13 -0700277/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700278static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700279
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700280#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700281/* Currently TSAN may incorrectly flag data races between epoll_ctl and
282 epoll_wait for any grpc_fd structs that are added to the epoll set via
283 epoll_ctl and are returned (within a very short window) via epoll_wait().
284
285 To work-around this race, we establish a happens-before relation between
286 the code just-before epoll_ctl() and the code after epoll_wait() by using
287 this atomic */
288gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700289#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700290
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700291#ifdef GRPC_PI_REF_COUNT_DEBUG
Craig Tillerb39307d2016-06-30 15:39:13 -0700292static void pi_add_ref(polling_island *pi);
293static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700294
Craig Tillerb39307d2016-06-30 15:39:13 -0700295static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
296 int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700297 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700298 pi_add_ref(pi);
299 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
300 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700301}
302
Craig Tillerb39307d2016-06-30 15:39:13 -0700303static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
304 char *reason, char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700305 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700306 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700307 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700308 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700309}
310#endif
311
Craig Tiller15007612016-07-06 09:36:16 -0700312static void pi_add_ref(polling_island *pi) {
313 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
314}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700315
Craig Tillerb39307d2016-06-30 15:39:13 -0700316static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tiller15007612016-07-06 09:36:16 -0700317 /* If ref count went to one, we're back to just the workqueue owning a ref.
318 Unref the workqueue to break the loop.
319
320 If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700321 Note that this deletion not be done under a lock. Once the ref count goes
322 to zero, we are guaranteed that no one else holds a reference to the
323 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700324
325 Also, if we are deleting the polling island and the merged_to field is
326 non-empty, we should remove a ref to the merged_to polling island
327 */
Craig Tiller15007612016-07-06 09:36:16 -0700328 switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
329 case 2: /* last external ref: the only one now owned is by the workqueue */
330 GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
331 break;
332 case 1: {
333 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
334 polling_island_delete(exec_ctx, pi);
335 if (next != NULL) {
336 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
337 }
338 break;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700339 }
Craig Tiller15007612016-07-06 09:36:16 -0700340 case 0:
341 GPR_UNREACHABLE_CODE(return );
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700342 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700343}
344
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700345/* The caller is expected to hold pi->mu lock before calling this function */
346static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700347 size_t fd_count, bool add_fd_refs,
348 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700349 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700350 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700351 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700352 char *err_msg;
353 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700354
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700355#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700356 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700357 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700358#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700359
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700360 for (i = 0; i < fd_count; i++) {
361 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
362 ev.data.ptr = fds[i];
363 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700364
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700365 if (err < 0) {
366 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700367 gpr_asprintf(
368 &err_msg,
369 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
370 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
371 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
372 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700373 }
374
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700375 continue;
376 }
377
378 if (pi->fd_cnt == pi->fd_capacity) {
379 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
380 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
381 }
382
383 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700384 if (add_fd_refs) {
385 GRPC_FD_REF(fds[i], "polling_island");
386 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700387 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700388}
389
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700390/* The caller is expected to hold pi->mu before calling this */
391static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700392 grpc_wakeup_fd *wakeup_fd,
393 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700394 struct epoll_event ev;
395 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700396 char *err_msg;
397 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700398
399 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
400 ev.data.ptr = wakeup_fd;
401 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
402 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700403 if (err < 0 && errno != EEXIST) {
404 gpr_asprintf(&err_msg,
405 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
406 "error: %d (%s)",
407 pi->epoll_fd,
408 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
409 strerror(errno));
410 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
411 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700412 }
413}
414
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700415/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700416static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700417 bool remove_fd_refs,
418 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700419 int err;
420 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700421 char *err_msg;
422 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700423
424 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700425 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700426 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700427 gpr_asprintf(&err_msg,
428 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
429 "error: %d (%s)",
430 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
431 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
432 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700433 }
434
435 if (remove_fd_refs) {
436 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700437 }
438 }
439
440 pi->fd_cnt = 0;
441}
442
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700443/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700444static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700445 bool is_fd_closed,
446 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700447 int err;
448 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700449 char *err_msg;
450 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700451
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700452 /* If fd is already closed, then it would have been automatically been removed
453 from the epoll set */
454 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700455 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
456 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700457 gpr_asprintf(
458 &err_msg,
459 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
460 pi->epoll_fd, fd->fd, errno, strerror(errno));
461 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
462 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700463 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700464 }
465
466 for (i = 0; i < pi->fd_cnt; i++) {
467 if (pi->fds[i] == fd) {
468 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700469 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700470 break;
471 }
472 }
473}
474
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700475/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700476static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
477 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700478 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700479 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700480 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700481
Craig Tillerb39307d2016-06-30 15:39:13 -0700482 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700483
Craig Tillerb39307d2016-06-30 15:39:13 -0700484 pi = gpr_malloc(sizeof(*pi));
485 gpr_mu_init(&pi->mu);
486 pi->fd_cnt = 0;
487 pi->fd_capacity = 0;
488 pi->fds = NULL;
489 pi->epoll_fd = -1;
490 pi->workqueue = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700491
Craig Tiller15007612016-07-06 09:36:16 -0700492 gpr_atm_rel_store(&pi->ref_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700493 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700494
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700495 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700496
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700497 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700498 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
499 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700500 }
501
Craig Tillerb39307d2016-06-30 15:39:13 -0700502 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
503
504 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700505 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700506 }
507
Craig Tillerf975f742016-07-01 14:56:27 -0700508 if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
509 err_desc) &&
510 *error == GRPC_ERROR_NONE) {
511 polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
512 error);
513 GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
514 pi->workqueue->wakeup_read_fd->polling_island = pi;
Craig Tiller15007612016-07-06 09:36:16 -0700515 PI_ADD_REF(pi, "fd");
Craig Tillerf975f742016-07-01 14:56:27 -0700516 }
Craig Tillerb39307d2016-06-30 15:39:13 -0700517
518done:
519 if (*error != GRPC_ERROR_NONE) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700520 if (pi->workqueue != NULL) {
521 GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
522 }
Craig Tiller0a06cd72016-07-14 13:21:24 -0700523 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700524 pi = NULL;
525 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700526 return pi;
527}
528
Craig Tillerb39307d2016-06-30 15:39:13 -0700529static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700530 GPR_ASSERT(pi->fd_cnt == 0);
531
Craig Tiller0a06cd72016-07-14 13:21:24 -0700532 if (pi->epoll_fd >= 0) {
533 close(pi->epoll_fd);
534 }
Craig Tillerb39307d2016-06-30 15:39:13 -0700535 gpr_mu_destroy(&pi->mu);
536 gpr_free(pi->fds);
537 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700538}
539
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700540/* Attempts to gets the last polling island in the linked list (liked by the
541 * 'merged_to' field). Since this does not lock the polling island, there are no
542 * guarantees that the island returned is the last island */
543static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
544 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
545 while (next != NULL) {
546 pi = next;
547 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
548 }
549
550 return pi;
551}
552
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700553/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700554 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700555 returned polling island's mu.
556 Usage: To lock/unlock polling island "pi", do the following:
557 polling_island *pi_latest = polling_island_lock(pi);
558 ...
559 ... critical section ..
560 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700561 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
562static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700563 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700564
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700565 while (true) {
566 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
567 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700568 /* Looks like 'pi' is the last node in the linked list but unless we check
569 this by holding the pi->mu lock, we cannot be sure (i.e without the
570 pi->mu lock, we don't prevent island merges).
571 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700572 gpr_mu_lock(&pi->mu);
573 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
574 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700575 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700576 break;
577 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700578
579 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
580 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700581 gpr_mu_unlock(&pi->mu);
582 }
583
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700584 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700585 }
586
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700587 return pi;
588}
589
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700590/* Gets the lock on the *latest* polling islands in the linked lists pointed by
591 *p and *q (and also updates *p and *q to point to the latest polling islands)
592
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700593 This function is needed because calling the following block of code to obtain
594 locks on polling islands (*p and *q) is prone to deadlocks.
595 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700596 polling_island_lock(*p, true);
597 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700598 }
599
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700600 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700601 polling_island *p1;
602 polling_island *p2;
603 ..
604 polling_island_lock_pair(&p1, &p2);
605 ..
606 .. Critical section with both p1 and p2 locked
607 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700608 // Release locks: Always call polling_island_unlock_pair() to release locks
609 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700610*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700611static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700612 polling_island *pi_1 = *p;
613 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700614 polling_island *next_1 = NULL;
615 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700616
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700617 /* The algorithm is simple:
618 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
619 keep updating pi_1 and pi_2)
620 - Then obtain locks on the islands by following a lock order rule of
621 locking polling_island with lower address first
622 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
623 pointing to the same island. If that is the case, we can just call
624 polling_island_lock()
625 - After obtaining both the locks, double check that the polling islands
626 are still the last polling islands in their respective linked lists
627 (this is because there might have been polling island merges before
628 we got the lock)
629 - If the polling islands are the last islands, we are done. If not,
630 release the locks and continue the process from the first step */
631 while (true) {
632 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
633 while (next_1 != NULL) {
634 pi_1 = next_1;
635 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700636 }
637
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700638 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
639 while (next_2 != NULL) {
640 pi_2 = next_2;
641 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
642 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700643
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700644 if (pi_1 == pi_2) {
645 pi_1 = pi_2 = polling_island_lock(pi_1);
646 break;
647 }
648
649 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700650 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700651 gpr_mu_lock(&pi_2->mu);
652 } else {
653 gpr_mu_lock(&pi_2->mu);
654 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700655 }
656
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700657 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
658 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
659 if (next_1 == NULL && next_2 == NULL) {
660 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700661 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700662
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700663 gpr_mu_unlock(&pi_1->mu);
664 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700665 }
666
667 *p = pi_1;
668 *q = pi_2;
669}
670
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700671static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
672 if (p == q) {
673 gpr_mu_unlock(&p->mu);
674 } else {
675 gpr_mu_unlock(&p->mu);
676 gpr_mu_unlock(&q->mu);
677 }
678}
679
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700680static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700681 polling_island *q,
682 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700683 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700684 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700685
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700686 if (p != q) {
687 /* Make sure that p points to the polling island with fewer fds than q */
688 if (p->fd_cnt > q->fd_cnt) {
689 GPR_SWAP(polling_island *, p, q);
690 }
691
692 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
693 Note that the refcounts on the fds being moved will not change here.
694 This is why the last param in the following two functions is 'false') */
695 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
696 polling_island_remove_all_fds_locked(p, false, error);
697
698 /* Wakeup all the pollers (if any) on p so that they pickup this change */
699 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
700
701 /* Add the 'merged_to' link from p --> q */
702 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
703 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700704 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700705 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700706
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700707 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700708
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700709 /* Return the merged polling island (Note that no merge would have happened
710 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700711 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700712}
713
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700714static grpc_error *polling_island_global_init() {
715 grpc_error *error = GRPC_ERROR_NONE;
716
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700717 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
718 if (error == GRPC_ERROR_NONE) {
719 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
720 }
721
722 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700723}
724
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700725static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700726 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700727}
728
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700729/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700730 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700731 */
732
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700733/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700734 * but instead so that implementations with multiple threads in (for example)
735 * epoll_wait deal with the race between pollset removal and incoming poll
736 * notifications.
737 *
738 * The problem is that the poller ultimately holds a reference to this
739 * object, so it is very difficult to know when is safe to free it, at least
740 * without some expensive synchronization.
741 *
742 * If we keep the object freelisted, in the worst case losing this race just
743 * becomes a spurious read notification on a reused fd.
744 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700745
746/* The alarm system needs to be able to wakeup 'some poller' sometimes
747 * (specifically when a new alarm needs to be triggered earlier than the next
748 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
749 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700750
751/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
752 * sure to wake up one polling thread (which can wake up other threads if
753 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700754grpc_wakeup_fd grpc_global_wakeup_fd;
755
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700756static grpc_fd *fd_freelist = NULL;
757static gpr_mu fd_freelist_mu;
758
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700759#ifdef GRPC_FD_REF_COUNT_DEBUG
760#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
761#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
762static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
763 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700764 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
765 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700766 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
767#else
768#define REF_BY(fd, n, reason) ref_by(fd, n)
769#define UNREF_BY(fd, n, reason) unref_by(fd, n)
770static void ref_by(grpc_fd *fd, int n) {
771#endif
772 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
773}
774
775#ifdef GRPC_FD_REF_COUNT_DEBUG
776static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
777 int line) {
778 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700779 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
780 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700781 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
782#else
783static void unref_by(grpc_fd *fd, int n) {
784 gpr_atm old;
785#endif
786 old = gpr_atm_full_fetch_add(&fd->refst, -n);
787 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700788 /* Add the fd to the freelist */
789 gpr_mu_lock(&fd_freelist_mu);
790 fd->freelist_next = fd_freelist;
791 fd_freelist = fd;
792 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700793
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700794 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700795 } else {
796 GPR_ASSERT(old > n);
797 }
798}
799
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700800/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700801#ifdef GRPC_FD_REF_COUNT_DEBUG
802static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
803 int line) {
804 ref_by(fd, 2, reason, file, line);
805}
806
807static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
808 int line) {
809 unref_by(fd, 2, reason, file, line);
810}
811#else
812static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700813static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
814#endif
815
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700816static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
817
818static void fd_global_shutdown(void) {
819 gpr_mu_lock(&fd_freelist_mu);
820 gpr_mu_unlock(&fd_freelist_mu);
821 while (fd_freelist != NULL) {
822 grpc_fd *fd = fd_freelist;
823 fd_freelist = fd_freelist->freelist_next;
824 gpr_mu_destroy(&fd->mu);
825 gpr_free(fd);
826 }
827 gpr_mu_destroy(&fd_freelist_mu);
828}
829
830static grpc_fd *fd_create(int fd, const char *name) {
831 grpc_fd *new_fd = NULL;
832
833 gpr_mu_lock(&fd_freelist_mu);
834 if (fd_freelist != NULL) {
835 new_fd = fd_freelist;
836 fd_freelist = fd_freelist->freelist_next;
837 }
838 gpr_mu_unlock(&fd_freelist_mu);
839
840 if (new_fd == NULL) {
841 new_fd = gpr_malloc(sizeof(grpc_fd));
842 gpr_mu_init(&new_fd->mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700843 }
844
845 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
846 newly created fd (or an fd we got from the freelist), no one else would be
847 holding a lock to it anyway. */
848 gpr_mu_lock(&new_fd->mu);
849
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700850 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700851 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700852 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700853 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700854 new_fd->read_closure = CLOSURE_NOT_READY;
855 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700856 new_fd->polling_island = NULL;
857 new_fd->freelist_next = NULL;
858 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700859 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700860
861 gpr_mu_unlock(&new_fd->mu);
862
863 char *fd_name;
864 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
865 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700866#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700867 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700868#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700869 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700870 return new_fd;
871}
872
873static bool fd_is_orphaned(grpc_fd *fd) {
874 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
875}
876
877static int fd_wrapped_fd(grpc_fd *fd) {
878 int ret_fd = -1;
879 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700880 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700881 ret_fd = fd->fd;
882 }
883 gpr_mu_unlock(&fd->mu);
884
885 return ret_fd;
886}
887
888static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
889 grpc_closure *on_done, int *release_fd,
890 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700891 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700892 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -0700893 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700894
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700895 gpr_mu_lock(&fd->mu);
896 fd->on_done_closure = on_done;
897
898 /* If release_fd is not NULL, we should be relinquishing control of the file
899 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700900 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700901 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700902 } else {
903 close(fd->fd);
904 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700905 }
906
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700907 fd->orphaned = true;
908
909 /* Remove the active status but keep referenced. We want this grpc_fd struct
910 to be alive (and not added to freelist) until the end of this function */
911 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700912
913 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700914 - Get a lock on the latest polling island (i.e the last island in the
915 linked list pointed by fd->polling_island). This is the island that
916 would actually contain the fd
917 - Remove the fd from the latest polling island
918 - Unlock the latest polling island
919 - Set fd->polling_island to NULL (but remove the ref on the polling island
920 before doing this.) */
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700921 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700922 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700923 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700924 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700925
Craig Tiller15007612016-07-06 09:36:16 -0700926 unref_pi = fd->polling_island;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700927 fd->polling_island = NULL;
928 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700929
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700930 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700931
932 gpr_mu_unlock(&fd->mu);
933 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -0700934 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700935 /* Unref stale polling island here, outside the fd lock above.
936 The polling island owns a workqueue which owns an fd, and unreffing
937 inside the lock can cause an eventual lock loop that makes TSAN very
938 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -0700939 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
940 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700941 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700942}
943
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700944static grpc_error *fd_shutdown_error(bool shutdown) {
945 if (!shutdown) {
946 return GRPC_ERROR_NONE;
947 } else {
948 return GRPC_ERROR_CREATE("FD shutdown");
949 }
950}
951
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700952static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
953 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700954 if (fd->shutdown) {
955 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
956 NULL);
957 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700958 /* not ready ==> switch to a waiting state by setting the closure */
959 *st = closure;
960 } else if (*st == CLOSURE_READY) {
961 /* already ready ==> queue the closure to run immediately */
962 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700963 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
964 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700965 } else {
966 /* upcallptr was set to a different closure. This is an error! */
967 gpr_log(GPR_ERROR,
968 "User called a notify_on function with a previous callback still "
969 "pending");
970 abort();
971 }
972}
973
974/* returns 1 if state becomes not ready */
975static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
976 grpc_closure **st) {
977 if (*st == CLOSURE_READY) {
978 /* duplicate ready ==> ignore */
979 return 0;
980 } else if (*st == CLOSURE_NOT_READY) {
981 /* not ready, and not waiting ==> flag ready */
982 *st = CLOSURE_READY;
983 return 0;
984 } else {
985 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700986 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700987 *st = CLOSURE_NOT_READY;
988 return 1;
989 }
990}
991
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700992static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
993 grpc_fd *fd) {
994 grpc_pollset *notifier = NULL;
995
996 gpr_mu_lock(&fd->mu);
997 notifier = fd->read_notifier_pollset;
998 gpr_mu_unlock(&fd->mu);
999
1000 return notifier;
1001}
1002
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001003static bool fd_is_shutdown(grpc_fd *fd) {
1004 gpr_mu_lock(&fd->mu);
1005 const bool r = fd->shutdown;
1006 gpr_mu_unlock(&fd->mu);
1007 return r;
1008}
1009
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001010/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001011static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1012 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001013 /* Do the actual shutdown only once */
1014 if (!fd->shutdown) {
1015 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001016
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001017 shutdown(fd->fd, SHUT_RDWR);
1018 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1019 at this point, the closures would be called with 'success = false' */
1020 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1021 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1022 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001023 gpr_mu_unlock(&fd->mu);
1024}
1025
1026static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1027 grpc_closure *closure) {
1028 gpr_mu_lock(&fd->mu);
1029 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1030 gpr_mu_unlock(&fd->mu);
1031}
1032
1033static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1034 grpc_closure *closure) {
1035 gpr_mu_lock(&fd->mu);
1036 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1037 gpr_mu_unlock(&fd->mu);
1038}
1039
Craig Tillerd6ba6192016-06-30 15:42:41 -07001040static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001041 gpr_mu_lock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001042 grpc_workqueue *workqueue = NULL;
1043 if (fd->polling_island != NULL) {
1044 workqueue =
1045 GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
1046 }
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001047 gpr_mu_unlock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001048 return workqueue;
1049}
Craig Tiller70bd4832016-06-30 14:20:46 -07001050
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001051/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001052 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001053 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001054GPR_TLS_DECL(g_current_thread_pollset);
1055GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001056static __thread bool g_initialized_sigmask;
1057static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001058
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001059static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001060#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001061 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001062#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001063}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001064
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001065static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001066
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001067/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001068static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001069 gpr_tls_init(&g_current_thread_pollset);
1070 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001071 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001072 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001073}
1074
1075static void pollset_global_shutdown(void) {
1076 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001077 gpr_tls_destroy(&g_current_thread_pollset);
1078 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001079}
1080
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001081static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1082 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001083
1084 /* Kick the worker only if it was not already kicked */
1085 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1086 GRPC_POLLING_TRACE(
1087 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1088 (void *)worker, worker->pt_id);
1089 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1090 if (err_num != 0) {
1091 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1092 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001093 }
1094 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001095}
1096
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001097/* Return 1 if the pollset has active threads in pollset_work (pollset must
1098 * be locked) */
1099static int pollset_has_workers(grpc_pollset *p) {
1100 return p->root_worker.next != &p->root_worker;
1101}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001102
1103static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1104 worker->prev->next = worker->next;
1105 worker->next->prev = worker->prev;
1106}
1107
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001108static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1109 if (pollset_has_workers(p)) {
1110 grpc_pollset_worker *w = p->root_worker.next;
1111 remove_worker(p, w);
1112 return w;
1113 } else {
1114 return NULL;
1115 }
1116}
1117
1118static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1119 worker->next = &p->root_worker;
1120 worker->prev = worker->next->prev;
1121 worker->prev->next = worker->next->prev = worker;
1122}
1123
1124static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1125 worker->prev = &p->root_worker;
1126 worker->next = worker->prev->next;
1127 worker->prev->next = worker->next->prev = worker;
1128}
1129
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001130/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001131static grpc_error *pollset_kick(grpc_pollset *p,
1132 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001133 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001134 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001135 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001136 grpc_pollset_worker *worker = specific_worker;
1137 if (worker != NULL) {
1138 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001139 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001140 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001141 for (worker = p->root_worker.next; worker != &p->root_worker;
1142 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001143 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001144 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001145 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001146 }
Craig Tillera218a062016-06-26 09:58:37 -07001147 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001148 } else {
1149 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001150 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001151 } else {
1152 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001153 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001154 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001155 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001156 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001157 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1158 /* Since worker == NULL, it means that we can kick "any" worker on this
1159 pollset 'p'. If 'p' happens to be the same pollset this thread is
1160 currently polling (i.e in pollset_work() function), then there is no need
1161 to kick any other worker since the current thread can just absorb the
1162 kick. This is the reason why we enter this case only when
1163 g_current_thread_pollset is != p */
1164
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001165 GPR_TIMER_MARK("kick_anonymous", 0);
1166 worker = pop_front_worker(p);
1167 if (worker != NULL) {
1168 GPR_TIMER_MARK("finally_kick", 0);
1169 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001170 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001171 } else {
1172 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001173 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001174 }
1175 }
1176
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001177 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001178 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1179 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001180}
1181
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001182static grpc_error *kick_poller(void) {
1183 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1184}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001185
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001186static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1187 gpr_mu_init(&pollset->mu);
1188 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001189
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001190 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001191 pollset->kicked_without_pollers = false;
1192
1193 pollset->shutting_down = false;
1194 pollset->finish_shutdown_called = false;
1195 pollset->shutdown_done = NULL;
1196
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001197 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001198}
1199
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001200/* Convert a timespec to milliseconds:
1201 - Very small or negative poll times are clamped to zero to do a non-blocking
1202 poll (which becomes spin polling)
1203 - Other small values are rounded up to one millisecond
1204 - Longer than a millisecond polls are rounded up to the next nearest
1205 millisecond to avoid spinning
1206 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001207static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1208 gpr_timespec now) {
1209 gpr_timespec timeout;
1210 static const int64_t max_spin_polling_us = 10;
1211 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1212 return -1;
1213 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001214
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001215 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1216 max_spin_polling_us,
1217 GPR_TIMESPAN))) <= 0) {
1218 return 0;
1219 }
1220 timeout = gpr_time_sub(deadline, now);
1221 return gpr_time_to_millis(gpr_time_add(
1222 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1223}
1224
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001225static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1226 grpc_pollset *notifier) {
1227 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001228 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001229 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1230 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001231 gpr_mu_unlock(&fd->mu);
1232}
1233
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001234static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001235 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1236 gpr_mu_lock(&fd->mu);
1237 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1238 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001239}
1240
Craig Tillerb39307d2016-06-30 15:39:13 -07001241static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1242 grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001243 if (ps->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001244 PI_UNREF(exec_ctx, ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001245 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001246 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001247}
1248
1249static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1250 grpc_pollset *pollset) {
1251 /* The pollset cannot have any workers if we are at this stage */
1252 GPR_ASSERT(!pollset_has_workers(pollset));
1253
1254 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001255
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001256 /* Release the ref and set pollset->polling_island to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001257 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001258 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001259}
1260
1261/* pollset->mu lock must be held by the caller before calling this */
1262static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1263 grpc_closure *closure) {
1264 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1265 GPR_ASSERT(!pollset->shutting_down);
1266 pollset->shutting_down = true;
1267 pollset->shutdown_done = closure;
1268 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1269
1270 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1271 because it would release the underlying polling island. In such a case, we
1272 let the last worker call finish_shutdown_locked() from pollset_work() */
1273 if (!pollset_has_workers(pollset)) {
1274 GPR_ASSERT(!pollset->finish_shutdown_called);
1275 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1276 finish_shutdown_locked(exec_ctx, pollset);
1277 }
1278 GPR_TIMER_END("pollset_shutdown", 0);
1279}
1280
1281/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1282 * than destroying the mutexes, there is nothing special that needs to be done
1283 * here */
1284static void pollset_destroy(grpc_pollset *pollset) {
1285 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001286 gpr_mu_destroy(&pollset->mu);
1287}
1288
Craig Tiller2b49ea92016-07-01 13:21:27 -07001289static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001290 GPR_ASSERT(pollset->shutting_down);
1291 GPR_ASSERT(!pollset_has_workers(pollset));
1292 pollset->shutting_down = false;
1293 pollset->finish_shutdown_called = false;
1294 pollset->kicked_without_pollers = false;
1295 pollset->shutdown_done = NULL;
Craig Tillerb39307d2016-06-30 15:39:13 -07001296 GPR_ASSERT(pollset->polling_island == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001297}
1298
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001299#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001300/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1301static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001302 grpc_pollset *pollset,
1303 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001304 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001305 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001306 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001307 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001308 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001309 char *err_msg;
1310 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001311 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1312
1313 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001314 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001315
1316 Since epoll_fd is immutable, we can read it without obtaining the polling
1317 island lock. There is however a possibility that the polling island (from
1318 which we got the epoll_fd) got merged with another island while we are
1319 in this function. This is still okay because in such a case, we will wakeup
1320 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001321 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001322
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001323 if (pollset->polling_island == NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001324 pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001325 if (pollset->polling_island == NULL) {
1326 GPR_TIMER_END("pollset_work_and_unlock", 0);
1327 return; /* Fatal error. We cannot continue */
1328 }
1329
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001330 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001331 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1332 (void *)pollset, (void *)pollset->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001333 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001334
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001335 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001336 epoll_fd = pi->epoll_fd;
1337
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001338 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001339 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001340 if (pollset->polling_island != pi) {
1341 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1342 polling island to be deleted */
1343 PI_ADD_REF(pi, "ps");
Craig Tillerb39307d2016-06-30 15:39:13 -07001344 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001345 pollset->polling_island = pi;
1346 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001347
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001348 /* Add an extra ref so that the island does not get destroyed (which means
1349 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1350 epoll_fd */
1351 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001352 gpr_mu_unlock(&pollset->mu);
1353
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001354 do {
1355 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1356 sig_mask);
1357 if (ep_rv < 0) {
1358 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001359 gpr_asprintf(&err_msg,
1360 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1361 epoll_fd, errno, strerror(errno));
1362 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001363 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001364 /* We were interrupted. Save an interation by doing a zero timeout
1365 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001366 GRPC_POLLING_TRACE(
1367 "pollset_work: pollset: %p, worker: %p received kick",
1368 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001369 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001370 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001371 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001372
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001373#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001374 /* See the definition of g_poll_sync for more details */
1375 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001376#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001377
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001378 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001379 void *data_ptr = ep_ev[i].data.ptr;
1380 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001381 append_error(error,
1382 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1383 err_desc);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001384 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001385 GRPC_POLLING_TRACE(
1386 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1387 "%d) got merged",
1388 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001389 /* This means that our polling island is merged with a different
1390 island. We do not have to do anything here since the subsequent call
1391 to the function pollset_work_and_unlock() will pick up the correct
1392 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001393 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001394 grpc_fd *fd = data_ptr;
1395 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1396 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1397 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001398 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001399 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001400 }
1401 if (write_ev || cancel) {
1402 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001403 }
1404 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001405 }
1406 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001407
1408 GPR_ASSERT(pi != NULL);
1409
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001410 /* Before leaving, release the extra ref we added to the polling island. It
1411 is important to use "pi" here (i.e our old copy of pollset->polling_island
1412 that we got before releasing the polling island lock). This is because
1413 pollset->polling_island pointer might get udpated in other parts of the
1414 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001415 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001416
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001417 GPR_TIMER_END("pollset_work_and_unlock", 0);
1418}
1419
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001420/* pollset->mu lock must be held by the caller before calling this.
1421 The function pollset_work() may temporarily release the lock (pollset->mu)
1422 during the course of its execution but it will always re-acquire the lock and
1423 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001424static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1425 grpc_pollset_worker **worker_hdl,
1426 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001427 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001428 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001429 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1430
1431 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001432
1433 grpc_pollset_worker worker;
1434 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001435 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001436 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001437
1438 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001439
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001440 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1441 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001442
1443 if (pollset->kicked_without_pollers) {
1444 /* If the pollset was kicked without pollers, pretend that the current
1445 worker got the kick and skip polling. A kick indicates that there is some
1446 work that needs attention like an event on the completion queue or an
1447 alarm */
1448 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1449 pollset->kicked_without_pollers = 0;
1450 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001451 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001452 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1453 worker that there is some pending work that needs immediate attention
1454 (like an event on the completion queue, or a polling island merge that
1455 results in a new epoll-fd to wait on) and that the worker should not
1456 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001457
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001458 A worker can be kicked anytime from the point it is added to the pollset
1459 via push_front_worker() (or push_back_worker()) to the point it is
1460 removed via remove_worker().
1461 If the worker is kicked before/during it calls epoll_pwait(), it should
1462 immediately exit from epoll_wait(). If the worker is kicked after it
1463 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001464
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001465 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001466 times *except* when it is in epoll_pwait(). This way, the worker never
1467 misses acting on a kick */
1468
Craig Tiller19196992016-06-27 18:45:56 -07001469 if (!g_initialized_sigmask) {
1470 sigemptyset(&new_mask);
1471 sigaddset(&new_mask, grpc_wakeup_signal);
1472 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1473 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1474 g_initialized_sigmask = true;
1475 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1476 This is the mask used at all times *except during
1477 epoll_wait()*"
1478 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001479 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001480
Craig Tiller19196992016-06-27 18:45:56 -07001481 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001482 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001483 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001484
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001485 push_front_worker(pollset, &worker); /* Add worker to pollset */
1486
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001487 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1488 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001489 grpc_exec_ctx_flush(exec_ctx);
1490
1491 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001492
1493 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1494 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001495 remove_worker(pollset, &worker);
1496 }
1497
1498 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1499 false at this point) and the pollset is shutting down, we may have to
1500 finish the shutdown process by calling finish_shutdown_locked().
1501 See pollset_shutdown() for more details.
1502
1503 Note: Continuing to access pollset here is safe; it is the caller's
1504 responsibility to not destroy a pollset when it has outstanding calls to
1505 pollset_work() */
1506 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1507 !pollset->finish_shutdown_called) {
1508 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1509 finish_shutdown_locked(exec_ctx, pollset);
1510
1511 gpr_mu_unlock(&pollset->mu);
1512 grpc_exec_ctx_flush(exec_ctx);
1513 gpr_mu_lock(&pollset->mu);
1514 }
1515
1516 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001517
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001518 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1519 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001520
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001521 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001522
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001523 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1524 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001525}
1526
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001527static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1528 grpc_fd *fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001529 grpc_error *error = GRPC_ERROR_NONE;
1530
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001531 gpr_mu_lock(&pollset->mu);
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001532 gpr_mu_lock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001533
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001534 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001535
Craig Tiller7212c232016-07-06 13:11:09 -07001536retry:
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001537 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1538 * equal, do nothing.
1539 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1540 * a new polling island (with a refcount of 2) and make the polling_island
1541 * fields in both fd and pollset to point to the new island
1542 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1543 * the NULL polling_island field to point to the non-NULL polling_island
1544 * field (ensure that the refcount on the polling island is incremented by
1545 * 1 to account for the newly added reference)
1546 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1547 * and different, merge both the polling islands and update the
1548 * polling_island fields in both fd and pollset to point to the merged
1549 * polling island.
1550 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001551
Craig Tiller42ac6db2016-07-06 17:13:56 -07001552 if (fd->orphaned) {
1553 gpr_mu_unlock(&fd->mu);
1554 gpr_mu_unlock(&pollset->mu);
1555 /* early out */
1556 return;
1557 }
1558
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001559 if (fd->polling_island == pollset->polling_island) {
1560 pi_new = fd->polling_island;
1561 if (pi_new == NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001562 /* Unlock before creating a new polling island: the polling island will
1563 create a workqueue which creates a file descriptor, and holding an fd
1564 lock here can eventually cause a loop to appear to TSAN (making it
1565 unhappy). We don't think it's a real loop (there's an epoch point where
1566 that loop possibility disappears), but the advantages of keeping TSAN
1567 happy outweigh any performance advantage we might have by keeping the
1568 lock held. */
Craig Tiller7212c232016-07-06 13:11:09 -07001569 gpr_mu_unlock(&fd->mu);
Craig Tillerb39307d2016-06-30 15:39:13 -07001570 pi_new = polling_island_create(exec_ctx, fd, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001571 gpr_mu_lock(&fd->mu);
Craig Tiller0a06cd72016-07-14 13:21:24 -07001572 /* Need to reverify any assumptions made between the initial lock and
1573 getting to this branch: if they've changed, we need to throw away our
1574 work and figure things out again. */
Craig Tiller7212c232016-07-06 13:11:09 -07001575 if (fd->polling_island != NULL) {
Craig Tiller27da6422016-07-06 13:14:46 -07001576 GRPC_POLLING_TRACE(
1577 "pollset_add_fd: Raced creating new polling island. pi_new: %p "
1578 "(fd: %d, pollset: %p)",
1579 (void *)pi_new, fd->fd, (void *)pollset);
1580 PI_ADD_REF(pi_new, "dance_of_destruction");
1581 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
Craig Tiller7212c232016-07-06 13:11:09 -07001582 goto retry;
Craig Tiller27da6422016-07-06 13:14:46 -07001583 } else {
1584 GRPC_POLLING_TRACE(
1585 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
1586 "pollset: %p)",
1587 (void *)pi_new, fd->fd, (void *)pollset);
Craig Tiller7212c232016-07-06 13:11:09 -07001588 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001589 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001590 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001591 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001592 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001593 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001594
1595 GRPC_POLLING_TRACE(
1596 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1597 "pollset->pi: %p)",
1598 (void *)pi_new, fd->fd, (void *)pollset,
1599 (void *)pollset->polling_island);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001600 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001601 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001602 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001603
1604 GRPC_POLLING_TRACE(
1605 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1606 "%p, fd->pi: %p",
1607 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001608 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001609 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1610 &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001611 GRPC_POLLING_TRACE(
1612 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1613 "%p, fd->pi: %p, pollset->pi: %p)",
1614 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
1615 (void *)pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001616 }
1617
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001618 /* At this point, pi_new is the polling island that both fd->polling_island
1619 and pollset->polling_island must be pointing to */
1620
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001621 if (fd->polling_island != pi_new) {
1622 PI_ADD_REF(pi_new, "fd");
1623 if (fd->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001624 PI_UNREF(exec_ctx, fd->polling_island, "fd");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001625 }
1626 fd->polling_island = pi_new;
1627 }
1628
1629 if (pollset->polling_island != pi_new) {
1630 PI_ADD_REF(pi_new, "ps");
1631 if (pollset->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001632 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001633 }
1634 pollset->polling_island = pi_new;
1635 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001636
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001637 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001638 gpr_mu_unlock(&pollset->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001639
1640 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001641}
1642
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001643/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001644 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001645 */
1646
1647static grpc_pollset_set *pollset_set_create(void) {
1648 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1649 memset(pollset_set, 0, sizeof(*pollset_set));
1650 gpr_mu_init(&pollset_set->mu);
1651 return pollset_set;
1652}
1653
1654static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1655 size_t i;
1656 gpr_mu_destroy(&pollset_set->mu);
1657 for (i = 0; i < pollset_set->fd_count; i++) {
1658 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1659 }
1660 gpr_free(pollset_set->pollsets);
1661 gpr_free(pollset_set->pollset_sets);
1662 gpr_free(pollset_set->fds);
1663 gpr_free(pollset_set);
1664}
1665
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001666static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1667 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1668 size_t i;
1669 gpr_mu_lock(&pollset_set->mu);
1670 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1671 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1672 pollset_set->fds = gpr_realloc(
1673 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1674 }
1675 GRPC_FD_REF(fd, "pollset_set");
1676 pollset_set->fds[pollset_set->fd_count++] = fd;
1677 for (i = 0; i < pollset_set->pollset_count; i++) {
1678 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1679 }
1680 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1681 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1682 }
1683 gpr_mu_unlock(&pollset_set->mu);
1684}
1685
1686static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1687 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1688 size_t i;
1689 gpr_mu_lock(&pollset_set->mu);
1690 for (i = 0; i < pollset_set->fd_count; i++) {
1691 if (pollset_set->fds[i] == fd) {
1692 pollset_set->fd_count--;
1693 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1694 pollset_set->fds[pollset_set->fd_count]);
1695 GRPC_FD_UNREF(fd, "pollset_set");
1696 break;
1697 }
1698 }
1699 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1700 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1701 }
1702 gpr_mu_unlock(&pollset_set->mu);
1703}
1704
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001705static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1706 grpc_pollset_set *pollset_set,
1707 grpc_pollset *pollset) {
1708 size_t i, j;
1709 gpr_mu_lock(&pollset_set->mu);
1710 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1711 pollset_set->pollset_capacity =
1712 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1713 pollset_set->pollsets =
1714 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1715 sizeof(*pollset_set->pollsets));
1716 }
1717 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1718 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1719 if (fd_is_orphaned(pollset_set->fds[i])) {
1720 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1721 } else {
1722 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1723 pollset_set->fds[j++] = pollset_set->fds[i];
1724 }
1725 }
1726 pollset_set->fd_count = j;
1727 gpr_mu_unlock(&pollset_set->mu);
1728}
1729
1730static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1731 grpc_pollset_set *pollset_set,
1732 grpc_pollset *pollset) {
1733 size_t i;
1734 gpr_mu_lock(&pollset_set->mu);
1735 for (i = 0; i < pollset_set->pollset_count; i++) {
1736 if (pollset_set->pollsets[i] == pollset) {
1737 pollset_set->pollset_count--;
1738 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1739 pollset_set->pollsets[pollset_set->pollset_count]);
1740 break;
1741 }
1742 }
1743 gpr_mu_unlock(&pollset_set->mu);
1744}
1745
1746static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1747 grpc_pollset_set *bag,
1748 grpc_pollset_set *item) {
1749 size_t i, j;
1750 gpr_mu_lock(&bag->mu);
1751 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1752 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1753 bag->pollset_sets =
1754 gpr_realloc(bag->pollset_sets,
1755 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1756 }
1757 bag->pollset_sets[bag->pollset_set_count++] = item;
1758 for (i = 0, j = 0; i < bag->fd_count; i++) {
1759 if (fd_is_orphaned(bag->fds[i])) {
1760 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1761 } else {
1762 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1763 bag->fds[j++] = bag->fds[i];
1764 }
1765 }
1766 bag->fd_count = j;
1767 gpr_mu_unlock(&bag->mu);
1768}
1769
1770static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1771 grpc_pollset_set *bag,
1772 grpc_pollset_set *item) {
1773 size_t i;
1774 gpr_mu_lock(&bag->mu);
1775 for (i = 0; i < bag->pollset_set_count; i++) {
1776 if (bag->pollset_sets[i] == item) {
1777 bag->pollset_set_count--;
1778 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1779 bag->pollset_sets[bag->pollset_set_count]);
1780 break;
1781 }
1782 }
1783 gpr_mu_unlock(&bag->mu);
1784}
1785
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001786/* Test helper functions
1787 * */
1788void *grpc_fd_get_polling_island(grpc_fd *fd) {
1789 polling_island *pi;
1790
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001791 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001792 pi = fd->polling_island;
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001793 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001794
1795 return pi;
1796}
1797
1798void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1799 polling_island *pi;
1800
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001801 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001802 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001803 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001804
1805 return pi;
1806}
1807
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001808bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001809 polling_island *p1 = p;
1810 polling_island *p2 = q;
1811
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001812 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1813 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001814 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001815 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001816
1817 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001818}
1819
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001820/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001821 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001822 */
1823
1824static void shutdown_engine(void) {
1825 fd_global_shutdown();
1826 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001827 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001828}
1829
1830static const grpc_event_engine_vtable vtable = {
1831 .pollset_size = sizeof(grpc_pollset),
1832
1833 .fd_create = fd_create,
1834 .fd_wrapped_fd = fd_wrapped_fd,
1835 .fd_orphan = fd_orphan,
1836 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001837 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001838 .fd_notify_on_read = fd_notify_on_read,
1839 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001840 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001841 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001842
1843 .pollset_init = pollset_init,
1844 .pollset_shutdown = pollset_shutdown,
1845 .pollset_reset = pollset_reset,
1846 .pollset_destroy = pollset_destroy,
1847 .pollset_work = pollset_work,
1848 .pollset_kick = pollset_kick,
1849 .pollset_add_fd = pollset_add_fd,
1850
1851 .pollset_set_create = pollset_set_create,
1852 .pollset_set_destroy = pollset_set_destroy,
1853 .pollset_set_add_pollset = pollset_set_add_pollset,
1854 .pollset_set_del_pollset = pollset_set_del_pollset,
1855 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1856 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1857 .pollset_set_add_fd = pollset_set_add_fd,
1858 .pollset_set_del_fd = pollset_set_del_fd,
1859
1860 .kick_poller = kick_poller,
1861
1862 .shutdown_engine = shutdown_engine,
1863};
1864
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001865/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1866 * Create a dummy epoll_fd to make sure epoll support is available */
1867static bool is_epoll_available() {
1868 int fd = epoll_create1(EPOLL_CLOEXEC);
1869 if (fd < 0) {
1870 gpr_log(
1871 GPR_ERROR,
1872 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1873 fd);
1874 return false;
1875 }
1876 close(fd);
1877 return true;
1878}
1879
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001880const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001881 /* If use of signals is disabled, we cannot use epoll engine*/
1882 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1883 return NULL;
1884 }
1885
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001886 if (!is_epoll_available()) {
1887 return NULL;
1888 }
1889
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001890 if (!is_grpc_wakeup_signal_initialized) {
1891 grpc_use_signal(SIGRTMIN + 2);
1892 }
1893
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001894 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001895
1896 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1897 return NULL;
1898 }
1899
1900 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1901 polling_island_global_init())) {
1902 return NULL;
1903 }
1904
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001905 return &vtable;
1906}
1907
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001908#else /* defined(GPR_LINUX_EPOLL) */
1909#if defined(GPR_POSIX_SOCKET)
1910#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001911/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1912 * NULL */
1913const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001914#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001915
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001916void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001917#endif /* !defined(GPR_LINUX_EPOLL) */