blob: 02bcbaa10f55478c7853e92a602c7d0fbfc4e449 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070045#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070046#include <signal.h>
47#include <string.h>
48#include <sys/epoll.h>
49#include <sys/socket.h>
50#include <unistd.h>
51
52#include <grpc/support/alloc.h>
53#include <grpc/support/log.h>
54#include <grpc/support/string_util.h>
55#include <grpc/support/tls.h>
56#include <grpc/support/useful.h>
57
58#include "src/core/lib/iomgr/ev_posix.h"
59#include "src/core/lib/iomgr/iomgr_internal.h"
60#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070061#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070062#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Sree Kuchibhotla34217242016-06-29 00:19:07 -070065/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
66static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070067#define GRPC_POLLING_TRACE(fmt, ...) \
68 if (grpc_polling_trace) { \
69 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
70 }
71
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070072static int grpc_wakeup_signal = -1;
73static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070075/* Implements the function defined in grpc_posix.h. This function might be
76 * called before even calling grpc_init() to set either a different signal to
77 * use. If signum == -1, then the use of signals is disabled */
78void grpc_use_signal(int signum) {
79 grpc_wakeup_signal = signum;
80 is_grpc_wakeup_signal_initialized = true;
81
82 if (grpc_wakeup_signal < 0) {
83 gpr_log(GPR_INFO,
84 "Use of signals is disabled. Epoll engine will not be used");
85 } else {
86 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
87 grpc_wakeup_signal);
88 }
89}
90
91struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070092
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070094 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070096struct grpc_fd {
97 int fd;
98 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070099 bit 0 : 1=Active / 0=Orphaned
100 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700101 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700102 gpr_atm refst;
103
104 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700105
106 /* Indicates that the fd is shutdown and that any pending read/write closures
107 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700109
110 /* The fd is either closed or we relinquished control of it. In either cases,
111 this indicates that the 'fd' on this structure is no longer valid */
112 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700113
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700114 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700115 grpc_closure *read_closure;
116 grpc_closure *write_closure;
117
Craig Tillerf83f8ca2016-07-06 11:34:08 -0700118 /* The polling island to which this fd belongs to (protected by mu) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 struct polling_island *polling_island;
120
121 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122 grpc_closure *on_done_closure;
123
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700124 /* The pollset that last noticed that the fd is readable */
125 grpc_pollset *read_notifier_pollset;
126
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700127 grpc_iomgr_object iomgr_object;
128};
129
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700130/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700131// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700132#ifdef GRPC_FD_REF_COUNT_DEBUG
133static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
134static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
135 int line);
136#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
137#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
138#else
139static void fd_ref(grpc_fd *fd);
140static void fd_unref(grpc_fd *fd);
141#define GRPC_FD_REF(fd, reason) fd_ref(fd)
142#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
143#endif
144
145static void fd_global_init(void);
146static void fd_global_shutdown(void);
147
148#define CLOSURE_NOT_READY ((grpc_closure *)0)
149#define CLOSURE_READY ((grpc_closure *)1)
150
151/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700152 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700153 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700154
Craig Tiller15007612016-07-06 09:36:16 -0700155//#define GRPC_PI_REF_COUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700156#ifdef GRPC_PI_REF_COUNT_DEBUG
157
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700158#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700159#define PI_UNREF(exec_ctx, p, r) \
160 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700161
162#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
163
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700164#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700165#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700166
167#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
168
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700169typedef struct polling_island {
170 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700171 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
172 the refcount.
173 Once the ref count becomes zero, this structure is destroyed which means
174 we should ensure that there is never a scenario where a PI_ADD_REF() is
175 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700176 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700177
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700178 /* Pointer to the polling_island this merged into.
179 * merged_to value is only set once in polling_island's lifetime (and that too
180 * only if the island is merged with another island). Because of this, we can
181 * use gpr_atm type here so that we can do atomic access on this and reduce
182 * lock contention on 'mu' mutex.
183 *
184 * Note that if this field is not NULL (i.e not 0), all the remaining fields
185 * (except mu and ref_count) are invalid and must be ignored. */
186 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700187
Craig Tillerb39307d2016-06-30 15:39:13 -0700188 /* The workqueue associated with this polling island */
189 grpc_workqueue *workqueue;
190
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700191 /* The fd of the underlying epoll set */
192 int epoll_fd;
193
194 /* The file descriptors in the epoll set */
195 size_t fd_cnt;
196 size_t fd_capacity;
197 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700198} polling_island;
199
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700200/*******************************************************************************
201 * Pollset Declarations
202 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700203struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700204 /* Thread id of this worker */
205 pthread_t pt_id;
206
207 /* Used to prevent a worker from getting kicked multiple times */
208 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700209 struct grpc_pollset_worker *next;
210 struct grpc_pollset_worker *prev;
211};
212
213struct grpc_pollset {
214 gpr_mu mu;
215 grpc_pollset_worker root_worker;
216 bool kicked_without_pollers;
217
218 bool shutting_down; /* Is the pollset shutting down ? */
219 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
220 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
221
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700222 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700223 struct polling_island *polling_island;
224};
225
226/*******************************************************************************
227 * Pollset-set Declarations
228 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700229/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
230 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
231 * the current pollset_set would result in polling island merges. This would
232 * remove the need to maintain fd_count here. This will also significantly
233 * simplify the grpc_fd structure since we would no longer need to explicitly
234 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700235struct grpc_pollset_set {
236 gpr_mu mu;
237
238 size_t pollset_count;
239 size_t pollset_capacity;
240 grpc_pollset **pollsets;
241
242 size_t pollset_set_count;
243 size_t pollset_set_capacity;
244 struct grpc_pollset_set **pollset_sets;
245
246 size_t fd_count;
247 size_t fd_capacity;
248 grpc_fd **fds;
249};
250
251/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700252 * Common helpers
253 */
254
Craig Tillerf975f742016-07-01 14:56:27 -0700255static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700256 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700257 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700258 if (*composite == GRPC_ERROR_NONE) {
259 *composite = GRPC_ERROR_CREATE(desc);
260 }
261 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700262 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700263}
264
265/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700266 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700267 */
268
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700269/* The wakeup fd that is used to wake up all threads in a Polling island. This
270 is useful in the polling island merge operation where we need to wakeup all
271 the threads currently polling the smaller polling island (so that they can
272 start polling the new/merged polling island)
273
274 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
275 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
276static grpc_wakeup_fd polling_island_wakeup_fd;
277
Craig Tillerb39307d2016-06-30 15:39:13 -0700278/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700279static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700280
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700281#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700282/* Currently TSAN may incorrectly flag data races between epoll_ctl and
283 epoll_wait for any grpc_fd structs that are added to the epoll set via
284 epoll_ctl and are returned (within a very short window) via epoll_wait().
285
286 To work-around this race, we establish a happens-before relation between
287 the code just-before epoll_ctl() and the code after epoll_wait() by using
288 this atomic */
289gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700290#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700291
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700292#ifdef GRPC_PI_REF_COUNT_DEBUG
Craig Tillerb39307d2016-06-30 15:39:13 -0700293static void pi_add_ref(polling_island *pi);
294static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700295
Craig Tillerb39307d2016-06-30 15:39:13 -0700296static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
297 int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700298 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700299 pi_add_ref(pi);
300 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
301 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700302}
303
Craig Tillerb39307d2016-06-30 15:39:13 -0700304static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
305 char *reason, char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700306 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700307 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700308 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700309 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700310}
311#endif
312
Craig Tiller15007612016-07-06 09:36:16 -0700313static void pi_add_ref(polling_island *pi) {
314 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
315}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700316
Craig Tillerb39307d2016-06-30 15:39:13 -0700317static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tiller15007612016-07-06 09:36:16 -0700318 /* If ref count went to one, we're back to just the workqueue owning a ref.
319 Unref the workqueue to break the loop.
320
321 If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700322 Note that this deletion not be done under a lock. Once the ref count goes
323 to zero, we are guaranteed that no one else holds a reference to the
324 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700325
326 Also, if we are deleting the polling island and the merged_to field is
327 non-empty, we should remove a ref to the merged_to polling island
328 */
Craig Tiller15007612016-07-06 09:36:16 -0700329 switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
330 case 2: /* last external ref: the only one now owned is by the workqueue */
331 GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
332 break;
333 case 1: {
334 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
335 polling_island_delete(exec_ctx, pi);
336 if (next != NULL) {
337 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
338 }
339 break;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700340 }
Craig Tiller15007612016-07-06 09:36:16 -0700341 case 0:
342 GPR_UNREACHABLE_CODE(return );
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700343 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700344}
345
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700346/* The caller is expected to hold pi->mu lock before calling this function */
347static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700348 size_t fd_count, bool add_fd_refs,
349 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700350 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700351 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700352 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700353 char *err_msg;
354 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700355
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700356#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700357 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700358 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700359#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700360
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700361 for (i = 0; i < fd_count; i++) {
362 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
363 ev.data.ptr = fds[i];
364 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700365
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700366 if (err < 0) {
367 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700368 gpr_asprintf(
369 &err_msg,
370 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
371 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
372 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
373 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700374 }
375
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700376 continue;
377 }
378
379 if (pi->fd_cnt == pi->fd_capacity) {
380 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
381 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
382 }
383
384 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700385 if (add_fd_refs) {
386 GRPC_FD_REF(fds[i], "polling_island");
387 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700388 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700389}
390
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700391/* The caller is expected to hold pi->mu before calling this */
392static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700393 grpc_wakeup_fd *wakeup_fd,
394 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700395 struct epoll_event ev;
396 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700397 char *err_msg;
398 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700399
400 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
401 ev.data.ptr = wakeup_fd;
402 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
403 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700404 if (err < 0 && errno != EEXIST) {
405 gpr_asprintf(&err_msg,
406 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
407 "error: %d (%s)",
408 pi->epoll_fd,
409 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
410 strerror(errno));
411 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
412 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700413 }
414}
415
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700416/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700417static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700418 bool remove_fd_refs,
419 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700420 int err;
421 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700422 char *err_msg;
423 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700424
425 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700426 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700427 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700428 gpr_asprintf(&err_msg,
429 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
430 "error: %d (%s)",
431 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
432 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
433 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700434 }
435
436 if (remove_fd_refs) {
437 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700438 }
439 }
440
441 pi->fd_cnt = 0;
442}
443
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700444/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700445static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700446 bool is_fd_closed,
447 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700448 int err;
449 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700450 char *err_msg;
451 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700452
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700453 /* If fd is already closed, then it would have been automatically been removed
454 from the epoll set */
455 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700456 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
457 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700458 gpr_asprintf(
459 &err_msg,
460 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
461 pi->epoll_fd, fd->fd, errno, strerror(errno));
462 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
463 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700464 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700465 }
466
467 for (i = 0; i < pi->fd_cnt; i++) {
468 if (pi->fds[i] == fd) {
469 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700470 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700471 break;
472 }
473 }
474}
475
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700476/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700477static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
478 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700479 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700480 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700481 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700482
Craig Tillerb39307d2016-06-30 15:39:13 -0700483 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700484
Craig Tillerb39307d2016-06-30 15:39:13 -0700485 pi = gpr_malloc(sizeof(*pi));
486 gpr_mu_init(&pi->mu);
487 pi->fd_cnt = 0;
488 pi->fd_capacity = 0;
489 pi->fds = NULL;
490 pi->epoll_fd = -1;
491 pi->workqueue = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700492
Craig Tiller15007612016-07-06 09:36:16 -0700493 gpr_atm_rel_store(&pi->ref_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700494 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700495
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700496 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700497
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700498 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700499 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
500 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700501 }
502
Craig Tillerb39307d2016-06-30 15:39:13 -0700503 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
504
505 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700506 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700507 }
508
Craig Tillerf975f742016-07-01 14:56:27 -0700509 if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
510 err_desc) &&
511 *error == GRPC_ERROR_NONE) {
512 polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
513 error);
514 GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
515 pi->workqueue->wakeup_read_fd->polling_island = pi;
Craig Tiller15007612016-07-06 09:36:16 -0700516 PI_ADD_REF(pi, "fd");
Craig Tillerf975f742016-07-01 14:56:27 -0700517 }
Craig Tillerb39307d2016-06-30 15:39:13 -0700518
519done:
520 if (*error != GRPC_ERROR_NONE) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700521 if (pi->workqueue != NULL) {
522 GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
523 }
Craig Tiller0a06cd72016-07-14 13:21:24 -0700524 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700525 pi = NULL;
526 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700527 return pi;
528}
529
Craig Tillerb39307d2016-06-30 15:39:13 -0700530static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700531 GPR_ASSERT(pi->fd_cnt == 0);
532
Craig Tiller0a06cd72016-07-14 13:21:24 -0700533 if (pi->epoll_fd >= 0) {
534 close(pi->epoll_fd);
535 }
Craig Tillerb39307d2016-06-30 15:39:13 -0700536 gpr_mu_destroy(&pi->mu);
537 gpr_free(pi->fds);
538 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700539}
540
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700541/* Attempts to gets the last polling island in the linked list (liked by the
542 * 'merged_to' field). Since this does not lock the polling island, there are no
543 * guarantees that the island returned is the last island */
544static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
545 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
546 while (next != NULL) {
547 pi = next;
548 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
549 }
550
551 return pi;
552}
553
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700554/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700555 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700556 returned polling island's mu.
557 Usage: To lock/unlock polling island "pi", do the following:
558 polling_island *pi_latest = polling_island_lock(pi);
559 ...
560 ... critical section ..
561 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700562 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
563static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700564 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700565
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700566 while (true) {
567 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
568 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700569 /* Looks like 'pi' is the last node in the linked list but unless we check
570 this by holding the pi->mu lock, we cannot be sure (i.e without the
571 pi->mu lock, we don't prevent island merges).
572 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700573 gpr_mu_lock(&pi->mu);
574 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
575 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700576 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700577 break;
578 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700579
580 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
581 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700582 gpr_mu_unlock(&pi->mu);
583 }
584
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700585 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700586 }
587
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700588 return pi;
589}
590
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700591/* Gets the lock on the *latest* polling islands in the linked lists pointed by
592 *p and *q (and also updates *p and *q to point to the latest polling islands)
593
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700594 This function is needed because calling the following block of code to obtain
595 locks on polling islands (*p and *q) is prone to deadlocks.
596 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700597 polling_island_lock(*p, true);
598 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700599 }
600
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700601 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700602 polling_island *p1;
603 polling_island *p2;
604 ..
605 polling_island_lock_pair(&p1, &p2);
606 ..
607 .. Critical section with both p1 and p2 locked
608 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700609 // Release locks: Always call polling_island_unlock_pair() to release locks
610 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700611*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700612static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700613 polling_island *pi_1 = *p;
614 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700615 polling_island *next_1 = NULL;
616 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700617
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700618 /* The algorithm is simple:
619 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
620 keep updating pi_1 and pi_2)
621 - Then obtain locks on the islands by following a lock order rule of
622 locking polling_island with lower address first
623 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
624 pointing to the same island. If that is the case, we can just call
625 polling_island_lock()
626 - After obtaining both the locks, double check that the polling islands
627 are still the last polling islands in their respective linked lists
628 (this is because there might have been polling island merges before
629 we got the lock)
630 - If the polling islands are the last islands, we are done. If not,
631 release the locks and continue the process from the first step */
632 while (true) {
633 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
634 while (next_1 != NULL) {
635 pi_1 = next_1;
636 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700637 }
638
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700639 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
640 while (next_2 != NULL) {
641 pi_2 = next_2;
642 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
643 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700644
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700645 if (pi_1 == pi_2) {
646 pi_1 = pi_2 = polling_island_lock(pi_1);
647 break;
648 }
649
650 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700651 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700652 gpr_mu_lock(&pi_2->mu);
653 } else {
654 gpr_mu_lock(&pi_2->mu);
655 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700656 }
657
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700658 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
659 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
660 if (next_1 == NULL && next_2 == NULL) {
661 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700662 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700663
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700664 gpr_mu_unlock(&pi_1->mu);
665 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700666 }
667
668 *p = pi_1;
669 *q = pi_2;
670}
671
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700672static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
673 if (p == q) {
674 gpr_mu_unlock(&p->mu);
675 } else {
676 gpr_mu_unlock(&p->mu);
677 gpr_mu_unlock(&q->mu);
678 }
679}
680
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700681static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700682 polling_island *q,
683 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700684 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700685 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700686
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700687 if (p != q) {
688 /* Make sure that p points to the polling island with fewer fds than q */
689 if (p->fd_cnt > q->fd_cnt) {
690 GPR_SWAP(polling_island *, p, q);
691 }
692
693 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
694 Note that the refcounts on the fds being moved will not change here.
695 This is why the last param in the following two functions is 'false') */
696 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
697 polling_island_remove_all_fds_locked(p, false, error);
698
699 /* Wakeup all the pollers (if any) on p so that they pickup this change */
700 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
701
702 /* Add the 'merged_to' link from p --> q */
703 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
704 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700705 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700706 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700707
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700708 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700709
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700710 /* Return the merged polling island (Note that no merge would have happened
711 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700712 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700713}
714
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700715static grpc_error *polling_island_global_init() {
716 grpc_error *error = GRPC_ERROR_NONE;
717
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700718 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
719 if (error == GRPC_ERROR_NONE) {
720 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
721 }
722
723 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700724}
725
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700726static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700727 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700728}
729
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700730/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700731 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700732 */
733
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700734/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700735 * but instead so that implementations with multiple threads in (for example)
736 * epoll_wait deal with the race between pollset removal and incoming poll
737 * notifications.
738 *
739 * The problem is that the poller ultimately holds a reference to this
740 * object, so it is very difficult to know when is safe to free it, at least
741 * without some expensive synchronization.
742 *
743 * If we keep the object freelisted, in the worst case losing this race just
744 * becomes a spurious read notification on a reused fd.
745 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700746
747/* The alarm system needs to be able to wakeup 'some poller' sometimes
748 * (specifically when a new alarm needs to be triggered earlier than the next
749 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
750 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700751
752/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
753 * sure to wake up one polling thread (which can wake up other threads if
754 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700755grpc_wakeup_fd grpc_global_wakeup_fd;
756
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700757static grpc_fd *fd_freelist = NULL;
758static gpr_mu fd_freelist_mu;
759
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700760#ifdef GRPC_FD_REF_COUNT_DEBUG
761#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
762#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
763static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
764 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700765 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
766 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700767 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
768#else
769#define REF_BY(fd, n, reason) ref_by(fd, n)
770#define UNREF_BY(fd, n, reason) unref_by(fd, n)
771static void ref_by(grpc_fd *fd, int n) {
772#endif
773 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
774}
775
776#ifdef GRPC_FD_REF_COUNT_DEBUG
777static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
778 int line) {
779 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700780 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
781 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700782 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
783#else
784static void unref_by(grpc_fd *fd, int n) {
785 gpr_atm old;
786#endif
787 old = gpr_atm_full_fetch_add(&fd->refst, -n);
788 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700789 /* Add the fd to the freelist */
790 gpr_mu_lock(&fd_freelist_mu);
791 fd->freelist_next = fd_freelist;
792 fd_freelist = fd;
793 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700794
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700795 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700796 } else {
797 GPR_ASSERT(old > n);
798 }
799}
800
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700801/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700802#ifdef GRPC_FD_REF_COUNT_DEBUG
803static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
804 int line) {
805 ref_by(fd, 2, reason, file, line);
806}
807
808static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
809 int line) {
810 unref_by(fd, 2, reason, file, line);
811}
812#else
813static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700814static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
815#endif
816
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700817static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
818
819static void fd_global_shutdown(void) {
820 gpr_mu_lock(&fd_freelist_mu);
821 gpr_mu_unlock(&fd_freelist_mu);
822 while (fd_freelist != NULL) {
823 grpc_fd *fd = fd_freelist;
824 fd_freelist = fd_freelist->freelist_next;
825 gpr_mu_destroy(&fd->mu);
826 gpr_free(fd);
827 }
828 gpr_mu_destroy(&fd_freelist_mu);
829}
830
831static grpc_fd *fd_create(int fd, const char *name) {
832 grpc_fd *new_fd = NULL;
833
834 gpr_mu_lock(&fd_freelist_mu);
835 if (fd_freelist != NULL) {
836 new_fd = fd_freelist;
837 fd_freelist = fd_freelist->freelist_next;
838 }
839 gpr_mu_unlock(&fd_freelist_mu);
840
841 if (new_fd == NULL) {
842 new_fd = gpr_malloc(sizeof(grpc_fd));
843 gpr_mu_init(&new_fd->mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700844 }
845
846 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
847 newly created fd (or an fd we got from the freelist), no one else would be
848 holding a lock to it anyway. */
849 gpr_mu_lock(&new_fd->mu);
850
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700851 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700852 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700853 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700854 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700855 new_fd->read_closure = CLOSURE_NOT_READY;
856 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700857 new_fd->polling_island = NULL;
858 new_fd->freelist_next = NULL;
859 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700860 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700861
862 gpr_mu_unlock(&new_fd->mu);
863
864 char *fd_name;
865 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
866 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700867#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700868 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700869#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700870 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700871 return new_fd;
872}
873
874static bool fd_is_orphaned(grpc_fd *fd) {
875 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
876}
877
878static int fd_wrapped_fd(grpc_fd *fd) {
879 int ret_fd = -1;
880 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700881 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700882 ret_fd = fd->fd;
883 }
884 gpr_mu_unlock(&fd->mu);
885
886 return ret_fd;
887}
888
889static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
890 grpc_closure *on_done, int *release_fd,
891 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700892 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700893 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -0700894 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700895
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700896 gpr_mu_lock(&fd->mu);
897 fd->on_done_closure = on_done;
898
899 /* If release_fd is not NULL, we should be relinquishing control of the file
900 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700901 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700902 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700903 } else {
904 close(fd->fd);
905 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700906 }
907
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700908 fd->orphaned = true;
909
910 /* Remove the active status but keep referenced. We want this grpc_fd struct
911 to be alive (and not added to freelist) until the end of this function */
912 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700913
914 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700915 - Get a lock on the latest polling island (i.e the last island in the
916 linked list pointed by fd->polling_island). This is the island that
917 would actually contain the fd
918 - Remove the fd from the latest polling island
919 - Unlock the latest polling island
920 - Set fd->polling_island to NULL (but remove the ref on the polling island
921 before doing this.) */
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700922 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700923 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700924 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700925 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700926
Craig Tiller15007612016-07-06 09:36:16 -0700927 unref_pi = fd->polling_island;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700928 fd->polling_island = NULL;
929 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700930
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700931 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700932
933 gpr_mu_unlock(&fd->mu);
934 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -0700935 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700936 /* Unref stale polling island here, outside the fd lock above.
937 The polling island owns a workqueue which owns an fd, and unreffing
938 inside the lock can cause an eventual lock loop that makes TSAN very
939 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -0700940 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
941 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700942 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700943}
944
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700945static grpc_error *fd_shutdown_error(bool shutdown) {
946 if (!shutdown) {
947 return GRPC_ERROR_NONE;
948 } else {
949 return GRPC_ERROR_CREATE("FD shutdown");
950 }
951}
952
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700953static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
954 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700955 if (fd->shutdown) {
956 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
957 NULL);
958 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700959 /* not ready ==> switch to a waiting state by setting the closure */
960 *st = closure;
961 } else if (*st == CLOSURE_READY) {
962 /* already ready ==> queue the closure to run immediately */
963 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700964 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
965 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700966 } else {
967 /* upcallptr was set to a different closure. This is an error! */
968 gpr_log(GPR_ERROR,
969 "User called a notify_on function with a previous callback still "
970 "pending");
971 abort();
972 }
973}
974
975/* returns 1 if state becomes not ready */
976static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
977 grpc_closure **st) {
978 if (*st == CLOSURE_READY) {
979 /* duplicate ready ==> ignore */
980 return 0;
981 } else if (*st == CLOSURE_NOT_READY) {
982 /* not ready, and not waiting ==> flag ready */
983 *st = CLOSURE_READY;
984 return 0;
985 } else {
986 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700987 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700988 *st = CLOSURE_NOT_READY;
989 return 1;
990 }
991}
992
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700993static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
994 grpc_fd *fd) {
995 grpc_pollset *notifier = NULL;
996
997 gpr_mu_lock(&fd->mu);
998 notifier = fd->read_notifier_pollset;
999 gpr_mu_unlock(&fd->mu);
1000
1001 return notifier;
1002}
1003
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001004static bool fd_is_shutdown(grpc_fd *fd) {
1005 gpr_mu_lock(&fd->mu);
1006 const bool r = fd->shutdown;
1007 gpr_mu_unlock(&fd->mu);
1008 return r;
1009}
1010
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001011/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001012static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1013 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001014 /* Do the actual shutdown only once */
1015 if (!fd->shutdown) {
1016 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001017
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001018 shutdown(fd->fd, SHUT_RDWR);
1019 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1020 at this point, the closures would be called with 'success = false' */
1021 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1022 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1023 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001024 gpr_mu_unlock(&fd->mu);
1025}
1026
1027static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1028 grpc_closure *closure) {
1029 gpr_mu_lock(&fd->mu);
1030 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1031 gpr_mu_unlock(&fd->mu);
1032}
1033
1034static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1035 grpc_closure *closure) {
1036 gpr_mu_lock(&fd->mu);
1037 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1038 gpr_mu_unlock(&fd->mu);
1039}
1040
Craig Tillerd6ba6192016-06-30 15:42:41 -07001041static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001042 gpr_mu_lock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001043 grpc_workqueue *workqueue = NULL;
1044 if (fd->polling_island != NULL) {
1045 workqueue =
1046 GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
1047 }
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001048 gpr_mu_unlock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001049 return workqueue;
1050}
Craig Tiller70bd4832016-06-30 14:20:46 -07001051
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001052/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001053 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001054 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001055GPR_TLS_DECL(g_current_thread_pollset);
1056GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001057static __thread bool g_initialized_sigmask;
1058static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001059
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001060static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001061#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001062 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001063#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001064}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001065
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001066static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001067
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001068/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001069static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001070 gpr_tls_init(&g_current_thread_pollset);
1071 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001072 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001073 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001074}
1075
1076static void pollset_global_shutdown(void) {
1077 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001078 gpr_tls_destroy(&g_current_thread_pollset);
1079 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001080}
1081
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001082static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1083 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001084
1085 /* Kick the worker only if it was not already kicked */
1086 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1087 GRPC_POLLING_TRACE(
1088 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1089 (void *)worker, worker->pt_id);
1090 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1091 if (err_num != 0) {
1092 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1093 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001094 }
1095 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001096}
1097
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001098/* Return 1 if the pollset has active threads in pollset_work (pollset must
1099 * be locked) */
1100static int pollset_has_workers(grpc_pollset *p) {
1101 return p->root_worker.next != &p->root_worker;
1102}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001103
1104static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1105 worker->prev->next = worker->next;
1106 worker->next->prev = worker->prev;
1107}
1108
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001109static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1110 if (pollset_has_workers(p)) {
1111 grpc_pollset_worker *w = p->root_worker.next;
1112 remove_worker(p, w);
1113 return w;
1114 } else {
1115 return NULL;
1116 }
1117}
1118
1119static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1120 worker->next = &p->root_worker;
1121 worker->prev = worker->next->prev;
1122 worker->prev->next = worker->next->prev = worker;
1123}
1124
1125static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1126 worker->prev = &p->root_worker;
1127 worker->next = worker->prev->next;
1128 worker->prev->next = worker->next->prev = worker;
1129}
1130
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001131/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001132static grpc_error *pollset_kick(grpc_pollset *p,
1133 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001134 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001135 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001136 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001137 grpc_pollset_worker *worker = specific_worker;
1138 if (worker != NULL) {
1139 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001140 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001141 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001142 for (worker = p->root_worker.next; worker != &p->root_worker;
1143 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001144 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001145 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001146 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001147 }
Craig Tillera218a062016-06-26 09:58:37 -07001148 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001149 } else {
1150 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001151 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001152 } else {
1153 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001154 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001155 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001156 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001157 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001158 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1159 /* Since worker == NULL, it means that we can kick "any" worker on this
1160 pollset 'p'. If 'p' happens to be the same pollset this thread is
1161 currently polling (i.e in pollset_work() function), then there is no need
1162 to kick any other worker since the current thread can just absorb the
1163 kick. This is the reason why we enter this case only when
1164 g_current_thread_pollset is != p */
1165
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001166 GPR_TIMER_MARK("kick_anonymous", 0);
1167 worker = pop_front_worker(p);
1168 if (worker != NULL) {
1169 GPR_TIMER_MARK("finally_kick", 0);
1170 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001171 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001172 } else {
1173 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001174 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001175 }
1176 }
1177
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001178 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001179 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1180 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001181}
1182
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001183static grpc_error *kick_poller(void) {
1184 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1185}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001186
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001187static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1188 gpr_mu_init(&pollset->mu);
1189 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001190
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001191 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001192 pollset->kicked_without_pollers = false;
1193
1194 pollset->shutting_down = false;
1195 pollset->finish_shutdown_called = false;
1196 pollset->shutdown_done = NULL;
1197
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001198 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001199}
1200
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001201/* Convert a timespec to milliseconds:
1202 - Very small or negative poll times are clamped to zero to do a non-blocking
1203 poll (which becomes spin polling)
1204 - Other small values are rounded up to one millisecond
1205 - Longer than a millisecond polls are rounded up to the next nearest
1206 millisecond to avoid spinning
1207 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001208static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1209 gpr_timespec now) {
1210 gpr_timespec timeout;
1211 static const int64_t max_spin_polling_us = 10;
1212 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1213 return -1;
1214 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001215
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001216 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1217 max_spin_polling_us,
1218 GPR_TIMESPAN))) <= 0) {
1219 return 0;
1220 }
1221 timeout = gpr_time_sub(deadline, now);
1222 return gpr_time_to_millis(gpr_time_add(
1223 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1224}
1225
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001226static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1227 grpc_pollset *notifier) {
1228 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001229 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001230 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1231 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001232 gpr_mu_unlock(&fd->mu);
1233}
1234
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001235static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001236 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1237 gpr_mu_lock(&fd->mu);
1238 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1239 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001240}
1241
Craig Tillerb39307d2016-06-30 15:39:13 -07001242static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1243 grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001244 if (ps->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001245 PI_UNREF(exec_ctx, ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001246 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001247 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001248}
1249
1250static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1251 grpc_pollset *pollset) {
1252 /* The pollset cannot have any workers if we are at this stage */
1253 GPR_ASSERT(!pollset_has_workers(pollset));
1254
1255 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001256
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001257 /* Release the ref and set pollset->polling_island to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001258 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001259 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001260}
1261
1262/* pollset->mu lock must be held by the caller before calling this */
1263static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1264 grpc_closure *closure) {
1265 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1266 GPR_ASSERT(!pollset->shutting_down);
1267 pollset->shutting_down = true;
1268 pollset->shutdown_done = closure;
1269 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1270
1271 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1272 because it would release the underlying polling island. In such a case, we
1273 let the last worker call finish_shutdown_locked() from pollset_work() */
1274 if (!pollset_has_workers(pollset)) {
1275 GPR_ASSERT(!pollset->finish_shutdown_called);
1276 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1277 finish_shutdown_locked(exec_ctx, pollset);
1278 }
1279 GPR_TIMER_END("pollset_shutdown", 0);
1280}
1281
1282/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1283 * than destroying the mutexes, there is nothing special that needs to be done
1284 * here */
1285static void pollset_destroy(grpc_pollset *pollset) {
1286 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001287 gpr_mu_destroy(&pollset->mu);
1288}
1289
Craig Tiller2b49ea92016-07-01 13:21:27 -07001290static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001291 GPR_ASSERT(pollset->shutting_down);
1292 GPR_ASSERT(!pollset_has_workers(pollset));
1293 pollset->shutting_down = false;
1294 pollset->finish_shutdown_called = false;
1295 pollset->kicked_without_pollers = false;
1296 pollset->shutdown_done = NULL;
Craig Tillerb39307d2016-06-30 15:39:13 -07001297 GPR_ASSERT(pollset->polling_island == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001298}
1299
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001300#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001301/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1302static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001303 grpc_pollset *pollset,
1304 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001305 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001306 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001307 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001308 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001309 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001310 char *err_msg;
1311 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001312 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1313
1314 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001315 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001316
1317 Since epoll_fd is immutable, we can read it without obtaining the polling
1318 island lock. There is however a possibility that the polling island (from
1319 which we got the epoll_fd) got merged with another island while we are
1320 in this function. This is still okay because in such a case, we will wakeup
1321 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001322 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001323
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001324 if (pollset->polling_island == NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001325 pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001326 if (pollset->polling_island == NULL) {
1327 GPR_TIMER_END("pollset_work_and_unlock", 0);
1328 return; /* Fatal error. We cannot continue */
1329 }
1330
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001331 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001332 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1333 (void *)pollset, (void *)pollset->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001334 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001335
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001336 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001337 epoll_fd = pi->epoll_fd;
1338
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001339 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001340 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001341 if (pollset->polling_island != pi) {
1342 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1343 polling island to be deleted */
1344 PI_ADD_REF(pi, "ps");
Craig Tillerb39307d2016-06-30 15:39:13 -07001345 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001346 pollset->polling_island = pi;
1347 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001348
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001349 /* Add an extra ref so that the island does not get destroyed (which means
1350 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1351 epoll_fd */
1352 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001353 gpr_mu_unlock(&pollset->mu);
1354
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001355 do {
1356 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1357 sig_mask);
1358 if (ep_rv < 0) {
1359 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001360 gpr_asprintf(&err_msg,
1361 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1362 epoll_fd, errno, strerror(errno));
1363 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001364 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001365 /* We were interrupted. Save an interation by doing a zero timeout
1366 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001367 GRPC_POLLING_TRACE(
1368 "pollset_work: pollset: %p, worker: %p received kick",
1369 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001370 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001371 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001372 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001373
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001374#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001375 /* See the definition of g_poll_sync for more details */
1376 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001377#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001378
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001379 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001380 void *data_ptr = ep_ev[i].data.ptr;
1381 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001382 append_error(error,
1383 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1384 err_desc);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001385 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001386 GRPC_POLLING_TRACE(
1387 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1388 "%d) got merged",
1389 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001390 /* This means that our polling island is merged with a different
1391 island. We do not have to do anything here since the subsequent call
1392 to the function pollset_work_and_unlock() will pick up the correct
1393 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001394 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001395 grpc_fd *fd = data_ptr;
1396 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1397 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1398 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001399 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001400 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001401 }
1402 if (write_ev || cancel) {
1403 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001404 }
1405 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001406 }
1407 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001408
1409 GPR_ASSERT(pi != NULL);
1410
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001411 /* Before leaving, release the extra ref we added to the polling island. It
1412 is important to use "pi" here (i.e our old copy of pollset->polling_island
1413 that we got before releasing the polling island lock). This is because
1414 pollset->polling_island pointer might get udpated in other parts of the
1415 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001416 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001417
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001418 GPR_TIMER_END("pollset_work_and_unlock", 0);
1419}
1420
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001421/* pollset->mu lock must be held by the caller before calling this.
1422 The function pollset_work() may temporarily release the lock (pollset->mu)
1423 during the course of its execution but it will always re-acquire the lock and
1424 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001425static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1426 grpc_pollset_worker **worker_hdl,
1427 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001428 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001429 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001430 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1431
1432 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001433
1434 grpc_pollset_worker worker;
1435 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001436 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001437 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001438
1439 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001440
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001441 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1442 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001443
1444 if (pollset->kicked_without_pollers) {
1445 /* If the pollset was kicked without pollers, pretend that the current
1446 worker got the kick and skip polling. A kick indicates that there is some
1447 work that needs attention like an event on the completion queue or an
1448 alarm */
1449 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1450 pollset->kicked_without_pollers = 0;
1451 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001452 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001453 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1454 worker that there is some pending work that needs immediate attention
1455 (like an event on the completion queue, or a polling island merge that
1456 results in a new epoll-fd to wait on) and that the worker should not
1457 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001458
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001459 A worker can be kicked anytime from the point it is added to the pollset
1460 via push_front_worker() (or push_back_worker()) to the point it is
1461 removed via remove_worker().
1462 If the worker is kicked before/during it calls epoll_pwait(), it should
1463 immediately exit from epoll_wait(). If the worker is kicked after it
1464 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001465
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001466 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001467 times *except* when it is in epoll_pwait(). This way, the worker never
1468 misses acting on a kick */
1469
Craig Tiller19196992016-06-27 18:45:56 -07001470 if (!g_initialized_sigmask) {
1471 sigemptyset(&new_mask);
1472 sigaddset(&new_mask, grpc_wakeup_signal);
1473 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1474 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1475 g_initialized_sigmask = true;
1476 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1477 This is the mask used at all times *except during
1478 epoll_wait()*"
1479 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001480 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001481
Craig Tiller19196992016-06-27 18:45:56 -07001482 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001483 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001484 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001485
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001486 push_front_worker(pollset, &worker); /* Add worker to pollset */
1487
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001488 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1489 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001490 grpc_exec_ctx_flush(exec_ctx);
1491
1492 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001493
1494 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1495 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001496 remove_worker(pollset, &worker);
1497 }
1498
1499 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1500 false at this point) and the pollset is shutting down, we may have to
1501 finish the shutdown process by calling finish_shutdown_locked().
1502 See pollset_shutdown() for more details.
1503
1504 Note: Continuing to access pollset here is safe; it is the caller's
1505 responsibility to not destroy a pollset when it has outstanding calls to
1506 pollset_work() */
1507 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1508 !pollset->finish_shutdown_called) {
1509 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1510 finish_shutdown_locked(exec_ctx, pollset);
1511
1512 gpr_mu_unlock(&pollset->mu);
1513 grpc_exec_ctx_flush(exec_ctx);
1514 gpr_mu_lock(&pollset->mu);
1515 }
1516
1517 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001518
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001519 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1520 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001521
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001522 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001523
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001524 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1525 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001526}
1527
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001528static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1529 grpc_fd *fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001530 grpc_error *error = GRPC_ERROR_NONE;
1531
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001532 gpr_mu_lock(&pollset->mu);
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001533 gpr_mu_lock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001534
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001535 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001536
Craig Tiller7212c232016-07-06 13:11:09 -07001537retry:
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001538 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1539 * equal, do nothing.
1540 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1541 * a new polling island (with a refcount of 2) and make the polling_island
1542 * fields in both fd and pollset to point to the new island
1543 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1544 * the NULL polling_island field to point to the non-NULL polling_island
1545 * field (ensure that the refcount on the polling island is incremented by
1546 * 1 to account for the newly added reference)
1547 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1548 * and different, merge both the polling islands and update the
1549 * polling_island fields in both fd and pollset to point to the merged
1550 * polling island.
1551 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001552
Craig Tiller42ac6db2016-07-06 17:13:56 -07001553 if (fd->orphaned) {
1554 gpr_mu_unlock(&fd->mu);
1555 gpr_mu_unlock(&pollset->mu);
1556 /* early out */
1557 return;
1558 }
1559
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001560 if (fd->polling_island == pollset->polling_island) {
1561 pi_new = fd->polling_island;
1562 if (pi_new == NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001563 /* Unlock before creating a new polling island: the polling island will
1564 create a workqueue which creates a file descriptor, and holding an fd
1565 lock here can eventually cause a loop to appear to TSAN (making it
1566 unhappy). We don't think it's a real loop (there's an epoch point where
1567 that loop possibility disappears), but the advantages of keeping TSAN
1568 happy outweigh any performance advantage we might have by keeping the
1569 lock held. */
Craig Tiller7212c232016-07-06 13:11:09 -07001570 gpr_mu_unlock(&fd->mu);
Craig Tillerb39307d2016-06-30 15:39:13 -07001571 pi_new = polling_island_create(exec_ctx, fd, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001572 gpr_mu_lock(&fd->mu);
Craig Tiller0a06cd72016-07-14 13:21:24 -07001573 /* Need to reverify any assumptions made between the initial lock and
1574 getting to this branch: if they've changed, we need to throw away our
1575 work and figure things out again. */
Craig Tiller7212c232016-07-06 13:11:09 -07001576 if (fd->polling_island != NULL) {
Craig Tiller27da6422016-07-06 13:14:46 -07001577 GRPC_POLLING_TRACE(
1578 "pollset_add_fd: Raced creating new polling island. pi_new: %p "
1579 "(fd: %d, pollset: %p)",
1580 (void *)pi_new, fd->fd, (void *)pollset);
1581 PI_ADD_REF(pi_new, "dance_of_destruction");
1582 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
Craig Tiller7212c232016-07-06 13:11:09 -07001583 goto retry;
Craig Tiller27da6422016-07-06 13:14:46 -07001584 } else {
1585 GRPC_POLLING_TRACE(
1586 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
1587 "pollset: %p)",
1588 (void *)pi_new, fd->fd, (void *)pollset);
Craig Tiller7212c232016-07-06 13:11:09 -07001589 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001590 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001591 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001592 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001593 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001594 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001595
1596 GRPC_POLLING_TRACE(
1597 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1598 "pollset->pi: %p)",
1599 (void *)pi_new, fd->fd, (void *)pollset,
1600 (void *)pollset->polling_island);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001601 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001602 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001603 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001604
1605 GRPC_POLLING_TRACE(
1606 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1607 "%p, fd->pi: %p",
1608 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001609 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001610 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1611 &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001612 GRPC_POLLING_TRACE(
1613 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1614 "%p, fd->pi: %p, pollset->pi: %p)",
1615 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
1616 (void *)pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001617 }
1618
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001619 /* At this point, pi_new is the polling island that both fd->polling_island
1620 and pollset->polling_island must be pointing to */
1621
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001622 if (fd->polling_island != pi_new) {
1623 PI_ADD_REF(pi_new, "fd");
1624 if (fd->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001625 PI_UNREF(exec_ctx, fd->polling_island, "fd");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001626 }
1627 fd->polling_island = pi_new;
1628 }
1629
1630 if (pollset->polling_island != pi_new) {
1631 PI_ADD_REF(pi_new, "ps");
1632 if (pollset->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001633 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001634 }
1635 pollset->polling_island = pi_new;
1636 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001637
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001638 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001639 gpr_mu_unlock(&pollset->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001640
1641 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001642}
1643
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001644/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001645 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001646 */
1647
1648static grpc_pollset_set *pollset_set_create(void) {
1649 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1650 memset(pollset_set, 0, sizeof(*pollset_set));
1651 gpr_mu_init(&pollset_set->mu);
1652 return pollset_set;
1653}
1654
1655static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1656 size_t i;
1657 gpr_mu_destroy(&pollset_set->mu);
1658 for (i = 0; i < pollset_set->fd_count; i++) {
1659 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1660 }
1661 gpr_free(pollset_set->pollsets);
1662 gpr_free(pollset_set->pollset_sets);
1663 gpr_free(pollset_set->fds);
1664 gpr_free(pollset_set);
1665}
1666
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001667static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1668 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1669 size_t i;
1670 gpr_mu_lock(&pollset_set->mu);
1671 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1672 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1673 pollset_set->fds = gpr_realloc(
1674 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1675 }
1676 GRPC_FD_REF(fd, "pollset_set");
1677 pollset_set->fds[pollset_set->fd_count++] = fd;
1678 for (i = 0; i < pollset_set->pollset_count; i++) {
1679 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1680 }
1681 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1682 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1683 }
1684 gpr_mu_unlock(&pollset_set->mu);
1685}
1686
1687static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1688 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1689 size_t i;
1690 gpr_mu_lock(&pollset_set->mu);
1691 for (i = 0; i < pollset_set->fd_count; i++) {
1692 if (pollset_set->fds[i] == fd) {
1693 pollset_set->fd_count--;
1694 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1695 pollset_set->fds[pollset_set->fd_count]);
1696 GRPC_FD_UNREF(fd, "pollset_set");
1697 break;
1698 }
1699 }
1700 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1701 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1702 }
1703 gpr_mu_unlock(&pollset_set->mu);
1704}
1705
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001706static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1707 grpc_pollset_set *pollset_set,
1708 grpc_pollset *pollset) {
1709 size_t i, j;
1710 gpr_mu_lock(&pollset_set->mu);
1711 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1712 pollset_set->pollset_capacity =
1713 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1714 pollset_set->pollsets =
1715 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1716 sizeof(*pollset_set->pollsets));
1717 }
1718 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1719 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1720 if (fd_is_orphaned(pollset_set->fds[i])) {
1721 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1722 } else {
1723 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1724 pollset_set->fds[j++] = pollset_set->fds[i];
1725 }
1726 }
1727 pollset_set->fd_count = j;
1728 gpr_mu_unlock(&pollset_set->mu);
1729}
1730
1731static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1732 grpc_pollset_set *pollset_set,
1733 grpc_pollset *pollset) {
1734 size_t i;
1735 gpr_mu_lock(&pollset_set->mu);
1736 for (i = 0; i < pollset_set->pollset_count; i++) {
1737 if (pollset_set->pollsets[i] == pollset) {
1738 pollset_set->pollset_count--;
1739 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1740 pollset_set->pollsets[pollset_set->pollset_count]);
1741 break;
1742 }
1743 }
1744 gpr_mu_unlock(&pollset_set->mu);
1745}
1746
1747static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1748 grpc_pollset_set *bag,
1749 grpc_pollset_set *item) {
1750 size_t i, j;
1751 gpr_mu_lock(&bag->mu);
1752 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1753 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1754 bag->pollset_sets =
1755 gpr_realloc(bag->pollset_sets,
1756 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1757 }
1758 bag->pollset_sets[bag->pollset_set_count++] = item;
1759 for (i = 0, j = 0; i < bag->fd_count; i++) {
1760 if (fd_is_orphaned(bag->fds[i])) {
1761 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1762 } else {
1763 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1764 bag->fds[j++] = bag->fds[i];
1765 }
1766 }
1767 bag->fd_count = j;
1768 gpr_mu_unlock(&bag->mu);
1769}
1770
1771static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1772 grpc_pollset_set *bag,
1773 grpc_pollset_set *item) {
1774 size_t i;
1775 gpr_mu_lock(&bag->mu);
1776 for (i = 0; i < bag->pollset_set_count; i++) {
1777 if (bag->pollset_sets[i] == item) {
1778 bag->pollset_set_count--;
1779 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1780 bag->pollset_sets[bag->pollset_set_count]);
1781 break;
1782 }
1783 }
1784 gpr_mu_unlock(&bag->mu);
1785}
1786
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001787/* Test helper functions
1788 * */
1789void *grpc_fd_get_polling_island(grpc_fd *fd) {
1790 polling_island *pi;
1791
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001792 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001793 pi = fd->polling_island;
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001794 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001795
1796 return pi;
1797}
1798
1799void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1800 polling_island *pi;
1801
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001802 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001803 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001804 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001805
1806 return pi;
1807}
1808
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001809bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001810 polling_island *p1 = p;
1811 polling_island *p2 = q;
1812
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001813 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1814 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001815 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001816 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001817
1818 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001819}
1820
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001821/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001822 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001823 */
1824
1825static void shutdown_engine(void) {
1826 fd_global_shutdown();
1827 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001828 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001829}
1830
1831static const grpc_event_engine_vtable vtable = {
1832 .pollset_size = sizeof(grpc_pollset),
1833
1834 .fd_create = fd_create,
1835 .fd_wrapped_fd = fd_wrapped_fd,
1836 .fd_orphan = fd_orphan,
1837 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001838 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001839 .fd_notify_on_read = fd_notify_on_read,
1840 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001841 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001842 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001843
1844 .pollset_init = pollset_init,
1845 .pollset_shutdown = pollset_shutdown,
1846 .pollset_reset = pollset_reset,
1847 .pollset_destroy = pollset_destroy,
1848 .pollset_work = pollset_work,
1849 .pollset_kick = pollset_kick,
1850 .pollset_add_fd = pollset_add_fd,
1851
1852 .pollset_set_create = pollset_set_create,
1853 .pollset_set_destroy = pollset_set_destroy,
1854 .pollset_set_add_pollset = pollset_set_add_pollset,
1855 .pollset_set_del_pollset = pollset_set_del_pollset,
1856 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1857 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1858 .pollset_set_add_fd = pollset_set_add_fd,
1859 .pollset_set_del_fd = pollset_set_del_fd,
1860
1861 .kick_poller = kick_poller,
1862
1863 .shutdown_engine = shutdown_engine,
1864};
1865
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001866/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1867 * Create a dummy epoll_fd to make sure epoll support is available */
1868static bool is_epoll_available() {
1869 int fd = epoll_create1(EPOLL_CLOEXEC);
1870 if (fd < 0) {
1871 gpr_log(
1872 GPR_ERROR,
1873 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1874 fd);
1875 return false;
1876 }
1877 close(fd);
1878 return true;
1879}
1880
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001881const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001882 /* If use of signals is disabled, we cannot use epoll engine*/
1883 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1884 return NULL;
1885 }
1886
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001887 if (!is_epoll_available()) {
1888 return NULL;
1889 }
1890
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001891 if (!is_grpc_wakeup_signal_initialized) {
1892 grpc_use_signal(SIGRTMIN + 2);
1893 }
1894
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001895 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001896
1897 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1898 return NULL;
1899 }
1900
1901 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1902 polling_island_global_init())) {
1903 return NULL;
1904 }
1905
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001906 return &vtable;
1907}
1908
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001909#else /* defined(GPR_LINUX_EPOLL) */
1910#if defined(GPR_POSIX_SOCKET)
1911#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001912/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1913 * NULL */
1914const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001915#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001916
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001917void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001918#endif /* !defined(GPR_LINUX_EPOLL) */