blob: 864fe62cb672d176a4225e7d2509cd70e22883fa [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070045#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070046#include <signal.h>
47#include <string.h>
48#include <sys/epoll.h>
49#include <sys/socket.h>
50#include <unistd.h>
51
52#include <grpc/support/alloc.h>
53#include <grpc/support/log.h>
54#include <grpc/support/string_util.h>
55#include <grpc/support/tls.h>
56#include <grpc/support/useful.h>
57
58#include "src/core/lib/iomgr/ev_posix.h"
59#include "src/core/lib/iomgr/iomgr_internal.h"
60#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070061#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070062#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Sree Kuchibhotla34217242016-06-29 00:19:07 -070065/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
66static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070067#define GRPC_POLLING_TRACE(fmt, ...) \
68 if (grpc_polling_trace) { \
69 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
70 }
71
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070072static int grpc_wakeup_signal = -1;
73static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070075/* Implements the function defined in grpc_posix.h. This function might be
76 * called before even calling grpc_init() to set either a different signal to
77 * use. If signum == -1, then the use of signals is disabled */
78void grpc_use_signal(int signum) {
79 grpc_wakeup_signal = signum;
80 is_grpc_wakeup_signal_initialized = true;
81
82 if (grpc_wakeup_signal < 0) {
83 gpr_log(GPR_INFO,
84 "Use of signals is disabled. Epoll engine will not be used");
85 } else {
86 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
87 grpc_wakeup_signal);
88 }
89}
90
91struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070092
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070094 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070096struct grpc_fd {
97 int fd;
98 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070099 bit 0 : 1=Active / 0=Orphaned
100 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700101 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700102 gpr_atm refst;
103
104 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700105
106 /* Indicates that the fd is shutdown and that any pending read/write closures
107 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700109
110 /* The fd is either closed or we relinquished control of it. In either cases,
111 this indicates that the 'fd' on this structure is no longer valid */
112 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700113
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700114 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700115 grpc_closure *read_closure;
116 grpc_closure *write_closure;
117
Craig Tillerf83f8ca2016-07-06 11:34:08 -0700118 /* The polling island to which this fd belongs to (protected by mu) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 struct polling_island *polling_island;
120
121 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122 grpc_closure *on_done_closure;
123
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700124 /* The pollset that last noticed that the fd is readable */
125 grpc_pollset *read_notifier_pollset;
126
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700127 grpc_iomgr_object iomgr_object;
128};
129
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700130/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700131// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700132#ifdef GRPC_FD_REF_COUNT_DEBUG
133static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
134static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
135 int line);
136#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
137#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
138#else
139static void fd_ref(grpc_fd *fd);
140static void fd_unref(grpc_fd *fd);
141#define GRPC_FD_REF(fd, reason) fd_ref(fd)
142#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
143#endif
144
145static void fd_global_init(void);
146static void fd_global_shutdown(void);
147
148#define CLOSURE_NOT_READY ((grpc_closure *)0)
149#define CLOSURE_READY ((grpc_closure *)1)
150
151/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700152 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700153 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700154
Craig Tillerd8a3c042016-09-09 12:42:37 -0700155#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700156
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700157#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700158#define PI_UNREF(exec_ctx, p, r) \
159 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700160
Craig Tillerd8a3c042016-09-09 12:42:37 -0700161#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700162
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700163#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700164#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700165
166#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
167
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700168typedef struct polling_island {
169 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700170 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
171 the refcount.
172 Once the ref count becomes zero, this structure is destroyed which means
173 we should ensure that there is never a scenario where a PI_ADD_REF() is
174 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700175 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700176
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700177 /* Pointer to the polling_island this merged into.
178 * merged_to value is only set once in polling_island's lifetime (and that too
179 * only if the island is merged with another island). Because of this, we can
180 * use gpr_atm type here so that we can do atomic access on this and reduce
181 * lock contention on 'mu' mutex.
182 *
183 * Note that if this field is not NULL (i.e not 0), all the remaining fields
184 * (except mu and ref_count) are invalid and must be ignored. */
185 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700186
Craig Tillerd8a3c042016-09-09 12:42:37 -0700187 gpr_atm poller_count;
188 gpr_mu workqueue_read_mu;
189 gpr_mpscq workqueue_items;
190 gpr_atm workqueue_item_count;
191 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700192
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700193 /* The fd of the underlying epoll set */
194 int epoll_fd;
195
196 /* The file descriptors in the epoll set */
197 size_t fd_cnt;
198 size_t fd_capacity;
199 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700200} polling_island;
201
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700202/*******************************************************************************
203 * Pollset Declarations
204 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700205struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700206 /* Thread id of this worker */
207 pthread_t pt_id;
208
209 /* Used to prevent a worker from getting kicked multiple times */
210 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700211 struct grpc_pollset_worker *next;
212 struct grpc_pollset_worker *prev;
213};
214
215struct grpc_pollset {
216 gpr_mu mu;
217 grpc_pollset_worker root_worker;
218 bool kicked_without_pollers;
219
220 bool shutting_down; /* Is the pollset shutting down ? */
221 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
222 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
223
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700224 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700225 struct polling_island *polling_island;
226};
227
228/*******************************************************************************
229 * Pollset-set Declarations
230 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700231/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
232 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
233 * the current pollset_set would result in polling island merges. This would
234 * remove the need to maintain fd_count here. This will also significantly
235 * simplify the grpc_fd structure since we would no longer need to explicitly
236 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700237struct grpc_pollset_set {
238 gpr_mu mu;
239
240 size_t pollset_count;
241 size_t pollset_capacity;
242 grpc_pollset **pollsets;
243
244 size_t pollset_set_count;
245 size_t pollset_set_capacity;
246 struct grpc_pollset_set **pollset_sets;
247
248 size_t fd_count;
249 size_t fd_capacity;
250 grpc_fd **fds;
251};
252
253/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700254 * Common helpers
255 */
256
Craig Tillerf975f742016-07-01 14:56:27 -0700257static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700258 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700259 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700260 if (*composite == GRPC_ERROR_NONE) {
261 *composite = GRPC_ERROR_CREATE(desc);
262 }
263 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700264 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700265}
266
267/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700268 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700269 */
270
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700271/* The wakeup fd that is used to wake up all threads in a Polling island. This
272 is useful in the polling island merge operation where we need to wakeup all
273 the threads currently polling the smaller polling island (so that they can
274 start polling the new/merged polling island)
275
276 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
277 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
278static grpc_wakeup_fd polling_island_wakeup_fd;
279
Craig Tillerd8a3c042016-09-09 12:42:37 -0700280static __thread polling_island *g_current_thread_polling_island;
281
Craig Tillerb39307d2016-06-30 15:39:13 -0700282/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700283static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700284
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700285#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700286/* Currently TSAN may incorrectly flag data races between epoll_ctl and
287 epoll_wait for any grpc_fd structs that are added to the epoll set via
288 epoll_ctl and are returned (within a very short window) via epoll_wait().
289
290 To work-around this race, we establish a happens-before relation between
291 the code just-before epoll_ctl() and the code after epoll_wait() by using
292 this atomic */
293gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700294#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700295
Craig Tillerb39307d2016-06-30 15:39:13 -0700296static void pi_add_ref(polling_island *pi);
297static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700298
Craig Tillerd8a3c042016-09-09 12:42:37 -0700299#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillerb39307d2016-06-30 15:39:13 -0700300static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
301 int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700302 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700303 pi_add_ref(pi);
304 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
305 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700306}
307
Craig Tillerb39307d2016-06-30 15:39:13 -0700308static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
309 char *reason, char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700310 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700311 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700312 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700313 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700314}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700315
316static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
317 const char *file, int line,
318 const char *reason) {
319 if (workqueue != NULL) {
320 pi_add_ref_debug((polling_island *)workqueue, reason, file, line);
321 }
322 return workqueue;
323}
324
325static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
326 const char *file, int line, const char *reason) {
327 if (workqueue != NULL) {
328 pi_unref_dbg((polling_island *)workqueue, reason, file, line);
329 }
330}
331#else
332static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
333 if (workqueue != NULL) {
334 pi_add_ref((polling_island *)workqueue);
335 }
336 return workqueue;
337}
338
339static void workqueue_unref(grpc_exec_ctx *exec_ctx,
340 grpc_workqueue *workqueue) {
341 if (workqueue != NULL) {
342 pi_unref(exec_ctx, (polling_island *)workqueue);
343 }
344}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700345#endif
346
Craig Tiller15007612016-07-06 09:36:16 -0700347static void pi_add_ref(polling_island *pi) {
348 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
349}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700350
Craig Tillerb39307d2016-06-30 15:39:13 -0700351static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700352 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700353 Note that this deletion not be done under a lock. Once the ref count goes
354 to zero, we are guaranteed that no one else holds a reference to the
355 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700356
357 Also, if we are deleting the polling island and the merged_to field is
358 non-empty, we should remove a ref to the merged_to polling island
359 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700360 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
361 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
362 polling_island_delete(exec_ctx, pi);
363 if (next != NULL) {
364 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700365 }
366 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700367}
368
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700369/* The caller is expected to hold pi->mu lock before calling this function */
370static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700371 size_t fd_count, bool add_fd_refs,
372 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700373 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700374 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700375 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700376 char *err_msg;
377 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700378
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700379#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700380 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700381 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700382#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700383
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700384 for (i = 0; i < fd_count; i++) {
385 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
386 ev.data.ptr = fds[i];
387 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700388
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700389 if (err < 0) {
390 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700391 gpr_asprintf(
392 &err_msg,
393 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
394 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
395 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
396 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700397 }
398
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700399 continue;
400 }
401
402 if (pi->fd_cnt == pi->fd_capacity) {
403 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
404 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
405 }
406
407 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700408 if (add_fd_refs) {
409 GRPC_FD_REF(fds[i], "polling_island");
410 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700411 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700412}
413
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700414/* The caller is expected to hold pi->mu before calling this */
415static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700416 grpc_wakeup_fd *wakeup_fd,
417 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700418 struct epoll_event ev;
419 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700420 char *err_msg;
421 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700422
423 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
424 ev.data.ptr = wakeup_fd;
425 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
426 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700427 if (err < 0 && errno != EEXIST) {
428 gpr_asprintf(&err_msg,
429 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
430 "error: %d (%s)",
431 pi->epoll_fd,
432 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
433 strerror(errno));
434 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
435 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700436 }
437}
438
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700439/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700440static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700441 bool remove_fd_refs,
442 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700443 int err;
444 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700445 char *err_msg;
446 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700447
448 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700449 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700450 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700451 gpr_asprintf(&err_msg,
452 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
453 "error: %d (%s)",
454 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
455 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
456 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700457 }
458
459 if (remove_fd_refs) {
460 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700461 }
462 }
463
464 pi->fd_cnt = 0;
465}
466
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700467/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700468static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700469 bool is_fd_closed,
470 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700471 int err;
472 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700473 char *err_msg;
474 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700475
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700476 /* If fd is already closed, then it would have been automatically been removed
477 from the epoll set */
478 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700479 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
480 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700481 gpr_asprintf(
482 &err_msg,
483 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
484 pi->epoll_fd, fd->fd, errno, strerror(errno));
485 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
486 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700487 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700488 }
489
490 for (i = 0; i < pi->fd_cnt; i++) {
491 if (pi->fds[i] == fd) {
492 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700493 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700494 break;
495 }
496 }
497}
498
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700499/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700500static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
501 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700502 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700503 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700504 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700505
Craig Tillerb39307d2016-06-30 15:39:13 -0700506 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700507
Craig Tillerb39307d2016-06-30 15:39:13 -0700508 pi = gpr_malloc(sizeof(*pi));
509 gpr_mu_init(&pi->mu);
510 pi->fd_cnt = 0;
511 pi->fd_capacity = 0;
512 pi->fds = NULL;
513 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700514
515 gpr_mu_init(&pi->workqueue_read_mu);
516 gpr_mpscq_init(&pi->workqueue_items);
517 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700518
Craig Tiller15007612016-07-06 09:36:16 -0700519 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700520 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700521 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700522
Craig Tillerd8a3c042016-09-09 12:42:37 -0700523 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
524 err_desc)) {
525 goto done;
526 }
527
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700528 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700529
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700530 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700531 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
532 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700533 }
534
Craig Tillerb39307d2016-06-30 15:39:13 -0700535 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700536 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700537
538 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700539 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700540 }
541
Craig Tillerb39307d2016-06-30 15:39:13 -0700542done:
543 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700544 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700545 pi = NULL;
546 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700547 return pi;
548}
549
Craig Tillerb39307d2016-06-30 15:39:13 -0700550static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700551 GPR_ASSERT(pi->fd_cnt == 0);
552
Craig Tiller0a06cd72016-07-14 13:21:24 -0700553 if (pi->epoll_fd >= 0) {
554 close(pi->epoll_fd);
555 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700556 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
557 gpr_mu_destroy(&pi->workqueue_read_mu);
558 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700559 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700560 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700561 gpr_free(pi->fds);
562 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700563}
564
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700565/* Attempts to gets the last polling island in the linked list (liked by the
566 * 'merged_to' field). Since this does not lock the polling island, there are no
567 * guarantees that the island returned is the last island */
568static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
569 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
570 while (next != NULL) {
571 pi = next;
572 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
573 }
574
575 return pi;
576}
577
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700578/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700579 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700580 returned polling island's mu.
581 Usage: To lock/unlock polling island "pi", do the following:
582 polling_island *pi_latest = polling_island_lock(pi);
583 ...
584 ... critical section ..
585 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700586 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
587static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700588 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700589
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700590 while (true) {
591 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
592 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700593 /* Looks like 'pi' is the last node in the linked list but unless we check
594 this by holding the pi->mu lock, we cannot be sure (i.e without the
595 pi->mu lock, we don't prevent island merges).
596 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700597 gpr_mu_lock(&pi->mu);
598 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
599 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700600 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700601 break;
602 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700603
604 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
605 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700606 gpr_mu_unlock(&pi->mu);
607 }
608
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700609 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700610 }
611
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700612 return pi;
613}
614
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700615/* Gets the lock on the *latest* polling islands in the linked lists pointed by
616 *p and *q (and also updates *p and *q to point to the latest polling islands)
617
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700618 This function is needed because calling the following block of code to obtain
619 locks on polling islands (*p and *q) is prone to deadlocks.
620 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700621 polling_island_lock(*p, true);
622 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700623 }
624
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700625 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700626 polling_island *p1;
627 polling_island *p2;
628 ..
629 polling_island_lock_pair(&p1, &p2);
630 ..
631 .. Critical section with both p1 and p2 locked
632 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700633 // Release locks: Always call polling_island_unlock_pair() to release locks
634 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700635*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700636static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700637 polling_island *pi_1 = *p;
638 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700639 polling_island *next_1 = NULL;
640 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700641
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700642 /* The algorithm is simple:
643 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
644 keep updating pi_1 and pi_2)
645 - Then obtain locks on the islands by following a lock order rule of
646 locking polling_island with lower address first
647 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
648 pointing to the same island. If that is the case, we can just call
649 polling_island_lock()
650 - After obtaining both the locks, double check that the polling islands
651 are still the last polling islands in their respective linked lists
652 (this is because there might have been polling island merges before
653 we got the lock)
654 - If the polling islands are the last islands, we are done. If not,
655 release the locks and continue the process from the first step */
656 while (true) {
657 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
658 while (next_1 != NULL) {
659 pi_1 = next_1;
660 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700661 }
662
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700663 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
664 while (next_2 != NULL) {
665 pi_2 = next_2;
666 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
667 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700668
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700669 if (pi_1 == pi_2) {
670 pi_1 = pi_2 = polling_island_lock(pi_1);
671 break;
672 }
673
674 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700675 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700676 gpr_mu_lock(&pi_2->mu);
677 } else {
678 gpr_mu_lock(&pi_2->mu);
679 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700680 }
681
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700682 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
683 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
684 if (next_1 == NULL && next_2 == NULL) {
685 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700686 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700687
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700688 gpr_mu_unlock(&pi_1->mu);
689 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700690 }
691
692 *p = pi_1;
693 *q = pi_2;
694}
695
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700696static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
697 if (p == q) {
698 gpr_mu_unlock(&p->mu);
699 } else {
700 gpr_mu_unlock(&p->mu);
701 gpr_mu_unlock(&q->mu);
702 }
703}
704
Craig Tillerd8a3c042016-09-09 12:42:37 -0700705static void workqueue_maybe_wakeup(polling_island *pi) {
706 bool force_wakeup = false;
707 bool is_current_poller = (g_current_thread_polling_island == pi);
708 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
709 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
710 if (force_wakeup || current_pollers > min_current_pollers_for_wakeup) {
711 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
712 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
713 }
714}
715
716static void workqueue_move_items_to_parent(polling_island *q) {
717 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
718 if (p == NULL) {
719 return;
720 }
721 gpr_mu_lock(&q->workqueue_read_mu);
722 int num_added = 0;
723 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
724 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
725 if (n != NULL) {
726 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
727 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
728 gpr_mpscq_push(&p->workqueue_items, n);
729 num_added++;
730 }
731 }
732 gpr_mu_unlock(&q->workqueue_read_mu);
733 if (num_added > 0) {
734 workqueue_maybe_wakeup(p);
735 }
736 workqueue_move_items_to_parent(p);
737}
738
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700739static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700740 polling_island *q,
741 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700742 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700743 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700744
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700745 if (p != q) {
746 /* Make sure that p points to the polling island with fewer fds than q */
747 if (p->fd_cnt > q->fd_cnt) {
748 GPR_SWAP(polling_island *, p, q);
749 }
750
751 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
752 Note that the refcounts on the fds being moved will not change here.
753 This is why the last param in the following two functions is 'false') */
754 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
755 polling_island_remove_all_fds_locked(p, false, error);
756
757 /* Wakeup all the pollers (if any) on p so that they pickup this change */
758 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
759
760 /* Add the 'merged_to' link from p --> q */
761 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
762 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700763
764 workqueue_move_items_to_parent(q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700765 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700766 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700767
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700768 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700769
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700770 /* Return the merged polling island (Note that no merge would have happened
771 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700772 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700773}
774
Craig Tillerd8a3c042016-09-09 12:42:37 -0700775static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
776 grpc_workqueue *workqueue, grpc_closure *closure,
777 grpc_error *error) {
778 polling_island *pi = (polling_island *)workqueue;
779 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
780 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
781 closure->error_data.error = error;
782 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
783 if (last == 0) {
784 workqueue_maybe_wakeup(pi);
785 }
786 GPR_TIMER_END("workqueue.enqueue", 0);
787 workqueue_move_items_to_parent(pi);
788}
789
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700790static grpc_error *polling_island_global_init() {
791 grpc_error *error = GRPC_ERROR_NONE;
792
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700793 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
794 if (error == GRPC_ERROR_NONE) {
795 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
796 }
797
798 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700799}
800
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700801static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700802 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700803}
804
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700805/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700806 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700807 */
808
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700809/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700810 * but instead so that implementations with multiple threads in (for example)
811 * epoll_wait deal with the race between pollset removal and incoming poll
812 * notifications.
813 *
814 * The problem is that the poller ultimately holds a reference to this
815 * object, so it is very difficult to know when is safe to free it, at least
816 * without some expensive synchronization.
817 *
818 * If we keep the object freelisted, in the worst case losing this race just
819 * becomes a spurious read notification on a reused fd.
820 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700821
822/* The alarm system needs to be able to wakeup 'some poller' sometimes
823 * (specifically when a new alarm needs to be triggered earlier than the next
824 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
825 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700826
827/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
828 * sure to wake up one polling thread (which can wake up other threads if
829 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700830grpc_wakeup_fd grpc_global_wakeup_fd;
831
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700832static grpc_fd *fd_freelist = NULL;
833static gpr_mu fd_freelist_mu;
834
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700835#ifdef GRPC_FD_REF_COUNT_DEBUG
836#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
837#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
838static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
839 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700840 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
841 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700842 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
843#else
844#define REF_BY(fd, n, reason) ref_by(fd, n)
845#define UNREF_BY(fd, n, reason) unref_by(fd, n)
846static void ref_by(grpc_fd *fd, int n) {
847#endif
848 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
849}
850
851#ifdef GRPC_FD_REF_COUNT_DEBUG
852static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
853 int line) {
854 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700855 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
856 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700857 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
858#else
859static void unref_by(grpc_fd *fd, int n) {
860 gpr_atm old;
861#endif
862 old = gpr_atm_full_fetch_add(&fd->refst, -n);
863 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700864 /* Add the fd to the freelist */
865 gpr_mu_lock(&fd_freelist_mu);
866 fd->freelist_next = fd_freelist;
867 fd_freelist = fd;
868 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700869
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700870 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700871 } else {
872 GPR_ASSERT(old > n);
873 }
874}
875
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700876/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700877#ifdef GRPC_FD_REF_COUNT_DEBUG
878static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
879 int line) {
880 ref_by(fd, 2, reason, file, line);
881}
882
883static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
884 int line) {
885 unref_by(fd, 2, reason, file, line);
886}
887#else
888static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700889static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
890#endif
891
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700892static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
893
894static void fd_global_shutdown(void) {
895 gpr_mu_lock(&fd_freelist_mu);
896 gpr_mu_unlock(&fd_freelist_mu);
897 while (fd_freelist != NULL) {
898 grpc_fd *fd = fd_freelist;
899 fd_freelist = fd_freelist->freelist_next;
900 gpr_mu_destroy(&fd->mu);
901 gpr_free(fd);
902 }
903 gpr_mu_destroy(&fd_freelist_mu);
904}
905
906static grpc_fd *fd_create(int fd, const char *name) {
907 grpc_fd *new_fd = NULL;
908
909 gpr_mu_lock(&fd_freelist_mu);
910 if (fd_freelist != NULL) {
911 new_fd = fd_freelist;
912 fd_freelist = fd_freelist->freelist_next;
913 }
914 gpr_mu_unlock(&fd_freelist_mu);
915
916 if (new_fd == NULL) {
917 new_fd = gpr_malloc(sizeof(grpc_fd));
918 gpr_mu_init(&new_fd->mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700919 }
920
921 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
922 newly created fd (or an fd we got from the freelist), no one else would be
923 holding a lock to it anyway. */
924 gpr_mu_lock(&new_fd->mu);
925
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700926 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700927 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700928 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700929 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700930 new_fd->read_closure = CLOSURE_NOT_READY;
931 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700932 new_fd->polling_island = NULL;
933 new_fd->freelist_next = NULL;
934 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700935 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700936
937 gpr_mu_unlock(&new_fd->mu);
938
939 char *fd_name;
940 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
941 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700942#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700943 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700944#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700945 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700946 return new_fd;
947}
948
949static bool fd_is_orphaned(grpc_fd *fd) {
950 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
951}
952
953static int fd_wrapped_fd(grpc_fd *fd) {
954 int ret_fd = -1;
955 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700956 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700957 ret_fd = fd->fd;
958 }
959 gpr_mu_unlock(&fd->mu);
960
961 return ret_fd;
962}
963
964static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
965 grpc_closure *on_done, int *release_fd,
966 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700967 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700968 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -0700969 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700970
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700971 gpr_mu_lock(&fd->mu);
972 fd->on_done_closure = on_done;
973
974 /* If release_fd is not NULL, we should be relinquishing control of the file
975 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700976 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700977 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700978 } else {
979 close(fd->fd);
980 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700981 }
982
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700983 fd->orphaned = true;
984
985 /* Remove the active status but keep referenced. We want this grpc_fd struct
986 to be alive (and not added to freelist) until the end of this function */
987 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700988
989 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700990 - Get a lock on the latest polling island (i.e the last island in the
991 linked list pointed by fd->polling_island). This is the island that
992 would actually contain the fd
993 - Remove the fd from the latest polling island
994 - Unlock the latest polling island
995 - Set fd->polling_island to NULL (but remove the ref on the polling island
996 before doing this.) */
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700997 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700998 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700999 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001000 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001001
Craig Tiller15007612016-07-06 09:36:16 -07001002 unref_pi = fd->polling_island;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001003 fd->polling_island = NULL;
1004 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001005
Yuchen Zenga0399f22016-08-04 17:52:53 -07001006 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
1007 NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001008
1009 gpr_mu_unlock(&fd->mu);
1010 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001011 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001012 /* Unref stale polling island here, outside the fd lock above.
1013 The polling island owns a workqueue which owns an fd, and unreffing
1014 inside the lock can cause an eventual lock loop that makes TSAN very
1015 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001016 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1017 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001018 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001019 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001020}
1021
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001022static grpc_error *fd_shutdown_error(bool shutdown) {
1023 if (!shutdown) {
1024 return GRPC_ERROR_NONE;
1025 } else {
1026 return GRPC_ERROR_CREATE("FD shutdown");
1027 }
1028}
1029
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001030static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1031 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001032 if (fd->shutdown) {
1033 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
1034 NULL);
1035 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001036 /* not ready ==> switch to a waiting state by setting the closure */
1037 *st = closure;
1038 } else if (*st == CLOSURE_READY) {
1039 /* already ready ==> queue the closure to run immediately */
1040 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001041 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
1042 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001043 } else {
1044 /* upcallptr was set to a different closure. This is an error! */
1045 gpr_log(GPR_ERROR,
1046 "User called a notify_on function with a previous callback still "
1047 "pending");
1048 abort();
1049 }
1050}
1051
1052/* returns 1 if state becomes not ready */
1053static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1054 grpc_closure **st) {
1055 if (*st == CLOSURE_READY) {
1056 /* duplicate ready ==> ignore */
1057 return 0;
1058 } else if (*st == CLOSURE_NOT_READY) {
1059 /* not ready, and not waiting ==> flag ready */
1060 *st = CLOSURE_READY;
1061 return 0;
1062 } else {
1063 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001064 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001065 *st = CLOSURE_NOT_READY;
1066 return 1;
1067 }
1068}
1069
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001070static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1071 grpc_fd *fd) {
1072 grpc_pollset *notifier = NULL;
1073
1074 gpr_mu_lock(&fd->mu);
1075 notifier = fd->read_notifier_pollset;
1076 gpr_mu_unlock(&fd->mu);
1077
1078 return notifier;
1079}
1080
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001081static bool fd_is_shutdown(grpc_fd *fd) {
1082 gpr_mu_lock(&fd->mu);
1083 const bool r = fd->shutdown;
1084 gpr_mu_unlock(&fd->mu);
1085 return r;
1086}
1087
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001088/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001089static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1090 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001091 /* Do the actual shutdown only once */
1092 if (!fd->shutdown) {
1093 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001094
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001095 shutdown(fd->fd, SHUT_RDWR);
1096 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1097 at this point, the closures would be called with 'success = false' */
1098 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1099 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1100 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001101 gpr_mu_unlock(&fd->mu);
1102}
1103
1104static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1105 grpc_closure *closure) {
1106 gpr_mu_lock(&fd->mu);
1107 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1108 gpr_mu_unlock(&fd->mu);
1109}
1110
1111static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1112 grpc_closure *closure) {
1113 gpr_mu_lock(&fd->mu);
1114 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1115 gpr_mu_unlock(&fd->mu);
1116}
1117
Craig Tillerd6ba6192016-06-30 15:42:41 -07001118static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001119 gpr_mu_lock(&fd->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001120 grpc_workqueue *workqueue =
1121 grpc_workqueue_ref((grpc_workqueue *)fd->polling_island);
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001122 gpr_mu_unlock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001123 return workqueue;
1124}
Craig Tiller70bd4832016-06-30 14:20:46 -07001125
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001126/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001127 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001128 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001129GPR_TLS_DECL(g_current_thread_pollset);
1130GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001131static __thread bool g_initialized_sigmask;
1132static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001133
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001134static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001135#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001136 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001137#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001138}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001139
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001140static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001141
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001142/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001143static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001144 gpr_tls_init(&g_current_thread_pollset);
1145 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001146 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001147 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001148}
1149
1150static void pollset_global_shutdown(void) {
1151 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001152 gpr_tls_destroy(&g_current_thread_pollset);
1153 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001154}
1155
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001156static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1157 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001158
1159 /* Kick the worker only if it was not already kicked */
1160 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1161 GRPC_POLLING_TRACE(
1162 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1163 (void *)worker, worker->pt_id);
1164 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1165 if (err_num != 0) {
1166 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1167 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001168 }
1169 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001170}
1171
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001172/* Return 1 if the pollset has active threads in pollset_work (pollset must
1173 * be locked) */
1174static int pollset_has_workers(grpc_pollset *p) {
1175 return p->root_worker.next != &p->root_worker;
1176}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001177
1178static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1179 worker->prev->next = worker->next;
1180 worker->next->prev = worker->prev;
1181}
1182
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001183static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1184 if (pollset_has_workers(p)) {
1185 grpc_pollset_worker *w = p->root_worker.next;
1186 remove_worker(p, w);
1187 return w;
1188 } else {
1189 return NULL;
1190 }
1191}
1192
1193static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1194 worker->next = &p->root_worker;
1195 worker->prev = worker->next->prev;
1196 worker->prev->next = worker->next->prev = worker;
1197}
1198
1199static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1200 worker->prev = &p->root_worker;
1201 worker->next = worker->prev->next;
1202 worker->prev->next = worker->next->prev = worker;
1203}
1204
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001205/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001206static grpc_error *pollset_kick(grpc_pollset *p,
1207 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001208 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001209 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001210 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001211 grpc_pollset_worker *worker = specific_worker;
1212 if (worker != NULL) {
1213 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001214 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001215 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001216 for (worker = p->root_worker.next; worker != &p->root_worker;
1217 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001218 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001219 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001220 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001221 }
Craig Tillera218a062016-06-26 09:58:37 -07001222 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001223 } else {
1224 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001225 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001226 } else {
1227 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001228 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001229 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001230 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001231 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001232 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1233 /* Since worker == NULL, it means that we can kick "any" worker on this
1234 pollset 'p'. If 'p' happens to be the same pollset this thread is
1235 currently polling (i.e in pollset_work() function), then there is no need
1236 to kick any other worker since the current thread can just absorb the
1237 kick. This is the reason why we enter this case only when
1238 g_current_thread_pollset is != p */
1239
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001240 GPR_TIMER_MARK("kick_anonymous", 0);
1241 worker = pop_front_worker(p);
1242 if (worker != NULL) {
1243 GPR_TIMER_MARK("finally_kick", 0);
1244 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001245 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001246 } else {
1247 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001248 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001249 }
1250 }
1251
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001252 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001253 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1254 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001255}
1256
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001257static grpc_error *kick_poller(void) {
1258 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1259}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001260
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001261static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1262 gpr_mu_init(&pollset->mu);
1263 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001264
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001265 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001266 pollset->kicked_without_pollers = false;
1267
1268 pollset->shutting_down = false;
1269 pollset->finish_shutdown_called = false;
1270 pollset->shutdown_done = NULL;
1271
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001272 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001273}
1274
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001275/* Convert a timespec to milliseconds:
1276 - Very small or negative poll times are clamped to zero to do a non-blocking
1277 poll (which becomes spin polling)
1278 - Other small values are rounded up to one millisecond
1279 - Longer than a millisecond polls are rounded up to the next nearest
1280 millisecond to avoid spinning
1281 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001282static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1283 gpr_timespec now) {
1284 gpr_timespec timeout;
1285 static const int64_t max_spin_polling_us = 10;
1286 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1287 return -1;
1288 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001289
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001290 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1291 max_spin_polling_us,
1292 GPR_TIMESPAN))) <= 0) {
1293 return 0;
1294 }
1295 timeout = gpr_time_sub(deadline, now);
1296 return gpr_time_to_millis(gpr_time_add(
1297 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1298}
1299
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001300static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1301 grpc_pollset *notifier) {
1302 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001303 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001304 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1305 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001306 gpr_mu_unlock(&fd->mu);
1307}
1308
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001309static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001310 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1311 gpr_mu_lock(&fd->mu);
1312 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1313 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001314}
1315
Craig Tillerb39307d2016-06-30 15:39:13 -07001316static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1317 grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001318 if (ps->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001319 PI_UNREF(exec_ctx, ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001320 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001321 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001322}
1323
1324static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1325 grpc_pollset *pollset) {
1326 /* The pollset cannot have any workers if we are at this stage */
1327 GPR_ASSERT(!pollset_has_workers(pollset));
1328
1329 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001330
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001331 /* Release the ref and set pollset->polling_island to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001332 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001333 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001334}
1335
1336/* pollset->mu lock must be held by the caller before calling this */
1337static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1338 grpc_closure *closure) {
1339 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1340 GPR_ASSERT(!pollset->shutting_down);
1341 pollset->shutting_down = true;
1342 pollset->shutdown_done = closure;
1343 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1344
1345 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1346 because it would release the underlying polling island. In such a case, we
1347 let the last worker call finish_shutdown_locked() from pollset_work() */
1348 if (!pollset_has_workers(pollset)) {
1349 GPR_ASSERT(!pollset->finish_shutdown_called);
1350 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1351 finish_shutdown_locked(exec_ctx, pollset);
1352 }
1353 GPR_TIMER_END("pollset_shutdown", 0);
1354}
1355
1356/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1357 * than destroying the mutexes, there is nothing special that needs to be done
1358 * here */
1359static void pollset_destroy(grpc_pollset *pollset) {
1360 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001361 gpr_mu_destroy(&pollset->mu);
1362}
1363
Craig Tiller2b49ea92016-07-01 13:21:27 -07001364static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001365 GPR_ASSERT(pollset->shutting_down);
1366 GPR_ASSERT(!pollset_has_workers(pollset));
1367 pollset->shutting_down = false;
1368 pollset->finish_shutdown_called = false;
1369 pollset->kicked_without_pollers = false;
1370 pollset->shutdown_done = NULL;
Craig Tillerb39307d2016-06-30 15:39:13 -07001371 GPR_ASSERT(pollset->polling_island == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001372}
1373
Craig Tillerd8a3c042016-09-09 12:42:37 -07001374static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1375 polling_island *pi) {
1376 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1377 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1378 gpr_mu_unlock(&pi->workqueue_read_mu);
1379 if (n != NULL) {
1380 if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
1381 workqueue_maybe_wakeup(pi);
1382 }
1383 grpc_closure *c = (grpc_closure *)n;
1384 grpc_closure_run(exec_ctx, c, c->error_data.error);
1385 return true;
1386 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
1387 workqueue_maybe_wakeup(pi);
1388 }
1389 }
1390 return false;
1391}
1392
Craig Tiller84ea3412016-09-08 14:57:56 -07001393#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001394/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1395static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001396 grpc_pollset *pollset,
1397 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001398 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001399 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001400 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001401 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001402 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001403 char *err_msg;
1404 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001405 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1406
1407 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001408 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001409
1410 Since epoll_fd is immutable, we can read it without obtaining the polling
1411 island lock. There is however a possibility that the polling island (from
1412 which we got the epoll_fd) got merged with another island while we are
1413 in this function. This is still okay because in such a case, we will wakeup
1414 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001415 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001416
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001417 if (pollset->polling_island == NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001418 pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001419 if (pollset->polling_island == NULL) {
1420 GPR_TIMER_END("pollset_work_and_unlock", 0);
1421 return; /* Fatal error. We cannot continue */
1422 }
1423
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001424 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001425 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1426 (void *)pollset, (void *)pollset->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001427 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001428
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001429 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001430 epoll_fd = pi->epoll_fd;
1431
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001432 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001433 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001434 if (pollset->polling_island != pi) {
1435 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1436 polling island to be deleted */
1437 PI_ADD_REF(pi, "ps");
Craig Tillerb39307d2016-06-30 15:39:13 -07001438 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001439 pollset->polling_island = pi;
1440 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001441
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001442 /* Add an extra ref so that the island does not get destroyed (which means
1443 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1444 epoll_fd */
1445 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001446 gpr_mu_unlock(&pollset->mu);
1447
Craig Tillerd8a3c042016-09-09 12:42:37 -07001448 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
1449 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1450 g_current_thread_polling_island = pi;
1451
Vijay Paicef54012016-08-28 23:05:31 -07001452 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001453 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1454 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001455 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001456 if (ep_rv < 0) {
1457 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001458 gpr_asprintf(&err_msg,
1459 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1460 epoll_fd, errno, strerror(errno));
1461 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001462 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001463 /* We were interrupted. Save an interation by doing a zero timeout
1464 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001465 GRPC_POLLING_TRACE(
1466 "pollset_work: pollset: %p, worker: %p received kick",
1467 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001468 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001469 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001470 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001471
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001472#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001473 /* See the definition of g_poll_sync for more details */
1474 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001475#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001476
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001477 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001478 void *data_ptr = ep_ev[i].data.ptr;
1479 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001480 append_error(error,
1481 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1482 err_desc);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001483 } else if (data_ptr == &pi->workqueue_wakeup_fd) {
1484 append_error(error,
1485 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1486 err_desc);
1487 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001488 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001489 GRPC_POLLING_TRACE(
1490 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1491 "%d) got merged",
1492 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001493 /* This means that our polling island is merged with a different
1494 island. We do not have to do anything here since the subsequent call
1495 to the function pollset_work_and_unlock() will pick up the correct
1496 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001497 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001498 grpc_fd *fd = data_ptr;
1499 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1500 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1501 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001502 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001503 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001504 }
1505 if (write_ev || cancel) {
1506 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001507 }
1508 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001509 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001510
1511 g_current_thread_polling_island = NULL;
1512 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1513 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001514
1515 GPR_ASSERT(pi != NULL);
1516
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001517 /* Before leaving, release the extra ref we added to the polling island. It
1518 is important to use "pi" here (i.e our old copy of pollset->polling_island
1519 that we got before releasing the polling island lock). This is because
1520 pollset->polling_island pointer might get udpated in other parts of the
1521 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001522 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001523
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001524 GPR_TIMER_END("pollset_work_and_unlock", 0);
1525}
1526
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001527/* pollset->mu lock must be held by the caller before calling this.
1528 The function pollset_work() may temporarily release the lock (pollset->mu)
1529 during the course of its execution but it will always re-acquire the lock and
1530 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001531static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1532 grpc_pollset_worker **worker_hdl,
1533 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001534 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001535 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001536 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1537
1538 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001539
1540 grpc_pollset_worker worker;
1541 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001542 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001543 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001544
1545 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001546
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001547 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1548 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001549
1550 if (pollset->kicked_without_pollers) {
1551 /* If the pollset was kicked without pollers, pretend that the current
1552 worker got the kick and skip polling. A kick indicates that there is some
1553 work that needs attention like an event on the completion queue or an
1554 alarm */
1555 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1556 pollset->kicked_without_pollers = 0;
1557 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001558 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001559 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1560 worker that there is some pending work that needs immediate attention
1561 (like an event on the completion queue, or a polling island merge that
1562 results in a new epoll-fd to wait on) and that the worker should not
1563 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001564
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001565 A worker can be kicked anytime from the point it is added to the pollset
1566 via push_front_worker() (or push_back_worker()) to the point it is
1567 removed via remove_worker().
1568 If the worker is kicked before/during it calls epoll_pwait(), it should
1569 immediately exit from epoll_wait(). If the worker is kicked after it
1570 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001571
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001572 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001573 times *except* when it is in epoll_pwait(). This way, the worker never
1574 misses acting on a kick */
1575
Craig Tiller19196992016-06-27 18:45:56 -07001576 if (!g_initialized_sigmask) {
1577 sigemptyset(&new_mask);
1578 sigaddset(&new_mask, grpc_wakeup_signal);
1579 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1580 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1581 g_initialized_sigmask = true;
1582 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1583 This is the mask used at all times *except during
1584 epoll_wait()*"
1585 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001586 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001587
Craig Tiller19196992016-06-27 18:45:56 -07001588 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001589 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001590 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001591
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001592 push_front_worker(pollset, &worker); /* Add worker to pollset */
1593
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001594 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1595 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001596 grpc_exec_ctx_flush(exec_ctx);
1597
1598 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001599
1600 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1601 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001602 remove_worker(pollset, &worker);
1603 }
1604
1605 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1606 false at this point) and the pollset is shutting down, we may have to
1607 finish the shutdown process by calling finish_shutdown_locked().
1608 See pollset_shutdown() for more details.
1609
1610 Note: Continuing to access pollset here is safe; it is the caller's
1611 responsibility to not destroy a pollset when it has outstanding calls to
1612 pollset_work() */
1613 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1614 !pollset->finish_shutdown_called) {
1615 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1616 finish_shutdown_locked(exec_ctx, pollset);
1617
1618 gpr_mu_unlock(&pollset->mu);
1619 grpc_exec_ctx_flush(exec_ctx);
1620 gpr_mu_lock(&pollset->mu);
1621 }
1622
1623 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001624
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001625 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1626 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001627
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001628 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001629
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001630 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1631 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001632}
1633
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001634static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1635 grpc_fd *fd) {
Craig Tiller9d018482016-07-18 08:53:49 -07001636 GPR_TIMER_BEGIN("pollset_add_fd", 0);
1637
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001638 grpc_error *error = GRPC_ERROR_NONE;
1639
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001640 gpr_mu_lock(&pollset->mu);
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001641 gpr_mu_lock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001642
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001643 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001644
Craig Tiller7212c232016-07-06 13:11:09 -07001645retry:
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001646 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1647 * equal, do nothing.
1648 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1649 * a new polling island (with a refcount of 2) and make the polling_island
1650 * fields in both fd and pollset to point to the new island
1651 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1652 * the NULL polling_island field to point to the non-NULL polling_island
1653 * field (ensure that the refcount on the polling island is incremented by
1654 * 1 to account for the newly added reference)
1655 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1656 * and different, merge both the polling islands and update the
1657 * polling_island fields in both fd and pollset to point to the merged
1658 * polling island.
1659 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001660
Craig Tiller42ac6db2016-07-06 17:13:56 -07001661 if (fd->orphaned) {
1662 gpr_mu_unlock(&fd->mu);
1663 gpr_mu_unlock(&pollset->mu);
1664 /* early out */
1665 return;
1666 }
1667
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001668 if (fd->polling_island == pollset->polling_island) {
1669 pi_new = fd->polling_island;
1670 if (pi_new == NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001671 /* Unlock before creating a new polling island: the polling island will
1672 create a workqueue which creates a file descriptor, and holding an fd
1673 lock here can eventually cause a loop to appear to TSAN (making it
1674 unhappy). We don't think it's a real loop (there's an epoch point where
1675 that loop possibility disappears), but the advantages of keeping TSAN
1676 happy outweigh any performance advantage we might have by keeping the
1677 lock held. */
Craig Tiller7212c232016-07-06 13:11:09 -07001678 gpr_mu_unlock(&fd->mu);
Craig Tillerb39307d2016-06-30 15:39:13 -07001679 pi_new = polling_island_create(exec_ctx, fd, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001680 gpr_mu_lock(&fd->mu);
Craig Tiller0a06cd72016-07-14 13:21:24 -07001681 /* Need to reverify any assumptions made between the initial lock and
1682 getting to this branch: if they've changed, we need to throw away our
1683 work and figure things out again. */
Craig Tiller7212c232016-07-06 13:11:09 -07001684 if (fd->polling_island != NULL) {
Craig Tiller27da6422016-07-06 13:14:46 -07001685 GRPC_POLLING_TRACE(
1686 "pollset_add_fd: Raced creating new polling island. pi_new: %p "
1687 "(fd: %d, pollset: %p)",
1688 (void *)pi_new, fd->fd, (void *)pollset);
1689 PI_ADD_REF(pi_new, "dance_of_destruction");
1690 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
Craig Tiller7212c232016-07-06 13:11:09 -07001691 goto retry;
Craig Tiller27da6422016-07-06 13:14:46 -07001692 } else {
1693 GRPC_POLLING_TRACE(
1694 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
1695 "pollset: %p)",
1696 (void *)pi_new, fd->fd, (void *)pollset);
Craig Tiller7212c232016-07-06 13:11:09 -07001697 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001698 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001699 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001700 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001701 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001702 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001703
1704 GRPC_POLLING_TRACE(
1705 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1706 "pollset->pi: %p)",
1707 (void *)pi_new, fd->fd, (void *)pollset,
1708 (void *)pollset->polling_island);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001709 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001710 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001711 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001712
1713 GRPC_POLLING_TRACE(
1714 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1715 "%p, fd->pi: %p",
1716 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001717 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001718 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1719 &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001720 GRPC_POLLING_TRACE(
1721 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1722 "%p, fd->pi: %p, pollset->pi: %p)",
1723 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
1724 (void *)pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001725 }
1726
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001727 /* At this point, pi_new is the polling island that both fd->polling_island
1728 and pollset->polling_island must be pointing to */
1729
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001730 if (fd->polling_island != pi_new) {
1731 PI_ADD_REF(pi_new, "fd");
1732 if (fd->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001733 PI_UNREF(exec_ctx, fd->polling_island, "fd");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001734 }
1735 fd->polling_island = pi_new;
1736 }
1737
1738 if (pollset->polling_island != pi_new) {
1739 PI_ADD_REF(pi_new, "ps");
1740 if (pollset->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001741 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001742 }
1743 pollset->polling_island = pi_new;
1744 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001745
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001746 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001747 gpr_mu_unlock(&pollset->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001748
1749 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Craig Tiller9d018482016-07-18 08:53:49 -07001750
1751 GPR_TIMER_END("pollset_add_fd", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001752}
1753
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001754/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001755 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001756 */
1757
1758static grpc_pollset_set *pollset_set_create(void) {
1759 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1760 memset(pollset_set, 0, sizeof(*pollset_set));
1761 gpr_mu_init(&pollset_set->mu);
1762 return pollset_set;
1763}
1764
1765static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1766 size_t i;
1767 gpr_mu_destroy(&pollset_set->mu);
1768 for (i = 0; i < pollset_set->fd_count; i++) {
1769 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1770 }
1771 gpr_free(pollset_set->pollsets);
1772 gpr_free(pollset_set->pollset_sets);
1773 gpr_free(pollset_set->fds);
1774 gpr_free(pollset_set);
1775}
1776
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001777static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1778 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1779 size_t i;
1780 gpr_mu_lock(&pollset_set->mu);
1781 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1782 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1783 pollset_set->fds = gpr_realloc(
1784 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1785 }
1786 GRPC_FD_REF(fd, "pollset_set");
1787 pollset_set->fds[pollset_set->fd_count++] = fd;
1788 for (i = 0; i < pollset_set->pollset_count; i++) {
1789 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1790 }
1791 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1792 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1793 }
1794 gpr_mu_unlock(&pollset_set->mu);
1795}
1796
1797static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1798 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1799 size_t i;
1800 gpr_mu_lock(&pollset_set->mu);
1801 for (i = 0; i < pollset_set->fd_count; i++) {
1802 if (pollset_set->fds[i] == fd) {
1803 pollset_set->fd_count--;
1804 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1805 pollset_set->fds[pollset_set->fd_count]);
1806 GRPC_FD_UNREF(fd, "pollset_set");
1807 break;
1808 }
1809 }
1810 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1811 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1812 }
1813 gpr_mu_unlock(&pollset_set->mu);
1814}
1815
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001816static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1817 grpc_pollset_set *pollset_set,
1818 grpc_pollset *pollset) {
1819 size_t i, j;
1820 gpr_mu_lock(&pollset_set->mu);
1821 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1822 pollset_set->pollset_capacity =
1823 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1824 pollset_set->pollsets =
1825 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1826 sizeof(*pollset_set->pollsets));
1827 }
1828 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1829 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1830 if (fd_is_orphaned(pollset_set->fds[i])) {
1831 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1832 } else {
1833 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1834 pollset_set->fds[j++] = pollset_set->fds[i];
1835 }
1836 }
1837 pollset_set->fd_count = j;
1838 gpr_mu_unlock(&pollset_set->mu);
1839}
1840
1841static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1842 grpc_pollset_set *pollset_set,
1843 grpc_pollset *pollset) {
1844 size_t i;
1845 gpr_mu_lock(&pollset_set->mu);
1846 for (i = 0; i < pollset_set->pollset_count; i++) {
1847 if (pollset_set->pollsets[i] == pollset) {
1848 pollset_set->pollset_count--;
1849 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1850 pollset_set->pollsets[pollset_set->pollset_count]);
1851 break;
1852 }
1853 }
1854 gpr_mu_unlock(&pollset_set->mu);
1855}
1856
1857static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1858 grpc_pollset_set *bag,
1859 grpc_pollset_set *item) {
1860 size_t i, j;
1861 gpr_mu_lock(&bag->mu);
1862 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1863 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1864 bag->pollset_sets =
1865 gpr_realloc(bag->pollset_sets,
1866 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1867 }
1868 bag->pollset_sets[bag->pollset_set_count++] = item;
1869 for (i = 0, j = 0; i < bag->fd_count; i++) {
1870 if (fd_is_orphaned(bag->fds[i])) {
1871 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1872 } else {
1873 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1874 bag->fds[j++] = bag->fds[i];
1875 }
1876 }
1877 bag->fd_count = j;
1878 gpr_mu_unlock(&bag->mu);
1879}
1880
1881static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1882 grpc_pollset_set *bag,
1883 grpc_pollset_set *item) {
1884 size_t i;
1885 gpr_mu_lock(&bag->mu);
1886 for (i = 0; i < bag->pollset_set_count; i++) {
1887 if (bag->pollset_sets[i] == item) {
1888 bag->pollset_set_count--;
1889 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1890 bag->pollset_sets[bag->pollset_set_count]);
1891 break;
1892 }
1893 }
1894 gpr_mu_unlock(&bag->mu);
1895}
1896
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001897/* Test helper functions
1898 * */
1899void *grpc_fd_get_polling_island(grpc_fd *fd) {
1900 polling_island *pi;
1901
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001902 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001903 pi = fd->polling_island;
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001904 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001905
1906 return pi;
1907}
1908
1909void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1910 polling_island *pi;
1911
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001912 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001913 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001914 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001915
1916 return pi;
1917}
1918
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001919bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001920 polling_island *p1 = p;
1921 polling_island *p2 = q;
1922
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001923 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1924 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001925 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001926 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001927
1928 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001929}
1930
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001931/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001932 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001933 */
1934
1935static void shutdown_engine(void) {
1936 fd_global_shutdown();
1937 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001938 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001939}
1940
1941static const grpc_event_engine_vtable vtable = {
1942 .pollset_size = sizeof(grpc_pollset),
1943
1944 .fd_create = fd_create,
1945 .fd_wrapped_fd = fd_wrapped_fd,
1946 .fd_orphan = fd_orphan,
1947 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001948 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001949 .fd_notify_on_read = fd_notify_on_read,
1950 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001951 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001952 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001953
1954 .pollset_init = pollset_init,
1955 .pollset_shutdown = pollset_shutdown,
1956 .pollset_reset = pollset_reset,
1957 .pollset_destroy = pollset_destroy,
1958 .pollset_work = pollset_work,
1959 .pollset_kick = pollset_kick,
1960 .pollset_add_fd = pollset_add_fd,
1961
1962 .pollset_set_create = pollset_set_create,
1963 .pollset_set_destroy = pollset_set_destroy,
1964 .pollset_set_add_pollset = pollset_set_add_pollset,
1965 .pollset_set_del_pollset = pollset_set_del_pollset,
1966 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1967 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1968 .pollset_set_add_fd = pollset_set_add_fd,
1969 .pollset_set_del_fd = pollset_set_del_fd,
1970
1971 .kick_poller = kick_poller,
1972
Craig Tillerd8a3c042016-09-09 12:42:37 -07001973 .workqueue_ref = workqueue_ref,
1974 .workqueue_unref = workqueue_unref,
1975 .workqueue_enqueue = workqueue_enqueue,
1976
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001977 .shutdown_engine = shutdown_engine,
1978};
1979
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001980/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1981 * Create a dummy epoll_fd to make sure epoll support is available */
1982static bool is_epoll_available() {
1983 int fd = epoll_create1(EPOLL_CLOEXEC);
1984 if (fd < 0) {
1985 gpr_log(
1986 GPR_ERROR,
1987 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1988 fd);
1989 return false;
1990 }
1991 close(fd);
1992 return true;
1993}
1994
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001995const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001996 /* If use of signals is disabled, we cannot use epoll engine*/
1997 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1998 return NULL;
1999 }
2000
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002001 if (!is_epoll_available()) {
2002 return NULL;
2003 }
2004
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002005 if (!is_grpc_wakeup_signal_initialized) {
2006 grpc_use_signal(SIGRTMIN + 2);
2007 }
2008
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002009 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07002010
2011 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
2012 return NULL;
2013 }
2014
2015 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
2016 polling_island_global_init())) {
2017 return NULL;
2018 }
2019
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002020 return &vtable;
2021}
2022
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002023#else /* defined(GPR_LINUX_EPOLL) */
2024#if defined(GPR_POSIX_SOCKET)
2025#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002026/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
2027 * NULL */
2028const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002029#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002030
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002031void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002032#endif /* !defined(GPR_LINUX_EPOLL) */