blob: c4f3aefb8225e07a8446651e689688b73f04c556 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
murgatroid9954070892016-08-08 17:01:18 -070035#include "src/core/lib/iomgr/port.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070036
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
murgatroid99623dd4f2016-08-08 17:31:27 -070038#ifdef GRPC_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070045#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070046#include <signal.h>
47#include <string.h>
48#include <sys/epoll.h>
49#include <sys/socket.h>
50#include <unistd.h>
51
52#include <grpc/support/alloc.h>
53#include <grpc/support/log.h>
54#include <grpc/support/string_util.h>
55#include <grpc/support/tls.h>
56#include <grpc/support/useful.h>
57
58#include "src/core/lib/iomgr/ev_posix.h"
59#include "src/core/lib/iomgr/iomgr_internal.h"
60#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070061#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070062#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Sree Kuchibhotla34217242016-06-29 00:19:07 -070065/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
66static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070067#define GRPC_POLLING_TRACE(fmt, ...) \
68 if (grpc_polling_trace) { \
69 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
70 }
71
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070072static int grpc_wakeup_signal = -1;
73static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070075/* Implements the function defined in grpc_posix.h. This function might be
76 * called before even calling grpc_init() to set either a different signal to
77 * use. If signum == -1, then the use of signals is disabled */
78void grpc_use_signal(int signum) {
79 grpc_wakeup_signal = signum;
80 is_grpc_wakeup_signal_initialized = true;
81
82 if (grpc_wakeup_signal < 0) {
83 gpr_log(GPR_INFO,
84 "Use of signals is disabled. Epoll engine will not be used");
85 } else {
86 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
87 grpc_wakeup_signal);
88 }
89}
90
91struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070092
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -080093typedef enum {
94 POLL_OBJ_FD,
95 POLL_OBJ_POLLSET,
96 POLL_OBJ_POLLSET_SET
97} poll_obj_type;
98
99typedef struct poll_obj {
100 gpr_mu mu;
101 struct polling_island *pi;
102} poll_obj;
103
104const char *poll_obj_string(poll_obj_type po_type) {
105 switch (po_type) {
106 case POLL_OBJ_FD:
107 return "fd";
108 case POLL_OBJ_POLLSET:
109 return "pollset";
110 case POLL_OBJ_POLLSET_SET:
111 return "pollset_set";
112 }
113
114 GPR_UNREACHABLE_CODE(return "UNKNOWN");
115}
116
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700117/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700118 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800120
121#define FD_FROM_PO(po) ((grpc_fd *)(po))
122
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123struct grpc_fd {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800124 poll_obj po;
125
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700126 int fd;
127 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700128 bit 0 : 1=Active / 0=Orphaned
129 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700130 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700131 gpr_atm refst;
132
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700133 /* Indicates that the fd is shutdown and that any pending read/write closures
134 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700135 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700136
137 /* The fd is either closed or we relinquished control of it. In either cases,
138 this indicates that the 'fd' on this structure is no longer valid */
139 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700140
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700141 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700142 grpc_closure *read_closure;
143 grpc_closure *write_closure;
144
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700145 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700146 grpc_closure *on_done_closure;
147
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700148 /* The pollset that last noticed that the fd is readable */
149 grpc_pollset *read_notifier_pollset;
150
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700151 grpc_iomgr_object iomgr_object;
152};
153
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700154/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700155// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700156#ifdef GRPC_FD_REF_COUNT_DEBUG
157static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
158static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
159 int line);
160#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
161#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
162#else
163static void fd_ref(grpc_fd *fd);
164static void fd_unref(grpc_fd *fd);
165#define GRPC_FD_REF(fd, reason) fd_ref(fd)
166#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
167#endif
168
169static void fd_global_init(void);
170static void fd_global_shutdown(void);
171
172#define CLOSURE_NOT_READY ((grpc_closure *)0)
173#define CLOSURE_READY ((grpc_closure *)1)
174
175/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700176 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700177 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700178
Craig Tillerd8a3c042016-09-09 12:42:37 -0700179#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700180
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700181#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700182#define PI_UNREF(exec_ctx, p, r) \
183 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700184
Craig Tillerd8a3c042016-09-09 12:42:37 -0700185#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700186
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700187#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700188#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700189
Yuchen Zeng362ac1b2016-09-13 16:01:31 -0700190#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700191
Craig Tiller460502e2016-10-13 10:02:08 -0700192/* This is also used as grpc_workqueue (by directly casing it) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700193typedef struct polling_island {
194 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700195 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
196 the refcount.
197 Once the ref count becomes zero, this structure is destroyed which means
198 we should ensure that there is never a scenario where a PI_ADD_REF() is
199 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700200 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700201
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700202 /* Pointer to the polling_island this merged into.
203 * merged_to value is only set once in polling_island's lifetime (and that too
204 * only if the island is merged with another island). Because of this, we can
205 * use gpr_atm type here so that we can do atomic access on this and reduce
206 * lock contention on 'mu' mutex.
207 *
208 * Note that if this field is not NULL (i.e not 0), all the remaining fields
209 * (except mu and ref_count) are invalid and must be ignored. */
210 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700211
Craig Tiller460502e2016-10-13 10:02:08 -0700212 /* Number of threads currently polling on this island */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700213 gpr_atm poller_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700214 /* Mutex guarding the read end of the workqueue (must be held to pop from
215 * workqueue_items) */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700216 gpr_mu workqueue_read_mu;
Craig Tiller460502e2016-10-13 10:02:08 -0700217 /* Queue of closures to be executed */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700218 gpr_mpscq workqueue_items;
Craig Tiller460502e2016-10-13 10:02:08 -0700219 /* Count of items in workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700220 gpr_atm workqueue_item_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700221 /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700222 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700223
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700224 /* The fd of the underlying epoll set */
225 int epoll_fd;
226
227 /* The file descriptors in the epoll set */
228 size_t fd_cnt;
229 size_t fd_capacity;
230 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700231} polling_island;
232
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700233/*******************************************************************************
234 * Pollset Declarations
235 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700236struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700237 /* Thread id of this worker */
238 pthread_t pt_id;
239
240 /* Used to prevent a worker from getting kicked multiple times */
241 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700242 struct grpc_pollset_worker *next;
243 struct grpc_pollset_worker *prev;
244};
245
246struct grpc_pollset {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800247 poll_obj po;
248
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700249 grpc_pollset_worker root_worker;
250 bool kicked_without_pollers;
251
252 bool shutting_down; /* Is the pollset shutting down ? */
253 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
254 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700255};
256
257/*******************************************************************************
258 * Pollset-set Declarations
259 */
260struct grpc_pollset_set {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800261 poll_obj po;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700262};
263
264/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700265 * Common helpers
266 */
267
Craig Tillerf975f742016-07-01 14:56:27 -0700268static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700269 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700270 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700271 if (*composite == GRPC_ERROR_NONE) {
272 *composite = GRPC_ERROR_CREATE(desc);
273 }
274 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700275 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700276}
277
278/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700279 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700280 */
281
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700282/* The wakeup fd that is used to wake up all threads in a Polling island. This
283 is useful in the polling island merge operation where we need to wakeup all
284 the threads currently polling the smaller polling island (so that they can
285 start polling the new/merged polling island)
286
287 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
288 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
289static grpc_wakeup_fd polling_island_wakeup_fd;
290
Craig Tiller2e620132016-10-10 15:27:44 -0700291/* The polling island being polled right now.
292 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700293static __thread polling_island *g_current_thread_polling_island;
294
Craig Tillerb39307d2016-06-30 15:39:13 -0700295/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700296static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700297
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700298#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700299/* Currently TSAN may incorrectly flag data races between epoll_ctl and
300 epoll_wait for any grpc_fd structs that are added to the epoll set via
301 epoll_ctl and are returned (within a very short window) via epoll_wait().
302
303 To work-around this race, we establish a happens-before relation between
304 the code just-before epoll_ctl() and the code after epoll_wait() by using
305 this atomic */
306gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700307#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700308
Craig Tillerb39307d2016-06-30 15:39:13 -0700309static void pi_add_ref(polling_island *pi);
310static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700311
Craig Tillerd8a3c042016-09-09 12:42:37 -0700312#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700313static void pi_add_ref_dbg(polling_island *pi, const char *reason,
314 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700315 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700316 pi_add_ref(pi);
317 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
318 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700319}
320
Craig Tillerb39307d2016-06-30 15:39:13 -0700321static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700322 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700323 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700324 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700325 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700326 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700327}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700328
329static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
330 const char *file, int line,
331 const char *reason) {
332 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700333 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700334 }
335 return workqueue;
336}
337
338static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
339 const char *file, int line, const char *reason) {
340 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700341 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700342 }
343}
344#else
345static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
346 if (workqueue != NULL) {
347 pi_add_ref((polling_island *)workqueue);
348 }
349 return workqueue;
350}
351
352static void workqueue_unref(grpc_exec_ctx *exec_ctx,
353 grpc_workqueue *workqueue) {
354 if (workqueue != NULL) {
355 pi_unref(exec_ctx, (polling_island *)workqueue);
356 }
357}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700358#endif
359
Craig Tiller15007612016-07-06 09:36:16 -0700360static void pi_add_ref(polling_island *pi) {
361 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
362}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700363
Craig Tillerb39307d2016-06-30 15:39:13 -0700364static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700365 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700366 Note that this deletion not be done under a lock. Once the ref count goes
367 to zero, we are guaranteed that no one else holds a reference to the
368 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700369
370 Also, if we are deleting the polling island and the merged_to field is
371 non-empty, we should remove a ref to the merged_to polling island
372 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700373 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
374 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
375 polling_island_delete(exec_ctx, pi);
376 if (next != NULL) {
377 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700378 }
379 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700380}
381
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700382/* The caller is expected to hold pi->mu lock before calling this function */
383static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700384 size_t fd_count, bool add_fd_refs,
385 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700386 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700387 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700388 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700389 char *err_msg;
390 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700391
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700392#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700393 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700394 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700395#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700396
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700397 for (i = 0; i < fd_count; i++) {
398 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
399 ev.data.ptr = fds[i];
400 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700401
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700402 if (err < 0) {
403 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700404 gpr_asprintf(
405 &err_msg,
406 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
407 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
408 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
409 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700410 }
411
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700412 continue;
413 }
414
415 if (pi->fd_cnt == pi->fd_capacity) {
416 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
417 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
418 }
419
420 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700421 if (add_fd_refs) {
422 GRPC_FD_REF(fds[i], "polling_island");
423 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700424 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700425}
426
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700427/* The caller is expected to hold pi->mu before calling this */
428static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700429 grpc_wakeup_fd *wakeup_fd,
430 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700431 struct epoll_event ev;
432 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700433 char *err_msg;
434 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700435
436 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
437 ev.data.ptr = wakeup_fd;
438 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
439 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700440 if (err < 0 && errno != EEXIST) {
441 gpr_asprintf(&err_msg,
442 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
443 "error: %d (%s)",
444 pi->epoll_fd,
445 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
446 strerror(errno));
447 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
448 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700449 }
450}
451
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700452/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700453static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700454 bool remove_fd_refs,
455 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700456 int err;
457 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700458 char *err_msg;
459 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700460
461 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700462 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700463 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700464 gpr_asprintf(&err_msg,
465 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
466 "error: %d (%s)",
467 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
468 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
469 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700470 }
471
472 if (remove_fd_refs) {
473 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700474 }
475 }
476
477 pi->fd_cnt = 0;
478}
479
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700480/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700481static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700482 bool is_fd_closed,
483 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700484 int err;
485 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700486 char *err_msg;
487 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700488
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700489 /* If fd is already closed, then it would have been automatically been removed
490 from the epoll set */
491 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700492 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
493 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700494 gpr_asprintf(
495 &err_msg,
496 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
497 pi->epoll_fd, fd->fd, errno, strerror(errno));
498 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
499 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700500 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700501 }
502
503 for (i = 0; i < pi->fd_cnt; i++) {
504 if (pi->fds[i] == fd) {
505 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700506 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700507 break;
508 }
509 }
510}
511
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700512/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700513static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
514 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700515 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700516 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700517 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700518
Craig Tillerb39307d2016-06-30 15:39:13 -0700519 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700520
Craig Tillerb39307d2016-06-30 15:39:13 -0700521 pi = gpr_malloc(sizeof(*pi));
522 gpr_mu_init(&pi->mu);
523 pi->fd_cnt = 0;
524 pi->fd_capacity = 0;
525 pi->fds = NULL;
526 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700527
528 gpr_mu_init(&pi->workqueue_read_mu);
529 gpr_mpscq_init(&pi->workqueue_items);
530 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700531
Craig Tiller15007612016-07-06 09:36:16 -0700532 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700533 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700534 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700535
Craig Tillerd8a3c042016-09-09 12:42:37 -0700536 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
537 err_desc)) {
538 goto done;
539 }
540
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700541 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700542
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700543 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700544 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
545 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700546 }
547
Craig Tillerb39307d2016-06-30 15:39:13 -0700548 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700549 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700550
551 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700552 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700553 }
554
Craig Tillerb39307d2016-06-30 15:39:13 -0700555done:
556 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700557 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700558 pi = NULL;
559 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700560 return pi;
561}
562
Craig Tillerb39307d2016-06-30 15:39:13 -0700563static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700564 GPR_ASSERT(pi->fd_cnt == 0);
565
Craig Tiller0a06cd72016-07-14 13:21:24 -0700566 if (pi->epoll_fd >= 0) {
567 close(pi->epoll_fd);
568 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700569 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
570 gpr_mu_destroy(&pi->workqueue_read_mu);
571 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700572 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700573 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700574 gpr_free(pi->fds);
575 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700576}
577
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700578/* Attempts to gets the last polling island in the linked list (liked by the
579 * 'merged_to' field). Since this does not lock the polling island, there are no
580 * guarantees that the island returned is the last island */
581static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
582 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
583 while (next != NULL) {
584 pi = next;
585 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
586 }
587
588 return pi;
589}
590
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700591/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700592 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700593 returned polling island's mu.
594 Usage: To lock/unlock polling island "pi", do the following:
595 polling_island *pi_latest = polling_island_lock(pi);
596 ...
597 ... critical section ..
598 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700599 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
600static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700601 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700602
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700603 while (true) {
604 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
605 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700606 /* Looks like 'pi' is the last node in the linked list but unless we check
607 this by holding the pi->mu lock, we cannot be sure (i.e without the
608 pi->mu lock, we don't prevent island merges).
609 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700610 gpr_mu_lock(&pi->mu);
611 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
612 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700613 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700614 break;
615 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700616
617 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
618 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700619 gpr_mu_unlock(&pi->mu);
620 }
621
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700622 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700623 }
624
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700625 return pi;
626}
627
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700628/* Gets the lock on the *latest* polling islands in the linked lists pointed by
629 *p and *q (and also updates *p and *q to point to the latest polling islands)
630
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700631 This function is needed because calling the following block of code to obtain
632 locks on polling islands (*p and *q) is prone to deadlocks.
633 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700634 polling_island_lock(*p, true);
635 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700636 }
637
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700638 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700639 polling_island *p1;
640 polling_island *p2;
641 ..
642 polling_island_lock_pair(&p1, &p2);
643 ..
644 .. Critical section with both p1 and p2 locked
645 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700646 // Release locks: Always call polling_island_unlock_pair() to release locks
647 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700648*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700649static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700650 polling_island *pi_1 = *p;
651 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700652 polling_island *next_1 = NULL;
653 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700654
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700655 /* The algorithm is simple:
656 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
657 keep updating pi_1 and pi_2)
658 - Then obtain locks on the islands by following a lock order rule of
659 locking polling_island with lower address first
660 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
661 pointing to the same island. If that is the case, we can just call
662 polling_island_lock()
663 - After obtaining both the locks, double check that the polling islands
664 are still the last polling islands in their respective linked lists
665 (this is because there might have been polling island merges before
666 we got the lock)
667 - If the polling islands are the last islands, we are done. If not,
668 release the locks and continue the process from the first step */
669 while (true) {
670 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
671 while (next_1 != NULL) {
672 pi_1 = next_1;
673 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700674 }
675
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700676 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
677 while (next_2 != NULL) {
678 pi_2 = next_2;
679 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
680 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700681
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700682 if (pi_1 == pi_2) {
683 pi_1 = pi_2 = polling_island_lock(pi_1);
684 break;
685 }
686
687 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700688 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700689 gpr_mu_lock(&pi_2->mu);
690 } else {
691 gpr_mu_lock(&pi_2->mu);
692 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700693 }
694
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700695 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
696 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
697 if (next_1 == NULL && next_2 == NULL) {
698 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700699 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700700
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700701 gpr_mu_unlock(&pi_1->mu);
702 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700703 }
704
705 *p = pi_1;
706 *q = pi_2;
707}
708
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700709static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
710 if (p == q) {
711 gpr_mu_unlock(&p->mu);
712 } else {
713 gpr_mu_unlock(&p->mu);
714 gpr_mu_unlock(&q->mu);
715 }
716}
717
Craig Tillerd8a3c042016-09-09 12:42:37 -0700718static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700719 /* If this thread is the current poller, then it may be that it's about to
720 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700721 bool is_current_poller = (g_current_thread_polling_island == pi);
722 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
723 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700724 /* Only issue a wakeup if it's likely that some poller could come in and take
725 it right now. Note that since we do an anticipatory mpscq_pop every poll
726 loop, it's ok if we miss the wakeup here, as we'll get the work item when
727 the next poller enters anyway. */
728 if (current_pollers > min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700729 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
730 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
731 }
732}
733
734static void workqueue_move_items_to_parent(polling_island *q) {
735 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
736 if (p == NULL) {
737 return;
738 }
739 gpr_mu_lock(&q->workqueue_read_mu);
740 int num_added = 0;
741 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
742 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
743 if (n != NULL) {
744 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
745 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
746 gpr_mpscq_push(&p->workqueue_items, n);
747 num_added++;
748 }
749 }
750 gpr_mu_unlock(&q->workqueue_read_mu);
751 if (num_added > 0) {
752 workqueue_maybe_wakeup(p);
753 }
754 workqueue_move_items_to_parent(p);
755}
756
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700757static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700758 polling_island *q,
759 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700760 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700761 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700762
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700763 if (p != q) {
764 /* Make sure that p points to the polling island with fewer fds than q */
765 if (p->fd_cnt > q->fd_cnt) {
766 GPR_SWAP(polling_island *, p, q);
767 }
768
769 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
770 Note that the refcounts on the fds being moved will not change here.
771 This is why the last param in the following two functions is 'false') */
772 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
773 polling_island_remove_all_fds_locked(p, false, error);
774
775 /* Wakeup all the pollers (if any) on p so that they pickup this change */
776 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
777
778 /* Add the 'merged_to' link from p --> q */
779 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
780 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700781
782 workqueue_move_items_to_parent(q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700783 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700784 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700785
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700786 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700787
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700788 /* Return the merged polling island (Note that no merge would have happened
789 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700790 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700791}
792
Craig Tillerd8a3c042016-09-09 12:42:37 -0700793static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
794 grpc_workqueue *workqueue, grpc_closure *closure,
795 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700796 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700797 /* take a ref to the workqueue: otherwise it can happen that whatever events
798 * this kicks off ends up destroying the workqueue before this function
799 * completes */
800 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
801 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700802 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
803 closure->error_data.error = error;
804 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
805 if (last == 0) {
806 workqueue_maybe_wakeup(pi);
807 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700808 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700809 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
810 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700811}
812
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700813static grpc_error *polling_island_global_init() {
814 grpc_error *error = GRPC_ERROR_NONE;
815
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700816 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
817 if (error == GRPC_ERROR_NONE) {
818 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
819 }
820
821 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700822}
823
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700824static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700825 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700826}
827
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700828/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700829 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700830 */
831
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700832/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700833 * but instead so that implementations with multiple threads in (for example)
834 * epoll_wait deal with the race between pollset removal and incoming poll
835 * notifications.
836 *
837 * The problem is that the poller ultimately holds a reference to this
838 * object, so it is very difficult to know when is safe to free it, at least
839 * without some expensive synchronization.
840 *
841 * If we keep the object freelisted, in the worst case losing this race just
842 * becomes a spurious read notification on a reused fd.
843 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700844
845/* The alarm system needs to be able to wakeup 'some poller' sometimes
846 * (specifically when a new alarm needs to be triggered earlier than the next
847 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
848 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700849
850/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
851 * sure to wake up one polling thread (which can wake up other threads if
852 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700853grpc_wakeup_fd grpc_global_wakeup_fd;
854
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700855static grpc_fd *fd_freelist = NULL;
856static gpr_mu fd_freelist_mu;
857
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700858#ifdef GRPC_FD_REF_COUNT_DEBUG
859#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
860#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
861static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
862 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700863 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
864 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700865 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
866#else
867#define REF_BY(fd, n, reason) ref_by(fd, n)
868#define UNREF_BY(fd, n, reason) unref_by(fd, n)
869static void ref_by(grpc_fd *fd, int n) {
870#endif
871 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
872}
873
874#ifdef GRPC_FD_REF_COUNT_DEBUG
875static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
876 int line) {
877 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700878 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
879 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700880 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
881#else
882static void unref_by(grpc_fd *fd, int n) {
883 gpr_atm old;
884#endif
885 old = gpr_atm_full_fetch_add(&fd->refst, -n);
886 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700887 /* Add the fd to the freelist */
888 gpr_mu_lock(&fd_freelist_mu);
889 fd->freelist_next = fd_freelist;
890 fd_freelist = fd;
891 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700892
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700893 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700894 } else {
895 GPR_ASSERT(old > n);
896 }
897}
898
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700899/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700900#ifdef GRPC_FD_REF_COUNT_DEBUG
901static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
902 int line) {
903 ref_by(fd, 2, reason, file, line);
904}
905
906static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
907 int line) {
908 unref_by(fd, 2, reason, file, line);
909}
910#else
911static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700912static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
913#endif
914
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700915static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
916
917static void fd_global_shutdown(void) {
918 gpr_mu_lock(&fd_freelist_mu);
919 gpr_mu_unlock(&fd_freelist_mu);
920 while (fd_freelist != NULL) {
921 grpc_fd *fd = fd_freelist;
922 fd_freelist = fd_freelist->freelist_next;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800923 gpr_mu_destroy(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700924 gpr_free(fd);
925 }
926 gpr_mu_destroy(&fd_freelist_mu);
927}
928
929static grpc_fd *fd_create(int fd, const char *name) {
930 grpc_fd *new_fd = NULL;
931
932 gpr_mu_lock(&fd_freelist_mu);
933 if (fd_freelist != NULL) {
934 new_fd = fd_freelist;
935 fd_freelist = fd_freelist->freelist_next;
936 }
937 gpr_mu_unlock(&fd_freelist_mu);
938
939 if (new_fd == NULL) {
940 new_fd = gpr_malloc(sizeof(grpc_fd));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800941 gpr_mu_init(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700942 }
943
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800944 /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
945 * is a newly created fd (or an fd we got from the freelist), no one else
946 * would be holding a lock to it anyway. */
947 gpr_mu_lock(&new_fd->po.mu);
948 new_fd->po.pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700949
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700950 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700951 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700952 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700953 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700954 new_fd->read_closure = CLOSURE_NOT_READY;
955 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700956 new_fd->freelist_next = NULL;
957 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700958 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700959
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800960 gpr_mu_unlock(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700961
962 char *fd_name;
963 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
964 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700965#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700966 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700967#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700968 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700969 return new_fd;
970}
971
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -0800972/*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700973static bool fd_is_orphaned(grpc_fd *fd) {
974 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
975}
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -0800976*/
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700977
978static int fd_wrapped_fd(grpc_fd *fd) {
979 int ret_fd = -1;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800980 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700981 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700982 ret_fd = fd->fd;
983 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800984 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700985
986 return ret_fd;
987}
988
989static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
990 grpc_closure *on_done, int *release_fd,
991 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700992 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700993 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -0700994 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700995
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800996 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700997 fd->on_done_closure = on_done;
998
999 /* If release_fd is not NULL, we should be relinquishing control of the file
1000 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001001 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001002 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001003 } else {
1004 close(fd->fd);
1005 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001006 }
1007
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001008 fd->orphaned = true;
1009
1010 /* Remove the active status but keep referenced. We want this grpc_fd struct
1011 to be alive (and not added to freelist) until the end of this function */
1012 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001013
1014 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001015 - Get a lock on the latest polling island (i.e the last island in the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001016 linked list pointed by fd->po.pi). This is the island that
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001017 would actually contain the fd
1018 - Remove the fd from the latest polling island
1019 - Unlock the latest polling island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001020 - Set fd->po.pi to NULL (but remove the ref on the polling island
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001021 before doing this.) */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001022 if (fd->po.pi != NULL) {
1023 polling_island *pi_latest = polling_island_lock(fd->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001024 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001025 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001026
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001027 unref_pi = fd->po.pi;
1028 fd->po.pi = NULL;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001029 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001030
Yuchen Zenga0399f22016-08-04 17:52:53 -07001031 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
1032 NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001033
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001034 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001035 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001036 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001037 /* Unref stale polling island here, outside the fd lock above.
1038 The polling island owns a workqueue which owns an fd, and unreffing
1039 inside the lock can cause an eventual lock loop that makes TSAN very
1040 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001041 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1042 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001043 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001044 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001045}
1046
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001047static grpc_error *fd_shutdown_error(bool shutdown) {
1048 if (!shutdown) {
1049 return GRPC_ERROR_NONE;
1050 } else {
1051 return GRPC_ERROR_CREATE("FD shutdown");
1052 }
1053}
1054
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001055static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1056 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001057 if (fd->shutdown) {
1058 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
1059 NULL);
1060 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001061 /* not ready ==> switch to a waiting state by setting the closure */
1062 *st = closure;
1063 } else if (*st == CLOSURE_READY) {
1064 /* already ready ==> queue the closure to run immediately */
1065 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001066 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
1067 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001068 } else {
1069 /* upcallptr was set to a different closure. This is an error! */
1070 gpr_log(GPR_ERROR,
1071 "User called a notify_on function with a previous callback still "
1072 "pending");
1073 abort();
1074 }
1075}
1076
1077/* returns 1 if state becomes not ready */
1078static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1079 grpc_closure **st) {
1080 if (*st == CLOSURE_READY) {
1081 /* duplicate ready ==> ignore */
1082 return 0;
1083 } else if (*st == CLOSURE_NOT_READY) {
1084 /* not ready, and not waiting ==> flag ready */
1085 *st = CLOSURE_READY;
1086 return 0;
1087 } else {
1088 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001089 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001090 *st = CLOSURE_NOT_READY;
1091 return 1;
1092 }
1093}
1094
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001095static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1096 grpc_fd *fd) {
1097 grpc_pollset *notifier = NULL;
1098
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001099 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001100 notifier = fd->read_notifier_pollset;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001101 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001102
1103 return notifier;
1104}
1105
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001106static bool fd_is_shutdown(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001107 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001108 const bool r = fd->shutdown;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001109 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001110 return r;
1111}
1112
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001113/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001114static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001115 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001116 /* Do the actual shutdown only once */
1117 if (!fd->shutdown) {
1118 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001119
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001120 shutdown(fd->fd, SHUT_RDWR);
1121 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1122 at this point, the closures would be called with 'success = false' */
1123 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1124 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1125 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001126 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001127}
1128
1129static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1130 grpc_closure *closure) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001131 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001132 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001133 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001134}
1135
1136static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1137 grpc_closure *closure) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001138 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001139 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001140 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001141}
1142
Craig Tillerd6ba6192016-06-30 15:42:41 -07001143static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001144 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001145 grpc_workqueue *workqueue =
1146 GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001147 gpr_mu_unlock(&fd->po.mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001148 return workqueue;
1149}
Craig Tiller70bd4832016-06-30 14:20:46 -07001150
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001151/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001152 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001153 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001154GPR_TLS_DECL(g_current_thread_pollset);
1155GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001156static __thread bool g_initialized_sigmask;
1157static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001158
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001159static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001160#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001161 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001162#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001163}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001164
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001165static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001166
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001167/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001168static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001169 gpr_tls_init(&g_current_thread_pollset);
1170 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001171 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001172 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001173}
1174
1175static void pollset_global_shutdown(void) {
1176 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001177 gpr_tls_destroy(&g_current_thread_pollset);
1178 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001179}
1180
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001181static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1182 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001183
1184 /* Kick the worker only if it was not already kicked */
1185 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1186 GRPC_POLLING_TRACE(
1187 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1188 (void *)worker, worker->pt_id);
1189 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1190 if (err_num != 0) {
1191 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1192 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001193 }
1194 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001195}
1196
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001197/* Return 1 if the pollset has active threads in pollset_work (pollset must
1198 * be locked) */
1199static int pollset_has_workers(grpc_pollset *p) {
1200 return p->root_worker.next != &p->root_worker;
1201}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001202
1203static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1204 worker->prev->next = worker->next;
1205 worker->next->prev = worker->prev;
1206}
1207
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001208static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1209 if (pollset_has_workers(p)) {
1210 grpc_pollset_worker *w = p->root_worker.next;
1211 remove_worker(p, w);
1212 return w;
1213 } else {
1214 return NULL;
1215 }
1216}
1217
1218static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1219 worker->next = &p->root_worker;
1220 worker->prev = worker->next->prev;
1221 worker->prev->next = worker->next->prev = worker;
1222}
1223
1224static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1225 worker->prev = &p->root_worker;
1226 worker->next = worker->prev->next;
1227 worker->prev->next = worker->next->prev = worker;
1228}
1229
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001230/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001231static grpc_error *pollset_kick(grpc_pollset *p,
1232 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001233 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001234 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001235 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001236 grpc_pollset_worker *worker = specific_worker;
1237 if (worker != NULL) {
1238 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001239 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001240 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001241 for (worker = p->root_worker.next; worker != &p->root_worker;
1242 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001243 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001244 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001245 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001246 }
Craig Tillera218a062016-06-26 09:58:37 -07001247 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001248 } else {
1249 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001250 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001251 } else {
1252 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001253 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001254 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001255 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001256 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001257 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1258 /* Since worker == NULL, it means that we can kick "any" worker on this
1259 pollset 'p'. If 'p' happens to be the same pollset this thread is
1260 currently polling (i.e in pollset_work() function), then there is no need
1261 to kick any other worker since the current thread can just absorb the
1262 kick. This is the reason why we enter this case only when
1263 g_current_thread_pollset is != p */
1264
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001265 GPR_TIMER_MARK("kick_anonymous", 0);
1266 worker = pop_front_worker(p);
1267 if (worker != NULL) {
1268 GPR_TIMER_MARK("finally_kick", 0);
1269 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001270 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001271 } else {
1272 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001273 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001274 }
1275 }
1276
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001277 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001278 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1279 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001280}
1281
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001282static grpc_error *kick_poller(void) {
1283 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1284}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001285
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001286static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001287 gpr_mu_init(&pollset->po.mu);
1288 *mu = &pollset->po.mu;
1289 pollset->po.pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001290
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001291 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001292 pollset->kicked_without_pollers = false;
1293
1294 pollset->shutting_down = false;
1295 pollset->finish_shutdown_called = false;
1296 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001297}
1298
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001299/* Convert a timespec to milliseconds:
1300 - Very small or negative poll times are clamped to zero to do a non-blocking
1301 poll (which becomes spin polling)
1302 - Other small values are rounded up to one millisecond
1303 - Longer than a millisecond polls are rounded up to the next nearest
1304 millisecond to avoid spinning
1305 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001306static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1307 gpr_timespec now) {
1308 gpr_timespec timeout;
1309 static const int64_t max_spin_polling_us = 10;
1310 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1311 return -1;
1312 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001313
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001314 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1315 max_spin_polling_us,
1316 GPR_TIMESPAN))) <= 0) {
1317 return 0;
1318 }
1319 timeout = gpr_time_sub(deadline, now);
1320 return gpr_time_to_millis(gpr_time_add(
1321 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1322}
1323
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001324static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1325 grpc_pollset *notifier) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001326 /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
1327 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001328 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1329 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001330 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001331}
1332
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001333static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001334 /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
1335 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001336 set_ready_locked(exec_ctx, fd, &fd->write_closure);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001337 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001338}
1339
Craig Tillerb39307d2016-06-30 15:39:13 -07001340static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1341 grpc_pollset *ps, char *reason) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001342 if (ps->po.pi != NULL) {
1343 PI_UNREF(exec_ctx, ps->po.pi, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001344 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001345 ps->po.pi = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001346}
1347
1348static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1349 grpc_pollset *pollset) {
1350 /* The pollset cannot have any workers if we are at this stage */
1351 GPR_ASSERT(!pollset_has_workers(pollset));
1352
1353 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001354
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001355 /* Release the ref and set pollset->po.pi to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001356 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001357 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001358}
1359
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001360/* pollset->po.mu lock must be held by the caller before calling this */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001361static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1362 grpc_closure *closure) {
1363 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1364 GPR_ASSERT(!pollset->shutting_down);
1365 pollset->shutting_down = true;
1366 pollset->shutdown_done = closure;
1367 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1368
1369 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1370 because it would release the underlying polling island. In such a case, we
1371 let the last worker call finish_shutdown_locked() from pollset_work() */
1372 if (!pollset_has_workers(pollset)) {
1373 GPR_ASSERT(!pollset->finish_shutdown_called);
1374 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1375 finish_shutdown_locked(exec_ctx, pollset);
1376 }
1377 GPR_TIMER_END("pollset_shutdown", 0);
1378}
1379
1380/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1381 * than destroying the mutexes, there is nothing special that needs to be done
1382 * here */
1383static void pollset_destroy(grpc_pollset *pollset) {
1384 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001385 gpr_mu_destroy(&pollset->po.mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001386}
1387
Craig Tiller2b49ea92016-07-01 13:21:27 -07001388static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001389 GPR_ASSERT(pollset->shutting_down);
1390 GPR_ASSERT(!pollset_has_workers(pollset));
1391 pollset->shutting_down = false;
1392 pollset->finish_shutdown_called = false;
1393 pollset->kicked_without_pollers = false;
1394 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001395 GPR_ASSERT(pollset->po.pi == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001396}
1397
Craig Tillerd8a3c042016-09-09 12:42:37 -07001398static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1399 polling_island *pi) {
1400 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1401 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1402 gpr_mu_unlock(&pi->workqueue_read_mu);
1403 if (n != NULL) {
1404 if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
1405 workqueue_maybe_wakeup(pi);
1406 }
1407 grpc_closure *c = (grpc_closure *)n;
1408 grpc_closure_run(exec_ctx, c, c->error_data.error);
1409 return true;
1410 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
Craig Tiller460502e2016-10-13 10:02:08 -07001411 /* n == NULL might mean there's work but it's not available to be popped
1412 * yet - try to ensure another workqueue wakes up to check shortly if so
1413 */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001414 workqueue_maybe_wakeup(pi);
1415 }
1416 }
1417 return false;
1418}
1419
Craig Tiller84ea3412016-09-08 14:57:56 -07001420#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001421/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1422static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001423 grpc_pollset *pollset,
1424 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001425 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001426 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001427 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001428 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001429 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001430 char *err_msg;
1431 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001432 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1433
1434 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001435 latest polling island pointed by pollset->po.pi
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001436
1437 Since epoll_fd is immutable, we can read it without obtaining the polling
1438 island lock. There is however a possibility that the polling island (from
1439 which we got the epoll_fd) got merged with another island while we are
1440 in this function. This is still okay because in such a case, we will wakeup
1441 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001442 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001443
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001444 if (pollset->po.pi == NULL) {
1445 pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
1446 if (pollset->po.pi == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001447 GPR_TIMER_END("pollset_work_and_unlock", 0);
1448 return; /* Fatal error. We cannot continue */
1449 }
1450
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001451 PI_ADD_REF(pollset->po.pi, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001452 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001453 (void *)pollset, (void *)pollset->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001454 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001455
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001456 pi = polling_island_maybe_get_latest(pollset->po.pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001457 epoll_fd = pi->epoll_fd;
1458
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001459 /* Update the pollset->po.pi since the island being pointed by
1460 pollset->po.pi maybe older than the one pointed by pi) */
1461 if (pollset->po.pi != pi) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001462 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1463 polling island to be deleted */
1464 PI_ADD_REF(pi, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001465 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
1466 pollset->po.pi = pi;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001467 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001468
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001469 /* Add an extra ref so that the island does not get destroyed (which means
1470 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1471 epoll_fd */
1472 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001473 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001474
Craig Tiller460502e2016-10-13 10:02:08 -07001475 /* If we get some workqueue work to do, it might end up completing an item on
1476 the completion queue, so there's no need to poll... so we skip that and
1477 redo the complete loop to verify */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001478 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
1479 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1480 g_current_thread_polling_island = pi;
1481
Vijay Paicef54012016-08-28 23:05:31 -07001482 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001483 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1484 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001485 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001486 if (ep_rv < 0) {
1487 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001488 gpr_asprintf(&err_msg,
1489 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1490 epoll_fd, errno, strerror(errno));
1491 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001492 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001493 /* We were interrupted. Save an interation by doing a zero timeout
1494 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001495 GRPC_POLLING_TRACE(
1496 "pollset_work: pollset: %p, worker: %p received kick",
1497 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001498 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001499 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001500 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001501
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001502#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001503 /* See the definition of g_poll_sync for more details */
1504 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001505#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001506
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001507 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001508 void *data_ptr = ep_ev[i].data.ptr;
1509 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001510 append_error(error,
1511 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1512 err_desc);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001513 } else if (data_ptr == &pi->workqueue_wakeup_fd) {
1514 append_error(error,
1515 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1516 err_desc);
1517 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001518 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001519 GRPC_POLLING_TRACE(
1520 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1521 "%d) got merged",
1522 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001523 /* This means that our polling island is merged with a different
1524 island. We do not have to do anything here since the subsequent call
1525 to the function pollset_work_and_unlock() will pick up the correct
1526 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001527 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001528 grpc_fd *fd = data_ptr;
1529 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1530 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1531 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001532 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001533 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001534 }
1535 if (write_ev || cancel) {
1536 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001537 }
1538 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001539 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001540
1541 g_current_thread_polling_island = NULL;
1542 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1543 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001544
1545 GPR_ASSERT(pi != NULL);
1546
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001547 /* Before leaving, release the extra ref we added to the polling island. It
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001548 is important to use "pi" here (i.e our old copy of pollset->po.pi
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001549 that we got before releasing the polling island lock). This is because
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001550 pollset->po.pi pointer might get udpated in other parts of the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001551 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001552 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001553
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001554 GPR_TIMER_END("pollset_work_and_unlock", 0);
1555}
1556
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001557/* pollset->po.mu lock must be held by the caller before calling this.
1558 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001559 during the course of its execution but it will always re-acquire the lock and
1560 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001561static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1562 grpc_pollset_worker **worker_hdl,
1563 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001564 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001565 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001566 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1567
1568 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001569
1570 grpc_pollset_worker worker;
1571 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001572 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001573 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001574
1575 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001576
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001577 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1578 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001579
1580 if (pollset->kicked_without_pollers) {
1581 /* If the pollset was kicked without pollers, pretend that the current
1582 worker got the kick and skip polling. A kick indicates that there is some
1583 work that needs attention like an event on the completion queue or an
1584 alarm */
1585 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1586 pollset->kicked_without_pollers = 0;
1587 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001588 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001589 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1590 worker that there is some pending work that needs immediate attention
1591 (like an event on the completion queue, or a polling island merge that
1592 results in a new epoll-fd to wait on) and that the worker should not
1593 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001594
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001595 A worker can be kicked anytime from the point it is added to the pollset
1596 via push_front_worker() (or push_back_worker()) to the point it is
1597 removed via remove_worker().
1598 If the worker is kicked before/during it calls epoll_pwait(), it should
1599 immediately exit from epoll_wait(). If the worker is kicked after it
1600 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001601
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001602 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001603 times *except* when it is in epoll_pwait(). This way, the worker never
1604 misses acting on a kick */
1605
Craig Tiller19196992016-06-27 18:45:56 -07001606 if (!g_initialized_sigmask) {
1607 sigemptyset(&new_mask);
1608 sigaddset(&new_mask, grpc_wakeup_signal);
1609 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1610 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1611 g_initialized_sigmask = true;
1612 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1613 This is the mask used at all times *except during
1614 epoll_wait()*"
1615 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001616 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001617
Craig Tiller19196992016-06-27 18:45:56 -07001618 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001619 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001620 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001621
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001622 push_front_worker(pollset, &worker); /* Add worker to pollset */
1623
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001624 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1625 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001626 grpc_exec_ctx_flush(exec_ctx);
1627
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001628 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001629
1630 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1631 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001632 remove_worker(pollset, &worker);
1633 }
1634
1635 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1636 false at this point) and the pollset is shutting down, we may have to
1637 finish the shutdown process by calling finish_shutdown_locked().
1638 See pollset_shutdown() for more details.
1639
1640 Note: Continuing to access pollset here is safe; it is the caller's
1641 responsibility to not destroy a pollset when it has outstanding calls to
1642 pollset_work() */
1643 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1644 !pollset->finish_shutdown_called) {
1645 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1646 finish_shutdown_locked(exec_ctx, pollset);
1647
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001648 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001649 grpc_exec_ctx_flush(exec_ctx);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001650 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001651 }
1652
1653 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001654
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001655 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1656 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001657
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001658 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001659
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001660 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1661 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001662}
1663
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001664static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
1665 poll_obj *item, poll_obj_type bag_type,
1666 poll_obj_type item_type) {
1667 GPR_TIMER_BEGIN("add_poll_object", 0);
1668
1669 grpc_error *error = GRPC_ERROR_NONE;
1670 polling_island *pi_new = NULL;
1671
1672 gpr_mu_lock(&bag->mu);
1673 gpr_mu_lock(&item->mu);
1674
1675retry:
1676 /*
1677 * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1678 * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1679 * a refcount of 2) and point item->pi and bag->pi to the new island
1680 * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1681 * the other's non-NULL pi
1682 * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1683 * polling islands and update item->pi and bag->pi to point to the new
1684 * island
1685 */
1686
1687 /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1688 * orphaned */
1689 if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1690 gpr_mu_unlock(&item->mu);
1691 gpr_mu_unlock(&bag->mu);
1692 return;
1693 }
1694
1695 if (item->pi == bag->pi) {
1696 pi_new = item->pi;
1697 if (pi_new == NULL) {
1698 /* GPR_ASSERT(item->pi == bag->pi == NULL) */
1699
1700 /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1701 * we need to do some extra work to make TSAN happy */
1702 if (item_type == POLL_OBJ_FD) {
1703 /* Unlock before creating a new polling island: the polling island will
1704 create a workqueue which creates a file descriptor, and holding an fd
1705 lock here can eventually cause a loop to appear to TSAN (making it
1706 unhappy). We don't think it's a real loop (there's an epoch point
1707 where that loop possibility disappears), but the advantages of
1708 keeping TSAN happy outweigh any performance advantage we might have
1709 by keeping the lock held. */
1710 gpr_mu_unlock(&item->mu);
1711 pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
1712 gpr_mu_lock(&item->mu);
1713
1714 /* Need to reverify any assumptions made between the initial lock and
1715 getting to this branch: if they've changed, we need to throw away our
1716 work and figure things out again. */
1717 if (item->pi != NULL) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001718 GRPC_POLLING_TRACE(
1719 "add_poll_object: Raced creating new polling island. pi_new: %p "
1720 "(fd: %d, %s: %p)",
1721 (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1722 (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001723 /* No need to lock 'pi_new' here since this is a new polling island
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001724 * and no one has a reference to it yet */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001725 polling_island_remove_all_fds_locked(pi_new, true, &error);
1726
1727 /* Ref and unref so that the polling island gets deleted during unref
1728 */
1729 PI_ADD_REF(pi_new, "dance_of_destruction");
1730 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
1731 goto retry;
1732 }
1733 } else {
1734 pi_new = polling_island_create(exec_ctx, NULL, &error);
1735 }
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001736
1737 GRPC_POLLING_TRACE(
1738 "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1739 "%s: %p)",
1740 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1741 poll_obj_string(bag_type), (void *)bag);
1742 } else {
1743 GRPC_POLLING_TRACE(
1744 "add_poll_object: Same polling island. pi: %p (%s, %s)",
1745 (void *)pi_new, poll_obj_string(item_type),
1746 poll_obj_string(bag_type));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001747 }
1748 } else if (item->pi == NULL) {
1749 /* GPR_ASSERT(bag->pi != NULL) */
1750 /* Make pi_new point to latest pi*/
1751 pi_new = polling_island_lock(bag->pi);
1752
1753 if (item_type == POLL_OBJ_FD) {
1754 grpc_fd *fd = FD_FROM_PO(item);
1755 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1756 }
1757
1758 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001759 GRPC_POLLING_TRACE(
1760 "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1761 "bag(%s): %p)",
1762 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1763 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001764 } else if (bag->pi == NULL) {
1765 /* GPR_ASSERT(item->pi != NULL) */
1766 /* Make pi_new to point to latest pi */
1767 pi_new = polling_island_lock(item->pi);
1768 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001769 GRPC_POLLING_TRACE(
1770 "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1771 "bag(%s): %p)",
1772 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1773 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001774 } else {
1775 pi_new = polling_island_merge(item->pi, bag->pi, &error);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001776 GRPC_POLLING_TRACE(
1777 "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1778 "bag(%s): %p)",
1779 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1780 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001781 }
1782
1783 /* At this point, pi_new is the polling island that both item->pi and bag->pi
1784 MUST be pointing to */
1785
1786 if (item->pi != pi_new) {
1787 PI_ADD_REF(pi_new, poll_obj_string(item_type));
1788 if (item->pi != NULL) {
1789 PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
1790 }
1791 item->pi = pi_new;
1792 }
1793
1794 if (bag->pi != pi_new) {
1795 PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1796 if (bag->pi != NULL) {
1797 PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
1798 }
1799 bag->pi = pi_new;
1800 }
1801
1802 gpr_mu_unlock(&item->mu);
1803 gpr_mu_unlock(&bag->mu);
1804
1805 GRPC_LOG_IF_ERROR("add_poll_object", error);
1806 GPR_TIMER_END("add_poll_object", 0);
1807}
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001808
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001809static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1810 grpc_fd *fd) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001811 add_poll_object(exec_ctx, &pollset->po, &fd->po, POLL_OBJ_POLLSET,
1812 POLL_OBJ_FD);
1813}
1814
1815#if 0
1816static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1817 grpc_fd *fd) {
Craig Tiller57726ca2016-09-12 11:59:45 -07001818 GPR_TIMER_BEGIN("pollset_add_fd", 0);
1819
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001820 grpc_error *error = GRPC_ERROR_NONE;
1821
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001822 gpr_mu_lock(&pollset->po.mu);
1823 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001824
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001825 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001826
Craig Tiller7212c232016-07-06 13:11:09 -07001827retry:
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001828 /* 1) If fd->po.pi and pollset->po.pi are both non-NULL and
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001829 * equal, do nothing.
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001830 * 2) If fd->po.pi and pollset->po.pi are both NULL, create
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001831 * a new polling island (with a refcount of 2) and make the polling_island
1832 * fields in both fd and pollset to point to the new island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001833 * 3) If one of fd->po.pi or pollset->po.pi is NULL, update
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001834 * the NULL polling_island field to point to the non-NULL polling_island
1835 * field (ensure that the refcount on the polling island is incremented by
1836 * 1 to account for the newly added reference)
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001837 * 4) Finally, if fd->po.pi and pollset->po.pi are non-NULL
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001838 * and different, merge both the polling islands and update the
1839 * polling_island fields in both fd and pollset to point to the merged
1840 * polling island.
1841 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001842
Craig Tiller42ac6db2016-07-06 17:13:56 -07001843 if (fd->orphaned) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001844 gpr_mu_unlock(&fd->po.mu);
1845 gpr_mu_unlock(&pollset->po.mu);
Craig Tiller42ac6db2016-07-06 17:13:56 -07001846 /* early out */
1847 return;
1848 }
1849
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001850 if (fd->po.pi == pollset->po.pi) {
1851 pi_new = fd->po.pi;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001852 if (pi_new == NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001853 /* Unlock before creating a new polling island: the polling island will
1854 create a workqueue which creates a file descriptor, and holding an fd
1855 lock here can eventually cause a loop to appear to TSAN (making it
1856 unhappy). We don't think it's a real loop (there's an epoch point where
1857 that loop possibility disappears), but the advantages of keeping TSAN
1858 happy outweigh any performance advantage we might have by keeping the
1859 lock held. */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001860 gpr_mu_unlock(&fd->po.mu);
Craig Tillerb39307d2016-06-30 15:39:13 -07001861 pi_new = polling_island_create(exec_ctx, fd, &error);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001862 gpr_mu_lock(&fd->po.mu);
Craig Tiller0a06cd72016-07-14 13:21:24 -07001863 /* Need to reverify any assumptions made between the initial lock and
1864 getting to this branch: if they've changed, we need to throw away our
1865 work and figure things out again. */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001866 if (fd->po.pi != NULL) {
Craig Tiller27da6422016-07-06 13:14:46 -07001867 GRPC_POLLING_TRACE(
1868 "pollset_add_fd: Raced creating new polling island. pi_new: %p "
1869 "(fd: %d, pollset: %p)",
1870 (void *)pi_new, fd->fd, (void *)pollset);
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001871
1872 /* No need to lock 'pi_new' here since this is a new polling island and
Sree Kuchibhotla485a9022016-10-26 16:46:55 -07001873 * no one has a reference to it yet */
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001874 polling_island_remove_all_fds_locked(pi_new, true, &error);
1875
1876 /* Ref and unref so that the polling island gets deleted during unref */
Craig Tiller27da6422016-07-06 13:14:46 -07001877 PI_ADD_REF(pi_new, "dance_of_destruction");
1878 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
Craig Tiller7212c232016-07-06 13:11:09 -07001879 goto retry;
Craig Tiller27da6422016-07-06 13:14:46 -07001880 } else {
1881 GRPC_POLLING_TRACE(
1882 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
1883 "pollset: %p)",
1884 (void *)pi_new, fd->fd, (void *)pollset);
Craig Tiller7212c232016-07-06 13:11:09 -07001885 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001886 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001887 } else if (fd->po.pi == NULL) {
1888 pi_new = polling_island_lock(pollset->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001889 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001890 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001891
1892 GRPC_POLLING_TRACE(
1893 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1894 "pollset->pi: %p)",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001895 (void *)pi_new, fd->fd, (void *)pollset, (void *)pollset->po.pi);
1896 } else if (pollset->po.pi == NULL) {
1897 pi_new = polling_island_lock(fd->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001898 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001899
1900 GRPC_POLLING_TRACE(
1901 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1902 "%p, fd->pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001903 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->po.pi);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001904 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001905 pi_new = polling_island_merge(fd->po.pi, pollset->po.pi, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001906 GRPC_POLLING_TRACE(
1907 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1908 "%p, fd->pi: %p, pollset->pi: %p)",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001909 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->po.pi,
1910 (void *)pollset->po.pi);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001911 }
1912
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001913 /* At this point, pi_new is the polling island that both fd->po.pi
1914 and pollset->po.pi must be pointing to */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001915
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001916 if (fd->po.pi != pi_new) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001917 PI_ADD_REF(pi_new, "fd");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001918 if (fd->po.pi != NULL) {
1919 PI_UNREF(exec_ctx, fd->po.pi, "fd");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001920 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001921 fd->po.pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001922 }
1923
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001924 if (pollset->po.pi != pi_new) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001925 PI_ADD_REF(pi_new, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001926 if (pollset->po.pi != NULL) {
1927 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001928 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001929 pollset->po.pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001930 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001931
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001932 gpr_mu_unlock(&fd->po.mu);
1933 gpr_mu_unlock(&pollset->po.mu);
Craig Tiller15007612016-07-06 09:36:16 -07001934
1935 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001936}
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001937#endif
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001938
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001939/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001940 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001941 */
1942
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001943#if 0
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001944static grpc_pollset_set *pollset_set_create(void) {
1945 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1946 memset(pollset_set, 0, sizeof(*pollset_set));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001947 gpr_mu_init(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001948 return pollset_set;
1949}
1950
1951static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1952 size_t i;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001953 gpr_mu_destroy(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001954 for (i = 0; i < pollset_set->fd_count; i++) {
1955 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1956 }
1957 gpr_free(pollset_set->pollsets);
1958 gpr_free(pollset_set->pollset_sets);
1959 gpr_free(pollset_set->fds);
1960 gpr_free(pollset_set);
1961}
1962
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001963static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1964 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1965 size_t i;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001966 gpr_mu_lock(&pollset_set->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001967 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1968 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1969 pollset_set->fds = gpr_realloc(
1970 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1971 }
1972 GRPC_FD_REF(fd, "pollset_set");
1973 pollset_set->fds[pollset_set->fd_count++] = fd;
1974 for (i = 0; i < pollset_set->pollset_count; i++) {
1975 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1976 }
1977 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1978 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1979 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001980 gpr_mu_unlock(&pollset_set->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001981}
1982
1983static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1984 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1985 size_t i;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001986 gpr_mu_lock(&pollset_set->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001987 for (i = 0; i < pollset_set->fd_count; i++) {
1988 if (pollset_set->fds[i] == fd) {
1989 pollset_set->fd_count--;
1990 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1991 pollset_set->fds[pollset_set->fd_count]);
1992 GRPC_FD_UNREF(fd, "pollset_set");
1993 break;
1994 }
1995 }
1996 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1997 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1998 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001999 gpr_mu_unlock(&pollset_set->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002000}
2001
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002002static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
2003 grpc_pollset_set *pollset_set,
2004 grpc_pollset *pollset) {
2005 size_t i, j;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002006 gpr_mu_lock(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002007 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
2008 pollset_set->pollset_capacity =
2009 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
2010 pollset_set->pollsets =
2011 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
2012 sizeof(*pollset_set->pollsets));
2013 }
2014 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
2015 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
2016 if (fd_is_orphaned(pollset_set->fds[i])) {
2017 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
2018 } else {
2019 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
2020 pollset_set->fds[j++] = pollset_set->fds[i];
2021 }
2022 }
2023 pollset_set->fd_count = j;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002024 gpr_mu_unlock(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002025}
2026
2027static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
2028 grpc_pollset_set *pollset_set,
2029 grpc_pollset *pollset) {
2030 size_t i;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002031 gpr_mu_lock(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002032 for (i = 0; i < pollset_set->pollset_count; i++) {
2033 if (pollset_set->pollsets[i] == pollset) {
2034 pollset_set->pollset_count--;
2035 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
2036 pollset_set->pollsets[pollset_set->pollset_count]);
2037 break;
2038 }
2039 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002040 gpr_mu_unlock(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002041}
2042
2043static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
2044 grpc_pollset_set *bag,
2045 grpc_pollset_set *item) {
2046 size_t i, j;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002047 gpr_mu_lock(&bag->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002048 if (bag->pollset_set_count == bag->pollset_set_capacity) {
2049 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
2050 bag->pollset_sets =
2051 gpr_realloc(bag->pollset_sets,
2052 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
2053 }
2054 bag->pollset_sets[bag->pollset_set_count++] = item;
2055 for (i = 0, j = 0; i < bag->fd_count; i++) {
2056 if (fd_is_orphaned(bag->fds[i])) {
2057 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
2058 } else {
2059 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
2060 bag->fds[j++] = bag->fds[i];
2061 }
2062 }
2063 bag->fd_count = j;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002064 gpr_mu_unlock(&bag->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002065}
2066
2067static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
2068 grpc_pollset_set *bag,
2069 grpc_pollset_set *item) {
2070 size_t i;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002071 gpr_mu_lock(&bag->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002072 for (i = 0; i < bag->pollset_set_count; i++) {
2073 if (bag->pollset_sets[i] == item) {
2074 bag->pollset_set_count--;
2075 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
2076 bag->pollset_sets[bag->pollset_set_count]);
2077 break;
2078 }
2079 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002080 gpr_mu_unlock(&bag->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002081}
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002082#endif // Pollset_set functions
2083
2084static grpc_pollset_set *pollset_set_create(void) {
2085 grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
2086 pss->po.pi = NULL;
2087 gpr_mu_init(&pss->po.mu);
2088 return pss;
2089}
2090
2091static void pollset_set_destroy(grpc_pollset_set *pss) {
2092 gpr_mu_destroy(&pss->po.mu);
2093
2094 if (pss->po.pi != NULL) {
2095 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
2096 PI_UNREF(&exec_ctx, pss->po.pi, "pss_destroy");
2097 grpc_exec_ctx_finish(&exec_ctx);
2098 }
2099
2100 gpr_free(pss);
2101}
2102
2103static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
2104 grpc_fd *fd) {
2105 add_poll_object(exec_ctx, &pss->po, &fd->po, POLL_OBJ_POLLSET_SET,
2106 POLL_OBJ_FD);
2107}
2108
2109static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
2110 grpc_fd *fd) {
2111 /* Nothing to do */
2112}
2113
2114static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
2115 grpc_pollset_set *pss, grpc_pollset *ps) {
2116 add_poll_object(exec_ctx, &pss->po, &ps->po, POLL_OBJ_POLLSET_SET,
2117 POLL_OBJ_POLLSET);
2118}
2119
2120static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
2121 grpc_pollset_set *pss, grpc_pollset *ps) {
2122 /* Nothing to do */
2123}
2124
2125static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
2126 grpc_pollset_set *bag,
2127 grpc_pollset_set *item) {
2128 add_poll_object(exec_ctx, &bag->po, &item->po, POLL_OBJ_POLLSET_SET,
2129 POLL_OBJ_POLLSET_SET);
2130}
2131
2132static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
2133 grpc_pollset_set *bag,
2134 grpc_pollset_set *item) {
2135 /* Nothing to do */
2136}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002137
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002138/* Test helper functions
2139 * */
2140void *grpc_fd_get_polling_island(grpc_fd *fd) {
2141 polling_island *pi;
2142
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002143 gpr_mu_lock(&fd->po.mu);
2144 pi = fd->po.pi;
2145 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002146
2147 return pi;
2148}
2149
2150void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
2151 polling_island *pi;
2152
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002153 gpr_mu_lock(&ps->po.mu);
2154 pi = ps->po.pi;
2155 gpr_mu_unlock(&ps->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002156
2157 return pi;
2158}
2159
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002160bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002161 polling_island *p1 = p;
2162 polling_island *p2 = q;
2163
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07002164 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
2165 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002166 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07002167 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002168
2169 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002170}
2171
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002172/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002173 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002174 */
2175
2176static void shutdown_engine(void) {
2177 fd_global_shutdown();
2178 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07002179 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002180}
2181
2182static const grpc_event_engine_vtable vtable = {
2183 .pollset_size = sizeof(grpc_pollset),
2184
2185 .fd_create = fd_create,
2186 .fd_wrapped_fd = fd_wrapped_fd,
2187 .fd_orphan = fd_orphan,
2188 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07002189 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002190 .fd_notify_on_read = fd_notify_on_read,
2191 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002192 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07002193 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002194
2195 .pollset_init = pollset_init,
2196 .pollset_shutdown = pollset_shutdown,
2197 .pollset_reset = pollset_reset,
2198 .pollset_destroy = pollset_destroy,
2199 .pollset_work = pollset_work,
2200 .pollset_kick = pollset_kick,
2201 .pollset_add_fd = pollset_add_fd,
2202
2203 .pollset_set_create = pollset_set_create,
2204 .pollset_set_destroy = pollset_set_destroy,
2205 .pollset_set_add_pollset = pollset_set_add_pollset,
2206 .pollset_set_del_pollset = pollset_set_del_pollset,
2207 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
2208 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
2209 .pollset_set_add_fd = pollset_set_add_fd,
2210 .pollset_set_del_fd = pollset_set_del_fd,
2211
2212 .kick_poller = kick_poller,
2213
Craig Tillerd8a3c042016-09-09 12:42:37 -07002214 .workqueue_ref = workqueue_ref,
2215 .workqueue_unref = workqueue_unref,
2216 .workqueue_enqueue = workqueue_enqueue,
2217
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002218 .shutdown_engine = shutdown_engine,
2219};
2220
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002221/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
2222 * Create a dummy epoll_fd to make sure epoll support is available */
2223static bool is_epoll_available() {
2224 int fd = epoll_create1(EPOLL_CLOEXEC);
2225 if (fd < 0) {
2226 gpr_log(
2227 GPR_ERROR,
2228 "epoll_create1 failed with error: %d. Not using epoll polling engine",
2229 fd);
2230 return false;
2231 }
2232 close(fd);
2233 return true;
2234}
2235
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002236const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002237 /* If use of signals is disabled, we cannot use epoll engine*/
2238 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
2239 return NULL;
2240 }
2241
Ken Paysoncd7d0472016-10-11 12:24:20 -07002242 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07002243 return NULL;
2244 }
2245
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002246 if (!is_epoll_available()) {
2247 return NULL;
2248 }
2249
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002250 if (!is_grpc_wakeup_signal_initialized) {
Sree Kuchibhotlabd48c912016-09-27 16:48:25 -07002251 grpc_use_signal(SIGRTMIN + 6);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002252 }
2253
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002254 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07002255
2256 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
2257 return NULL;
2258 }
2259
2260 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
2261 polling_island_global_init())) {
2262 return NULL;
2263 }
2264
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002265 return &vtable;
2266}
2267
murgatroid99623dd4f2016-08-08 17:31:27 -07002268#else /* defined(GRPC_LINUX_EPOLL) */
2269#if defined(GRPC_POSIX_SOCKET)
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002270#include "src/core/lib/iomgr/ev_posix.h"
murgatroid99623dd4f2016-08-08 17:31:27 -07002271/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002272 * NULL */
2273const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
murgatroid99623dd4f2016-08-08 17:31:27 -07002274#endif /* defined(GRPC_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002275
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002276void grpc_use_signal(int signum) {}
murgatroid99623dd4f2016-08-08 17:31:27 -07002277#endif /* !defined(GRPC_LINUX_EPOLL) */