blob: 249bc987356f93dcd969d57a11f47ce3d96801d9 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070045#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070046#include <signal.h>
47#include <string.h>
48#include <sys/epoll.h>
49#include <sys/socket.h>
50#include <unistd.h>
51
52#include <grpc/support/alloc.h>
53#include <grpc/support/log.h>
54#include <grpc/support/string_util.h>
55#include <grpc/support/tls.h>
56#include <grpc/support/useful.h>
57
58#include "src/core/lib/iomgr/ev_posix.h"
59#include "src/core/lib/iomgr/iomgr_internal.h"
60#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070061#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070062#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Sree Kuchibhotla34217242016-06-29 00:19:07 -070065/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
66static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070067#define GRPC_POLLING_TRACE(fmt, ...) \
68 if (grpc_polling_trace) { \
69 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
70 }
71
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070072static int grpc_wakeup_signal = -1;
73static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070075/* Implements the function defined in grpc_posix.h. This function might be
76 * called before even calling grpc_init() to set either a different signal to
77 * use. If signum == -1, then the use of signals is disabled */
78void grpc_use_signal(int signum) {
79 grpc_wakeup_signal = signum;
80 is_grpc_wakeup_signal_initialized = true;
81
82 if (grpc_wakeup_signal < 0) {
83 gpr_log(GPR_INFO,
84 "Use of signals is disabled. Epoll engine will not be used");
85 } else {
86 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
87 grpc_wakeup_signal);
88 }
89}
90
91struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070092
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070094 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070096struct grpc_fd {
97 int fd;
98 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070099 bit 0 : 1=Active / 0=Orphaned
100 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700101 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700102 gpr_atm refst;
103
104 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700105
106 /* Indicates that the fd is shutdown and that any pending read/write closures
107 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700109
110 /* The fd is either closed or we relinquished control of it. In either cases,
111 this indicates that the 'fd' on this structure is no longer valid */
112 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700113
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700114 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700115 grpc_closure *read_closure;
116 grpc_closure *write_closure;
117
Craig Tillerf83f8ca2016-07-06 11:34:08 -0700118 /* The polling island to which this fd belongs to (protected by mu) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 struct polling_island *polling_island;
120
121 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122 grpc_closure *on_done_closure;
123
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700124 /* The pollset that last noticed that the fd is readable */
125 grpc_pollset *read_notifier_pollset;
126
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700127 grpc_iomgr_object iomgr_object;
128};
129
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700130/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700131// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700132#ifdef GRPC_FD_REF_COUNT_DEBUG
133static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
134static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
135 int line);
136#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
137#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
138#else
139static void fd_ref(grpc_fd *fd);
140static void fd_unref(grpc_fd *fd);
141#define GRPC_FD_REF(fd, reason) fd_ref(fd)
142#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
143#endif
144
145static void fd_global_init(void);
146static void fd_global_shutdown(void);
147
148#define CLOSURE_NOT_READY ((grpc_closure *)0)
149#define CLOSURE_READY ((grpc_closure *)1)
150
151/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700152 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700153 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700154
Craig Tiller15007612016-07-06 09:36:16 -0700155//#define GRPC_PI_REF_COUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700156#ifdef GRPC_PI_REF_COUNT_DEBUG
157
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700158#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700159#define PI_UNREF(exec_ctx, p, r) \
160 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700161
162#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
163
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700164#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700165#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700166
167#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
168
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700169typedef struct polling_island {
170 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700171 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
172 the refcount.
173 Once the ref count becomes zero, this structure is destroyed which means
174 we should ensure that there is never a scenario where a PI_ADD_REF() is
175 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700176 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700177
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700178 /* Pointer to the polling_island this merged into.
179 * merged_to value is only set once in polling_island's lifetime (and that too
180 * only if the island is merged with another island). Because of this, we can
181 * use gpr_atm type here so that we can do atomic access on this and reduce
182 * lock contention on 'mu' mutex.
183 *
184 * Note that if this field is not NULL (i.e not 0), all the remaining fields
185 * (except mu and ref_count) are invalid and must be ignored. */
186 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700187
Craig Tillerb39307d2016-06-30 15:39:13 -0700188 /* The workqueue associated with this polling island */
189 grpc_workqueue *workqueue;
190
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700191 /* The fd of the underlying epoll set */
192 int epoll_fd;
193
194 /* The file descriptors in the epoll set */
195 size_t fd_cnt;
196 size_t fd_capacity;
197 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700198} polling_island;
199
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700200/*******************************************************************************
201 * Pollset Declarations
202 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700203struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700204 /* Thread id of this worker */
205 pthread_t pt_id;
206
207 /* Used to prevent a worker from getting kicked multiple times */
208 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700209 struct grpc_pollset_worker *next;
210 struct grpc_pollset_worker *prev;
211};
212
213struct grpc_pollset {
214 gpr_mu mu;
215 grpc_pollset_worker root_worker;
216 bool kicked_without_pollers;
217
218 bool shutting_down; /* Is the pollset shutting down ? */
219 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
220 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
221
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700222 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700223 struct polling_island *polling_island;
224};
225
226/*******************************************************************************
227 * Pollset-set Declarations
228 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700229/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
230 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
231 * the current pollset_set would result in polling island merges. This would
232 * remove the need to maintain fd_count here. This will also significantly
233 * simplify the grpc_fd structure since we would no longer need to explicitly
234 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700235struct grpc_pollset_set {
236 gpr_mu mu;
237
238 size_t pollset_count;
239 size_t pollset_capacity;
240 grpc_pollset **pollsets;
241
242 size_t pollset_set_count;
243 size_t pollset_set_capacity;
244 struct grpc_pollset_set **pollset_sets;
245
246 size_t fd_count;
247 size_t fd_capacity;
248 grpc_fd **fds;
249};
250
251/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700252 * Common helpers
253 */
254
Craig Tillerf975f742016-07-01 14:56:27 -0700255static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700256 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700257 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700258 if (*composite == GRPC_ERROR_NONE) {
259 *composite = GRPC_ERROR_CREATE(desc);
260 }
261 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700262 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700263}
264
265/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700266 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700267 */
268
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700269/* The wakeup fd that is used to wake up all threads in a Polling island. This
270 is useful in the polling island merge operation where we need to wakeup all
271 the threads currently polling the smaller polling island (so that they can
272 start polling the new/merged polling island)
273
274 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
275 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
276static grpc_wakeup_fd polling_island_wakeup_fd;
277
Craig Tillerb39307d2016-06-30 15:39:13 -0700278/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700279static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700280
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700281#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700282/* Currently TSAN may incorrectly flag data races between epoll_ctl and
283 epoll_wait for any grpc_fd structs that are added to the epoll set via
284 epoll_ctl and are returned (within a very short window) via epoll_wait().
285
286 To work-around this race, we establish a happens-before relation between
287 the code just-before epoll_ctl() and the code after epoll_wait() by using
288 this atomic */
289gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700290#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700291
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700292#ifdef GRPC_PI_REF_COUNT_DEBUG
Craig Tillerb39307d2016-06-30 15:39:13 -0700293static void pi_add_ref(polling_island *pi);
294static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700295
Craig Tillerb39307d2016-06-30 15:39:13 -0700296static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
297 int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700298 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700299 pi_add_ref(pi);
300 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
301 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700302}
303
Craig Tillerb39307d2016-06-30 15:39:13 -0700304static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
305 char *reason, char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700306 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700307 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700308 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700309 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700310}
311#endif
312
Craig Tiller15007612016-07-06 09:36:16 -0700313static void pi_add_ref(polling_island *pi) {
314 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
315}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700316
Craig Tillerb39307d2016-06-30 15:39:13 -0700317static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tiller15007612016-07-06 09:36:16 -0700318 /* If ref count went to one, we're back to just the workqueue owning a ref.
319 Unref the workqueue to break the loop.
320
321 If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700322 Note that this deletion not be done under a lock. Once the ref count goes
323 to zero, we are guaranteed that no one else holds a reference to the
324 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700325
326 Also, if we are deleting the polling island and the merged_to field is
327 non-empty, we should remove a ref to the merged_to polling island
328 */
Craig Tiller15007612016-07-06 09:36:16 -0700329 switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
330 case 2: /* last external ref: the only one now owned is by the workqueue */
331 GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
332 break;
333 case 1: {
334 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
335 polling_island_delete(exec_ctx, pi);
336 if (next != NULL) {
337 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
338 }
339 break;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700340 }
Craig Tiller15007612016-07-06 09:36:16 -0700341 case 0:
342 GPR_UNREACHABLE_CODE(return );
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700343 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700344}
345
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700346/* The caller is expected to hold pi->mu lock before calling this function */
347static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700348 size_t fd_count, bool add_fd_refs,
349 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700350 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700351 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700352 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700353 char *err_msg;
354 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700355
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700356#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700357 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700358 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700359#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700360
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700361 for (i = 0; i < fd_count; i++) {
362 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
363 ev.data.ptr = fds[i];
364 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700365
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700366 if (err < 0) {
367 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700368 gpr_asprintf(
369 &err_msg,
370 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
371 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
372 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
373 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700374 }
375
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700376 continue;
377 }
378
379 if (pi->fd_cnt == pi->fd_capacity) {
380 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
381 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
382 }
383
384 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700385 if (add_fd_refs) {
386 GRPC_FD_REF(fds[i], "polling_island");
387 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700388 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700389}
390
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700391/* The caller is expected to hold pi->mu before calling this */
392static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700393 grpc_wakeup_fd *wakeup_fd,
394 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700395 struct epoll_event ev;
396 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700397 char *err_msg;
398 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700399
400 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
401 ev.data.ptr = wakeup_fd;
402 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
403 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700404 if (err < 0 && errno != EEXIST) {
405 gpr_asprintf(&err_msg,
406 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
407 "error: %d (%s)",
408 pi->epoll_fd,
409 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
410 strerror(errno));
411 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
412 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700413 }
414}
415
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700416/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700417static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700418 bool remove_fd_refs,
419 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700420 int err;
421 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700422 char *err_msg;
423 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700424
425 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700426 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700427 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700428 gpr_asprintf(&err_msg,
429 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
430 "error: %d (%s)",
431 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
432 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
433 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700434 }
435
436 if (remove_fd_refs) {
437 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700438 }
439 }
440
441 pi->fd_cnt = 0;
442}
443
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700444/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700445static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700446 bool is_fd_closed,
447 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700448 int err;
449 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700450 char *err_msg;
451 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700452
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700453 /* If fd is already closed, then it would have been automatically been removed
454 from the epoll set */
455 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700456 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
457 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700458 gpr_asprintf(
459 &err_msg,
460 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
461 pi->epoll_fd, fd->fd, errno, strerror(errno));
462 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
463 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700464 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700465 }
466
467 for (i = 0; i < pi->fd_cnt; i++) {
468 if (pi->fds[i] == fd) {
469 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700470 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700471 break;
472 }
473 }
474}
475
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700476/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700477static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
478 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700479 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700480 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700481 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700482
Craig Tillerb39307d2016-06-30 15:39:13 -0700483 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700484
Craig Tillerb39307d2016-06-30 15:39:13 -0700485 pi = gpr_malloc(sizeof(*pi));
486 gpr_mu_init(&pi->mu);
487 pi->fd_cnt = 0;
488 pi->fd_capacity = 0;
489 pi->fds = NULL;
490 pi->epoll_fd = -1;
491 pi->workqueue = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700492
Craig Tiller15007612016-07-06 09:36:16 -0700493 gpr_atm_rel_store(&pi->ref_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700494 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700495
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700496 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700497
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700498 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700499 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
500 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700501 }
502
Craig Tillerb39307d2016-06-30 15:39:13 -0700503 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
504
505 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700506 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700507 }
508
Craig Tillerf975f742016-07-01 14:56:27 -0700509 if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
510 err_desc) &&
511 *error == GRPC_ERROR_NONE) {
512 polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
513 error);
514 GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
515 pi->workqueue->wakeup_read_fd->polling_island = pi;
Craig Tiller15007612016-07-06 09:36:16 -0700516 PI_ADD_REF(pi, "fd");
Craig Tillerf975f742016-07-01 14:56:27 -0700517 }
Craig Tillerb39307d2016-06-30 15:39:13 -0700518
519done:
520 if (*error != GRPC_ERROR_NONE) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700521 if (pi->workqueue != NULL) {
522 GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
523 }
Craig Tiller0a06cd72016-07-14 13:21:24 -0700524 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700525 pi = NULL;
526 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700527 return pi;
528}
529
Craig Tillerb39307d2016-06-30 15:39:13 -0700530static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700531 GPR_ASSERT(pi->fd_cnt == 0);
532
Craig Tiller0a06cd72016-07-14 13:21:24 -0700533 if (pi->epoll_fd >= 0) {
534 close(pi->epoll_fd);
535 }
Craig Tillerb39307d2016-06-30 15:39:13 -0700536 gpr_mu_destroy(&pi->mu);
537 gpr_free(pi->fds);
538 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700539}
540
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700541/* Attempts to gets the last polling island in the linked list (liked by the
542 * 'merged_to' field). Since this does not lock the polling island, there are no
543 * guarantees that the island returned is the last island */
544static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
545 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
546 while (next != NULL) {
547 pi = next;
548 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
549 }
550
551 return pi;
552}
553
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700554/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700555 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700556 returned polling island's mu.
557 Usage: To lock/unlock polling island "pi", do the following:
558 polling_island *pi_latest = polling_island_lock(pi);
559 ...
560 ... critical section ..
561 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700562 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
563static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700564 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700565
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700566 while (true) {
567 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
568 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700569 /* Looks like 'pi' is the last node in the linked list but unless we check
570 this by holding the pi->mu lock, we cannot be sure (i.e without the
571 pi->mu lock, we don't prevent island merges).
572 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700573 gpr_mu_lock(&pi->mu);
574 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
575 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700576 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700577 break;
578 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700579
580 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
581 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700582 gpr_mu_unlock(&pi->mu);
583 }
584
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700585 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700586 }
587
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700588 return pi;
589}
590
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700591/* Gets the lock on the *latest* polling islands in the linked lists pointed by
592 *p and *q (and also updates *p and *q to point to the latest polling islands)
593
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700594 This function is needed because calling the following block of code to obtain
595 locks on polling islands (*p and *q) is prone to deadlocks.
596 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700597 polling_island_lock(*p, true);
598 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700599 }
600
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700601 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700602 polling_island *p1;
603 polling_island *p2;
604 ..
605 polling_island_lock_pair(&p1, &p2);
606 ..
607 .. Critical section with both p1 and p2 locked
608 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700609 // Release locks: Always call polling_island_unlock_pair() to release locks
610 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700611*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700612static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700613 polling_island *pi_1 = *p;
614 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700615 polling_island *next_1 = NULL;
616 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700617
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700618 /* The algorithm is simple:
619 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
620 keep updating pi_1 and pi_2)
621 - Then obtain locks on the islands by following a lock order rule of
622 locking polling_island with lower address first
623 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
624 pointing to the same island. If that is the case, we can just call
625 polling_island_lock()
626 - After obtaining both the locks, double check that the polling islands
627 are still the last polling islands in their respective linked lists
628 (this is because there might have been polling island merges before
629 we got the lock)
630 - If the polling islands are the last islands, we are done. If not,
631 release the locks and continue the process from the first step */
632 while (true) {
633 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
634 while (next_1 != NULL) {
635 pi_1 = next_1;
636 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700637 }
638
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700639 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
640 while (next_2 != NULL) {
641 pi_2 = next_2;
642 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
643 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700644
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700645 if (pi_1 == pi_2) {
646 pi_1 = pi_2 = polling_island_lock(pi_1);
647 break;
648 }
649
650 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700651 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700652 gpr_mu_lock(&pi_2->mu);
653 } else {
654 gpr_mu_lock(&pi_2->mu);
655 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700656 }
657
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700658 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
659 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
660 if (next_1 == NULL && next_2 == NULL) {
661 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700662 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700663
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700664 gpr_mu_unlock(&pi_1->mu);
665 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700666 }
667
668 *p = pi_1;
669 *q = pi_2;
670}
671
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700672static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
673 if (p == q) {
674 gpr_mu_unlock(&p->mu);
675 } else {
676 gpr_mu_unlock(&p->mu);
677 gpr_mu_unlock(&q->mu);
678 }
679}
680
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700681static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700682 polling_island *q,
683 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700684 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700685 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700686
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700687 if (p != q) {
688 /* Make sure that p points to the polling island with fewer fds than q */
689 if (p->fd_cnt > q->fd_cnt) {
690 GPR_SWAP(polling_island *, p, q);
691 }
692
693 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
694 Note that the refcounts on the fds being moved will not change here.
695 This is why the last param in the following two functions is 'false') */
696 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
697 polling_island_remove_all_fds_locked(p, false, error);
698
699 /* Wakeup all the pollers (if any) on p so that they pickup this change */
700 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
701
702 /* Add the 'merged_to' link from p --> q */
703 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
704 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700705 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700706 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700707
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700708 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700709
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700710 /* Return the merged polling island (Note that no merge would have happened
711 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700712 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700713}
714
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700715static grpc_error *polling_island_global_init() {
716 grpc_error *error = GRPC_ERROR_NONE;
717
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700718 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
719 if (error == GRPC_ERROR_NONE) {
720 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
721 }
722
723 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700724}
725
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700726static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700727 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700728}
729
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700730/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700731 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700732 */
733
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700734/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700735 * but instead so that implementations with multiple threads in (for example)
736 * epoll_wait deal with the race between pollset removal and incoming poll
737 * notifications.
738 *
739 * The problem is that the poller ultimately holds a reference to this
740 * object, so it is very difficult to know when is safe to free it, at least
741 * without some expensive synchronization.
742 *
743 * If we keep the object freelisted, in the worst case losing this race just
744 * becomes a spurious read notification on a reused fd.
745 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700746
747/* The alarm system needs to be able to wakeup 'some poller' sometimes
748 * (specifically when a new alarm needs to be triggered earlier than the next
749 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
750 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700751
752/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
753 * sure to wake up one polling thread (which can wake up other threads if
754 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700755grpc_wakeup_fd grpc_global_wakeup_fd;
756
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700757static grpc_fd *fd_freelist = NULL;
758static gpr_mu fd_freelist_mu;
759
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700760#ifdef GRPC_FD_REF_COUNT_DEBUG
761#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
762#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
763static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
764 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700765 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
766 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700767 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
768#else
769#define REF_BY(fd, n, reason) ref_by(fd, n)
770#define UNREF_BY(fd, n, reason) unref_by(fd, n)
771static void ref_by(grpc_fd *fd, int n) {
772#endif
773 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
774}
775
776#ifdef GRPC_FD_REF_COUNT_DEBUG
777static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
778 int line) {
779 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700780 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
781 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700782 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
783#else
784static void unref_by(grpc_fd *fd, int n) {
785 gpr_atm old;
786#endif
787 old = gpr_atm_full_fetch_add(&fd->refst, -n);
788 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700789 /* Add the fd to the freelist */
790 gpr_mu_lock(&fd_freelist_mu);
791 fd->freelist_next = fd_freelist;
792 fd_freelist = fd;
793 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700794
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700795 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700796 } else {
797 GPR_ASSERT(old > n);
798 }
799}
800
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700801/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700802#ifdef GRPC_FD_REF_COUNT_DEBUG
803static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
804 int line) {
805 ref_by(fd, 2, reason, file, line);
806}
807
808static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
809 int line) {
810 unref_by(fd, 2, reason, file, line);
811}
812#else
813static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700814static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
815#endif
816
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700817static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
818
819static void fd_global_shutdown(void) {
820 gpr_mu_lock(&fd_freelist_mu);
821 gpr_mu_unlock(&fd_freelist_mu);
822 while (fd_freelist != NULL) {
823 grpc_fd *fd = fd_freelist;
824 fd_freelist = fd_freelist->freelist_next;
825 gpr_mu_destroy(&fd->mu);
826 gpr_free(fd);
827 }
828 gpr_mu_destroy(&fd_freelist_mu);
829}
830
831static grpc_fd *fd_create(int fd, const char *name) {
832 grpc_fd *new_fd = NULL;
833
834 gpr_mu_lock(&fd_freelist_mu);
835 if (fd_freelist != NULL) {
836 new_fd = fd_freelist;
837 fd_freelist = fd_freelist->freelist_next;
838 }
839 gpr_mu_unlock(&fd_freelist_mu);
840
841 if (new_fd == NULL) {
842 new_fd = gpr_malloc(sizeof(grpc_fd));
843 gpr_mu_init(&new_fd->mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700844 }
845
846 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
847 newly created fd (or an fd we got from the freelist), no one else would be
848 holding a lock to it anyway. */
849 gpr_mu_lock(&new_fd->mu);
850
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700851 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700852 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700853 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700854 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700855 new_fd->read_closure = CLOSURE_NOT_READY;
856 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700857 new_fd->polling_island = NULL;
858 new_fd->freelist_next = NULL;
859 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700860 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700861
862 gpr_mu_unlock(&new_fd->mu);
863
864 char *fd_name;
865 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
866 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700867#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700868 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700869#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700870 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700871 return new_fd;
872}
873
874static bool fd_is_orphaned(grpc_fd *fd) {
875 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
876}
877
878static int fd_wrapped_fd(grpc_fd *fd) {
879 int ret_fd = -1;
880 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700881 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700882 ret_fd = fd->fd;
883 }
884 gpr_mu_unlock(&fd->mu);
885
886 return ret_fd;
887}
888
889static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
890 grpc_closure *on_done, int *release_fd,
891 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700892 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700893 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -0700894 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700895
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700896 gpr_mu_lock(&fd->mu);
897 fd->on_done_closure = on_done;
898
899 /* If release_fd is not NULL, we should be relinquishing control of the file
900 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700901 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700902 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700903 } else {
904 close(fd->fd);
905 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700906 }
907
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700908 fd->orphaned = true;
909
910 /* Remove the active status but keep referenced. We want this grpc_fd struct
911 to be alive (and not added to freelist) until the end of this function */
912 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700913
914 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700915 - Get a lock on the latest polling island (i.e the last island in the
916 linked list pointed by fd->polling_island). This is the island that
917 would actually contain the fd
918 - Remove the fd from the latest polling island
919 - Unlock the latest polling island
920 - Set fd->polling_island to NULL (but remove the ref on the polling island
921 before doing this.) */
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700922 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700923 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700924 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700925 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700926
Craig Tiller15007612016-07-06 09:36:16 -0700927 unref_pi = fd->polling_island;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700928 fd->polling_island = NULL;
929 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700930
Yuchen Zenga0399f22016-08-04 17:52:53 -0700931 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
932 NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700933
934 gpr_mu_unlock(&fd->mu);
935 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -0700936 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700937 /* Unref stale polling island here, outside the fd lock above.
938 The polling island owns a workqueue which owns an fd, and unreffing
939 inside the lock can cause an eventual lock loop that makes TSAN very
940 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -0700941 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
942 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700943 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -0700944 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700945}
946
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700947static grpc_error *fd_shutdown_error(bool shutdown) {
948 if (!shutdown) {
949 return GRPC_ERROR_NONE;
950 } else {
951 return GRPC_ERROR_CREATE("FD shutdown");
952 }
953}
954
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700955static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
956 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700957 if (fd->shutdown) {
958 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
959 NULL);
960 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700961 /* not ready ==> switch to a waiting state by setting the closure */
962 *st = closure;
963 } else if (*st == CLOSURE_READY) {
964 /* already ready ==> queue the closure to run immediately */
965 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700966 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
967 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700968 } else {
969 /* upcallptr was set to a different closure. This is an error! */
970 gpr_log(GPR_ERROR,
971 "User called a notify_on function with a previous callback still "
972 "pending");
973 abort();
974 }
975}
976
977/* returns 1 if state becomes not ready */
978static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
979 grpc_closure **st) {
980 if (*st == CLOSURE_READY) {
981 /* duplicate ready ==> ignore */
982 return 0;
983 } else if (*st == CLOSURE_NOT_READY) {
984 /* not ready, and not waiting ==> flag ready */
985 *st = CLOSURE_READY;
986 return 0;
987 } else {
988 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700989 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700990 *st = CLOSURE_NOT_READY;
991 return 1;
992 }
993}
994
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700995static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
996 grpc_fd *fd) {
997 grpc_pollset *notifier = NULL;
998
999 gpr_mu_lock(&fd->mu);
1000 notifier = fd->read_notifier_pollset;
1001 gpr_mu_unlock(&fd->mu);
1002
1003 return notifier;
1004}
1005
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001006static bool fd_is_shutdown(grpc_fd *fd) {
1007 gpr_mu_lock(&fd->mu);
1008 const bool r = fd->shutdown;
1009 gpr_mu_unlock(&fd->mu);
1010 return r;
1011}
1012
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001013/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001014static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1015 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001016 /* Do the actual shutdown only once */
1017 if (!fd->shutdown) {
1018 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001019
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001020 shutdown(fd->fd, SHUT_RDWR);
1021 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1022 at this point, the closures would be called with 'success = false' */
1023 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1024 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1025 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001026 gpr_mu_unlock(&fd->mu);
1027}
1028
1029static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1030 grpc_closure *closure) {
1031 gpr_mu_lock(&fd->mu);
1032 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1033 gpr_mu_unlock(&fd->mu);
1034}
1035
1036static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1037 grpc_closure *closure) {
1038 gpr_mu_lock(&fd->mu);
1039 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1040 gpr_mu_unlock(&fd->mu);
1041}
1042
Craig Tillerd6ba6192016-06-30 15:42:41 -07001043static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001044 gpr_mu_lock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001045 grpc_workqueue *workqueue = NULL;
1046 if (fd->polling_island != NULL) {
1047 workqueue =
1048 GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
1049 }
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001050 gpr_mu_unlock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001051 return workqueue;
1052}
Craig Tiller70bd4832016-06-30 14:20:46 -07001053
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001054/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001055 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001056 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001057GPR_TLS_DECL(g_current_thread_pollset);
1058GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001059static __thread bool g_initialized_sigmask;
1060static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001061
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001062static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001063#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001064 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001065#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001066}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001067
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001068static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001069
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001070/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001071static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001072 gpr_tls_init(&g_current_thread_pollset);
1073 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001074 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001075 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001076}
1077
1078static void pollset_global_shutdown(void) {
1079 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001080 gpr_tls_destroy(&g_current_thread_pollset);
1081 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001082}
1083
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001084static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1085 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001086
1087 /* Kick the worker only if it was not already kicked */
1088 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1089 GRPC_POLLING_TRACE(
1090 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1091 (void *)worker, worker->pt_id);
1092 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1093 if (err_num != 0) {
1094 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1095 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001096 }
1097 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001098}
1099
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001100/* Return 1 if the pollset has active threads in pollset_work (pollset must
1101 * be locked) */
1102static int pollset_has_workers(grpc_pollset *p) {
1103 return p->root_worker.next != &p->root_worker;
1104}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001105
1106static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1107 worker->prev->next = worker->next;
1108 worker->next->prev = worker->prev;
1109}
1110
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001111static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1112 if (pollset_has_workers(p)) {
1113 grpc_pollset_worker *w = p->root_worker.next;
1114 remove_worker(p, w);
1115 return w;
1116 } else {
1117 return NULL;
1118 }
1119}
1120
1121static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1122 worker->next = &p->root_worker;
1123 worker->prev = worker->next->prev;
1124 worker->prev->next = worker->next->prev = worker;
1125}
1126
1127static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1128 worker->prev = &p->root_worker;
1129 worker->next = worker->prev->next;
1130 worker->prev->next = worker->next->prev = worker;
1131}
1132
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001133/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001134static grpc_error *pollset_kick(grpc_pollset *p,
1135 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001136 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001137 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001138 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001139 grpc_pollset_worker *worker = specific_worker;
1140 if (worker != NULL) {
1141 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001142 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001143 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001144 for (worker = p->root_worker.next; worker != &p->root_worker;
1145 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001146 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001147 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001148 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001149 }
Craig Tillera218a062016-06-26 09:58:37 -07001150 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001151 } else {
1152 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001153 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001154 } else {
1155 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001156 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001157 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001158 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001159 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001160 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1161 /* Since worker == NULL, it means that we can kick "any" worker on this
1162 pollset 'p'. If 'p' happens to be the same pollset this thread is
1163 currently polling (i.e in pollset_work() function), then there is no need
1164 to kick any other worker since the current thread can just absorb the
1165 kick. This is the reason why we enter this case only when
1166 g_current_thread_pollset is != p */
1167
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001168 GPR_TIMER_MARK("kick_anonymous", 0);
1169 worker = pop_front_worker(p);
1170 if (worker != NULL) {
1171 GPR_TIMER_MARK("finally_kick", 0);
1172 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001173 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001174 } else {
1175 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001176 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001177 }
1178 }
1179
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001180 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001181 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1182 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001183}
1184
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001185static grpc_error *kick_poller(void) {
1186 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1187}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001188
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001189static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1190 gpr_mu_init(&pollset->mu);
1191 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001192
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001193 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001194 pollset->kicked_without_pollers = false;
1195
1196 pollset->shutting_down = false;
1197 pollset->finish_shutdown_called = false;
1198 pollset->shutdown_done = NULL;
1199
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001200 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001201}
1202
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001203/* Convert a timespec to milliseconds:
1204 - Very small or negative poll times are clamped to zero to do a non-blocking
1205 poll (which becomes spin polling)
1206 - Other small values are rounded up to one millisecond
1207 - Longer than a millisecond polls are rounded up to the next nearest
1208 millisecond to avoid spinning
1209 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001210static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1211 gpr_timespec now) {
1212 gpr_timespec timeout;
1213 static const int64_t max_spin_polling_us = 10;
1214 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1215 return -1;
1216 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001217
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001218 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1219 max_spin_polling_us,
1220 GPR_TIMESPAN))) <= 0) {
1221 return 0;
1222 }
1223 timeout = gpr_time_sub(deadline, now);
1224 return gpr_time_to_millis(gpr_time_add(
1225 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1226}
1227
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001228static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1229 grpc_pollset *notifier) {
1230 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001231 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001232 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1233 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001234 gpr_mu_unlock(&fd->mu);
1235}
1236
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001237static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001238 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1239 gpr_mu_lock(&fd->mu);
1240 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1241 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001242}
1243
Craig Tillerb39307d2016-06-30 15:39:13 -07001244static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1245 grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001246 if (ps->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001247 PI_UNREF(exec_ctx, ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001248 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001249 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001250}
1251
1252static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1253 grpc_pollset *pollset) {
1254 /* The pollset cannot have any workers if we are at this stage */
1255 GPR_ASSERT(!pollset_has_workers(pollset));
1256
1257 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001258
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001259 /* Release the ref and set pollset->polling_island to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001260 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001261 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001262}
1263
1264/* pollset->mu lock must be held by the caller before calling this */
1265static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1266 grpc_closure *closure) {
1267 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1268 GPR_ASSERT(!pollset->shutting_down);
1269 pollset->shutting_down = true;
1270 pollset->shutdown_done = closure;
1271 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1272
1273 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1274 because it would release the underlying polling island. In such a case, we
1275 let the last worker call finish_shutdown_locked() from pollset_work() */
1276 if (!pollset_has_workers(pollset)) {
1277 GPR_ASSERT(!pollset->finish_shutdown_called);
1278 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1279 finish_shutdown_locked(exec_ctx, pollset);
1280 }
1281 GPR_TIMER_END("pollset_shutdown", 0);
1282}
1283
1284/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1285 * than destroying the mutexes, there is nothing special that needs to be done
1286 * here */
1287static void pollset_destroy(grpc_pollset *pollset) {
1288 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001289 gpr_mu_destroy(&pollset->mu);
1290}
1291
Craig Tiller2b49ea92016-07-01 13:21:27 -07001292static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001293 GPR_ASSERT(pollset->shutting_down);
1294 GPR_ASSERT(!pollset_has_workers(pollset));
1295 pollset->shutting_down = false;
1296 pollset->finish_shutdown_called = false;
1297 pollset->kicked_without_pollers = false;
1298 pollset->shutdown_done = NULL;
Craig Tillerb39307d2016-06-30 15:39:13 -07001299 GPR_ASSERT(pollset->polling_island == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001300}
1301
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001302#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001303/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1304static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001305 grpc_pollset *pollset,
1306 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001307 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001308 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001309 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001310 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001311 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001312 char *err_msg;
1313 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001314 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1315
1316 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001317 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001318
1319 Since epoll_fd is immutable, we can read it without obtaining the polling
1320 island lock. There is however a possibility that the polling island (from
1321 which we got the epoll_fd) got merged with another island while we are
1322 in this function. This is still okay because in such a case, we will wakeup
1323 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001324 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001325
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001326 if (pollset->polling_island == NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001327 pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001328 if (pollset->polling_island == NULL) {
1329 GPR_TIMER_END("pollset_work_and_unlock", 0);
1330 return; /* Fatal error. We cannot continue */
1331 }
1332
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001333 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001334 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1335 (void *)pollset, (void *)pollset->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001336 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001337
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001338 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001339 epoll_fd = pi->epoll_fd;
1340
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001341 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001342 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001343 if (pollset->polling_island != pi) {
1344 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1345 polling island to be deleted */
1346 PI_ADD_REF(pi, "ps");
Craig Tillerb39307d2016-06-30 15:39:13 -07001347 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001348 pollset->polling_island = pi;
1349 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001350
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001351 /* Add an extra ref so that the island does not get destroyed (which means
1352 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1353 epoll_fd */
1354 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001355 gpr_mu_unlock(&pollset->mu);
1356
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001357 do {
Vijay Paicef54012016-08-28 23:05:31 -07001358 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001359 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1360 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001361 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001362 if (ep_rv < 0) {
1363 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001364 gpr_asprintf(&err_msg,
1365 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1366 epoll_fd, errno, strerror(errno));
1367 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001368 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001369 /* We were interrupted. Save an interation by doing a zero timeout
1370 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001371 GRPC_POLLING_TRACE(
1372 "pollset_work: pollset: %p, worker: %p received kick",
1373 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001374 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001375 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001376 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001377
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001378#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001379 /* See the definition of g_poll_sync for more details */
1380 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001381#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001382
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001383 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001384 void *data_ptr = ep_ev[i].data.ptr;
1385 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001386 append_error(error,
1387 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1388 err_desc);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001389 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001390 GRPC_POLLING_TRACE(
1391 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1392 "%d) got merged",
1393 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001394 /* This means that our polling island is merged with a different
1395 island. We do not have to do anything here since the subsequent call
1396 to the function pollset_work_and_unlock() will pick up the correct
1397 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001398 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001399 grpc_fd *fd = data_ptr;
1400 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1401 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1402 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001403 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001404 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001405 }
1406 if (write_ev || cancel) {
1407 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001408 }
1409 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001410 }
1411 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001412
1413 GPR_ASSERT(pi != NULL);
1414
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001415 /* Before leaving, release the extra ref we added to the polling island. It
1416 is important to use "pi" here (i.e our old copy of pollset->polling_island
1417 that we got before releasing the polling island lock). This is because
1418 pollset->polling_island pointer might get udpated in other parts of the
1419 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001420 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001421
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001422 GPR_TIMER_END("pollset_work_and_unlock", 0);
1423}
1424
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001425/* pollset->mu lock must be held by the caller before calling this.
1426 The function pollset_work() may temporarily release the lock (pollset->mu)
1427 during the course of its execution but it will always re-acquire the lock and
1428 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001429static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1430 grpc_pollset_worker **worker_hdl,
1431 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001432 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001433 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001434 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1435
1436 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001437
1438 grpc_pollset_worker worker;
1439 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001440 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001441 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001442
1443 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001444
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001445 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1446 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001447
1448 if (pollset->kicked_without_pollers) {
1449 /* If the pollset was kicked without pollers, pretend that the current
1450 worker got the kick and skip polling. A kick indicates that there is some
1451 work that needs attention like an event on the completion queue or an
1452 alarm */
1453 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1454 pollset->kicked_without_pollers = 0;
1455 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001456 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001457 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1458 worker that there is some pending work that needs immediate attention
1459 (like an event on the completion queue, or a polling island merge that
1460 results in a new epoll-fd to wait on) and that the worker should not
1461 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001462
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001463 A worker can be kicked anytime from the point it is added to the pollset
1464 via push_front_worker() (or push_back_worker()) to the point it is
1465 removed via remove_worker().
1466 If the worker is kicked before/during it calls epoll_pwait(), it should
1467 immediately exit from epoll_wait(). If the worker is kicked after it
1468 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001469
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001470 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001471 times *except* when it is in epoll_pwait(). This way, the worker never
1472 misses acting on a kick */
1473
Craig Tiller19196992016-06-27 18:45:56 -07001474 if (!g_initialized_sigmask) {
1475 sigemptyset(&new_mask);
1476 sigaddset(&new_mask, grpc_wakeup_signal);
1477 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1478 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1479 g_initialized_sigmask = true;
1480 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1481 This is the mask used at all times *except during
1482 epoll_wait()*"
1483 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001484 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001485
Craig Tiller19196992016-06-27 18:45:56 -07001486 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001487 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001488 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001489
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001490 push_front_worker(pollset, &worker); /* Add worker to pollset */
1491
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001492 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1493 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001494 grpc_exec_ctx_flush(exec_ctx);
1495
1496 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001497
1498 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1499 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001500 remove_worker(pollset, &worker);
1501 }
1502
1503 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1504 false at this point) and the pollset is shutting down, we may have to
1505 finish the shutdown process by calling finish_shutdown_locked().
1506 See pollset_shutdown() for more details.
1507
1508 Note: Continuing to access pollset here is safe; it is the caller's
1509 responsibility to not destroy a pollset when it has outstanding calls to
1510 pollset_work() */
1511 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1512 !pollset->finish_shutdown_called) {
1513 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1514 finish_shutdown_locked(exec_ctx, pollset);
1515
1516 gpr_mu_unlock(&pollset->mu);
1517 grpc_exec_ctx_flush(exec_ctx);
1518 gpr_mu_lock(&pollset->mu);
1519 }
1520
1521 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001522
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001523 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1524 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001525
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001526 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001527
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001528 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1529 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001530}
1531
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001532static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1533 grpc_fd *fd) {
Craig Tiller57726ca2016-09-12 11:59:45 -07001534 GPR_TIMER_BEGIN("pollset_add_fd", 0);
1535
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001536 grpc_error *error = GRPC_ERROR_NONE;
1537
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001538 gpr_mu_lock(&pollset->mu);
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001539 gpr_mu_lock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001540
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001541 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001542
Craig Tiller7212c232016-07-06 13:11:09 -07001543retry:
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001544 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1545 * equal, do nothing.
1546 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1547 * a new polling island (with a refcount of 2) and make the polling_island
1548 * fields in both fd and pollset to point to the new island
1549 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1550 * the NULL polling_island field to point to the non-NULL polling_island
1551 * field (ensure that the refcount on the polling island is incremented by
1552 * 1 to account for the newly added reference)
1553 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1554 * and different, merge both the polling islands and update the
1555 * polling_island fields in both fd and pollset to point to the merged
1556 * polling island.
1557 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001558
Craig Tiller42ac6db2016-07-06 17:13:56 -07001559 if (fd->orphaned) {
1560 gpr_mu_unlock(&fd->mu);
1561 gpr_mu_unlock(&pollset->mu);
1562 /* early out */
1563 return;
1564 }
1565
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001566 if (fd->polling_island == pollset->polling_island) {
1567 pi_new = fd->polling_island;
1568 if (pi_new == NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001569 /* Unlock before creating a new polling island: the polling island will
1570 create a workqueue which creates a file descriptor, and holding an fd
1571 lock here can eventually cause a loop to appear to TSAN (making it
1572 unhappy). We don't think it's a real loop (there's an epoch point where
1573 that loop possibility disappears), but the advantages of keeping TSAN
1574 happy outweigh any performance advantage we might have by keeping the
1575 lock held. */
Craig Tiller7212c232016-07-06 13:11:09 -07001576 gpr_mu_unlock(&fd->mu);
Craig Tillerb39307d2016-06-30 15:39:13 -07001577 pi_new = polling_island_create(exec_ctx, fd, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001578 gpr_mu_lock(&fd->mu);
Craig Tiller0a06cd72016-07-14 13:21:24 -07001579 /* Need to reverify any assumptions made between the initial lock and
1580 getting to this branch: if they've changed, we need to throw away our
1581 work and figure things out again. */
Craig Tiller7212c232016-07-06 13:11:09 -07001582 if (fd->polling_island != NULL) {
Craig Tiller27da6422016-07-06 13:14:46 -07001583 GRPC_POLLING_TRACE(
1584 "pollset_add_fd: Raced creating new polling island. pi_new: %p "
1585 "(fd: %d, pollset: %p)",
1586 (void *)pi_new, fd->fd, (void *)pollset);
1587 PI_ADD_REF(pi_new, "dance_of_destruction");
1588 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
Craig Tiller7212c232016-07-06 13:11:09 -07001589 goto retry;
Craig Tiller27da6422016-07-06 13:14:46 -07001590 } else {
1591 GRPC_POLLING_TRACE(
1592 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
1593 "pollset: %p)",
1594 (void *)pi_new, fd->fd, (void *)pollset);
Craig Tiller7212c232016-07-06 13:11:09 -07001595 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001596 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001597 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001598 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001599 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001600 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001601
1602 GRPC_POLLING_TRACE(
1603 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1604 "pollset->pi: %p)",
1605 (void *)pi_new, fd->fd, (void *)pollset,
1606 (void *)pollset->polling_island);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001607 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001608 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001609 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001610
1611 GRPC_POLLING_TRACE(
1612 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1613 "%p, fd->pi: %p",
1614 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001615 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001616 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1617 &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001618 GRPC_POLLING_TRACE(
1619 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1620 "%p, fd->pi: %p, pollset->pi: %p)",
1621 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
1622 (void *)pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001623 }
1624
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001625 /* At this point, pi_new is the polling island that both fd->polling_island
1626 and pollset->polling_island must be pointing to */
1627
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001628 if (fd->polling_island != pi_new) {
1629 PI_ADD_REF(pi_new, "fd");
1630 if (fd->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001631 PI_UNREF(exec_ctx, fd->polling_island, "fd");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001632 }
1633 fd->polling_island = pi_new;
1634 }
1635
1636 if (pollset->polling_island != pi_new) {
1637 PI_ADD_REF(pi_new, "ps");
1638 if (pollset->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001639 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001640 }
1641 pollset->polling_island = pi_new;
1642 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001643
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001644 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001645 gpr_mu_unlock(&pollset->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001646
1647 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Craig Tiller57726ca2016-09-12 11:59:45 -07001648
1649 GPR_TIMER_END("pollset_add_fd", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001650}
1651
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001652/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001653 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001654 */
1655
1656static grpc_pollset_set *pollset_set_create(void) {
1657 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1658 memset(pollset_set, 0, sizeof(*pollset_set));
1659 gpr_mu_init(&pollset_set->mu);
1660 return pollset_set;
1661}
1662
1663static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1664 size_t i;
1665 gpr_mu_destroy(&pollset_set->mu);
1666 for (i = 0; i < pollset_set->fd_count; i++) {
1667 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1668 }
1669 gpr_free(pollset_set->pollsets);
1670 gpr_free(pollset_set->pollset_sets);
1671 gpr_free(pollset_set->fds);
1672 gpr_free(pollset_set);
1673}
1674
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001675static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1676 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1677 size_t i;
1678 gpr_mu_lock(&pollset_set->mu);
1679 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1680 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1681 pollset_set->fds = gpr_realloc(
1682 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1683 }
1684 GRPC_FD_REF(fd, "pollset_set");
1685 pollset_set->fds[pollset_set->fd_count++] = fd;
1686 for (i = 0; i < pollset_set->pollset_count; i++) {
1687 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1688 }
1689 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1690 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1691 }
1692 gpr_mu_unlock(&pollset_set->mu);
1693}
1694
1695static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1696 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1697 size_t i;
1698 gpr_mu_lock(&pollset_set->mu);
1699 for (i = 0; i < pollset_set->fd_count; i++) {
1700 if (pollset_set->fds[i] == fd) {
1701 pollset_set->fd_count--;
1702 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1703 pollset_set->fds[pollset_set->fd_count]);
1704 GRPC_FD_UNREF(fd, "pollset_set");
1705 break;
1706 }
1707 }
1708 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1709 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1710 }
1711 gpr_mu_unlock(&pollset_set->mu);
1712}
1713
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001714static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1715 grpc_pollset_set *pollset_set,
1716 grpc_pollset *pollset) {
1717 size_t i, j;
1718 gpr_mu_lock(&pollset_set->mu);
1719 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1720 pollset_set->pollset_capacity =
1721 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1722 pollset_set->pollsets =
1723 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1724 sizeof(*pollset_set->pollsets));
1725 }
1726 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1727 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1728 if (fd_is_orphaned(pollset_set->fds[i])) {
1729 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1730 } else {
1731 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1732 pollset_set->fds[j++] = pollset_set->fds[i];
1733 }
1734 }
1735 pollset_set->fd_count = j;
1736 gpr_mu_unlock(&pollset_set->mu);
1737}
1738
1739static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1740 grpc_pollset_set *pollset_set,
1741 grpc_pollset *pollset) {
1742 size_t i;
1743 gpr_mu_lock(&pollset_set->mu);
1744 for (i = 0; i < pollset_set->pollset_count; i++) {
1745 if (pollset_set->pollsets[i] == pollset) {
1746 pollset_set->pollset_count--;
1747 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1748 pollset_set->pollsets[pollset_set->pollset_count]);
1749 break;
1750 }
1751 }
1752 gpr_mu_unlock(&pollset_set->mu);
1753}
1754
1755static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1756 grpc_pollset_set *bag,
1757 grpc_pollset_set *item) {
1758 size_t i, j;
1759 gpr_mu_lock(&bag->mu);
1760 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1761 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1762 bag->pollset_sets =
1763 gpr_realloc(bag->pollset_sets,
1764 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1765 }
1766 bag->pollset_sets[bag->pollset_set_count++] = item;
1767 for (i = 0, j = 0; i < bag->fd_count; i++) {
1768 if (fd_is_orphaned(bag->fds[i])) {
1769 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1770 } else {
1771 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1772 bag->fds[j++] = bag->fds[i];
1773 }
1774 }
1775 bag->fd_count = j;
1776 gpr_mu_unlock(&bag->mu);
1777}
1778
1779static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1780 grpc_pollset_set *bag,
1781 grpc_pollset_set *item) {
1782 size_t i;
1783 gpr_mu_lock(&bag->mu);
1784 for (i = 0; i < bag->pollset_set_count; i++) {
1785 if (bag->pollset_sets[i] == item) {
1786 bag->pollset_set_count--;
1787 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1788 bag->pollset_sets[bag->pollset_set_count]);
1789 break;
1790 }
1791 }
1792 gpr_mu_unlock(&bag->mu);
1793}
1794
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001795/* Test helper functions
1796 * */
1797void *grpc_fd_get_polling_island(grpc_fd *fd) {
1798 polling_island *pi;
1799
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001800 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001801 pi = fd->polling_island;
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001802 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001803
1804 return pi;
1805}
1806
1807void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1808 polling_island *pi;
1809
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001810 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001811 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001812 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001813
1814 return pi;
1815}
1816
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001817bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001818 polling_island *p1 = p;
1819 polling_island *p2 = q;
1820
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001821 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1822 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001823 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001824 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001825
1826 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001827}
1828
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001829/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001830 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001831 */
1832
1833static void shutdown_engine(void) {
1834 fd_global_shutdown();
1835 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001836 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001837}
1838
1839static const grpc_event_engine_vtable vtable = {
1840 .pollset_size = sizeof(grpc_pollset),
1841
1842 .fd_create = fd_create,
1843 .fd_wrapped_fd = fd_wrapped_fd,
1844 .fd_orphan = fd_orphan,
1845 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001846 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001847 .fd_notify_on_read = fd_notify_on_read,
1848 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001849 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001850 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001851
1852 .pollset_init = pollset_init,
1853 .pollset_shutdown = pollset_shutdown,
1854 .pollset_reset = pollset_reset,
1855 .pollset_destroy = pollset_destroy,
1856 .pollset_work = pollset_work,
1857 .pollset_kick = pollset_kick,
1858 .pollset_add_fd = pollset_add_fd,
1859
1860 .pollset_set_create = pollset_set_create,
1861 .pollset_set_destroy = pollset_set_destroy,
1862 .pollset_set_add_pollset = pollset_set_add_pollset,
1863 .pollset_set_del_pollset = pollset_set_del_pollset,
1864 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1865 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1866 .pollset_set_add_fd = pollset_set_add_fd,
1867 .pollset_set_del_fd = pollset_set_del_fd,
1868
1869 .kick_poller = kick_poller,
1870
1871 .shutdown_engine = shutdown_engine,
1872};
1873
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001874/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1875 * Create a dummy epoll_fd to make sure epoll support is available */
1876static bool is_epoll_available() {
1877 int fd = epoll_create1(EPOLL_CLOEXEC);
1878 if (fd < 0) {
1879 gpr_log(
1880 GPR_ERROR,
1881 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1882 fd);
1883 return false;
1884 }
1885 close(fd);
1886 return true;
1887}
1888
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001889const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001890 /* If use of signals is disabled, we cannot use epoll engine*/
1891 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1892 return NULL;
1893 }
1894
Ken Paysonbc544be2016-10-06 19:23:47 -07001895 if (!grpc_has_wakeup_fd) {
1896 return NULL;
1897 }
1898
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001899 if (!is_epoll_available()) {
1900 return NULL;
1901 }
1902
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001903 if (!is_grpc_wakeup_signal_initialized) {
Sree Kuchibhotlabd48c912016-09-27 16:48:25 -07001904 grpc_use_signal(SIGRTMIN + 6);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001905 }
1906
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001907 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001908
1909 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1910 return NULL;
1911 }
1912
1913 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1914 polling_island_global_init())) {
1915 return NULL;
1916 }
1917
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001918 return &vtable;
1919}
1920
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001921#else /* defined(GPR_LINUX_EPOLL) */
1922#if defined(GPR_POSIX_SOCKET)
1923#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001924/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1925 * NULL */
1926const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001927#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001928
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001929void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001930#endif /* !defined(GPR_LINUX_EPOLL) */