blob: 70a8ce9d1d24dc2e13f9fb646614d3e1cae48e61 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
45#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070060#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070061#include "src/core/lib/profiling/timers.h"
62#include "src/core/lib/support/block_annotate.h"
63
Sree Kuchibhotla34217242016-06-29 00:19:07 -070064/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
65static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070066#define GRPC_POLLING_TRACE(fmt, ...) \
67 if (grpc_polling_trace) { \
68 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
69 }
70
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070071static int grpc_wakeup_signal = -1;
72static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070073
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070074/* Implements the function defined in grpc_posix.h. This function might be
75 * called before even calling grpc_init() to set either a different signal to
76 * use. If signum == -1, then the use of signals is disabled */
77void grpc_use_signal(int signum) {
78 grpc_wakeup_signal = signum;
79 is_grpc_wakeup_signal_initialized = true;
80
81 if (grpc_wakeup_signal < 0) {
82 gpr_log(GPR_INFO,
83 "Use of signals is disabled. Epoll engine will not be used");
84 } else {
85 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
86 grpc_wakeup_signal);
87 }
88}
89
90struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070091
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070093 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070094 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095struct grpc_fd {
96 int fd;
97 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070098 bit 0 : 1=Active / 0=Orphaned
99 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700100 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700101 gpr_atm refst;
102
103 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700104
105 /* Indicates that the fd is shutdown and that any pending read/write closures
106 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700107 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700108
109 /* The fd is either closed or we relinquished control of it. In either cases,
110 this indicates that the 'fd' on this structure is no longer valid */
111 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700112
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700113 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *read_closure;
115 grpc_closure *write_closure;
116
Craig Tillerf83f8ca2016-07-06 11:34:08 -0700117 /* The polling island to which this fd belongs to (protected by mu) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700118 struct polling_island *polling_island;
119
120 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700121 grpc_closure *on_done_closure;
122
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700123 /* The pollset that last noticed that the fd is readable */
124 grpc_pollset *read_notifier_pollset;
125
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700126 grpc_iomgr_object iomgr_object;
127};
128
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700129/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700130// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700131#ifdef GRPC_FD_REF_COUNT_DEBUG
132static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
133static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
134 int line);
135#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
136#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
137#else
138static void fd_ref(grpc_fd *fd);
139static void fd_unref(grpc_fd *fd);
140#define GRPC_FD_REF(fd, reason) fd_ref(fd)
141#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
142#endif
143
144static void fd_global_init(void);
145static void fd_global_shutdown(void);
146
147#define CLOSURE_NOT_READY ((grpc_closure *)0)
148#define CLOSURE_READY ((grpc_closure *)1)
149
150/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700151 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700152 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700153
Craig Tiller15007612016-07-06 09:36:16 -0700154//#define GRPC_PI_REF_COUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700155#ifdef GRPC_PI_REF_COUNT_DEBUG
156
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700157#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700158#define PI_UNREF(exec_ctx, p, r) \
159 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700160
161#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
162
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700163#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700164#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700165
166#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
167
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700168typedef struct polling_island {
169 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700170 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
171 the refcount.
172 Once the ref count becomes zero, this structure is destroyed which means
173 we should ensure that there is never a scenario where a PI_ADD_REF() is
174 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700175 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700176
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700177 /* Pointer to the polling_island this merged into.
178 * merged_to value is only set once in polling_island's lifetime (and that too
179 * only if the island is merged with another island). Because of this, we can
180 * use gpr_atm type here so that we can do atomic access on this and reduce
181 * lock contention on 'mu' mutex.
182 *
183 * Note that if this field is not NULL (i.e not 0), all the remaining fields
184 * (except mu and ref_count) are invalid and must be ignored. */
185 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700186
Craig Tillerb39307d2016-06-30 15:39:13 -0700187 /* The workqueue associated with this polling island */
188 grpc_workqueue *workqueue;
189
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700190 /* The fd of the underlying epoll set */
191 int epoll_fd;
192
193 /* The file descriptors in the epoll set */
194 size_t fd_cnt;
195 size_t fd_capacity;
196 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700197} polling_island;
198
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700199/*******************************************************************************
200 * Pollset Declarations
201 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700202struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700203 /* Thread id of this worker */
204 pthread_t pt_id;
205
206 /* Used to prevent a worker from getting kicked multiple times */
207 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700208 struct grpc_pollset_worker *next;
209 struct grpc_pollset_worker *prev;
210};
211
212struct grpc_pollset {
213 gpr_mu mu;
214 grpc_pollset_worker root_worker;
215 bool kicked_without_pollers;
216
217 bool shutting_down; /* Is the pollset shutting down ? */
218 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
219 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
220
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700221 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700222 struct polling_island *polling_island;
223};
224
225/*******************************************************************************
226 * Pollset-set Declarations
227 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700228/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
229 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
230 * the current pollset_set would result in polling island merges. This would
231 * remove the need to maintain fd_count here. This will also significantly
232 * simplify the grpc_fd structure since we would no longer need to explicitly
233 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700234struct grpc_pollset_set {
235 gpr_mu mu;
236
237 size_t pollset_count;
238 size_t pollset_capacity;
239 grpc_pollset **pollsets;
240
241 size_t pollset_set_count;
242 size_t pollset_set_capacity;
243 struct grpc_pollset_set **pollset_sets;
244
245 size_t fd_count;
246 size_t fd_capacity;
247 grpc_fd **fds;
248};
249
250/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700251 * Common helpers
252 */
253
Craig Tillerf975f742016-07-01 14:56:27 -0700254static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700255 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700256 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700257 if (*composite == GRPC_ERROR_NONE) {
258 *composite = GRPC_ERROR_CREATE(desc);
259 }
260 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700261 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700262}
263
264/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700265 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700266 */
267
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700268/* The wakeup fd that is used to wake up all threads in a Polling island. This
269 is useful in the polling island merge operation where we need to wakeup all
270 the threads currently polling the smaller polling island (so that they can
271 start polling the new/merged polling island)
272
273 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
274 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
275static grpc_wakeup_fd polling_island_wakeup_fd;
276
Craig Tillerb39307d2016-06-30 15:39:13 -0700277/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700278static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700279
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700280#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700281/* Currently TSAN may incorrectly flag data races between epoll_ctl and
282 epoll_wait for any grpc_fd structs that are added to the epoll set via
283 epoll_ctl and are returned (within a very short window) via epoll_wait().
284
285 To work-around this race, we establish a happens-before relation between
286 the code just-before epoll_ctl() and the code after epoll_wait() by using
287 this atomic */
288gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700289#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700290
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700291#ifdef GRPC_PI_REF_COUNT_DEBUG
Craig Tillerb39307d2016-06-30 15:39:13 -0700292static void pi_add_ref(polling_island *pi);
293static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700294
Craig Tillerb39307d2016-06-30 15:39:13 -0700295static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
296 int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700297 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700298 pi_add_ref(pi);
299 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
300 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700301}
302
Craig Tillerb39307d2016-06-30 15:39:13 -0700303static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
304 char *reason, char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700305 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700306 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700307 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700308 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700309}
310#endif
311
Craig Tiller15007612016-07-06 09:36:16 -0700312static void pi_add_ref(polling_island *pi) {
313 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
314}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700315
Craig Tillerb39307d2016-06-30 15:39:13 -0700316static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tiller15007612016-07-06 09:36:16 -0700317 /* If ref count went to one, we're back to just the workqueue owning a ref.
318 Unref the workqueue to break the loop.
319
320 If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700321 Note that this deletion not be done under a lock. Once the ref count goes
322 to zero, we are guaranteed that no one else holds a reference to the
323 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700324
325 Also, if we are deleting the polling island and the merged_to field is
326 non-empty, we should remove a ref to the merged_to polling island
327 */
Craig Tiller15007612016-07-06 09:36:16 -0700328 switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
329 case 2: /* last external ref: the only one now owned is by the workqueue */
330 GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
331 break;
332 case 1: {
333 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
334 polling_island_delete(exec_ctx, pi);
335 if (next != NULL) {
336 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
337 }
338 break;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700339 }
Craig Tiller15007612016-07-06 09:36:16 -0700340 case 0:
341 GPR_UNREACHABLE_CODE(return );
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700342 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700343}
344
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700345/* The caller is expected to hold pi->mu lock before calling this function */
346static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700347 size_t fd_count, bool add_fd_refs,
348 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700349 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700350 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700351 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700352 char *err_msg;
353 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700354
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700355#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700356 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700357 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700358#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700359
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700360 for (i = 0; i < fd_count; i++) {
361 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
362 ev.data.ptr = fds[i];
363 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700364
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700365 if (err < 0) {
366 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700367 gpr_asprintf(
368 &err_msg,
369 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
370 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
371 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
372 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700373 }
374
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700375 continue;
376 }
377
378 if (pi->fd_cnt == pi->fd_capacity) {
379 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
380 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
381 }
382
383 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700384 if (add_fd_refs) {
385 GRPC_FD_REF(fds[i], "polling_island");
386 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700387 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700388}
389
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700390/* The caller is expected to hold pi->mu before calling this */
391static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700392 grpc_wakeup_fd *wakeup_fd,
393 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700394 struct epoll_event ev;
395 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700396 char *err_msg;
397 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700398
399 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
400 ev.data.ptr = wakeup_fd;
401 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
402 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700403 if (err < 0 && errno != EEXIST) {
404 gpr_asprintf(&err_msg,
405 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
406 "error: %d (%s)",
407 pi->epoll_fd,
408 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
409 strerror(errno));
410 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
411 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700412 }
413}
414
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700415/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700416static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700417 bool remove_fd_refs,
418 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700419 int err;
420 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700421 char *err_msg;
422 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700423
424 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700425 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700426 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700427 gpr_asprintf(&err_msg,
428 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
429 "error: %d (%s)",
430 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
431 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
432 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700433 }
434
435 if (remove_fd_refs) {
436 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700437 }
438 }
439
440 pi->fd_cnt = 0;
441}
442
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700443/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700444static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700445 bool is_fd_closed,
446 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700447 int err;
448 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700449 char *err_msg;
450 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700451
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700452 /* If fd is already closed, then it would have been automatically been removed
453 from the epoll set */
454 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700455 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
456 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700457 gpr_asprintf(
458 &err_msg,
459 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
460 pi->epoll_fd, fd->fd, errno, strerror(errno));
461 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
462 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700463 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700464 }
465
466 for (i = 0; i < pi->fd_cnt; i++) {
467 if (pi->fds[i] == fd) {
468 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700469 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700470 break;
471 }
472 }
473}
474
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700475/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700476static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
477 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700478 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700479 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700480 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700481
Craig Tillerb39307d2016-06-30 15:39:13 -0700482 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700483
Craig Tillerb39307d2016-06-30 15:39:13 -0700484 pi = gpr_malloc(sizeof(*pi));
485 gpr_mu_init(&pi->mu);
486 pi->fd_cnt = 0;
487 pi->fd_capacity = 0;
488 pi->fds = NULL;
489 pi->epoll_fd = -1;
490 pi->workqueue = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700491
Craig Tiller15007612016-07-06 09:36:16 -0700492 gpr_atm_rel_store(&pi->ref_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700493 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700494
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700495 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700496
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700497 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700498 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
499 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700500 }
501
Craig Tillerb39307d2016-06-30 15:39:13 -0700502 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
503
504 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700505 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700506 }
507
Craig Tillerf975f742016-07-01 14:56:27 -0700508 if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
509 err_desc) &&
510 *error == GRPC_ERROR_NONE) {
511 polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
512 error);
513 GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
514 pi->workqueue->wakeup_read_fd->polling_island = pi;
Craig Tiller15007612016-07-06 09:36:16 -0700515 PI_ADD_REF(pi, "fd");
Craig Tillerf975f742016-07-01 14:56:27 -0700516 }
Craig Tillerb39307d2016-06-30 15:39:13 -0700517
518done:
519 if (*error != GRPC_ERROR_NONE) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700520 if (pi->workqueue != NULL) {
521 GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
522 }
Craig Tiller0a06cd72016-07-14 13:21:24 -0700523 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700524 pi = NULL;
525 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700526 return pi;
527}
528
Craig Tillerb39307d2016-06-30 15:39:13 -0700529static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700530 GPR_ASSERT(pi->fd_cnt == 0);
531
Craig Tiller0a06cd72016-07-14 13:21:24 -0700532 if (pi->epoll_fd >= 0) {
533 close(pi->epoll_fd);
534 }
Craig Tillerb39307d2016-06-30 15:39:13 -0700535 gpr_mu_destroy(&pi->mu);
536 gpr_free(pi->fds);
537 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700538}
539
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700540/* Attempts to gets the last polling island in the linked list (liked by the
541 * 'merged_to' field). Since this does not lock the polling island, there are no
542 * guarantees that the island returned is the last island */
543static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
544 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
545 while (next != NULL) {
546 pi = next;
547 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
548 }
549
550 return pi;
551}
552
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700553/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700554 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700555 returned polling island's mu.
556 Usage: To lock/unlock polling island "pi", do the following:
557 polling_island *pi_latest = polling_island_lock(pi);
558 ...
559 ... critical section ..
560 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700561 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
562static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700563 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700564
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700565 while (true) {
566 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
567 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700568 /* Looks like 'pi' is the last node in the linked list but unless we check
569 this by holding the pi->mu lock, we cannot be sure (i.e without the
570 pi->mu lock, we don't prevent island merges).
571 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700572 gpr_mu_lock(&pi->mu);
573 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
574 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700575 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700576 break;
577 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700578
579 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
580 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700581 gpr_mu_unlock(&pi->mu);
582 }
583
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700584 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700585 }
586
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700587 return pi;
588}
589
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700590/* Gets the lock on the *latest* polling islands in the linked lists pointed by
591 *p and *q (and also updates *p and *q to point to the latest polling islands)
592
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700593 This function is needed because calling the following block of code to obtain
594 locks on polling islands (*p and *q) is prone to deadlocks.
595 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700596 polling_island_lock(*p, true);
597 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700598 }
599
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700600 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700601 polling_island *p1;
602 polling_island *p2;
603 ..
604 polling_island_lock_pair(&p1, &p2);
605 ..
606 .. Critical section with both p1 and p2 locked
607 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700608 // Release locks: Always call polling_island_unlock_pair() to release locks
609 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700610*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700611static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700612 polling_island *pi_1 = *p;
613 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700614 polling_island *next_1 = NULL;
615 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700616
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700617 /* The algorithm is simple:
618 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
619 keep updating pi_1 and pi_2)
620 - Then obtain locks on the islands by following a lock order rule of
621 locking polling_island with lower address first
622 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
623 pointing to the same island. If that is the case, we can just call
624 polling_island_lock()
625 - After obtaining both the locks, double check that the polling islands
626 are still the last polling islands in their respective linked lists
627 (this is because there might have been polling island merges before
628 we got the lock)
629 - If the polling islands are the last islands, we are done. If not,
630 release the locks and continue the process from the first step */
631 while (true) {
632 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
633 while (next_1 != NULL) {
634 pi_1 = next_1;
635 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700636 }
637
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700638 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
639 while (next_2 != NULL) {
640 pi_2 = next_2;
641 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
642 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700643
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700644 if (pi_1 == pi_2) {
645 pi_1 = pi_2 = polling_island_lock(pi_1);
646 break;
647 }
648
649 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700650 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700651 gpr_mu_lock(&pi_2->mu);
652 } else {
653 gpr_mu_lock(&pi_2->mu);
654 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700655 }
656
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700657 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
658 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
659 if (next_1 == NULL && next_2 == NULL) {
660 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700661 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700662
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700663 gpr_mu_unlock(&pi_1->mu);
664 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700665 }
666
667 *p = pi_1;
668 *q = pi_2;
669}
670
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700671static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
672 if (p == q) {
673 gpr_mu_unlock(&p->mu);
674 } else {
675 gpr_mu_unlock(&p->mu);
676 gpr_mu_unlock(&q->mu);
677 }
678}
679
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700680static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700681 polling_island *q,
682 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700683 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700684 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700685
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700686 if (p != q) {
687 /* Make sure that p points to the polling island with fewer fds than q */
688 if (p->fd_cnt > q->fd_cnt) {
689 GPR_SWAP(polling_island *, p, q);
690 }
691
692 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
693 Note that the refcounts on the fds being moved will not change here.
694 This is why the last param in the following two functions is 'false') */
695 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
696 polling_island_remove_all_fds_locked(p, false, error);
697
698 /* Wakeup all the pollers (if any) on p so that they pickup this change */
699 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
700
701 /* Add the 'merged_to' link from p --> q */
702 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
703 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700704 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700705 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700706
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700707 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700708
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700709 /* Return the merged polling island (Note that no merge would have happened
710 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700711 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700712}
713
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700714static grpc_error *polling_island_global_init() {
715 grpc_error *error = GRPC_ERROR_NONE;
716
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700717 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
718 if (error == GRPC_ERROR_NONE) {
719 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
720 }
721
722 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700723}
724
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700725static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700726 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700727}
728
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700729/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700730 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700731 */
732
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700733/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700734 * but instead so that implementations with multiple threads in (for example)
735 * epoll_wait deal with the race between pollset removal and incoming poll
736 * notifications.
737 *
738 * The problem is that the poller ultimately holds a reference to this
739 * object, so it is very difficult to know when is safe to free it, at least
740 * without some expensive synchronization.
741 *
742 * If we keep the object freelisted, in the worst case losing this race just
743 * becomes a spurious read notification on a reused fd.
744 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700745
746/* The alarm system needs to be able to wakeup 'some poller' sometimes
747 * (specifically when a new alarm needs to be triggered earlier than the next
748 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
749 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700750
751/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
752 * sure to wake up one polling thread (which can wake up other threads if
753 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700754grpc_wakeup_fd grpc_global_wakeup_fd;
755
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700756static grpc_fd *fd_freelist = NULL;
757static gpr_mu fd_freelist_mu;
758
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700759#ifdef GRPC_FD_REF_COUNT_DEBUG
760#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
761#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
762static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
763 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700764 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
765 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700766 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
767#else
768#define REF_BY(fd, n, reason) ref_by(fd, n)
769#define UNREF_BY(fd, n, reason) unref_by(fd, n)
770static void ref_by(grpc_fd *fd, int n) {
771#endif
772 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
773}
774
775#ifdef GRPC_FD_REF_COUNT_DEBUG
776static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
777 int line) {
778 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700779 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
780 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700781 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
782#else
783static void unref_by(grpc_fd *fd, int n) {
784 gpr_atm old;
785#endif
786 old = gpr_atm_full_fetch_add(&fd->refst, -n);
787 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700788 /* Add the fd to the freelist */
789 gpr_mu_lock(&fd_freelist_mu);
790 fd->freelist_next = fd_freelist;
791 fd_freelist = fd;
792 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700793
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700794 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700795 } else {
796 GPR_ASSERT(old > n);
797 }
798}
799
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700800/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700801#ifdef GRPC_FD_REF_COUNT_DEBUG
802static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
803 int line) {
804 ref_by(fd, 2, reason, file, line);
805}
806
807static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
808 int line) {
809 unref_by(fd, 2, reason, file, line);
810}
811#else
812static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700813static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
814#endif
815
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700816static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
817
818static void fd_global_shutdown(void) {
819 gpr_mu_lock(&fd_freelist_mu);
820 gpr_mu_unlock(&fd_freelist_mu);
821 while (fd_freelist != NULL) {
822 grpc_fd *fd = fd_freelist;
823 fd_freelist = fd_freelist->freelist_next;
824 gpr_mu_destroy(&fd->mu);
825 gpr_free(fd);
826 }
827 gpr_mu_destroy(&fd_freelist_mu);
828}
829
830static grpc_fd *fd_create(int fd, const char *name) {
831 grpc_fd *new_fd = NULL;
832
833 gpr_mu_lock(&fd_freelist_mu);
834 if (fd_freelist != NULL) {
835 new_fd = fd_freelist;
836 fd_freelist = fd_freelist->freelist_next;
837 }
838 gpr_mu_unlock(&fd_freelist_mu);
839
840 if (new_fd == NULL) {
841 new_fd = gpr_malloc(sizeof(grpc_fd));
842 gpr_mu_init(&new_fd->mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700843 }
844
845 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
846 newly created fd (or an fd we got from the freelist), no one else would be
847 holding a lock to it anyway. */
848 gpr_mu_lock(&new_fd->mu);
849
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700850 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700851 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700852 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700853 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700854 new_fd->read_closure = CLOSURE_NOT_READY;
855 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700856 new_fd->polling_island = NULL;
857 new_fd->freelist_next = NULL;
858 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700859 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700860
861 gpr_mu_unlock(&new_fd->mu);
862
863 char *fd_name;
864 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
865 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700866#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700867 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700868#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700869 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700870 return new_fd;
871}
872
873static bool fd_is_orphaned(grpc_fd *fd) {
874 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
875}
876
877static int fd_wrapped_fd(grpc_fd *fd) {
878 int ret_fd = -1;
879 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700880 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700881 ret_fd = fd->fd;
882 }
883 gpr_mu_unlock(&fd->mu);
884
885 return ret_fd;
886}
887
888static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
889 grpc_closure *on_done, int *release_fd,
890 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700891 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700892 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -0700893 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700894
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700895 gpr_mu_lock(&fd->mu);
896 fd->on_done_closure = on_done;
897
898 /* If release_fd is not NULL, we should be relinquishing control of the file
899 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700900 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700901 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700902 } else {
903 close(fd->fd);
904 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700905 }
906
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700907 fd->orphaned = true;
908
909 /* Remove the active status but keep referenced. We want this grpc_fd struct
910 to be alive (and not added to freelist) until the end of this function */
911 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700912
913 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700914 - Get a lock on the latest polling island (i.e the last island in the
915 linked list pointed by fd->polling_island). This is the island that
916 would actually contain the fd
917 - Remove the fd from the latest polling island
918 - Unlock the latest polling island
919 - Set fd->polling_island to NULL (but remove the ref on the polling island
920 before doing this.) */
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700921 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700922 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700923 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700924 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700925
Craig Tiller15007612016-07-06 09:36:16 -0700926 unref_pi = fd->polling_island;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700927 fd->polling_island = NULL;
928 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700929
Yuchen Zenga0399f22016-08-04 17:52:53 -0700930 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
931 NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700932
933 gpr_mu_unlock(&fd->mu);
934 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -0700935 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700936 /* Unref stale polling island here, outside the fd lock above.
937 The polling island owns a workqueue which owns an fd, and unreffing
938 inside the lock can cause an eventual lock loop that makes TSAN very
939 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -0700940 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
941 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700942 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -0700943 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700944}
945
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700946static grpc_error *fd_shutdown_error(bool shutdown) {
947 if (!shutdown) {
948 return GRPC_ERROR_NONE;
949 } else {
950 return GRPC_ERROR_CREATE("FD shutdown");
951 }
952}
953
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700954static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
955 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700956 if (fd->shutdown) {
957 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
958 NULL);
959 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700960 /* not ready ==> switch to a waiting state by setting the closure */
961 *st = closure;
962 } else if (*st == CLOSURE_READY) {
963 /* already ready ==> queue the closure to run immediately */
964 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700965 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
966 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700967 } else {
968 /* upcallptr was set to a different closure. This is an error! */
969 gpr_log(GPR_ERROR,
970 "User called a notify_on function with a previous callback still "
971 "pending");
972 abort();
973 }
974}
975
976/* returns 1 if state becomes not ready */
977static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
978 grpc_closure **st) {
979 if (*st == CLOSURE_READY) {
980 /* duplicate ready ==> ignore */
981 return 0;
982 } else if (*st == CLOSURE_NOT_READY) {
983 /* not ready, and not waiting ==> flag ready */
984 *st = CLOSURE_READY;
985 return 0;
986 } else {
987 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700988 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700989 *st = CLOSURE_NOT_READY;
990 return 1;
991 }
992}
993
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700994static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
995 grpc_fd *fd) {
996 grpc_pollset *notifier = NULL;
997
998 gpr_mu_lock(&fd->mu);
999 notifier = fd->read_notifier_pollset;
1000 gpr_mu_unlock(&fd->mu);
1001
1002 return notifier;
1003}
1004
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001005static bool fd_is_shutdown(grpc_fd *fd) {
1006 gpr_mu_lock(&fd->mu);
1007 const bool r = fd->shutdown;
1008 gpr_mu_unlock(&fd->mu);
1009 return r;
1010}
1011
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001012/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001013static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1014 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001015 /* Do the actual shutdown only once */
1016 if (!fd->shutdown) {
1017 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001018
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001019 shutdown(fd->fd, SHUT_RDWR);
1020 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1021 at this point, the closures would be called with 'success = false' */
1022 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1023 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1024 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001025 gpr_mu_unlock(&fd->mu);
1026}
1027
1028static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1029 grpc_closure *closure) {
1030 gpr_mu_lock(&fd->mu);
1031 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1032 gpr_mu_unlock(&fd->mu);
1033}
1034
1035static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1036 grpc_closure *closure) {
1037 gpr_mu_lock(&fd->mu);
1038 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1039 gpr_mu_unlock(&fd->mu);
1040}
1041
Craig Tillerd6ba6192016-06-30 15:42:41 -07001042static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001043 gpr_mu_lock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001044 grpc_workqueue *workqueue = NULL;
1045 if (fd->polling_island != NULL) {
1046 workqueue =
1047 GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
1048 }
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001049 gpr_mu_unlock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001050 return workqueue;
1051}
Craig Tiller70bd4832016-06-30 14:20:46 -07001052
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001053/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001054 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001055 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001056GPR_TLS_DECL(g_current_thread_pollset);
1057GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001058static __thread bool g_initialized_sigmask;
1059static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001060
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001061static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001062#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001063 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001064#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001065}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001066
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001067static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001068
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001069/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001070static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001071 gpr_tls_init(&g_current_thread_pollset);
1072 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001073 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001074 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001075}
1076
1077static void pollset_global_shutdown(void) {
1078 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001079 gpr_tls_destroy(&g_current_thread_pollset);
1080 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001081}
1082
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001083static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1084 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001085
1086 /* Kick the worker only if it was not already kicked */
1087 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1088 GRPC_POLLING_TRACE(
1089 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1090 (void *)worker, worker->pt_id);
1091 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1092 if (err_num != 0) {
1093 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1094 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001095 }
1096 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001097}
1098
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001099/* Return 1 if the pollset has active threads in pollset_work (pollset must
1100 * be locked) */
1101static int pollset_has_workers(grpc_pollset *p) {
1102 return p->root_worker.next != &p->root_worker;
1103}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001104
1105static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1106 worker->prev->next = worker->next;
1107 worker->next->prev = worker->prev;
1108}
1109
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001110static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1111 if (pollset_has_workers(p)) {
1112 grpc_pollset_worker *w = p->root_worker.next;
1113 remove_worker(p, w);
1114 return w;
1115 } else {
1116 return NULL;
1117 }
1118}
1119
1120static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1121 worker->next = &p->root_worker;
1122 worker->prev = worker->next->prev;
1123 worker->prev->next = worker->next->prev = worker;
1124}
1125
1126static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1127 worker->prev = &p->root_worker;
1128 worker->next = worker->prev->next;
1129 worker->prev->next = worker->next->prev = worker;
1130}
1131
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001132/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001133static grpc_error *pollset_kick(grpc_pollset *p,
1134 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001135 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001136 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001137 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001138 grpc_pollset_worker *worker = specific_worker;
1139 if (worker != NULL) {
1140 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001141 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001142 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001143 for (worker = p->root_worker.next; worker != &p->root_worker;
1144 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001145 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001146 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001147 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001148 }
Craig Tillera218a062016-06-26 09:58:37 -07001149 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001150 } else {
1151 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001152 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001153 } else {
1154 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001155 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001156 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001157 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001158 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001159 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1160 /* Since worker == NULL, it means that we can kick "any" worker on this
1161 pollset 'p'. If 'p' happens to be the same pollset this thread is
1162 currently polling (i.e in pollset_work() function), then there is no need
1163 to kick any other worker since the current thread can just absorb the
1164 kick. This is the reason why we enter this case only when
1165 g_current_thread_pollset is != p */
1166
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001167 GPR_TIMER_MARK("kick_anonymous", 0);
1168 worker = pop_front_worker(p);
1169 if (worker != NULL) {
1170 GPR_TIMER_MARK("finally_kick", 0);
1171 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001172 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001173 } else {
1174 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001175 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001176 }
1177 }
1178
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001179 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001180 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1181 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001182}
1183
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001184static grpc_error *kick_poller(void) {
1185 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1186}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001187
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001188static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1189 gpr_mu_init(&pollset->mu);
1190 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001191
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001192 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001193 pollset->kicked_without_pollers = false;
1194
1195 pollset->shutting_down = false;
1196 pollset->finish_shutdown_called = false;
1197 pollset->shutdown_done = NULL;
1198
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001199 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001200}
1201
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001202/* Convert a timespec to milliseconds:
1203 - Very small or negative poll times are clamped to zero to do a non-blocking
1204 poll (which becomes spin polling)
1205 - Other small values are rounded up to one millisecond
1206 - Longer than a millisecond polls are rounded up to the next nearest
1207 millisecond to avoid spinning
1208 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001209static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1210 gpr_timespec now) {
1211 gpr_timespec timeout;
1212 static const int64_t max_spin_polling_us = 10;
1213 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1214 return -1;
1215 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001216
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001217 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1218 max_spin_polling_us,
1219 GPR_TIMESPAN))) <= 0) {
1220 return 0;
1221 }
1222 timeout = gpr_time_sub(deadline, now);
1223 return gpr_time_to_millis(gpr_time_add(
1224 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1225}
1226
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001227static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1228 grpc_pollset *notifier) {
1229 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001230 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001231 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1232 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001233 gpr_mu_unlock(&fd->mu);
1234}
1235
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001236static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001237 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1238 gpr_mu_lock(&fd->mu);
1239 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1240 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001241}
1242
Craig Tillerb39307d2016-06-30 15:39:13 -07001243static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1244 grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001245 if (ps->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001246 PI_UNREF(exec_ctx, ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001247 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001248 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001249}
1250
1251static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1252 grpc_pollset *pollset) {
1253 /* The pollset cannot have any workers if we are at this stage */
1254 GPR_ASSERT(!pollset_has_workers(pollset));
1255
1256 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001257
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001258 /* Release the ref and set pollset->polling_island to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001259 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001260 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001261}
1262
1263/* pollset->mu lock must be held by the caller before calling this */
1264static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1265 grpc_closure *closure) {
1266 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1267 GPR_ASSERT(!pollset->shutting_down);
1268 pollset->shutting_down = true;
1269 pollset->shutdown_done = closure;
1270 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1271
1272 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1273 because it would release the underlying polling island. In such a case, we
1274 let the last worker call finish_shutdown_locked() from pollset_work() */
1275 if (!pollset_has_workers(pollset)) {
1276 GPR_ASSERT(!pollset->finish_shutdown_called);
1277 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1278 finish_shutdown_locked(exec_ctx, pollset);
1279 }
1280 GPR_TIMER_END("pollset_shutdown", 0);
1281}
1282
1283/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1284 * than destroying the mutexes, there is nothing special that needs to be done
1285 * here */
1286static void pollset_destroy(grpc_pollset *pollset) {
1287 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001288 gpr_mu_destroy(&pollset->mu);
1289}
1290
Craig Tiller2b49ea92016-07-01 13:21:27 -07001291static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001292 GPR_ASSERT(pollset->shutting_down);
1293 GPR_ASSERT(!pollset_has_workers(pollset));
1294 pollset->shutting_down = false;
1295 pollset->finish_shutdown_called = false;
1296 pollset->kicked_without_pollers = false;
1297 pollset->shutdown_done = NULL;
Craig Tillerb39307d2016-06-30 15:39:13 -07001298 GPR_ASSERT(pollset->polling_island == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001299}
1300
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001301#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001302/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1303static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001304 grpc_pollset *pollset,
1305 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001306 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001307 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001308 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001309 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001310 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001311 char *err_msg;
1312 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001313 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1314
1315 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001316 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001317
1318 Since epoll_fd is immutable, we can read it without obtaining the polling
1319 island lock. There is however a possibility that the polling island (from
1320 which we got the epoll_fd) got merged with another island while we are
1321 in this function. This is still okay because in such a case, we will wakeup
1322 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001323 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001324
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001325 if (pollset->polling_island == NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001326 pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001327 if (pollset->polling_island == NULL) {
1328 GPR_TIMER_END("pollset_work_and_unlock", 0);
1329 return; /* Fatal error. We cannot continue */
1330 }
1331
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001332 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001333 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1334 (void *)pollset, (void *)pollset->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001335 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001336
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001337 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001338 epoll_fd = pi->epoll_fd;
1339
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001340 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001341 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001342 if (pollset->polling_island != pi) {
1343 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1344 polling island to be deleted */
1345 PI_ADD_REF(pi, "ps");
Craig Tillerb39307d2016-06-30 15:39:13 -07001346 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001347 pollset->polling_island = pi;
1348 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001349
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001350 /* Add an extra ref so that the island does not get destroyed (which means
1351 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1352 epoll_fd */
1353 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001354 gpr_mu_unlock(&pollset->mu);
1355
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001356 do {
1357 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1358 sig_mask);
1359 if (ep_rv < 0) {
1360 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001361 gpr_asprintf(&err_msg,
1362 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1363 epoll_fd, errno, strerror(errno));
1364 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001365 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001366 /* We were interrupted. Save an interation by doing a zero timeout
1367 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001368 GRPC_POLLING_TRACE(
1369 "pollset_work: pollset: %p, worker: %p received kick",
1370 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001371 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001372 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001373 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001374
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001375#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001376 /* See the definition of g_poll_sync for more details */
1377 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001378#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001379
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001380 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001381 void *data_ptr = ep_ev[i].data.ptr;
1382 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001383 append_error(error,
1384 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1385 err_desc);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001386 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001387 GRPC_POLLING_TRACE(
1388 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1389 "%d) got merged",
1390 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001391 /* This means that our polling island is merged with a different
1392 island. We do not have to do anything here since the subsequent call
1393 to the function pollset_work_and_unlock() will pick up the correct
1394 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001395 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001396 grpc_fd *fd = data_ptr;
1397 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1398 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1399 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001400 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001401 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001402 }
1403 if (write_ev || cancel) {
1404 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001405 }
1406 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001407 }
1408 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001409
1410 GPR_ASSERT(pi != NULL);
1411
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001412 /* Before leaving, release the extra ref we added to the polling island. It
1413 is important to use "pi" here (i.e our old copy of pollset->polling_island
1414 that we got before releasing the polling island lock). This is because
1415 pollset->polling_island pointer might get udpated in other parts of the
1416 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001417 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001418
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001419 GPR_TIMER_END("pollset_work_and_unlock", 0);
1420}
1421
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001422/* pollset->mu lock must be held by the caller before calling this.
1423 The function pollset_work() may temporarily release the lock (pollset->mu)
1424 during the course of its execution but it will always re-acquire the lock and
1425 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001426static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1427 grpc_pollset_worker **worker_hdl,
1428 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001429 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001430 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001431 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1432
1433 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001434
1435 grpc_pollset_worker worker;
1436 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001437 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001438 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001439
1440 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001441
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001442 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1443 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001444
1445 if (pollset->kicked_without_pollers) {
1446 /* If the pollset was kicked without pollers, pretend that the current
1447 worker got the kick and skip polling. A kick indicates that there is some
1448 work that needs attention like an event on the completion queue or an
1449 alarm */
1450 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1451 pollset->kicked_without_pollers = 0;
1452 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001453 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001454 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1455 worker that there is some pending work that needs immediate attention
1456 (like an event on the completion queue, or a polling island merge that
1457 results in a new epoll-fd to wait on) and that the worker should not
1458 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001459
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001460 A worker can be kicked anytime from the point it is added to the pollset
1461 via push_front_worker() (or push_back_worker()) to the point it is
1462 removed via remove_worker().
1463 If the worker is kicked before/during it calls epoll_pwait(), it should
1464 immediately exit from epoll_wait(). If the worker is kicked after it
1465 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001466
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001467 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001468 times *except* when it is in epoll_pwait(). This way, the worker never
1469 misses acting on a kick */
1470
Craig Tiller19196992016-06-27 18:45:56 -07001471 if (!g_initialized_sigmask) {
1472 sigemptyset(&new_mask);
1473 sigaddset(&new_mask, grpc_wakeup_signal);
1474 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1475 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1476 g_initialized_sigmask = true;
1477 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1478 This is the mask used at all times *except during
1479 epoll_wait()*"
1480 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001481 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001482
Craig Tiller19196992016-06-27 18:45:56 -07001483 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001484 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001485 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001486
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001487 push_front_worker(pollset, &worker); /* Add worker to pollset */
1488
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001489 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1490 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001491 grpc_exec_ctx_flush(exec_ctx);
1492
1493 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001494
1495 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1496 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001497 remove_worker(pollset, &worker);
1498 }
1499
1500 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1501 false at this point) and the pollset is shutting down, we may have to
1502 finish the shutdown process by calling finish_shutdown_locked().
1503 See pollset_shutdown() for more details.
1504
1505 Note: Continuing to access pollset here is safe; it is the caller's
1506 responsibility to not destroy a pollset when it has outstanding calls to
1507 pollset_work() */
1508 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1509 !pollset->finish_shutdown_called) {
1510 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1511 finish_shutdown_locked(exec_ctx, pollset);
1512
1513 gpr_mu_unlock(&pollset->mu);
1514 grpc_exec_ctx_flush(exec_ctx);
1515 gpr_mu_lock(&pollset->mu);
1516 }
1517
1518 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001519
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001520 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1521 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001522
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001523 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001524
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001525 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1526 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001527}
1528
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001529static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1530 grpc_fd *fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001531 grpc_error *error = GRPC_ERROR_NONE;
1532
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001533 gpr_mu_lock(&pollset->mu);
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001534 gpr_mu_lock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001535
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001536 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001537
Craig Tiller7212c232016-07-06 13:11:09 -07001538retry:
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001539 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1540 * equal, do nothing.
1541 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1542 * a new polling island (with a refcount of 2) and make the polling_island
1543 * fields in both fd and pollset to point to the new island
1544 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1545 * the NULL polling_island field to point to the non-NULL polling_island
1546 * field (ensure that the refcount on the polling island is incremented by
1547 * 1 to account for the newly added reference)
1548 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1549 * and different, merge both the polling islands and update the
1550 * polling_island fields in both fd and pollset to point to the merged
1551 * polling island.
1552 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001553
Craig Tiller42ac6db2016-07-06 17:13:56 -07001554 if (fd->orphaned) {
1555 gpr_mu_unlock(&fd->mu);
1556 gpr_mu_unlock(&pollset->mu);
1557 /* early out */
1558 return;
1559 }
1560
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001561 if (fd->polling_island == pollset->polling_island) {
1562 pi_new = fd->polling_island;
1563 if (pi_new == NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001564 /* Unlock before creating a new polling island: the polling island will
1565 create a workqueue which creates a file descriptor, and holding an fd
1566 lock here can eventually cause a loop to appear to TSAN (making it
1567 unhappy). We don't think it's a real loop (there's an epoch point where
1568 that loop possibility disappears), but the advantages of keeping TSAN
1569 happy outweigh any performance advantage we might have by keeping the
1570 lock held. */
Craig Tiller7212c232016-07-06 13:11:09 -07001571 gpr_mu_unlock(&fd->mu);
Craig Tillerb39307d2016-06-30 15:39:13 -07001572 pi_new = polling_island_create(exec_ctx, fd, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001573 gpr_mu_lock(&fd->mu);
Craig Tiller0a06cd72016-07-14 13:21:24 -07001574 /* Need to reverify any assumptions made between the initial lock and
1575 getting to this branch: if they've changed, we need to throw away our
1576 work and figure things out again. */
Craig Tiller7212c232016-07-06 13:11:09 -07001577 if (fd->polling_island != NULL) {
Craig Tiller27da6422016-07-06 13:14:46 -07001578 GRPC_POLLING_TRACE(
1579 "pollset_add_fd: Raced creating new polling island. pi_new: %p "
1580 "(fd: %d, pollset: %p)",
1581 (void *)pi_new, fd->fd, (void *)pollset);
1582 PI_ADD_REF(pi_new, "dance_of_destruction");
1583 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
Craig Tiller7212c232016-07-06 13:11:09 -07001584 goto retry;
Craig Tiller27da6422016-07-06 13:14:46 -07001585 } else {
1586 GRPC_POLLING_TRACE(
1587 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
1588 "pollset: %p)",
1589 (void *)pi_new, fd->fd, (void *)pollset);
Craig Tiller7212c232016-07-06 13:11:09 -07001590 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001591 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001592 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001593 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001594 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001595 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001596
1597 GRPC_POLLING_TRACE(
1598 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1599 "pollset->pi: %p)",
1600 (void *)pi_new, fd->fd, (void *)pollset,
1601 (void *)pollset->polling_island);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001602 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001603 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001604 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001605
1606 GRPC_POLLING_TRACE(
1607 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1608 "%p, fd->pi: %p",
1609 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001610 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001611 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1612 &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001613 GRPC_POLLING_TRACE(
1614 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1615 "%p, fd->pi: %p, pollset->pi: %p)",
1616 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
1617 (void *)pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001618 }
1619
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001620 /* At this point, pi_new is the polling island that both fd->polling_island
1621 and pollset->polling_island must be pointing to */
1622
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001623 if (fd->polling_island != pi_new) {
1624 PI_ADD_REF(pi_new, "fd");
1625 if (fd->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001626 PI_UNREF(exec_ctx, fd->polling_island, "fd");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001627 }
1628 fd->polling_island = pi_new;
1629 }
1630
1631 if (pollset->polling_island != pi_new) {
1632 PI_ADD_REF(pi_new, "ps");
1633 if (pollset->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001634 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001635 }
1636 pollset->polling_island = pi_new;
1637 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001638
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001639 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001640 gpr_mu_unlock(&pollset->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001641
1642 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001643}
1644
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001645/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001646 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001647 */
1648
1649static grpc_pollset_set *pollset_set_create(void) {
1650 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1651 memset(pollset_set, 0, sizeof(*pollset_set));
1652 gpr_mu_init(&pollset_set->mu);
1653 return pollset_set;
1654}
1655
1656static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1657 size_t i;
1658 gpr_mu_destroy(&pollset_set->mu);
1659 for (i = 0; i < pollset_set->fd_count; i++) {
1660 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1661 }
1662 gpr_free(pollset_set->pollsets);
1663 gpr_free(pollset_set->pollset_sets);
1664 gpr_free(pollset_set->fds);
1665 gpr_free(pollset_set);
1666}
1667
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001668static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1669 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1670 size_t i;
1671 gpr_mu_lock(&pollset_set->mu);
1672 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1673 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1674 pollset_set->fds = gpr_realloc(
1675 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1676 }
1677 GRPC_FD_REF(fd, "pollset_set");
1678 pollset_set->fds[pollset_set->fd_count++] = fd;
1679 for (i = 0; i < pollset_set->pollset_count; i++) {
1680 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1681 }
1682 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1683 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1684 }
1685 gpr_mu_unlock(&pollset_set->mu);
1686}
1687
1688static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1689 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1690 size_t i;
1691 gpr_mu_lock(&pollset_set->mu);
1692 for (i = 0; i < pollset_set->fd_count; i++) {
1693 if (pollset_set->fds[i] == fd) {
1694 pollset_set->fd_count--;
1695 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1696 pollset_set->fds[pollset_set->fd_count]);
1697 GRPC_FD_UNREF(fd, "pollset_set");
1698 break;
1699 }
1700 }
1701 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1702 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1703 }
1704 gpr_mu_unlock(&pollset_set->mu);
1705}
1706
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001707static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1708 grpc_pollset_set *pollset_set,
1709 grpc_pollset *pollset) {
1710 size_t i, j;
1711 gpr_mu_lock(&pollset_set->mu);
1712 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1713 pollset_set->pollset_capacity =
1714 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1715 pollset_set->pollsets =
1716 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1717 sizeof(*pollset_set->pollsets));
1718 }
1719 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1720 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1721 if (fd_is_orphaned(pollset_set->fds[i])) {
1722 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1723 } else {
1724 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1725 pollset_set->fds[j++] = pollset_set->fds[i];
1726 }
1727 }
1728 pollset_set->fd_count = j;
1729 gpr_mu_unlock(&pollset_set->mu);
1730}
1731
1732static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1733 grpc_pollset_set *pollset_set,
1734 grpc_pollset *pollset) {
1735 size_t i;
1736 gpr_mu_lock(&pollset_set->mu);
1737 for (i = 0; i < pollset_set->pollset_count; i++) {
1738 if (pollset_set->pollsets[i] == pollset) {
1739 pollset_set->pollset_count--;
1740 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1741 pollset_set->pollsets[pollset_set->pollset_count]);
1742 break;
1743 }
1744 }
1745 gpr_mu_unlock(&pollset_set->mu);
1746}
1747
1748static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1749 grpc_pollset_set *bag,
1750 grpc_pollset_set *item) {
1751 size_t i, j;
1752 gpr_mu_lock(&bag->mu);
1753 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1754 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1755 bag->pollset_sets =
1756 gpr_realloc(bag->pollset_sets,
1757 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1758 }
1759 bag->pollset_sets[bag->pollset_set_count++] = item;
1760 for (i = 0, j = 0; i < bag->fd_count; i++) {
1761 if (fd_is_orphaned(bag->fds[i])) {
1762 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1763 } else {
1764 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1765 bag->fds[j++] = bag->fds[i];
1766 }
1767 }
1768 bag->fd_count = j;
1769 gpr_mu_unlock(&bag->mu);
1770}
1771
1772static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1773 grpc_pollset_set *bag,
1774 grpc_pollset_set *item) {
1775 size_t i;
1776 gpr_mu_lock(&bag->mu);
1777 for (i = 0; i < bag->pollset_set_count; i++) {
1778 if (bag->pollset_sets[i] == item) {
1779 bag->pollset_set_count--;
1780 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1781 bag->pollset_sets[bag->pollset_set_count]);
1782 break;
1783 }
1784 }
1785 gpr_mu_unlock(&bag->mu);
1786}
1787
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001788/* Test helper functions
1789 * */
1790void *grpc_fd_get_polling_island(grpc_fd *fd) {
1791 polling_island *pi;
1792
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001793 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001794 pi = fd->polling_island;
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001795 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001796
1797 return pi;
1798}
1799
1800void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1801 polling_island *pi;
1802
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001803 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001804 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001805 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001806
1807 return pi;
1808}
1809
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001810bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001811 polling_island *p1 = p;
1812 polling_island *p2 = q;
1813
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001814 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1815 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001816 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001817 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001818
1819 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001820}
1821
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001822/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001823 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001824 */
1825
1826static void shutdown_engine(void) {
1827 fd_global_shutdown();
1828 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001829 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001830}
1831
1832static const grpc_event_engine_vtable vtable = {
1833 .pollset_size = sizeof(grpc_pollset),
1834
1835 .fd_create = fd_create,
1836 .fd_wrapped_fd = fd_wrapped_fd,
1837 .fd_orphan = fd_orphan,
1838 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001839 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001840 .fd_notify_on_read = fd_notify_on_read,
1841 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001842 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001843 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001844
1845 .pollset_init = pollset_init,
1846 .pollset_shutdown = pollset_shutdown,
1847 .pollset_reset = pollset_reset,
1848 .pollset_destroy = pollset_destroy,
1849 .pollset_work = pollset_work,
1850 .pollset_kick = pollset_kick,
1851 .pollset_add_fd = pollset_add_fd,
1852
1853 .pollset_set_create = pollset_set_create,
1854 .pollset_set_destroy = pollset_set_destroy,
1855 .pollset_set_add_pollset = pollset_set_add_pollset,
1856 .pollset_set_del_pollset = pollset_set_del_pollset,
1857 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1858 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1859 .pollset_set_add_fd = pollset_set_add_fd,
1860 .pollset_set_del_fd = pollset_set_del_fd,
1861
1862 .kick_poller = kick_poller,
1863
1864 .shutdown_engine = shutdown_engine,
1865};
1866
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001867/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1868 * Create a dummy epoll_fd to make sure epoll support is available */
1869static bool is_epoll_available() {
1870 int fd = epoll_create1(EPOLL_CLOEXEC);
1871 if (fd < 0) {
1872 gpr_log(
1873 GPR_ERROR,
1874 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1875 fd);
1876 return false;
1877 }
1878 close(fd);
1879 return true;
1880}
1881
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001882const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001883 /* If use of signals is disabled, we cannot use epoll engine*/
1884 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1885 return NULL;
1886 }
1887
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001888 if (!is_epoll_available()) {
1889 return NULL;
1890 }
1891
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001892 if (!is_grpc_wakeup_signal_initialized) {
1893 grpc_use_signal(SIGRTMIN + 2);
1894 }
1895
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001896 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001897
1898 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1899 return NULL;
1900 }
1901
1902 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1903 polling_island_global_init())) {
1904 return NULL;
1905 }
1906
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001907 return &vtable;
1908}
1909
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001910#else /* defined(GPR_LINUX_EPOLL) */
1911#if defined(GPR_POSIX_SOCKET)
1912#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001913/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1914 * NULL */
1915const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001916#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001917
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001918void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001919#endif /* !defined(GPR_LINUX_EPOLL) */