blob: 98fe2defea82ef09b134a35b3c8ae289d09cf051 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070045#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070046#include <signal.h>
47#include <string.h>
48#include <sys/epoll.h>
49#include <sys/socket.h>
50#include <unistd.h>
51
52#include <grpc/support/alloc.h>
53#include <grpc/support/log.h>
54#include <grpc/support/string_util.h>
55#include <grpc/support/tls.h>
56#include <grpc/support/useful.h>
57
58#include "src/core/lib/iomgr/ev_posix.h"
59#include "src/core/lib/iomgr/iomgr_internal.h"
60#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070061#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070062#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Sree Kuchibhotla34217242016-06-29 00:19:07 -070065/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
66static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070067#define GRPC_POLLING_TRACE(fmt, ...) \
68 if (grpc_polling_trace) { \
69 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
70 }
71
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070072static int grpc_wakeup_signal = -1;
73static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070075/* Implements the function defined in grpc_posix.h. This function might be
76 * called before even calling grpc_init() to set either a different signal to
77 * use. If signum == -1, then the use of signals is disabled */
78void grpc_use_signal(int signum) {
79 grpc_wakeup_signal = signum;
80 is_grpc_wakeup_signal_initialized = true;
81
82 if (grpc_wakeup_signal < 0) {
83 gpr_log(GPR_INFO,
84 "Use of signals is disabled. Epoll engine will not be used");
85 } else {
86 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
87 grpc_wakeup_signal);
88 }
89}
90
91struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070092
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070094 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070096struct grpc_fd {
97 int fd;
98 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070099 bit 0 : 1=Active / 0=Orphaned
100 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700101 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700102 gpr_atm refst;
103
104 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700105
106 /* Indicates that the fd is shutdown and that any pending read/write closures
107 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700109
110 /* The fd is either closed or we relinquished control of it. In either cases,
111 this indicates that the 'fd' on this structure is no longer valid */
112 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700113
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700114 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700115 grpc_closure *read_closure;
116 grpc_closure *write_closure;
117
Craig Tillerf83f8ca2016-07-06 11:34:08 -0700118 /* The polling island to which this fd belongs to (protected by mu) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 struct polling_island *polling_island;
120
121 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122 grpc_closure *on_done_closure;
123
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700124 /* The pollset that last noticed that the fd is readable */
125 grpc_pollset *read_notifier_pollset;
126
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700127 grpc_iomgr_object iomgr_object;
128};
129
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700130/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700131// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700132#ifdef GRPC_FD_REF_COUNT_DEBUG
133static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
134static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
135 int line);
136#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
137#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
138#else
139static void fd_ref(grpc_fd *fd);
140static void fd_unref(grpc_fd *fd);
141#define GRPC_FD_REF(fd, reason) fd_ref(fd)
142#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
143#endif
144
145static void fd_global_init(void);
146static void fd_global_shutdown(void);
147
148#define CLOSURE_NOT_READY ((grpc_closure *)0)
149#define CLOSURE_READY ((grpc_closure *)1)
150
151/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700152 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700153 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700154
Craig Tillerd8a3c042016-09-09 12:42:37 -0700155#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700156
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700157#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700158#define PI_UNREF(exec_ctx, p, r) \
159 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700160
Craig Tillerd8a3c042016-09-09 12:42:37 -0700161#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700162
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700163#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700164#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700165
166#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
167
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700168typedef struct polling_island {
169 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700170 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
171 the refcount.
172 Once the ref count becomes zero, this structure is destroyed which means
173 we should ensure that there is never a scenario where a PI_ADD_REF() is
174 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700175 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700176
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700177 /* Pointer to the polling_island this merged into.
178 * merged_to value is only set once in polling_island's lifetime (and that too
179 * only if the island is merged with another island). Because of this, we can
180 * use gpr_atm type here so that we can do atomic access on this and reduce
181 * lock contention on 'mu' mutex.
182 *
183 * Note that if this field is not NULL (i.e not 0), all the remaining fields
184 * (except mu and ref_count) are invalid and must be ignored. */
185 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700186
Craig Tillerd8a3c042016-09-09 12:42:37 -0700187 gpr_atm poller_count;
188 gpr_mu workqueue_read_mu;
189 gpr_mpscq workqueue_items;
190 gpr_atm workqueue_item_count;
191 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700192
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700193 /* The fd of the underlying epoll set */
194 int epoll_fd;
195
196 /* The file descriptors in the epoll set */
197 size_t fd_cnt;
198 size_t fd_capacity;
199 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700200} polling_island;
201
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700202/*******************************************************************************
203 * Pollset Declarations
204 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700205struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700206 /* Thread id of this worker */
207 pthread_t pt_id;
208
209 /* Used to prevent a worker from getting kicked multiple times */
210 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700211 struct grpc_pollset_worker *next;
212 struct grpc_pollset_worker *prev;
213};
214
215struct grpc_pollset {
216 gpr_mu mu;
217 grpc_pollset_worker root_worker;
218 bool kicked_without_pollers;
219
220 bool shutting_down; /* Is the pollset shutting down ? */
221 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
222 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
223
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700224 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700225 struct polling_island *polling_island;
226};
227
228/*******************************************************************************
229 * Pollset-set Declarations
230 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700231/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
232 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
233 * the current pollset_set would result in polling island merges. This would
234 * remove the need to maintain fd_count here. This will also significantly
235 * simplify the grpc_fd structure since we would no longer need to explicitly
236 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700237struct grpc_pollset_set {
238 gpr_mu mu;
239
240 size_t pollset_count;
241 size_t pollset_capacity;
242 grpc_pollset **pollsets;
243
244 size_t pollset_set_count;
245 size_t pollset_set_capacity;
246 struct grpc_pollset_set **pollset_sets;
247
248 size_t fd_count;
249 size_t fd_capacity;
250 grpc_fd **fds;
251};
252
253/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700254 * Common helpers
255 */
256
Craig Tillerf975f742016-07-01 14:56:27 -0700257static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700258 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700259 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700260 if (*composite == GRPC_ERROR_NONE) {
261 *composite = GRPC_ERROR_CREATE(desc);
262 }
263 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700264 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700265}
266
267/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700268 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700269 */
270
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700271/* The wakeup fd that is used to wake up all threads in a Polling island. This
272 is useful in the polling island merge operation where we need to wakeup all
273 the threads currently polling the smaller polling island (so that they can
274 start polling the new/merged polling island)
275
276 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
277 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
278static grpc_wakeup_fd polling_island_wakeup_fd;
279
Craig Tiller2e620132016-10-10 15:27:44 -0700280/* The polling island being polled right now.
281 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700282static __thread polling_island *g_current_thread_polling_island;
283
Craig Tillerb39307d2016-06-30 15:39:13 -0700284/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700285static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700286
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700287#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700288/* Currently TSAN may incorrectly flag data races between epoll_ctl and
289 epoll_wait for any grpc_fd structs that are added to the epoll set via
290 epoll_ctl and are returned (within a very short window) via epoll_wait().
291
292 To work-around this race, we establish a happens-before relation between
293 the code just-before epoll_ctl() and the code after epoll_wait() by using
294 this atomic */
295gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700296#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700297
Craig Tillerb39307d2016-06-30 15:39:13 -0700298static void pi_add_ref(polling_island *pi);
299static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700300
Craig Tillerd8a3c042016-09-09 12:42:37 -0700301#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700302static void pi_add_ref_dbg(polling_island *pi, const char *reason,
303 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700304 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700305 pi_add_ref(pi);
306 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
307 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700308}
309
Craig Tillerb39307d2016-06-30 15:39:13 -0700310static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700311 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700312 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700313 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700314 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700315 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700316}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700317
318static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
319 const char *file, int line,
320 const char *reason) {
321 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700322 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700323 }
324 return workqueue;
325}
326
327static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
328 const char *file, int line, const char *reason) {
329 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700330 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700331 }
332}
333#else
334static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
335 if (workqueue != NULL) {
336 pi_add_ref((polling_island *)workqueue);
337 }
338 return workqueue;
339}
340
341static void workqueue_unref(grpc_exec_ctx *exec_ctx,
342 grpc_workqueue *workqueue) {
343 if (workqueue != NULL) {
344 pi_unref(exec_ctx, (polling_island *)workqueue);
345 }
346}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700347#endif
348
Craig Tiller15007612016-07-06 09:36:16 -0700349static void pi_add_ref(polling_island *pi) {
350 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
351}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700352
Craig Tillerb39307d2016-06-30 15:39:13 -0700353static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700354 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700355 Note that this deletion not be done under a lock. Once the ref count goes
356 to zero, we are guaranteed that no one else holds a reference to the
357 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700358
359 Also, if we are deleting the polling island and the merged_to field is
360 non-empty, we should remove a ref to the merged_to polling island
361 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700362 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
363 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
364 polling_island_delete(exec_ctx, pi);
365 if (next != NULL) {
366 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700367 }
368 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700369}
370
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700371/* The caller is expected to hold pi->mu lock before calling this function */
372static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700373 size_t fd_count, bool add_fd_refs,
374 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700375 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700376 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700377 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700378 char *err_msg;
379 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700380
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700381#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700382 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700383 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700384#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700385
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700386 for (i = 0; i < fd_count; i++) {
387 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
388 ev.data.ptr = fds[i];
389 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700390
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700391 if (err < 0) {
392 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700393 gpr_asprintf(
394 &err_msg,
395 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
396 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
397 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
398 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700399 }
400
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700401 continue;
402 }
403
404 if (pi->fd_cnt == pi->fd_capacity) {
405 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
406 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
407 }
408
409 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700410 if (add_fd_refs) {
411 GRPC_FD_REF(fds[i], "polling_island");
412 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700413 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700414}
415
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700416/* The caller is expected to hold pi->mu before calling this */
417static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700418 grpc_wakeup_fd *wakeup_fd,
419 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700420 struct epoll_event ev;
421 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700422 char *err_msg;
423 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700424
425 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
426 ev.data.ptr = wakeup_fd;
427 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
428 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700429 if (err < 0 && errno != EEXIST) {
430 gpr_asprintf(&err_msg,
431 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
432 "error: %d (%s)",
433 pi->epoll_fd,
434 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
435 strerror(errno));
436 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
437 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700438 }
439}
440
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700441/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700442static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700443 bool remove_fd_refs,
444 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700445 int err;
446 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700447 char *err_msg;
448 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700449
450 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700451 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700452 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700453 gpr_asprintf(&err_msg,
454 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
455 "error: %d (%s)",
456 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
457 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
458 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700459 }
460
461 if (remove_fd_refs) {
462 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700463 }
464 }
465
466 pi->fd_cnt = 0;
467}
468
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700469/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700470static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700471 bool is_fd_closed,
472 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700473 int err;
474 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700475 char *err_msg;
476 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700477
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700478 /* If fd is already closed, then it would have been automatically been removed
479 from the epoll set */
480 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700481 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
482 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700483 gpr_asprintf(
484 &err_msg,
485 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
486 pi->epoll_fd, fd->fd, errno, strerror(errno));
487 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
488 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700489 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700490 }
491
492 for (i = 0; i < pi->fd_cnt; i++) {
493 if (pi->fds[i] == fd) {
494 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700495 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700496 break;
497 }
498 }
499}
500
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700501/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700502static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
503 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700504 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700505 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700506 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700507
Craig Tillerb39307d2016-06-30 15:39:13 -0700508 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700509
Craig Tillerb39307d2016-06-30 15:39:13 -0700510 pi = gpr_malloc(sizeof(*pi));
511 gpr_mu_init(&pi->mu);
512 pi->fd_cnt = 0;
513 pi->fd_capacity = 0;
514 pi->fds = NULL;
515 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700516
517 gpr_mu_init(&pi->workqueue_read_mu);
518 gpr_mpscq_init(&pi->workqueue_items);
519 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700520
Craig Tiller15007612016-07-06 09:36:16 -0700521 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700522 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700523 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700524
Craig Tillerd8a3c042016-09-09 12:42:37 -0700525 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
526 err_desc)) {
527 goto done;
528 }
529
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700530 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700531
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700532 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700533 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
534 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700535 }
536
Craig Tillerb39307d2016-06-30 15:39:13 -0700537 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700538 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700539
540 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700541 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700542 }
543
Craig Tillerb39307d2016-06-30 15:39:13 -0700544done:
545 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700546 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700547 pi = NULL;
548 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700549 return pi;
550}
551
Craig Tillerb39307d2016-06-30 15:39:13 -0700552static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700553 GPR_ASSERT(pi->fd_cnt == 0);
554
Craig Tiller0a06cd72016-07-14 13:21:24 -0700555 if (pi->epoll_fd >= 0) {
556 close(pi->epoll_fd);
557 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700558 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
559 gpr_mu_destroy(&pi->workqueue_read_mu);
560 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700561 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700562 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700563 gpr_free(pi->fds);
564 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700565}
566
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700567/* Attempts to gets the last polling island in the linked list (liked by the
568 * 'merged_to' field). Since this does not lock the polling island, there are no
569 * guarantees that the island returned is the last island */
570static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
571 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
572 while (next != NULL) {
573 pi = next;
574 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
575 }
576
577 return pi;
578}
579
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700580/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700581 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700582 returned polling island's mu.
583 Usage: To lock/unlock polling island "pi", do the following:
584 polling_island *pi_latest = polling_island_lock(pi);
585 ...
586 ... critical section ..
587 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700588 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
589static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700590 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700591
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700592 while (true) {
593 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
594 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700595 /* Looks like 'pi' is the last node in the linked list but unless we check
596 this by holding the pi->mu lock, we cannot be sure (i.e without the
597 pi->mu lock, we don't prevent island merges).
598 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700599 gpr_mu_lock(&pi->mu);
600 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
601 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700602 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700603 break;
604 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700605
606 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
607 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700608 gpr_mu_unlock(&pi->mu);
609 }
610
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700611 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700612 }
613
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700614 return pi;
615}
616
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700617/* Gets the lock on the *latest* polling islands in the linked lists pointed by
618 *p and *q (and also updates *p and *q to point to the latest polling islands)
619
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700620 This function is needed because calling the following block of code to obtain
621 locks on polling islands (*p and *q) is prone to deadlocks.
622 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700623 polling_island_lock(*p, true);
624 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700625 }
626
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700627 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700628 polling_island *p1;
629 polling_island *p2;
630 ..
631 polling_island_lock_pair(&p1, &p2);
632 ..
633 .. Critical section with both p1 and p2 locked
634 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700635 // Release locks: Always call polling_island_unlock_pair() to release locks
636 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700637*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700638static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700639 polling_island *pi_1 = *p;
640 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700641 polling_island *next_1 = NULL;
642 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700643
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700644 /* The algorithm is simple:
645 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
646 keep updating pi_1 and pi_2)
647 - Then obtain locks on the islands by following a lock order rule of
648 locking polling_island with lower address first
649 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
650 pointing to the same island. If that is the case, we can just call
651 polling_island_lock()
652 - After obtaining both the locks, double check that the polling islands
653 are still the last polling islands in their respective linked lists
654 (this is because there might have been polling island merges before
655 we got the lock)
656 - If the polling islands are the last islands, we are done. If not,
657 release the locks and continue the process from the first step */
658 while (true) {
659 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
660 while (next_1 != NULL) {
661 pi_1 = next_1;
662 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700663 }
664
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700665 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
666 while (next_2 != NULL) {
667 pi_2 = next_2;
668 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
669 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700670
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700671 if (pi_1 == pi_2) {
672 pi_1 = pi_2 = polling_island_lock(pi_1);
673 break;
674 }
675
676 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700677 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700678 gpr_mu_lock(&pi_2->mu);
679 } else {
680 gpr_mu_lock(&pi_2->mu);
681 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700682 }
683
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700684 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
685 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
686 if (next_1 == NULL && next_2 == NULL) {
687 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700688 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700689
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700690 gpr_mu_unlock(&pi_1->mu);
691 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700692 }
693
694 *p = pi_1;
695 *q = pi_2;
696}
697
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700698static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
699 if (p == q) {
700 gpr_mu_unlock(&p->mu);
701 } else {
702 gpr_mu_unlock(&p->mu);
703 gpr_mu_unlock(&q->mu);
704 }
705}
706
Craig Tillerd8a3c042016-09-09 12:42:37 -0700707static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700708 /* If this thread is the current poller, then it may be that it's about to
709 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700710 bool is_current_poller = (g_current_thread_polling_island == pi);
711 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
712 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700713 /* Only issue a wakeup if it's likely that some poller could come in and take
714 it right now. Note that since we do an anticipatory mpscq_pop every poll
715 loop, it's ok if we miss the wakeup here, as we'll get the work item when
716 the next poller enters anyway. */
717 if (current_pollers > min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700718 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
719 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
720 }
721}
722
723static void workqueue_move_items_to_parent(polling_island *q) {
724 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
725 if (p == NULL) {
726 return;
727 }
728 gpr_mu_lock(&q->workqueue_read_mu);
729 int num_added = 0;
730 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
731 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
732 if (n != NULL) {
733 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
734 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
735 gpr_mpscq_push(&p->workqueue_items, n);
736 num_added++;
737 }
738 }
739 gpr_mu_unlock(&q->workqueue_read_mu);
740 if (num_added > 0) {
741 workqueue_maybe_wakeup(p);
742 }
743 workqueue_move_items_to_parent(p);
744}
745
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700746static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700747 polling_island *q,
748 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700749 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700750 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700751
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700752 if (p != q) {
753 /* Make sure that p points to the polling island with fewer fds than q */
754 if (p->fd_cnt > q->fd_cnt) {
755 GPR_SWAP(polling_island *, p, q);
756 }
757
758 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
759 Note that the refcounts on the fds being moved will not change here.
760 This is why the last param in the following two functions is 'false') */
761 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
762 polling_island_remove_all_fds_locked(p, false, error);
763
764 /* Wakeup all the pollers (if any) on p so that they pickup this change */
765 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
766
767 /* Add the 'merged_to' link from p --> q */
768 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
769 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700770
771 workqueue_move_items_to_parent(q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700772 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700773 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700774
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700775 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700776
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700777 /* Return the merged polling island (Note that no merge would have happened
778 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700779 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700780}
781
Craig Tillerd8a3c042016-09-09 12:42:37 -0700782static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
783 grpc_workqueue *workqueue, grpc_closure *closure,
784 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700785 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700786 /* take a ref to the workqueue: otherwise it can happen that whatever events
787 * this kicks off ends up destroying the workqueue before this function
788 * completes */
789 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
790 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700791 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
792 closure->error_data.error = error;
793 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
794 if (last == 0) {
795 workqueue_maybe_wakeup(pi);
796 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700797 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700798 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
799 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700800}
801
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700802static grpc_error *polling_island_global_init() {
803 grpc_error *error = GRPC_ERROR_NONE;
804
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700805 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
806 if (error == GRPC_ERROR_NONE) {
807 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
808 }
809
810 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700811}
812
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700813static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700814 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700815}
816
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700817/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700818 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700819 */
820
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700821/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700822 * but instead so that implementations with multiple threads in (for example)
823 * epoll_wait deal with the race between pollset removal and incoming poll
824 * notifications.
825 *
826 * The problem is that the poller ultimately holds a reference to this
827 * object, so it is very difficult to know when is safe to free it, at least
828 * without some expensive synchronization.
829 *
830 * If we keep the object freelisted, in the worst case losing this race just
831 * becomes a spurious read notification on a reused fd.
832 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700833
834/* The alarm system needs to be able to wakeup 'some poller' sometimes
835 * (specifically when a new alarm needs to be triggered earlier than the next
836 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
837 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700838
839/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
840 * sure to wake up one polling thread (which can wake up other threads if
841 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700842grpc_wakeup_fd grpc_global_wakeup_fd;
843
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700844static grpc_fd *fd_freelist = NULL;
845static gpr_mu fd_freelist_mu;
846
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700847#ifdef GRPC_FD_REF_COUNT_DEBUG
848#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
849#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
850static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
851 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700852 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
853 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700854 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
855#else
856#define REF_BY(fd, n, reason) ref_by(fd, n)
857#define UNREF_BY(fd, n, reason) unref_by(fd, n)
858static void ref_by(grpc_fd *fd, int n) {
859#endif
860 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
861}
862
863#ifdef GRPC_FD_REF_COUNT_DEBUG
864static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
865 int line) {
866 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700867 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
868 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700869 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
870#else
871static void unref_by(grpc_fd *fd, int n) {
872 gpr_atm old;
873#endif
874 old = gpr_atm_full_fetch_add(&fd->refst, -n);
875 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700876 /* Add the fd to the freelist */
877 gpr_mu_lock(&fd_freelist_mu);
878 fd->freelist_next = fd_freelist;
879 fd_freelist = fd;
880 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700881
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700882 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700883 } else {
884 GPR_ASSERT(old > n);
885 }
886}
887
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700888/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700889#ifdef GRPC_FD_REF_COUNT_DEBUG
890static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
891 int line) {
892 ref_by(fd, 2, reason, file, line);
893}
894
895static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
896 int line) {
897 unref_by(fd, 2, reason, file, line);
898}
899#else
900static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700901static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
902#endif
903
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700904static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
905
906static void fd_global_shutdown(void) {
907 gpr_mu_lock(&fd_freelist_mu);
908 gpr_mu_unlock(&fd_freelist_mu);
909 while (fd_freelist != NULL) {
910 grpc_fd *fd = fd_freelist;
911 fd_freelist = fd_freelist->freelist_next;
912 gpr_mu_destroy(&fd->mu);
913 gpr_free(fd);
914 }
915 gpr_mu_destroy(&fd_freelist_mu);
916}
917
918static grpc_fd *fd_create(int fd, const char *name) {
919 grpc_fd *new_fd = NULL;
920
921 gpr_mu_lock(&fd_freelist_mu);
922 if (fd_freelist != NULL) {
923 new_fd = fd_freelist;
924 fd_freelist = fd_freelist->freelist_next;
925 }
926 gpr_mu_unlock(&fd_freelist_mu);
927
928 if (new_fd == NULL) {
929 new_fd = gpr_malloc(sizeof(grpc_fd));
930 gpr_mu_init(&new_fd->mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700931 }
932
933 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
934 newly created fd (or an fd we got from the freelist), no one else would be
935 holding a lock to it anyway. */
936 gpr_mu_lock(&new_fd->mu);
937
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700938 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700939 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700940 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700941 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700942 new_fd->read_closure = CLOSURE_NOT_READY;
943 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700944 new_fd->polling_island = NULL;
945 new_fd->freelist_next = NULL;
946 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700947 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700948
949 gpr_mu_unlock(&new_fd->mu);
950
951 char *fd_name;
952 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
953 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700954#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700955 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700956#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700957 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700958 return new_fd;
959}
960
961static bool fd_is_orphaned(grpc_fd *fd) {
962 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
963}
964
965static int fd_wrapped_fd(grpc_fd *fd) {
966 int ret_fd = -1;
967 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700968 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700969 ret_fd = fd->fd;
970 }
971 gpr_mu_unlock(&fd->mu);
972
973 return ret_fd;
974}
975
976static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
977 grpc_closure *on_done, int *release_fd,
978 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700979 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700980 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -0700981 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700982
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700983 gpr_mu_lock(&fd->mu);
984 fd->on_done_closure = on_done;
985
986 /* If release_fd is not NULL, we should be relinquishing control of the file
987 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700988 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700989 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700990 } else {
991 close(fd->fd);
992 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700993 }
994
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700995 fd->orphaned = true;
996
997 /* Remove the active status but keep referenced. We want this grpc_fd struct
998 to be alive (and not added to freelist) until the end of this function */
999 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001000
1001 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001002 - Get a lock on the latest polling island (i.e the last island in the
1003 linked list pointed by fd->polling_island). This is the island that
1004 would actually contain the fd
1005 - Remove the fd from the latest polling island
1006 - Unlock the latest polling island
1007 - Set fd->polling_island to NULL (but remove the ref on the polling island
1008 before doing this.) */
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001009 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001010 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001011 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001012 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001013
Craig Tiller15007612016-07-06 09:36:16 -07001014 unref_pi = fd->polling_island;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001015 fd->polling_island = NULL;
1016 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001017
Yuchen Zenga0399f22016-08-04 17:52:53 -07001018 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
1019 NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001020
1021 gpr_mu_unlock(&fd->mu);
1022 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001023 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001024 /* Unref stale polling island here, outside the fd lock above.
1025 The polling island owns a workqueue which owns an fd, and unreffing
1026 inside the lock can cause an eventual lock loop that makes TSAN very
1027 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001028 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1029 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001030 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001031 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001032}
1033
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001034static grpc_error *fd_shutdown_error(bool shutdown) {
1035 if (!shutdown) {
1036 return GRPC_ERROR_NONE;
1037 } else {
1038 return GRPC_ERROR_CREATE("FD shutdown");
1039 }
1040}
1041
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001042static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1043 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001044 if (fd->shutdown) {
1045 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
1046 NULL);
1047 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001048 /* not ready ==> switch to a waiting state by setting the closure */
1049 *st = closure;
1050 } else if (*st == CLOSURE_READY) {
1051 /* already ready ==> queue the closure to run immediately */
1052 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001053 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
1054 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001055 } else {
1056 /* upcallptr was set to a different closure. This is an error! */
1057 gpr_log(GPR_ERROR,
1058 "User called a notify_on function with a previous callback still "
1059 "pending");
1060 abort();
1061 }
1062}
1063
1064/* returns 1 if state becomes not ready */
1065static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1066 grpc_closure **st) {
1067 if (*st == CLOSURE_READY) {
1068 /* duplicate ready ==> ignore */
1069 return 0;
1070 } else if (*st == CLOSURE_NOT_READY) {
1071 /* not ready, and not waiting ==> flag ready */
1072 *st = CLOSURE_READY;
1073 return 0;
1074 } else {
1075 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001076 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001077 *st = CLOSURE_NOT_READY;
1078 return 1;
1079 }
1080}
1081
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001082static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1083 grpc_fd *fd) {
1084 grpc_pollset *notifier = NULL;
1085
1086 gpr_mu_lock(&fd->mu);
1087 notifier = fd->read_notifier_pollset;
1088 gpr_mu_unlock(&fd->mu);
1089
1090 return notifier;
1091}
1092
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001093static bool fd_is_shutdown(grpc_fd *fd) {
1094 gpr_mu_lock(&fd->mu);
1095 const bool r = fd->shutdown;
1096 gpr_mu_unlock(&fd->mu);
1097 return r;
1098}
1099
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001100/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001101static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1102 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001103 /* Do the actual shutdown only once */
1104 if (!fd->shutdown) {
1105 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001106
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001107 shutdown(fd->fd, SHUT_RDWR);
1108 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1109 at this point, the closures would be called with 'success = false' */
1110 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1111 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1112 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001113 gpr_mu_unlock(&fd->mu);
1114}
1115
1116static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1117 grpc_closure *closure) {
1118 gpr_mu_lock(&fd->mu);
1119 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1120 gpr_mu_unlock(&fd->mu);
1121}
1122
1123static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1124 grpc_closure *closure) {
1125 gpr_mu_lock(&fd->mu);
1126 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1127 gpr_mu_unlock(&fd->mu);
1128}
1129
Craig Tillerd6ba6192016-06-30 15:42:41 -07001130static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001131 gpr_mu_lock(&fd->mu);
Craig Tillera10b0b12016-09-09 16:20:07 -07001132 grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF(
1133 (grpc_workqueue *)fd->polling_island, "fd_get_workqueue");
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001134 gpr_mu_unlock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001135 return workqueue;
1136}
Craig Tiller70bd4832016-06-30 14:20:46 -07001137
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001138/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001139 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001140 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001141GPR_TLS_DECL(g_current_thread_pollset);
1142GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001143static __thread bool g_initialized_sigmask;
1144static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001145
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001146static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001147#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001148 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001149#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001150}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001151
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001152static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001153
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001154/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001155static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001156 gpr_tls_init(&g_current_thread_pollset);
1157 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001158 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001159 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001160}
1161
1162static void pollset_global_shutdown(void) {
1163 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001164 gpr_tls_destroy(&g_current_thread_pollset);
1165 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001166}
1167
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001168static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1169 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001170
1171 /* Kick the worker only if it was not already kicked */
1172 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1173 GRPC_POLLING_TRACE(
1174 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1175 (void *)worker, worker->pt_id);
1176 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1177 if (err_num != 0) {
1178 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1179 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001180 }
1181 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001182}
1183
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001184/* Return 1 if the pollset has active threads in pollset_work (pollset must
1185 * be locked) */
1186static int pollset_has_workers(grpc_pollset *p) {
1187 return p->root_worker.next != &p->root_worker;
1188}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001189
1190static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1191 worker->prev->next = worker->next;
1192 worker->next->prev = worker->prev;
1193}
1194
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001195static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1196 if (pollset_has_workers(p)) {
1197 grpc_pollset_worker *w = p->root_worker.next;
1198 remove_worker(p, w);
1199 return w;
1200 } else {
1201 return NULL;
1202 }
1203}
1204
1205static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1206 worker->next = &p->root_worker;
1207 worker->prev = worker->next->prev;
1208 worker->prev->next = worker->next->prev = worker;
1209}
1210
1211static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1212 worker->prev = &p->root_worker;
1213 worker->next = worker->prev->next;
1214 worker->prev->next = worker->next->prev = worker;
1215}
1216
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001217/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001218static grpc_error *pollset_kick(grpc_pollset *p,
1219 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001220 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001221 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001222 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001223 grpc_pollset_worker *worker = specific_worker;
1224 if (worker != NULL) {
1225 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001226 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001227 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001228 for (worker = p->root_worker.next; worker != &p->root_worker;
1229 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001230 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001231 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001232 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001233 }
Craig Tillera218a062016-06-26 09:58:37 -07001234 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001235 } else {
1236 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001237 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001238 } else {
1239 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001240 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001241 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001242 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001243 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001244 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1245 /* Since worker == NULL, it means that we can kick "any" worker on this
1246 pollset 'p'. If 'p' happens to be the same pollset this thread is
1247 currently polling (i.e in pollset_work() function), then there is no need
1248 to kick any other worker since the current thread can just absorb the
1249 kick. This is the reason why we enter this case only when
1250 g_current_thread_pollset is != p */
1251
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001252 GPR_TIMER_MARK("kick_anonymous", 0);
1253 worker = pop_front_worker(p);
1254 if (worker != NULL) {
1255 GPR_TIMER_MARK("finally_kick", 0);
1256 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001257 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001258 } else {
1259 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001260 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001261 }
1262 }
1263
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001264 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001265 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1266 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001267}
1268
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001269static grpc_error *kick_poller(void) {
1270 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1271}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001272
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001273static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1274 gpr_mu_init(&pollset->mu);
1275 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001276
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001277 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001278 pollset->kicked_without_pollers = false;
1279
1280 pollset->shutting_down = false;
1281 pollset->finish_shutdown_called = false;
1282 pollset->shutdown_done = NULL;
1283
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001284 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001285}
1286
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001287/* Convert a timespec to milliseconds:
1288 - Very small or negative poll times are clamped to zero to do a non-blocking
1289 poll (which becomes spin polling)
1290 - Other small values are rounded up to one millisecond
1291 - Longer than a millisecond polls are rounded up to the next nearest
1292 millisecond to avoid spinning
1293 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001294static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1295 gpr_timespec now) {
1296 gpr_timespec timeout;
1297 static const int64_t max_spin_polling_us = 10;
1298 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1299 return -1;
1300 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001301
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001302 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1303 max_spin_polling_us,
1304 GPR_TIMESPAN))) <= 0) {
1305 return 0;
1306 }
1307 timeout = gpr_time_sub(deadline, now);
1308 return gpr_time_to_millis(gpr_time_add(
1309 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1310}
1311
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001312static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1313 grpc_pollset *notifier) {
1314 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001315 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001316 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1317 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001318 gpr_mu_unlock(&fd->mu);
1319}
1320
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001321static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001322 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1323 gpr_mu_lock(&fd->mu);
1324 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1325 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001326}
1327
Craig Tillerb39307d2016-06-30 15:39:13 -07001328static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1329 grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001330 if (ps->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001331 PI_UNREF(exec_ctx, ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001332 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001333 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001334}
1335
1336static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1337 grpc_pollset *pollset) {
1338 /* The pollset cannot have any workers if we are at this stage */
1339 GPR_ASSERT(!pollset_has_workers(pollset));
1340
1341 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001342
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001343 /* Release the ref and set pollset->polling_island to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001344 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001345 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001346}
1347
1348/* pollset->mu lock must be held by the caller before calling this */
1349static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1350 grpc_closure *closure) {
1351 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1352 GPR_ASSERT(!pollset->shutting_down);
1353 pollset->shutting_down = true;
1354 pollset->shutdown_done = closure;
1355 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1356
1357 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1358 because it would release the underlying polling island. In such a case, we
1359 let the last worker call finish_shutdown_locked() from pollset_work() */
1360 if (!pollset_has_workers(pollset)) {
1361 GPR_ASSERT(!pollset->finish_shutdown_called);
1362 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1363 finish_shutdown_locked(exec_ctx, pollset);
1364 }
1365 GPR_TIMER_END("pollset_shutdown", 0);
1366}
1367
1368/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1369 * than destroying the mutexes, there is nothing special that needs to be done
1370 * here */
1371static void pollset_destroy(grpc_pollset *pollset) {
1372 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001373 gpr_mu_destroy(&pollset->mu);
1374}
1375
Craig Tiller2b49ea92016-07-01 13:21:27 -07001376static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001377 GPR_ASSERT(pollset->shutting_down);
1378 GPR_ASSERT(!pollset_has_workers(pollset));
1379 pollset->shutting_down = false;
1380 pollset->finish_shutdown_called = false;
1381 pollset->kicked_without_pollers = false;
1382 pollset->shutdown_done = NULL;
Craig Tillerb39307d2016-06-30 15:39:13 -07001383 GPR_ASSERT(pollset->polling_island == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001384}
1385
Craig Tillerd8a3c042016-09-09 12:42:37 -07001386static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1387 polling_island *pi) {
1388 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1389 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1390 gpr_mu_unlock(&pi->workqueue_read_mu);
1391 if (n != NULL) {
1392 if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
1393 workqueue_maybe_wakeup(pi);
1394 }
1395 grpc_closure *c = (grpc_closure *)n;
1396 grpc_closure_run(exec_ctx, c, c->error_data.error);
1397 return true;
1398 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
1399 workqueue_maybe_wakeup(pi);
1400 }
1401 }
1402 return false;
1403}
1404
Craig Tiller84ea3412016-09-08 14:57:56 -07001405#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001406/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1407static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001408 grpc_pollset *pollset,
1409 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001410 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001411 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001412 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001413 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001414 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001415 char *err_msg;
1416 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001417 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1418
1419 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001420 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001421
1422 Since epoll_fd is immutable, we can read it without obtaining the polling
1423 island lock. There is however a possibility that the polling island (from
1424 which we got the epoll_fd) got merged with another island while we are
1425 in this function. This is still okay because in such a case, we will wakeup
1426 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001427 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001428
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001429 if (pollset->polling_island == NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001430 pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001431 if (pollset->polling_island == NULL) {
1432 GPR_TIMER_END("pollset_work_and_unlock", 0);
1433 return; /* Fatal error. We cannot continue */
1434 }
1435
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001436 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001437 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1438 (void *)pollset, (void *)pollset->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001439 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001440
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001441 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001442 epoll_fd = pi->epoll_fd;
1443
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001444 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001445 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001446 if (pollset->polling_island != pi) {
1447 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1448 polling island to be deleted */
1449 PI_ADD_REF(pi, "ps");
Craig Tillerb39307d2016-06-30 15:39:13 -07001450 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001451 pollset->polling_island = pi;
1452 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001453
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001454 /* Add an extra ref so that the island does not get destroyed (which means
1455 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1456 epoll_fd */
1457 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001458 gpr_mu_unlock(&pollset->mu);
1459
Craig Tillerd8a3c042016-09-09 12:42:37 -07001460 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
1461 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1462 g_current_thread_polling_island = pi;
1463
Vijay Paicef54012016-08-28 23:05:31 -07001464 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001465 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1466 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001467 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001468 if (ep_rv < 0) {
1469 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001470 gpr_asprintf(&err_msg,
1471 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1472 epoll_fd, errno, strerror(errno));
1473 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001474 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001475 /* We were interrupted. Save an interation by doing a zero timeout
1476 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001477 GRPC_POLLING_TRACE(
1478 "pollset_work: pollset: %p, worker: %p received kick",
1479 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001480 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001481 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001482 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001483
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001484#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001485 /* See the definition of g_poll_sync for more details */
1486 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001487#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001488
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001489 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001490 void *data_ptr = ep_ev[i].data.ptr;
1491 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001492 append_error(error,
1493 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1494 err_desc);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001495 } else if (data_ptr == &pi->workqueue_wakeup_fd) {
1496 append_error(error,
1497 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1498 err_desc);
1499 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001500 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001501 GRPC_POLLING_TRACE(
1502 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1503 "%d) got merged",
1504 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001505 /* This means that our polling island is merged with a different
1506 island. We do not have to do anything here since the subsequent call
1507 to the function pollset_work_and_unlock() will pick up the correct
1508 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001509 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001510 grpc_fd *fd = data_ptr;
1511 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1512 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1513 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001514 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001515 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001516 }
1517 if (write_ev || cancel) {
1518 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001519 }
1520 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001521 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001522
1523 g_current_thread_polling_island = NULL;
1524 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1525 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001526
1527 GPR_ASSERT(pi != NULL);
1528
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001529 /* Before leaving, release the extra ref we added to the polling island. It
1530 is important to use "pi" here (i.e our old copy of pollset->polling_island
1531 that we got before releasing the polling island lock). This is because
1532 pollset->polling_island pointer might get udpated in other parts of the
1533 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001534 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001535
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001536 GPR_TIMER_END("pollset_work_and_unlock", 0);
1537}
1538
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001539/* pollset->mu lock must be held by the caller before calling this.
1540 The function pollset_work() may temporarily release the lock (pollset->mu)
1541 during the course of its execution but it will always re-acquire the lock and
1542 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001543static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1544 grpc_pollset_worker **worker_hdl,
1545 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001546 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001547 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001548 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1549
1550 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001551
1552 grpc_pollset_worker worker;
1553 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001554 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001555 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001556
1557 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001558
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001559 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1560 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001561
1562 if (pollset->kicked_without_pollers) {
1563 /* If the pollset was kicked without pollers, pretend that the current
1564 worker got the kick and skip polling. A kick indicates that there is some
1565 work that needs attention like an event on the completion queue or an
1566 alarm */
1567 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1568 pollset->kicked_without_pollers = 0;
1569 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001570 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001571 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1572 worker that there is some pending work that needs immediate attention
1573 (like an event on the completion queue, or a polling island merge that
1574 results in a new epoll-fd to wait on) and that the worker should not
1575 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001576
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001577 A worker can be kicked anytime from the point it is added to the pollset
1578 via push_front_worker() (or push_back_worker()) to the point it is
1579 removed via remove_worker().
1580 If the worker is kicked before/during it calls epoll_pwait(), it should
1581 immediately exit from epoll_wait(). If the worker is kicked after it
1582 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001583
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001584 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001585 times *except* when it is in epoll_pwait(). This way, the worker never
1586 misses acting on a kick */
1587
Craig Tiller19196992016-06-27 18:45:56 -07001588 if (!g_initialized_sigmask) {
1589 sigemptyset(&new_mask);
1590 sigaddset(&new_mask, grpc_wakeup_signal);
1591 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1592 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1593 g_initialized_sigmask = true;
1594 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1595 This is the mask used at all times *except during
1596 epoll_wait()*"
1597 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001598 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001599
Craig Tiller19196992016-06-27 18:45:56 -07001600 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001601 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001602 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001603
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001604 push_front_worker(pollset, &worker); /* Add worker to pollset */
1605
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001606 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1607 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001608 grpc_exec_ctx_flush(exec_ctx);
1609
1610 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001611
1612 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1613 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001614 remove_worker(pollset, &worker);
1615 }
1616
1617 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1618 false at this point) and the pollset is shutting down, we may have to
1619 finish the shutdown process by calling finish_shutdown_locked().
1620 See pollset_shutdown() for more details.
1621
1622 Note: Continuing to access pollset here is safe; it is the caller's
1623 responsibility to not destroy a pollset when it has outstanding calls to
1624 pollset_work() */
1625 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1626 !pollset->finish_shutdown_called) {
1627 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1628 finish_shutdown_locked(exec_ctx, pollset);
1629
1630 gpr_mu_unlock(&pollset->mu);
1631 grpc_exec_ctx_flush(exec_ctx);
1632 gpr_mu_lock(&pollset->mu);
1633 }
1634
1635 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001636
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001637 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1638 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001639
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001640 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001641
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001642 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1643 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001644}
1645
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001646static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1647 grpc_fd *fd) {
Craig Tiller9d018482016-07-18 08:53:49 -07001648 GPR_TIMER_BEGIN("pollset_add_fd", 0);
1649
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001650 grpc_error *error = GRPC_ERROR_NONE;
1651
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001652 gpr_mu_lock(&pollset->mu);
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001653 gpr_mu_lock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001654
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001655 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001656
Craig Tiller7212c232016-07-06 13:11:09 -07001657retry:
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001658 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1659 * equal, do nothing.
1660 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1661 * a new polling island (with a refcount of 2) and make the polling_island
1662 * fields in both fd and pollset to point to the new island
1663 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1664 * the NULL polling_island field to point to the non-NULL polling_island
1665 * field (ensure that the refcount on the polling island is incremented by
1666 * 1 to account for the newly added reference)
1667 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1668 * and different, merge both the polling islands and update the
1669 * polling_island fields in both fd and pollset to point to the merged
1670 * polling island.
1671 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001672
Craig Tiller42ac6db2016-07-06 17:13:56 -07001673 if (fd->orphaned) {
1674 gpr_mu_unlock(&fd->mu);
1675 gpr_mu_unlock(&pollset->mu);
1676 /* early out */
1677 return;
1678 }
1679
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001680 if (fd->polling_island == pollset->polling_island) {
1681 pi_new = fd->polling_island;
1682 if (pi_new == NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001683 /* Unlock before creating a new polling island: the polling island will
1684 create a workqueue which creates a file descriptor, and holding an fd
1685 lock here can eventually cause a loop to appear to TSAN (making it
1686 unhappy). We don't think it's a real loop (there's an epoch point where
1687 that loop possibility disappears), but the advantages of keeping TSAN
1688 happy outweigh any performance advantage we might have by keeping the
1689 lock held. */
Craig Tiller7212c232016-07-06 13:11:09 -07001690 gpr_mu_unlock(&fd->mu);
Craig Tillerb39307d2016-06-30 15:39:13 -07001691 pi_new = polling_island_create(exec_ctx, fd, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001692 gpr_mu_lock(&fd->mu);
Craig Tiller0a06cd72016-07-14 13:21:24 -07001693 /* Need to reverify any assumptions made between the initial lock and
1694 getting to this branch: if they've changed, we need to throw away our
1695 work and figure things out again. */
Craig Tiller7212c232016-07-06 13:11:09 -07001696 if (fd->polling_island != NULL) {
Craig Tiller27da6422016-07-06 13:14:46 -07001697 GRPC_POLLING_TRACE(
1698 "pollset_add_fd: Raced creating new polling island. pi_new: %p "
1699 "(fd: %d, pollset: %p)",
1700 (void *)pi_new, fd->fd, (void *)pollset);
1701 PI_ADD_REF(pi_new, "dance_of_destruction");
1702 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
Craig Tiller7212c232016-07-06 13:11:09 -07001703 goto retry;
Craig Tiller27da6422016-07-06 13:14:46 -07001704 } else {
1705 GRPC_POLLING_TRACE(
1706 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
1707 "pollset: %p)",
1708 (void *)pi_new, fd->fd, (void *)pollset);
Craig Tiller7212c232016-07-06 13:11:09 -07001709 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001710 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001711 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001712 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001713 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001714 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001715
1716 GRPC_POLLING_TRACE(
1717 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1718 "pollset->pi: %p)",
1719 (void *)pi_new, fd->fd, (void *)pollset,
1720 (void *)pollset->polling_island);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001721 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001722 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001723 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001724
1725 GRPC_POLLING_TRACE(
1726 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1727 "%p, fd->pi: %p",
1728 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001729 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001730 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1731 &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001732 GRPC_POLLING_TRACE(
1733 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1734 "%p, fd->pi: %p, pollset->pi: %p)",
1735 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
1736 (void *)pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001737 }
1738
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001739 /* At this point, pi_new is the polling island that both fd->polling_island
1740 and pollset->polling_island must be pointing to */
1741
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001742 if (fd->polling_island != pi_new) {
1743 PI_ADD_REF(pi_new, "fd");
1744 if (fd->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001745 PI_UNREF(exec_ctx, fd->polling_island, "fd");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001746 }
1747 fd->polling_island = pi_new;
1748 }
1749
1750 if (pollset->polling_island != pi_new) {
1751 PI_ADD_REF(pi_new, "ps");
1752 if (pollset->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001753 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001754 }
1755 pollset->polling_island = pi_new;
1756 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001757
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001758 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001759 gpr_mu_unlock(&pollset->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001760
1761 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Craig Tiller9d018482016-07-18 08:53:49 -07001762
1763 GPR_TIMER_END("pollset_add_fd", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001764}
1765
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001766/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001767 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001768 */
1769
1770static grpc_pollset_set *pollset_set_create(void) {
1771 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1772 memset(pollset_set, 0, sizeof(*pollset_set));
1773 gpr_mu_init(&pollset_set->mu);
1774 return pollset_set;
1775}
1776
1777static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1778 size_t i;
1779 gpr_mu_destroy(&pollset_set->mu);
1780 for (i = 0; i < pollset_set->fd_count; i++) {
1781 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1782 }
1783 gpr_free(pollset_set->pollsets);
1784 gpr_free(pollset_set->pollset_sets);
1785 gpr_free(pollset_set->fds);
1786 gpr_free(pollset_set);
1787}
1788
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001789static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1790 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1791 size_t i;
1792 gpr_mu_lock(&pollset_set->mu);
1793 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1794 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1795 pollset_set->fds = gpr_realloc(
1796 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1797 }
1798 GRPC_FD_REF(fd, "pollset_set");
1799 pollset_set->fds[pollset_set->fd_count++] = fd;
1800 for (i = 0; i < pollset_set->pollset_count; i++) {
1801 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1802 }
1803 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1804 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1805 }
1806 gpr_mu_unlock(&pollset_set->mu);
1807}
1808
1809static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1810 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1811 size_t i;
1812 gpr_mu_lock(&pollset_set->mu);
1813 for (i = 0; i < pollset_set->fd_count; i++) {
1814 if (pollset_set->fds[i] == fd) {
1815 pollset_set->fd_count--;
1816 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1817 pollset_set->fds[pollset_set->fd_count]);
1818 GRPC_FD_UNREF(fd, "pollset_set");
1819 break;
1820 }
1821 }
1822 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1823 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1824 }
1825 gpr_mu_unlock(&pollset_set->mu);
1826}
1827
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001828static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1829 grpc_pollset_set *pollset_set,
1830 grpc_pollset *pollset) {
1831 size_t i, j;
1832 gpr_mu_lock(&pollset_set->mu);
1833 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1834 pollset_set->pollset_capacity =
1835 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1836 pollset_set->pollsets =
1837 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1838 sizeof(*pollset_set->pollsets));
1839 }
1840 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1841 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1842 if (fd_is_orphaned(pollset_set->fds[i])) {
1843 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1844 } else {
1845 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1846 pollset_set->fds[j++] = pollset_set->fds[i];
1847 }
1848 }
1849 pollset_set->fd_count = j;
1850 gpr_mu_unlock(&pollset_set->mu);
1851}
1852
1853static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1854 grpc_pollset_set *pollset_set,
1855 grpc_pollset *pollset) {
1856 size_t i;
1857 gpr_mu_lock(&pollset_set->mu);
1858 for (i = 0; i < pollset_set->pollset_count; i++) {
1859 if (pollset_set->pollsets[i] == pollset) {
1860 pollset_set->pollset_count--;
1861 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1862 pollset_set->pollsets[pollset_set->pollset_count]);
1863 break;
1864 }
1865 }
1866 gpr_mu_unlock(&pollset_set->mu);
1867}
1868
1869static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1870 grpc_pollset_set *bag,
1871 grpc_pollset_set *item) {
1872 size_t i, j;
1873 gpr_mu_lock(&bag->mu);
1874 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1875 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1876 bag->pollset_sets =
1877 gpr_realloc(bag->pollset_sets,
1878 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1879 }
1880 bag->pollset_sets[bag->pollset_set_count++] = item;
1881 for (i = 0, j = 0; i < bag->fd_count; i++) {
1882 if (fd_is_orphaned(bag->fds[i])) {
1883 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1884 } else {
1885 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1886 bag->fds[j++] = bag->fds[i];
1887 }
1888 }
1889 bag->fd_count = j;
1890 gpr_mu_unlock(&bag->mu);
1891}
1892
1893static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1894 grpc_pollset_set *bag,
1895 grpc_pollset_set *item) {
1896 size_t i;
1897 gpr_mu_lock(&bag->mu);
1898 for (i = 0; i < bag->pollset_set_count; i++) {
1899 if (bag->pollset_sets[i] == item) {
1900 bag->pollset_set_count--;
1901 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1902 bag->pollset_sets[bag->pollset_set_count]);
1903 break;
1904 }
1905 }
1906 gpr_mu_unlock(&bag->mu);
1907}
1908
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001909/* Test helper functions
1910 * */
1911void *grpc_fd_get_polling_island(grpc_fd *fd) {
1912 polling_island *pi;
1913
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001914 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001915 pi = fd->polling_island;
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001916 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001917
1918 return pi;
1919}
1920
1921void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1922 polling_island *pi;
1923
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001924 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001925 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001926 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001927
1928 return pi;
1929}
1930
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001931bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001932 polling_island *p1 = p;
1933 polling_island *p2 = q;
1934
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001935 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1936 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001937 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001938 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001939
1940 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001941}
1942
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001943/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001944 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001945 */
1946
1947static void shutdown_engine(void) {
1948 fd_global_shutdown();
1949 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001950 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001951}
1952
1953static const grpc_event_engine_vtable vtable = {
1954 .pollset_size = sizeof(grpc_pollset),
1955
1956 .fd_create = fd_create,
1957 .fd_wrapped_fd = fd_wrapped_fd,
1958 .fd_orphan = fd_orphan,
1959 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001960 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001961 .fd_notify_on_read = fd_notify_on_read,
1962 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001963 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001964 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001965
1966 .pollset_init = pollset_init,
1967 .pollset_shutdown = pollset_shutdown,
1968 .pollset_reset = pollset_reset,
1969 .pollset_destroy = pollset_destroy,
1970 .pollset_work = pollset_work,
1971 .pollset_kick = pollset_kick,
1972 .pollset_add_fd = pollset_add_fd,
1973
1974 .pollset_set_create = pollset_set_create,
1975 .pollset_set_destroy = pollset_set_destroy,
1976 .pollset_set_add_pollset = pollset_set_add_pollset,
1977 .pollset_set_del_pollset = pollset_set_del_pollset,
1978 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1979 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1980 .pollset_set_add_fd = pollset_set_add_fd,
1981 .pollset_set_del_fd = pollset_set_del_fd,
1982
1983 .kick_poller = kick_poller,
1984
Craig Tillerd8a3c042016-09-09 12:42:37 -07001985 .workqueue_ref = workqueue_ref,
1986 .workqueue_unref = workqueue_unref,
1987 .workqueue_enqueue = workqueue_enqueue,
1988
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001989 .shutdown_engine = shutdown_engine,
1990};
1991
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001992/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1993 * Create a dummy epoll_fd to make sure epoll support is available */
1994static bool is_epoll_available() {
1995 int fd = epoll_create1(EPOLL_CLOEXEC);
1996 if (fd < 0) {
1997 gpr_log(
1998 GPR_ERROR,
1999 "epoll_create1 failed with error: %d. Not using epoll polling engine",
2000 fd);
2001 return false;
2002 }
2003 close(fd);
2004 return true;
2005}
2006
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002007const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002008 /* If use of signals is disabled, we cannot use epoll engine*/
2009 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
2010 return NULL;
2011 }
2012
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002013 if (!is_epoll_available()) {
2014 return NULL;
2015 }
2016
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002017 if (!is_grpc_wakeup_signal_initialized) {
Sree Kuchibhotlabd48c912016-09-27 16:48:25 -07002018 grpc_use_signal(SIGRTMIN + 6);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002019 }
2020
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002021 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07002022
2023 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
2024 return NULL;
2025 }
2026
2027 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
2028 polling_island_global_init())) {
2029 return NULL;
2030 }
2031
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002032 return &vtable;
2033}
2034
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002035#else /* defined(GPR_LINUX_EPOLL) */
2036#if defined(GPR_POSIX_SOCKET)
2037#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002038/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
2039 * NULL */
2040const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002041#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002042
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002043void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002044#endif /* !defined(GPR_LINUX_EPOLL) */