blob: e9a0c690c0e5a9c6e1539aa8b1c91dc4187f5e45 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
murgatroid9954070892016-08-08 17:01:18 -070035#include "src/core/lib/iomgr/port.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070036
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
murgatroid99623dd4f2016-08-08 17:31:27 -070038#ifdef GRPC_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070045#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070046#include <signal.h>
47#include <string.h>
48#include <sys/epoll.h>
49#include <sys/socket.h>
50#include <unistd.h>
51
52#include <grpc/support/alloc.h>
53#include <grpc/support/log.h>
54#include <grpc/support/string_util.h>
55#include <grpc/support/tls.h>
56#include <grpc/support/useful.h>
57
58#include "src/core/lib/iomgr/ev_posix.h"
59#include "src/core/lib/iomgr/iomgr_internal.h"
60#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070061#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070062#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Sree Kuchibhotla34217242016-06-29 00:19:07 -070065/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
66static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070067#define GRPC_POLLING_TRACE(fmt, ...) \
68 if (grpc_polling_trace) { \
69 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
70 }
71
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070072static int grpc_wakeup_signal = -1;
73static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070075/* Implements the function defined in grpc_posix.h. This function might be
76 * called before even calling grpc_init() to set either a different signal to
77 * use. If signum == -1, then the use of signals is disabled */
78void grpc_use_signal(int signum) {
79 grpc_wakeup_signal = signum;
80 is_grpc_wakeup_signal_initialized = true;
81
82 if (grpc_wakeup_signal < 0) {
83 gpr_log(GPR_INFO,
84 "Use of signals is disabled. Epoll engine will not be used");
85 } else {
86 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
87 grpc_wakeup_signal);
88 }
89}
90
91struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070092
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -080093typedef enum {
94 POLL_OBJ_FD,
95 POLL_OBJ_POLLSET,
96 POLL_OBJ_POLLSET_SET
97} poll_obj_type;
98
99typedef struct poll_obj {
100 gpr_mu mu;
101 struct polling_island *pi;
102} poll_obj;
103
104const char *poll_obj_string(poll_obj_type po_type) {
105 switch (po_type) {
106 case POLL_OBJ_FD:
107 return "fd";
108 case POLL_OBJ_POLLSET:
109 return "pollset";
110 case POLL_OBJ_POLLSET_SET:
111 return "pollset_set";
112 }
113
114 GPR_UNREACHABLE_CODE(return "UNKNOWN");
115}
116
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700117/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700118 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800120
121#define FD_FROM_PO(po) ((grpc_fd *)(po))
122
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123struct grpc_fd {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800124 poll_obj po;
125
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700126 int fd;
127 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700128 bit 0 : 1=Active / 0=Orphaned
129 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700130 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700131 gpr_atm refst;
132
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700133 /* Indicates that the fd is shutdown and that any pending read/write closures
134 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700135 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700136
137 /* The fd is either closed or we relinquished control of it. In either cases,
138 this indicates that the 'fd' on this structure is no longer valid */
139 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700140
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700141 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700142 grpc_closure *read_closure;
143 grpc_closure *write_closure;
144
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700145 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700146 grpc_closure *on_done_closure;
147
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700148 /* The pollset that last noticed that the fd is readable */
149 grpc_pollset *read_notifier_pollset;
150
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700151 grpc_iomgr_object iomgr_object;
152};
153
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700154/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700155// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700156#ifdef GRPC_FD_REF_COUNT_DEBUG
157static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
158static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
159 int line);
160#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
161#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
162#else
163static void fd_ref(grpc_fd *fd);
164static void fd_unref(grpc_fd *fd);
165#define GRPC_FD_REF(fd, reason) fd_ref(fd)
166#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
167#endif
168
169static void fd_global_init(void);
170static void fd_global_shutdown(void);
171
172#define CLOSURE_NOT_READY ((grpc_closure *)0)
173#define CLOSURE_READY ((grpc_closure *)1)
174
175/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700176 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700177 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700178
Craig Tillerd8a3c042016-09-09 12:42:37 -0700179#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700180
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700181#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700182#define PI_UNREF(exec_ctx, p, r) \
183 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700184
Craig Tillerd8a3c042016-09-09 12:42:37 -0700185#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700186
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700187#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700188#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700189
Yuchen Zeng362ac1b2016-09-13 16:01:31 -0700190#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700191
Craig Tiller460502e2016-10-13 10:02:08 -0700192/* This is also used as grpc_workqueue (by directly casing it) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700193typedef struct polling_island {
194 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700195 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
196 the refcount.
197 Once the ref count becomes zero, this structure is destroyed which means
198 we should ensure that there is never a scenario where a PI_ADD_REF() is
199 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700200 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700201
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700202 /* Pointer to the polling_island this merged into.
203 * merged_to value is only set once in polling_island's lifetime (and that too
204 * only if the island is merged with another island). Because of this, we can
205 * use gpr_atm type here so that we can do atomic access on this and reduce
206 * lock contention on 'mu' mutex.
207 *
208 * Note that if this field is not NULL (i.e not 0), all the remaining fields
209 * (except mu and ref_count) are invalid and must be ignored. */
210 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700211
Craig Tiller460502e2016-10-13 10:02:08 -0700212 /* Number of threads currently polling on this island */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700213 gpr_atm poller_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700214 /* Mutex guarding the read end of the workqueue (must be held to pop from
215 * workqueue_items) */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700216 gpr_mu workqueue_read_mu;
Craig Tiller460502e2016-10-13 10:02:08 -0700217 /* Queue of closures to be executed */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700218 gpr_mpscq workqueue_items;
Craig Tiller460502e2016-10-13 10:02:08 -0700219 /* Count of items in workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700220 gpr_atm workqueue_item_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700221 /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700222 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700223
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700224 /* The fd of the underlying epoll set */
225 int epoll_fd;
226
227 /* The file descriptors in the epoll set */
228 size_t fd_cnt;
229 size_t fd_capacity;
230 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700231} polling_island;
232
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700233/*******************************************************************************
234 * Pollset Declarations
235 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700236struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700237 /* Thread id of this worker */
238 pthread_t pt_id;
239
240 /* Used to prevent a worker from getting kicked multiple times */
241 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700242 struct grpc_pollset_worker *next;
243 struct grpc_pollset_worker *prev;
244};
245
246struct grpc_pollset {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800247 poll_obj po;
248
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700249 grpc_pollset_worker root_worker;
250 bool kicked_without_pollers;
251
252 bool shutting_down; /* Is the pollset shutting down ? */
253 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
254 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700255};
256
257/*******************************************************************************
258 * Pollset-set Declarations
259 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700260/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
261 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
262 * the current pollset_set would result in polling island merges. This would
263 * remove the need to maintain fd_count here. This will also significantly
264 * simplify the grpc_fd structure since we would no longer need to explicitly
265 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700266struct grpc_pollset_set {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800267 poll_obj po;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700268
269 size_t pollset_count;
270 size_t pollset_capacity;
271 grpc_pollset **pollsets;
272
273 size_t pollset_set_count;
274 size_t pollset_set_capacity;
275 struct grpc_pollset_set **pollset_sets;
276
277 size_t fd_count;
278 size_t fd_capacity;
279 grpc_fd **fds;
280};
281
282/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700283 * Common helpers
284 */
285
Craig Tillerf975f742016-07-01 14:56:27 -0700286static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700287 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700288 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700289 if (*composite == GRPC_ERROR_NONE) {
290 *composite = GRPC_ERROR_CREATE(desc);
291 }
292 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700293 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700294}
295
296/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700297 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700298 */
299
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700300/* The wakeup fd that is used to wake up all threads in a Polling island. This
301 is useful in the polling island merge operation where we need to wakeup all
302 the threads currently polling the smaller polling island (so that they can
303 start polling the new/merged polling island)
304
305 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
306 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
307static grpc_wakeup_fd polling_island_wakeup_fd;
308
Craig Tiller2e620132016-10-10 15:27:44 -0700309/* The polling island being polled right now.
310 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700311static __thread polling_island *g_current_thread_polling_island;
312
Craig Tillerb39307d2016-06-30 15:39:13 -0700313/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700314static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700315
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700316#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700317/* Currently TSAN may incorrectly flag data races between epoll_ctl and
318 epoll_wait for any grpc_fd structs that are added to the epoll set via
319 epoll_ctl and are returned (within a very short window) via epoll_wait().
320
321 To work-around this race, we establish a happens-before relation between
322 the code just-before epoll_ctl() and the code after epoll_wait() by using
323 this atomic */
324gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700325#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700326
Craig Tillerb39307d2016-06-30 15:39:13 -0700327static void pi_add_ref(polling_island *pi);
328static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700329
Craig Tillerd8a3c042016-09-09 12:42:37 -0700330#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700331static void pi_add_ref_dbg(polling_island *pi, const char *reason,
332 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700333 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700334 pi_add_ref(pi);
335 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
336 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700337}
338
Craig Tillerb39307d2016-06-30 15:39:13 -0700339static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700340 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700341 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700342 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700343 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700344 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700345}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700346
347static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
348 const char *file, int line,
349 const char *reason) {
350 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700351 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700352 }
353 return workqueue;
354}
355
356static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
357 const char *file, int line, const char *reason) {
358 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700359 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700360 }
361}
362#else
363static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
364 if (workqueue != NULL) {
365 pi_add_ref((polling_island *)workqueue);
366 }
367 return workqueue;
368}
369
370static void workqueue_unref(grpc_exec_ctx *exec_ctx,
371 grpc_workqueue *workqueue) {
372 if (workqueue != NULL) {
373 pi_unref(exec_ctx, (polling_island *)workqueue);
374 }
375}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700376#endif
377
Craig Tiller15007612016-07-06 09:36:16 -0700378static void pi_add_ref(polling_island *pi) {
379 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
380}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700381
Craig Tillerb39307d2016-06-30 15:39:13 -0700382static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700383 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700384 Note that this deletion not be done under a lock. Once the ref count goes
385 to zero, we are guaranteed that no one else holds a reference to the
386 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700387
388 Also, if we are deleting the polling island and the merged_to field is
389 non-empty, we should remove a ref to the merged_to polling island
390 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700391 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
392 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
393 polling_island_delete(exec_ctx, pi);
394 if (next != NULL) {
395 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700396 }
397 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700398}
399
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700400/* The caller is expected to hold pi->mu lock before calling this function */
401static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700402 size_t fd_count, bool add_fd_refs,
403 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700404 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700405 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700406 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700407 char *err_msg;
408 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700409
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700410#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700411 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700412 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700413#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700414
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700415 for (i = 0; i < fd_count; i++) {
416 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
417 ev.data.ptr = fds[i];
418 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700419
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700420 if (err < 0) {
421 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700422 gpr_asprintf(
423 &err_msg,
424 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
425 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
426 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
427 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700428 }
429
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700430 continue;
431 }
432
433 if (pi->fd_cnt == pi->fd_capacity) {
434 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
435 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
436 }
437
438 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700439 if (add_fd_refs) {
440 GRPC_FD_REF(fds[i], "polling_island");
441 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700442 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700443}
444
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700445/* The caller is expected to hold pi->mu before calling this */
446static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700447 grpc_wakeup_fd *wakeup_fd,
448 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700449 struct epoll_event ev;
450 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700451 char *err_msg;
452 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700453
454 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
455 ev.data.ptr = wakeup_fd;
456 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
457 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700458 if (err < 0 && errno != EEXIST) {
459 gpr_asprintf(&err_msg,
460 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
461 "error: %d (%s)",
462 pi->epoll_fd,
463 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
464 strerror(errno));
465 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
466 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700467 }
468}
469
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700470/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700471static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700472 bool remove_fd_refs,
473 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700474 int err;
475 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700476 char *err_msg;
477 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700478
479 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700480 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700481 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700482 gpr_asprintf(&err_msg,
483 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
484 "error: %d (%s)",
485 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
486 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
487 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700488 }
489
490 if (remove_fd_refs) {
491 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700492 }
493 }
494
495 pi->fd_cnt = 0;
496}
497
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700498/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700499static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700500 bool is_fd_closed,
501 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700502 int err;
503 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700504 char *err_msg;
505 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700506
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700507 /* If fd is already closed, then it would have been automatically been removed
508 from the epoll set */
509 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700510 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
511 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700512 gpr_asprintf(
513 &err_msg,
514 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
515 pi->epoll_fd, fd->fd, errno, strerror(errno));
516 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
517 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700518 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700519 }
520
521 for (i = 0; i < pi->fd_cnt; i++) {
522 if (pi->fds[i] == fd) {
523 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700524 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700525 break;
526 }
527 }
528}
529
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700530/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700531static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
532 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700533 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700534 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700535 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700536
Craig Tillerb39307d2016-06-30 15:39:13 -0700537 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700538
Craig Tillerb39307d2016-06-30 15:39:13 -0700539 pi = gpr_malloc(sizeof(*pi));
540 gpr_mu_init(&pi->mu);
541 pi->fd_cnt = 0;
542 pi->fd_capacity = 0;
543 pi->fds = NULL;
544 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700545
546 gpr_mu_init(&pi->workqueue_read_mu);
547 gpr_mpscq_init(&pi->workqueue_items);
548 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700549
Craig Tiller15007612016-07-06 09:36:16 -0700550 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700551 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700552 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700553
Craig Tillerd8a3c042016-09-09 12:42:37 -0700554 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
555 err_desc)) {
556 goto done;
557 }
558
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700559 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700560
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700561 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700562 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
563 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700564 }
565
Craig Tillerb39307d2016-06-30 15:39:13 -0700566 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700567 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700568
569 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700570 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700571 }
572
Craig Tillerb39307d2016-06-30 15:39:13 -0700573done:
574 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700575 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700576 pi = NULL;
577 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700578 return pi;
579}
580
Craig Tillerb39307d2016-06-30 15:39:13 -0700581static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700582 GPR_ASSERT(pi->fd_cnt == 0);
583
Craig Tiller0a06cd72016-07-14 13:21:24 -0700584 if (pi->epoll_fd >= 0) {
585 close(pi->epoll_fd);
586 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700587 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
588 gpr_mu_destroy(&pi->workqueue_read_mu);
589 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700590 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700591 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700592 gpr_free(pi->fds);
593 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700594}
595
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700596/* Attempts to gets the last polling island in the linked list (liked by the
597 * 'merged_to' field). Since this does not lock the polling island, there are no
598 * guarantees that the island returned is the last island */
599static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
600 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
601 while (next != NULL) {
602 pi = next;
603 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
604 }
605
606 return pi;
607}
608
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700609/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700610 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700611 returned polling island's mu.
612 Usage: To lock/unlock polling island "pi", do the following:
613 polling_island *pi_latest = polling_island_lock(pi);
614 ...
615 ... critical section ..
616 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700617 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
618static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700619 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700620
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700621 while (true) {
622 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
623 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700624 /* Looks like 'pi' is the last node in the linked list but unless we check
625 this by holding the pi->mu lock, we cannot be sure (i.e without the
626 pi->mu lock, we don't prevent island merges).
627 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700628 gpr_mu_lock(&pi->mu);
629 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
630 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700631 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700632 break;
633 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700634
635 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
636 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700637 gpr_mu_unlock(&pi->mu);
638 }
639
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700640 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700641 }
642
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700643 return pi;
644}
645
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700646/* Gets the lock on the *latest* polling islands in the linked lists pointed by
647 *p and *q (and also updates *p and *q to point to the latest polling islands)
648
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700649 This function is needed because calling the following block of code to obtain
650 locks on polling islands (*p and *q) is prone to deadlocks.
651 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700652 polling_island_lock(*p, true);
653 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700654 }
655
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700656 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700657 polling_island *p1;
658 polling_island *p2;
659 ..
660 polling_island_lock_pair(&p1, &p2);
661 ..
662 .. Critical section with both p1 and p2 locked
663 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700664 // Release locks: Always call polling_island_unlock_pair() to release locks
665 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700666*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700667static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700668 polling_island *pi_1 = *p;
669 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700670 polling_island *next_1 = NULL;
671 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700672
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700673 /* The algorithm is simple:
674 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
675 keep updating pi_1 and pi_2)
676 - Then obtain locks on the islands by following a lock order rule of
677 locking polling_island with lower address first
678 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
679 pointing to the same island. If that is the case, we can just call
680 polling_island_lock()
681 - After obtaining both the locks, double check that the polling islands
682 are still the last polling islands in their respective linked lists
683 (this is because there might have been polling island merges before
684 we got the lock)
685 - If the polling islands are the last islands, we are done. If not,
686 release the locks and continue the process from the first step */
687 while (true) {
688 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
689 while (next_1 != NULL) {
690 pi_1 = next_1;
691 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700692 }
693
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700694 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
695 while (next_2 != NULL) {
696 pi_2 = next_2;
697 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
698 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700699
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700700 if (pi_1 == pi_2) {
701 pi_1 = pi_2 = polling_island_lock(pi_1);
702 break;
703 }
704
705 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700706 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700707 gpr_mu_lock(&pi_2->mu);
708 } else {
709 gpr_mu_lock(&pi_2->mu);
710 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700711 }
712
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700713 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
714 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
715 if (next_1 == NULL && next_2 == NULL) {
716 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700717 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700718
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700719 gpr_mu_unlock(&pi_1->mu);
720 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700721 }
722
723 *p = pi_1;
724 *q = pi_2;
725}
726
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700727static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
728 if (p == q) {
729 gpr_mu_unlock(&p->mu);
730 } else {
731 gpr_mu_unlock(&p->mu);
732 gpr_mu_unlock(&q->mu);
733 }
734}
735
Craig Tillerd8a3c042016-09-09 12:42:37 -0700736static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700737 /* If this thread is the current poller, then it may be that it's about to
738 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700739 bool is_current_poller = (g_current_thread_polling_island == pi);
740 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
741 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700742 /* Only issue a wakeup if it's likely that some poller could come in and take
743 it right now. Note that since we do an anticipatory mpscq_pop every poll
744 loop, it's ok if we miss the wakeup here, as we'll get the work item when
745 the next poller enters anyway. */
746 if (current_pollers > min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700747 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
748 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
749 }
750}
751
752static void workqueue_move_items_to_parent(polling_island *q) {
753 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
754 if (p == NULL) {
755 return;
756 }
757 gpr_mu_lock(&q->workqueue_read_mu);
758 int num_added = 0;
759 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
760 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
761 if (n != NULL) {
762 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
763 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
764 gpr_mpscq_push(&p->workqueue_items, n);
765 num_added++;
766 }
767 }
768 gpr_mu_unlock(&q->workqueue_read_mu);
769 if (num_added > 0) {
770 workqueue_maybe_wakeup(p);
771 }
772 workqueue_move_items_to_parent(p);
773}
774
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700775static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700776 polling_island *q,
777 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700778 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700779 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700780
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700781 if (p != q) {
782 /* Make sure that p points to the polling island with fewer fds than q */
783 if (p->fd_cnt > q->fd_cnt) {
784 GPR_SWAP(polling_island *, p, q);
785 }
786
787 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
788 Note that the refcounts on the fds being moved will not change here.
789 This is why the last param in the following two functions is 'false') */
790 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
791 polling_island_remove_all_fds_locked(p, false, error);
792
793 /* Wakeup all the pollers (if any) on p so that they pickup this change */
794 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
795
796 /* Add the 'merged_to' link from p --> q */
797 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
798 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700799
800 workqueue_move_items_to_parent(q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700801 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700802 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700803
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700804 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700805
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700806 /* Return the merged polling island (Note that no merge would have happened
807 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700808 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700809}
810
Craig Tillerd8a3c042016-09-09 12:42:37 -0700811static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
812 grpc_workqueue *workqueue, grpc_closure *closure,
813 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700814 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700815 /* take a ref to the workqueue: otherwise it can happen that whatever events
816 * this kicks off ends up destroying the workqueue before this function
817 * completes */
818 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
819 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700820 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
821 closure->error_data.error = error;
822 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
823 if (last == 0) {
824 workqueue_maybe_wakeup(pi);
825 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700826 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700827 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
828 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700829}
830
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700831static grpc_error *polling_island_global_init() {
832 grpc_error *error = GRPC_ERROR_NONE;
833
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700834 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
835 if (error == GRPC_ERROR_NONE) {
836 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
837 }
838
839 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700840}
841
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700842static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700843 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700844}
845
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700846/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700847 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700848 */
849
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700850/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700851 * but instead so that implementations with multiple threads in (for example)
852 * epoll_wait deal with the race between pollset removal and incoming poll
853 * notifications.
854 *
855 * The problem is that the poller ultimately holds a reference to this
856 * object, so it is very difficult to know when is safe to free it, at least
857 * without some expensive synchronization.
858 *
859 * If we keep the object freelisted, in the worst case losing this race just
860 * becomes a spurious read notification on a reused fd.
861 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700862
863/* The alarm system needs to be able to wakeup 'some poller' sometimes
864 * (specifically when a new alarm needs to be triggered earlier than the next
865 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
866 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700867
868/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
869 * sure to wake up one polling thread (which can wake up other threads if
870 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700871grpc_wakeup_fd grpc_global_wakeup_fd;
872
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700873static grpc_fd *fd_freelist = NULL;
874static gpr_mu fd_freelist_mu;
875
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700876#ifdef GRPC_FD_REF_COUNT_DEBUG
877#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
878#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
879static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
880 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700881 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
882 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700883 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
884#else
885#define REF_BY(fd, n, reason) ref_by(fd, n)
886#define UNREF_BY(fd, n, reason) unref_by(fd, n)
887static void ref_by(grpc_fd *fd, int n) {
888#endif
889 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
890}
891
892#ifdef GRPC_FD_REF_COUNT_DEBUG
893static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
894 int line) {
895 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700896 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
897 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700898 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
899#else
900static void unref_by(grpc_fd *fd, int n) {
901 gpr_atm old;
902#endif
903 old = gpr_atm_full_fetch_add(&fd->refst, -n);
904 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700905 /* Add the fd to the freelist */
906 gpr_mu_lock(&fd_freelist_mu);
907 fd->freelist_next = fd_freelist;
908 fd_freelist = fd;
909 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700910
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700911 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700912 } else {
913 GPR_ASSERT(old > n);
914 }
915}
916
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700917/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700918#ifdef GRPC_FD_REF_COUNT_DEBUG
919static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
920 int line) {
921 ref_by(fd, 2, reason, file, line);
922}
923
924static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
925 int line) {
926 unref_by(fd, 2, reason, file, line);
927}
928#else
929static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700930static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
931#endif
932
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700933static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
934
935static void fd_global_shutdown(void) {
936 gpr_mu_lock(&fd_freelist_mu);
937 gpr_mu_unlock(&fd_freelist_mu);
938 while (fd_freelist != NULL) {
939 grpc_fd *fd = fd_freelist;
940 fd_freelist = fd_freelist->freelist_next;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800941 gpr_mu_destroy(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700942 gpr_free(fd);
943 }
944 gpr_mu_destroy(&fd_freelist_mu);
945}
946
947static grpc_fd *fd_create(int fd, const char *name) {
948 grpc_fd *new_fd = NULL;
949
950 gpr_mu_lock(&fd_freelist_mu);
951 if (fd_freelist != NULL) {
952 new_fd = fd_freelist;
953 fd_freelist = fd_freelist->freelist_next;
954 }
955 gpr_mu_unlock(&fd_freelist_mu);
956
957 if (new_fd == NULL) {
958 new_fd = gpr_malloc(sizeof(grpc_fd));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800959 gpr_mu_init(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700960 }
961
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800962 /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
963 * is a newly created fd (or an fd we got from the freelist), no one else
964 * would be holding a lock to it anyway. */
965 gpr_mu_lock(&new_fd->po.mu);
966 new_fd->po.pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700967
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700968 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700969 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700970 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700971 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700972 new_fd->read_closure = CLOSURE_NOT_READY;
973 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700974 new_fd->freelist_next = NULL;
975 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700976 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700977
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800978 gpr_mu_unlock(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700979
980 char *fd_name;
981 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
982 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700983#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700984 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700985#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700986 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700987 return new_fd;
988}
989
990static bool fd_is_orphaned(grpc_fd *fd) {
991 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
992}
993
994static int fd_wrapped_fd(grpc_fd *fd) {
995 int ret_fd = -1;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800996 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700997 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700998 ret_fd = fd->fd;
999 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001000 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001001
1002 return ret_fd;
1003}
1004
1005static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1006 grpc_closure *on_done, int *release_fd,
1007 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001008 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001009 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -07001010 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001011
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001012 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001013 fd->on_done_closure = on_done;
1014
1015 /* If release_fd is not NULL, we should be relinquishing control of the file
1016 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001017 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001018 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001019 } else {
1020 close(fd->fd);
1021 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001022 }
1023
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001024 fd->orphaned = true;
1025
1026 /* Remove the active status but keep referenced. We want this grpc_fd struct
1027 to be alive (and not added to freelist) until the end of this function */
1028 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001029
1030 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001031 - Get a lock on the latest polling island (i.e the last island in the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001032 linked list pointed by fd->po.pi). This is the island that
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001033 would actually contain the fd
1034 - Remove the fd from the latest polling island
1035 - Unlock the latest polling island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001036 - Set fd->po.pi to NULL (but remove the ref on the polling island
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001037 before doing this.) */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001038 if (fd->po.pi != NULL) {
1039 polling_island *pi_latest = polling_island_lock(fd->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001040 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001041 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001042
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001043 unref_pi = fd->po.pi;
1044 fd->po.pi = NULL;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001045 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001046
Yuchen Zenga0399f22016-08-04 17:52:53 -07001047 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
1048 NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001049
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001050 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001051 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001052 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001053 /* Unref stale polling island here, outside the fd lock above.
1054 The polling island owns a workqueue which owns an fd, and unreffing
1055 inside the lock can cause an eventual lock loop that makes TSAN very
1056 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001057 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1058 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001059 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001060 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001061}
1062
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001063static grpc_error *fd_shutdown_error(bool shutdown) {
1064 if (!shutdown) {
1065 return GRPC_ERROR_NONE;
1066 } else {
1067 return GRPC_ERROR_CREATE("FD shutdown");
1068 }
1069}
1070
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001071static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1072 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001073 if (fd->shutdown) {
1074 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
1075 NULL);
1076 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001077 /* not ready ==> switch to a waiting state by setting the closure */
1078 *st = closure;
1079 } else if (*st == CLOSURE_READY) {
1080 /* already ready ==> queue the closure to run immediately */
1081 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001082 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
1083 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001084 } else {
1085 /* upcallptr was set to a different closure. This is an error! */
1086 gpr_log(GPR_ERROR,
1087 "User called a notify_on function with a previous callback still "
1088 "pending");
1089 abort();
1090 }
1091}
1092
1093/* returns 1 if state becomes not ready */
1094static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1095 grpc_closure **st) {
1096 if (*st == CLOSURE_READY) {
1097 /* duplicate ready ==> ignore */
1098 return 0;
1099 } else if (*st == CLOSURE_NOT_READY) {
1100 /* not ready, and not waiting ==> flag ready */
1101 *st = CLOSURE_READY;
1102 return 0;
1103 } else {
1104 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001105 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001106 *st = CLOSURE_NOT_READY;
1107 return 1;
1108 }
1109}
1110
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001111static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1112 grpc_fd *fd) {
1113 grpc_pollset *notifier = NULL;
1114
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001115 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001116 notifier = fd->read_notifier_pollset;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001117 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001118
1119 return notifier;
1120}
1121
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001122static bool fd_is_shutdown(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001123 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001124 const bool r = fd->shutdown;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001125 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001126 return r;
1127}
1128
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001129/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001130static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001131 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001132 /* Do the actual shutdown only once */
1133 if (!fd->shutdown) {
1134 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001135
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001136 shutdown(fd->fd, SHUT_RDWR);
1137 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1138 at this point, the closures would be called with 'success = false' */
1139 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1140 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1141 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001142 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001143}
1144
1145static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1146 grpc_closure *closure) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001147 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001148 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001149 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001150}
1151
1152static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1153 grpc_closure *closure) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001154 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001155 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001156 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001157}
1158
Craig Tillerd6ba6192016-06-30 15:42:41 -07001159static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001160 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001161 grpc_workqueue *workqueue =
1162 GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001163 gpr_mu_unlock(&fd->po.mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001164 return workqueue;
1165}
Craig Tiller70bd4832016-06-30 14:20:46 -07001166
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001167/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001168 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001169 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001170GPR_TLS_DECL(g_current_thread_pollset);
1171GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001172static __thread bool g_initialized_sigmask;
1173static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001174
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001175static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001176#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001177 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001178#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001179}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001180
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001181static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001182
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001183/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001184static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001185 gpr_tls_init(&g_current_thread_pollset);
1186 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001187 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001188 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001189}
1190
1191static void pollset_global_shutdown(void) {
1192 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001193 gpr_tls_destroy(&g_current_thread_pollset);
1194 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001195}
1196
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001197static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1198 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001199
1200 /* Kick the worker only if it was not already kicked */
1201 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1202 GRPC_POLLING_TRACE(
1203 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1204 (void *)worker, worker->pt_id);
1205 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1206 if (err_num != 0) {
1207 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1208 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001209 }
1210 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001211}
1212
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001213/* Return 1 if the pollset has active threads in pollset_work (pollset must
1214 * be locked) */
1215static int pollset_has_workers(grpc_pollset *p) {
1216 return p->root_worker.next != &p->root_worker;
1217}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001218
1219static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1220 worker->prev->next = worker->next;
1221 worker->next->prev = worker->prev;
1222}
1223
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001224static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1225 if (pollset_has_workers(p)) {
1226 grpc_pollset_worker *w = p->root_worker.next;
1227 remove_worker(p, w);
1228 return w;
1229 } else {
1230 return NULL;
1231 }
1232}
1233
1234static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1235 worker->next = &p->root_worker;
1236 worker->prev = worker->next->prev;
1237 worker->prev->next = worker->next->prev = worker;
1238}
1239
1240static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1241 worker->prev = &p->root_worker;
1242 worker->next = worker->prev->next;
1243 worker->prev->next = worker->next->prev = worker;
1244}
1245
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001246/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001247static grpc_error *pollset_kick(grpc_pollset *p,
1248 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001249 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001250 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001251 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001252 grpc_pollset_worker *worker = specific_worker;
1253 if (worker != NULL) {
1254 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001255 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001256 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001257 for (worker = p->root_worker.next; worker != &p->root_worker;
1258 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001259 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001260 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001261 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001262 }
Craig Tillera218a062016-06-26 09:58:37 -07001263 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001264 } else {
1265 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001266 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001267 } else {
1268 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001269 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001270 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001271 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001272 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001273 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1274 /* Since worker == NULL, it means that we can kick "any" worker on this
1275 pollset 'p'. If 'p' happens to be the same pollset this thread is
1276 currently polling (i.e in pollset_work() function), then there is no need
1277 to kick any other worker since the current thread can just absorb the
1278 kick. This is the reason why we enter this case only when
1279 g_current_thread_pollset is != p */
1280
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001281 GPR_TIMER_MARK("kick_anonymous", 0);
1282 worker = pop_front_worker(p);
1283 if (worker != NULL) {
1284 GPR_TIMER_MARK("finally_kick", 0);
1285 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001286 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001287 } else {
1288 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001289 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001290 }
1291 }
1292
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001293 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001294 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1295 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001296}
1297
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001298static grpc_error *kick_poller(void) {
1299 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1300}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001301
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001302static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001303 gpr_mu_init(&pollset->po.mu);
1304 *mu = &pollset->po.mu;
1305 pollset->po.pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001306
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001307 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001308 pollset->kicked_without_pollers = false;
1309
1310 pollset->shutting_down = false;
1311 pollset->finish_shutdown_called = false;
1312 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001313}
1314
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001315/* Convert a timespec to milliseconds:
1316 - Very small or negative poll times are clamped to zero to do a non-blocking
1317 poll (which becomes spin polling)
1318 - Other small values are rounded up to one millisecond
1319 - Longer than a millisecond polls are rounded up to the next nearest
1320 millisecond to avoid spinning
1321 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001322static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1323 gpr_timespec now) {
1324 gpr_timespec timeout;
1325 static const int64_t max_spin_polling_us = 10;
1326 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1327 return -1;
1328 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001329
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001330 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1331 max_spin_polling_us,
1332 GPR_TIMESPAN))) <= 0) {
1333 return 0;
1334 }
1335 timeout = gpr_time_sub(deadline, now);
1336 return gpr_time_to_millis(gpr_time_add(
1337 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1338}
1339
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001340static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1341 grpc_pollset *notifier) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001342 /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
1343 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001344 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1345 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001346 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001347}
1348
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001349static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001350 /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
1351 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001352 set_ready_locked(exec_ctx, fd, &fd->write_closure);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001353 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001354}
1355
Craig Tillerb39307d2016-06-30 15:39:13 -07001356static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1357 grpc_pollset *ps, char *reason) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001358 if (ps->po.pi != NULL) {
1359 PI_UNREF(exec_ctx, ps->po.pi, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001360 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001361 ps->po.pi = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001362}
1363
1364static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1365 grpc_pollset *pollset) {
1366 /* The pollset cannot have any workers if we are at this stage */
1367 GPR_ASSERT(!pollset_has_workers(pollset));
1368
1369 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001370
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001371 /* Release the ref and set pollset->po.pi to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001372 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001373 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001374}
1375
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001376/* pollset->po.mu lock must be held by the caller before calling this */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001377static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1378 grpc_closure *closure) {
1379 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1380 GPR_ASSERT(!pollset->shutting_down);
1381 pollset->shutting_down = true;
1382 pollset->shutdown_done = closure;
1383 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1384
1385 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1386 because it would release the underlying polling island. In such a case, we
1387 let the last worker call finish_shutdown_locked() from pollset_work() */
1388 if (!pollset_has_workers(pollset)) {
1389 GPR_ASSERT(!pollset->finish_shutdown_called);
1390 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1391 finish_shutdown_locked(exec_ctx, pollset);
1392 }
1393 GPR_TIMER_END("pollset_shutdown", 0);
1394}
1395
1396/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1397 * than destroying the mutexes, there is nothing special that needs to be done
1398 * here */
1399static void pollset_destroy(grpc_pollset *pollset) {
1400 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001401 gpr_mu_destroy(&pollset->po.mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001402}
1403
Craig Tiller2b49ea92016-07-01 13:21:27 -07001404static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001405 GPR_ASSERT(pollset->shutting_down);
1406 GPR_ASSERT(!pollset_has_workers(pollset));
1407 pollset->shutting_down = false;
1408 pollset->finish_shutdown_called = false;
1409 pollset->kicked_without_pollers = false;
1410 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001411 GPR_ASSERT(pollset->po.pi == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001412}
1413
Craig Tillerd8a3c042016-09-09 12:42:37 -07001414static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1415 polling_island *pi) {
1416 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1417 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1418 gpr_mu_unlock(&pi->workqueue_read_mu);
1419 if (n != NULL) {
1420 if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
1421 workqueue_maybe_wakeup(pi);
1422 }
1423 grpc_closure *c = (grpc_closure *)n;
1424 grpc_closure_run(exec_ctx, c, c->error_data.error);
1425 return true;
1426 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
Craig Tiller460502e2016-10-13 10:02:08 -07001427 /* n == NULL might mean there's work but it's not available to be popped
1428 * yet - try to ensure another workqueue wakes up to check shortly if so
1429 */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001430 workqueue_maybe_wakeup(pi);
1431 }
1432 }
1433 return false;
1434}
1435
Craig Tiller84ea3412016-09-08 14:57:56 -07001436#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001437/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1438static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001439 grpc_pollset *pollset,
1440 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001441 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001442 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001443 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001444 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001445 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001446 char *err_msg;
1447 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001448 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1449
1450 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001451 latest polling island pointed by pollset->po.pi
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001452
1453 Since epoll_fd is immutable, we can read it without obtaining the polling
1454 island lock. There is however a possibility that the polling island (from
1455 which we got the epoll_fd) got merged with another island while we are
1456 in this function. This is still okay because in such a case, we will wakeup
1457 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001458 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001459
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001460 if (pollset->po.pi == NULL) {
1461 pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
1462 if (pollset->po.pi == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001463 GPR_TIMER_END("pollset_work_and_unlock", 0);
1464 return; /* Fatal error. We cannot continue */
1465 }
1466
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001467 PI_ADD_REF(pollset->po.pi, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001468 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001469 (void *)pollset, (void *)pollset->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001470 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001471
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001472 pi = polling_island_maybe_get_latest(pollset->po.pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001473 epoll_fd = pi->epoll_fd;
1474
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001475 /* Update the pollset->po.pi since the island being pointed by
1476 pollset->po.pi maybe older than the one pointed by pi) */
1477 if (pollset->po.pi != pi) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001478 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1479 polling island to be deleted */
1480 PI_ADD_REF(pi, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001481 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
1482 pollset->po.pi = pi;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001483 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001484
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001485 /* Add an extra ref so that the island does not get destroyed (which means
1486 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1487 epoll_fd */
1488 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001489 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001490
Craig Tiller460502e2016-10-13 10:02:08 -07001491 /* If we get some workqueue work to do, it might end up completing an item on
1492 the completion queue, so there's no need to poll... so we skip that and
1493 redo the complete loop to verify */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001494 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
1495 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1496 g_current_thread_polling_island = pi;
1497
Vijay Paicef54012016-08-28 23:05:31 -07001498 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001499 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1500 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001501 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001502 if (ep_rv < 0) {
1503 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001504 gpr_asprintf(&err_msg,
1505 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1506 epoll_fd, errno, strerror(errno));
1507 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001508 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001509 /* We were interrupted. Save an interation by doing a zero timeout
1510 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001511 GRPC_POLLING_TRACE(
1512 "pollset_work: pollset: %p, worker: %p received kick",
1513 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001514 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001515 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001516 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001517
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001518#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001519 /* See the definition of g_poll_sync for more details */
1520 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001521#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001522
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001523 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001524 void *data_ptr = ep_ev[i].data.ptr;
1525 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001526 append_error(error,
1527 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1528 err_desc);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001529 } else if (data_ptr == &pi->workqueue_wakeup_fd) {
1530 append_error(error,
1531 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1532 err_desc);
1533 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001534 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001535 GRPC_POLLING_TRACE(
1536 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1537 "%d) got merged",
1538 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001539 /* This means that our polling island is merged with a different
1540 island. We do not have to do anything here since the subsequent call
1541 to the function pollset_work_and_unlock() will pick up the correct
1542 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001543 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001544 grpc_fd *fd = data_ptr;
1545 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1546 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1547 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001548 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001549 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001550 }
1551 if (write_ev || cancel) {
1552 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001553 }
1554 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001555 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001556
1557 g_current_thread_polling_island = NULL;
1558 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1559 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001560
1561 GPR_ASSERT(pi != NULL);
1562
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001563 /* Before leaving, release the extra ref we added to the polling island. It
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001564 is important to use "pi" here (i.e our old copy of pollset->po.pi
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001565 that we got before releasing the polling island lock). This is because
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001566 pollset->po.pi pointer might get udpated in other parts of the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001567 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001568 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001569
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001570 GPR_TIMER_END("pollset_work_and_unlock", 0);
1571}
1572
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001573/* pollset->po.mu lock must be held by the caller before calling this.
1574 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001575 during the course of its execution but it will always re-acquire the lock and
1576 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001577static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1578 grpc_pollset_worker **worker_hdl,
1579 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001580 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001581 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001582 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1583
1584 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001585
1586 grpc_pollset_worker worker;
1587 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001588 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001589 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001590
1591 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001592
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001593 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1594 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001595
1596 if (pollset->kicked_without_pollers) {
1597 /* If the pollset was kicked without pollers, pretend that the current
1598 worker got the kick and skip polling. A kick indicates that there is some
1599 work that needs attention like an event on the completion queue or an
1600 alarm */
1601 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1602 pollset->kicked_without_pollers = 0;
1603 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001604 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001605 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1606 worker that there is some pending work that needs immediate attention
1607 (like an event on the completion queue, or a polling island merge that
1608 results in a new epoll-fd to wait on) and that the worker should not
1609 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001610
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001611 A worker can be kicked anytime from the point it is added to the pollset
1612 via push_front_worker() (or push_back_worker()) to the point it is
1613 removed via remove_worker().
1614 If the worker is kicked before/during it calls epoll_pwait(), it should
1615 immediately exit from epoll_wait(). If the worker is kicked after it
1616 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001617
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001618 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001619 times *except* when it is in epoll_pwait(). This way, the worker never
1620 misses acting on a kick */
1621
Craig Tiller19196992016-06-27 18:45:56 -07001622 if (!g_initialized_sigmask) {
1623 sigemptyset(&new_mask);
1624 sigaddset(&new_mask, grpc_wakeup_signal);
1625 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1626 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1627 g_initialized_sigmask = true;
1628 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1629 This is the mask used at all times *except during
1630 epoll_wait()*"
1631 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001632 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001633
Craig Tiller19196992016-06-27 18:45:56 -07001634 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001635 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001636 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001637
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001638 push_front_worker(pollset, &worker); /* Add worker to pollset */
1639
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001640 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1641 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001642 grpc_exec_ctx_flush(exec_ctx);
1643
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001644 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001645
1646 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1647 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001648 remove_worker(pollset, &worker);
1649 }
1650
1651 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1652 false at this point) and the pollset is shutting down, we may have to
1653 finish the shutdown process by calling finish_shutdown_locked().
1654 See pollset_shutdown() for more details.
1655
1656 Note: Continuing to access pollset here is safe; it is the caller's
1657 responsibility to not destroy a pollset when it has outstanding calls to
1658 pollset_work() */
1659 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1660 !pollset->finish_shutdown_called) {
1661 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1662 finish_shutdown_locked(exec_ctx, pollset);
1663
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001664 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001665 grpc_exec_ctx_flush(exec_ctx);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001666 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001667 }
1668
1669 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001670
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001671 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1672 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001673
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001674 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001675
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001676 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1677 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001678}
1679
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001680static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
1681 poll_obj *item, poll_obj_type bag_type,
1682 poll_obj_type item_type) {
1683 GPR_TIMER_BEGIN("add_poll_object", 0);
1684
1685 grpc_error *error = GRPC_ERROR_NONE;
1686 polling_island *pi_new = NULL;
1687
1688 gpr_mu_lock(&bag->mu);
1689 gpr_mu_lock(&item->mu);
1690
1691retry:
1692 /*
1693 * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1694 * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1695 * a refcount of 2) and point item->pi and bag->pi to the new island
1696 * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1697 * the other's non-NULL pi
1698 * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1699 * polling islands and update item->pi and bag->pi to point to the new
1700 * island
1701 */
1702
1703 /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1704 * orphaned */
1705 if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1706 gpr_mu_unlock(&item->mu);
1707 gpr_mu_unlock(&bag->mu);
1708 return;
1709 }
1710
1711 if (item->pi == bag->pi) {
1712 pi_new = item->pi;
1713 if (pi_new == NULL) {
1714 /* GPR_ASSERT(item->pi == bag->pi == NULL) */
1715
1716 /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1717 * we need to do some extra work to make TSAN happy */
1718 if (item_type == POLL_OBJ_FD) {
1719 /* Unlock before creating a new polling island: the polling island will
1720 create a workqueue which creates a file descriptor, and holding an fd
1721 lock here can eventually cause a loop to appear to TSAN (making it
1722 unhappy). We don't think it's a real loop (there's an epoch point
1723 where that loop possibility disappears), but the advantages of
1724 keeping TSAN happy outweigh any performance advantage we might have
1725 by keeping the lock held. */
1726 gpr_mu_unlock(&item->mu);
1727 pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
1728 gpr_mu_lock(&item->mu);
1729
1730 /* Need to reverify any assumptions made between the initial lock and
1731 getting to this branch: if they've changed, we need to throw away our
1732 work and figure things out again. */
1733 if (item->pi != NULL) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001734 GRPC_POLLING_TRACE(
1735 "add_poll_object: Raced creating new polling island. pi_new: %p "
1736 "(fd: %d, %s: %p)",
1737 (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1738 (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001739 /* No need to lock 'pi_new' here since this is a new polling island
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001740 * and no one has a reference to it yet */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001741 polling_island_remove_all_fds_locked(pi_new, true, &error);
1742
1743 /* Ref and unref so that the polling island gets deleted during unref
1744 */
1745 PI_ADD_REF(pi_new, "dance_of_destruction");
1746 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
1747 goto retry;
1748 }
1749 } else {
1750 pi_new = polling_island_create(exec_ctx, NULL, &error);
1751 }
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001752
1753 GRPC_POLLING_TRACE(
1754 "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1755 "%s: %p)",
1756 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1757 poll_obj_string(bag_type), (void *)bag);
1758 } else {
1759 GRPC_POLLING_TRACE(
1760 "add_poll_object: Same polling island. pi: %p (%s, %s)",
1761 (void *)pi_new, poll_obj_string(item_type),
1762 poll_obj_string(bag_type));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001763 }
1764 } else if (item->pi == NULL) {
1765 /* GPR_ASSERT(bag->pi != NULL) */
1766 /* Make pi_new point to latest pi*/
1767 pi_new = polling_island_lock(bag->pi);
1768
1769 if (item_type == POLL_OBJ_FD) {
1770 grpc_fd *fd = FD_FROM_PO(item);
1771 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1772 }
1773
1774 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001775 GRPC_POLLING_TRACE(
1776 "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1777 "bag(%s): %p)",
1778 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1779 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001780 } else if (bag->pi == NULL) {
1781 /* GPR_ASSERT(item->pi != NULL) */
1782 /* Make pi_new to point to latest pi */
1783 pi_new = polling_island_lock(item->pi);
1784 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001785 GRPC_POLLING_TRACE(
1786 "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1787 "bag(%s): %p)",
1788 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1789 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001790 } else {
1791 pi_new = polling_island_merge(item->pi, bag->pi, &error);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001792 GRPC_POLLING_TRACE(
1793 "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1794 "bag(%s): %p)",
1795 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1796 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001797 }
1798
1799 /* At this point, pi_new is the polling island that both item->pi and bag->pi
1800 MUST be pointing to */
1801
1802 if (item->pi != pi_new) {
1803 PI_ADD_REF(pi_new, poll_obj_string(item_type));
1804 if (item->pi != NULL) {
1805 PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
1806 }
1807 item->pi = pi_new;
1808 }
1809
1810 if (bag->pi != pi_new) {
1811 PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1812 if (bag->pi != NULL) {
1813 PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
1814 }
1815 bag->pi = pi_new;
1816 }
1817
1818 gpr_mu_unlock(&item->mu);
1819 gpr_mu_unlock(&bag->mu);
1820
1821 GRPC_LOG_IF_ERROR("add_poll_object", error);
1822 GPR_TIMER_END("add_poll_object", 0);
1823}
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001824
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001825static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1826 grpc_fd *fd) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001827 add_poll_object(exec_ctx, &pollset->po, &fd->po, POLL_OBJ_POLLSET,
1828 POLL_OBJ_FD);
1829}
1830
1831#if 0
1832static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1833 grpc_fd *fd) {
Craig Tiller57726ca2016-09-12 11:59:45 -07001834 GPR_TIMER_BEGIN("pollset_add_fd", 0);
1835
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001836 grpc_error *error = GRPC_ERROR_NONE;
1837
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001838 gpr_mu_lock(&pollset->po.mu);
1839 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001840
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001841 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001842
Craig Tiller7212c232016-07-06 13:11:09 -07001843retry:
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001844 /* 1) If fd->po.pi and pollset->po.pi are both non-NULL and
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001845 * equal, do nothing.
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001846 * 2) If fd->po.pi and pollset->po.pi are both NULL, create
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001847 * a new polling island (with a refcount of 2) and make the polling_island
1848 * fields in both fd and pollset to point to the new island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001849 * 3) If one of fd->po.pi or pollset->po.pi is NULL, update
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001850 * the NULL polling_island field to point to the non-NULL polling_island
1851 * field (ensure that the refcount on the polling island is incremented by
1852 * 1 to account for the newly added reference)
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001853 * 4) Finally, if fd->po.pi and pollset->po.pi are non-NULL
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001854 * and different, merge both the polling islands and update the
1855 * polling_island fields in both fd and pollset to point to the merged
1856 * polling island.
1857 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001858
Craig Tiller42ac6db2016-07-06 17:13:56 -07001859 if (fd->orphaned) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001860 gpr_mu_unlock(&fd->po.mu);
1861 gpr_mu_unlock(&pollset->po.mu);
Craig Tiller42ac6db2016-07-06 17:13:56 -07001862 /* early out */
1863 return;
1864 }
1865
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001866 if (fd->po.pi == pollset->po.pi) {
1867 pi_new = fd->po.pi;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001868 if (pi_new == NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001869 /* Unlock before creating a new polling island: the polling island will
1870 create a workqueue which creates a file descriptor, and holding an fd
1871 lock here can eventually cause a loop to appear to TSAN (making it
1872 unhappy). We don't think it's a real loop (there's an epoch point where
1873 that loop possibility disappears), but the advantages of keeping TSAN
1874 happy outweigh any performance advantage we might have by keeping the
1875 lock held. */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001876 gpr_mu_unlock(&fd->po.mu);
Craig Tillerb39307d2016-06-30 15:39:13 -07001877 pi_new = polling_island_create(exec_ctx, fd, &error);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001878 gpr_mu_lock(&fd->po.mu);
Craig Tiller0a06cd72016-07-14 13:21:24 -07001879 /* Need to reverify any assumptions made between the initial lock and
1880 getting to this branch: if they've changed, we need to throw away our
1881 work and figure things out again. */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001882 if (fd->po.pi != NULL) {
Craig Tiller27da6422016-07-06 13:14:46 -07001883 GRPC_POLLING_TRACE(
1884 "pollset_add_fd: Raced creating new polling island. pi_new: %p "
1885 "(fd: %d, pollset: %p)",
1886 (void *)pi_new, fd->fd, (void *)pollset);
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001887
1888 /* No need to lock 'pi_new' here since this is a new polling island and
Sree Kuchibhotla485a9022016-10-26 16:46:55 -07001889 * no one has a reference to it yet */
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001890 polling_island_remove_all_fds_locked(pi_new, true, &error);
1891
1892 /* Ref and unref so that the polling island gets deleted during unref */
Craig Tiller27da6422016-07-06 13:14:46 -07001893 PI_ADD_REF(pi_new, "dance_of_destruction");
1894 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
Craig Tiller7212c232016-07-06 13:11:09 -07001895 goto retry;
Craig Tiller27da6422016-07-06 13:14:46 -07001896 } else {
1897 GRPC_POLLING_TRACE(
1898 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
1899 "pollset: %p)",
1900 (void *)pi_new, fd->fd, (void *)pollset);
Craig Tiller7212c232016-07-06 13:11:09 -07001901 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001902 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001903 } else if (fd->po.pi == NULL) {
1904 pi_new = polling_island_lock(pollset->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001905 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001906 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001907
1908 GRPC_POLLING_TRACE(
1909 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1910 "pollset->pi: %p)",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001911 (void *)pi_new, fd->fd, (void *)pollset, (void *)pollset->po.pi);
1912 } else if (pollset->po.pi == NULL) {
1913 pi_new = polling_island_lock(fd->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001914 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001915
1916 GRPC_POLLING_TRACE(
1917 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1918 "%p, fd->pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001919 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->po.pi);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001920 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001921 pi_new = polling_island_merge(fd->po.pi, pollset->po.pi, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001922 GRPC_POLLING_TRACE(
1923 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1924 "%p, fd->pi: %p, pollset->pi: %p)",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001925 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->po.pi,
1926 (void *)pollset->po.pi);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001927 }
1928
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001929 /* At this point, pi_new is the polling island that both fd->po.pi
1930 and pollset->po.pi must be pointing to */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001931
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001932 if (fd->po.pi != pi_new) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001933 PI_ADD_REF(pi_new, "fd");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001934 if (fd->po.pi != NULL) {
1935 PI_UNREF(exec_ctx, fd->po.pi, "fd");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001936 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001937 fd->po.pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001938 }
1939
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001940 if (pollset->po.pi != pi_new) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001941 PI_ADD_REF(pi_new, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001942 if (pollset->po.pi != NULL) {
1943 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001944 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001945 pollset->po.pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001946 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001947
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001948 gpr_mu_unlock(&fd->po.mu);
1949 gpr_mu_unlock(&pollset->po.mu);
Craig Tiller15007612016-07-06 09:36:16 -07001950
1951 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001952}
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001953#endif
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001954
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001955/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001956 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001957 */
1958
1959static grpc_pollset_set *pollset_set_create(void) {
1960 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1961 memset(pollset_set, 0, sizeof(*pollset_set));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001962 gpr_mu_init(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001963 return pollset_set;
1964}
1965
1966static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1967 size_t i;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001968 gpr_mu_destroy(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001969 for (i = 0; i < pollset_set->fd_count; i++) {
1970 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1971 }
1972 gpr_free(pollset_set->pollsets);
1973 gpr_free(pollset_set->pollset_sets);
1974 gpr_free(pollset_set->fds);
1975 gpr_free(pollset_set);
1976}
1977
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001978static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1979 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1980 size_t i;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001981 gpr_mu_lock(&pollset_set->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001982 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1983 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1984 pollset_set->fds = gpr_realloc(
1985 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1986 }
1987 GRPC_FD_REF(fd, "pollset_set");
1988 pollset_set->fds[pollset_set->fd_count++] = fd;
1989 for (i = 0; i < pollset_set->pollset_count; i++) {
1990 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1991 }
1992 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1993 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1994 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001995 gpr_mu_unlock(&pollset_set->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001996}
1997
1998static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1999 grpc_pollset_set *pollset_set, grpc_fd *fd) {
2000 size_t i;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002001 gpr_mu_lock(&pollset_set->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002002 for (i = 0; i < pollset_set->fd_count; i++) {
2003 if (pollset_set->fds[i] == fd) {
2004 pollset_set->fd_count--;
2005 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
2006 pollset_set->fds[pollset_set->fd_count]);
2007 GRPC_FD_UNREF(fd, "pollset_set");
2008 break;
2009 }
2010 }
2011 for (i = 0; i < pollset_set->pollset_set_count; i++) {
2012 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
2013 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002014 gpr_mu_unlock(&pollset_set->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002015}
2016
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002017static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
2018 grpc_pollset_set *pollset_set,
2019 grpc_pollset *pollset) {
2020 size_t i, j;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002021 gpr_mu_lock(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002022 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
2023 pollset_set->pollset_capacity =
2024 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
2025 pollset_set->pollsets =
2026 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
2027 sizeof(*pollset_set->pollsets));
2028 }
2029 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
2030 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
2031 if (fd_is_orphaned(pollset_set->fds[i])) {
2032 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
2033 } else {
2034 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
2035 pollset_set->fds[j++] = pollset_set->fds[i];
2036 }
2037 }
2038 pollset_set->fd_count = j;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002039 gpr_mu_unlock(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002040}
2041
2042static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
2043 grpc_pollset_set *pollset_set,
2044 grpc_pollset *pollset) {
2045 size_t i;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002046 gpr_mu_lock(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002047 for (i = 0; i < pollset_set->pollset_count; i++) {
2048 if (pollset_set->pollsets[i] == pollset) {
2049 pollset_set->pollset_count--;
2050 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
2051 pollset_set->pollsets[pollset_set->pollset_count]);
2052 break;
2053 }
2054 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002055 gpr_mu_unlock(&pollset_set->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002056}
2057
2058static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
2059 grpc_pollset_set *bag,
2060 grpc_pollset_set *item) {
2061 size_t i, j;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002062 gpr_mu_lock(&bag->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002063 if (bag->pollset_set_count == bag->pollset_set_capacity) {
2064 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
2065 bag->pollset_sets =
2066 gpr_realloc(bag->pollset_sets,
2067 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
2068 }
2069 bag->pollset_sets[bag->pollset_set_count++] = item;
2070 for (i = 0, j = 0; i < bag->fd_count; i++) {
2071 if (fd_is_orphaned(bag->fds[i])) {
2072 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
2073 } else {
2074 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
2075 bag->fds[j++] = bag->fds[i];
2076 }
2077 }
2078 bag->fd_count = j;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002079 gpr_mu_unlock(&bag->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002080}
2081
2082static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
2083 grpc_pollset_set *bag,
2084 grpc_pollset_set *item) {
2085 size_t i;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002086 gpr_mu_lock(&bag->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002087 for (i = 0; i < bag->pollset_set_count; i++) {
2088 if (bag->pollset_sets[i] == item) {
2089 bag->pollset_set_count--;
2090 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
2091 bag->pollset_sets[bag->pollset_set_count]);
2092 break;
2093 }
2094 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002095 gpr_mu_unlock(&bag->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002096}
2097
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002098/* Test helper functions
2099 * */
2100void *grpc_fd_get_polling_island(grpc_fd *fd) {
2101 polling_island *pi;
2102
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002103 gpr_mu_lock(&fd->po.mu);
2104 pi = fd->po.pi;
2105 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002106
2107 return pi;
2108}
2109
2110void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
2111 polling_island *pi;
2112
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002113 gpr_mu_lock(&ps->po.mu);
2114 pi = ps->po.pi;
2115 gpr_mu_unlock(&ps->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002116
2117 return pi;
2118}
2119
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002120bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002121 polling_island *p1 = p;
2122 polling_island *p2 = q;
2123
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07002124 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
2125 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002126 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07002127 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002128
2129 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002130}
2131
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002132/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002133 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002134 */
2135
2136static void shutdown_engine(void) {
2137 fd_global_shutdown();
2138 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07002139 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002140}
2141
2142static const grpc_event_engine_vtable vtable = {
2143 .pollset_size = sizeof(grpc_pollset),
2144
2145 .fd_create = fd_create,
2146 .fd_wrapped_fd = fd_wrapped_fd,
2147 .fd_orphan = fd_orphan,
2148 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07002149 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002150 .fd_notify_on_read = fd_notify_on_read,
2151 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002152 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07002153 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002154
2155 .pollset_init = pollset_init,
2156 .pollset_shutdown = pollset_shutdown,
2157 .pollset_reset = pollset_reset,
2158 .pollset_destroy = pollset_destroy,
2159 .pollset_work = pollset_work,
2160 .pollset_kick = pollset_kick,
2161 .pollset_add_fd = pollset_add_fd,
2162
2163 .pollset_set_create = pollset_set_create,
2164 .pollset_set_destroy = pollset_set_destroy,
2165 .pollset_set_add_pollset = pollset_set_add_pollset,
2166 .pollset_set_del_pollset = pollset_set_del_pollset,
2167 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
2168 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
2169 .pollset_set_add_fd = pollset_set_add_fd,
2170 .pollset_set_del_fd = pollset_set_del_fd,
2171
2172 .kick_poller = kick_poller,
2173
Craig Tillerd8a3c042016-09-09 12:42:37 -07002174 .workqueue_ref = workqueue_ref,
2175 .workqueue_unref = workqueue_unref,
2176 .workqueue_enqueue = workqueue_enqueue,
2177
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002178 .shutdown_engine = shutdown_engine,
2179};
2180
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002181/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
2182 * Create a dummy epoll_fd to make sure epoll support is available */
2183static bool is_epoll_available() {
2184 int fd = epoll_create1(EPOLL_CLOEXEC);
2185 if (fd < 0) {
2186 gpr_log(
2187 GPR_ERROR,
2188 "epoll_create1 failed with error: %d. Not using epoll polling engine",
2189 fd);
2190 return false;
2191 }
2192 close(fd);
2193 return true;
2194}
2195
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002196const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002197 /* If use of signals is disabled, we cannot use epoll engine*/
2198 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
2199 return NULL;
2200 }
2201
Ken Paysoncd7d0472016-10-11 12:24:20 -07002202 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07002203 return NULL;
2204 }
2205
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002206 if (!is_epoll_available()) {
2207 return NULL;
2208 }
2209
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002210 if (!is_grpc_wakeup_signal_initialized) {
Sree Kuchibhotlabd48c912016-09-27 16:48:25 -07002211 grpc_use_signal(SIGRTMIN + 6);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002212 }
2213
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002214 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07002215
2216 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
2217 return NULL;
2218 }
2219
2220 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
2221 polling_island_global_init())) {
2222 return NULL;
2223 }
2224
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002225 return &vtable;
2226}
2227
murgatroid99623dd4f2016-08-08 17:31:27 -07002228#else /* defined(GRPC_LINUX_EPOLL) */
2229#if defined(GRPC_POSIX_SOCKET)
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002230#include "src/core/lib/iomgr/ev_posix.h"
murgatroid99623dd4f2016-08-08 17:31:27 -07002231/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002232 * NULL */
2233const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
murgatroid99623dd4f2016-08-08 17:31:27 -07002234#endif /* defined(GRPC_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002235
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002236void grpc_use_signal(int signum) {}
murgatroid99623dd4f2016-08-08 17:31:27 -07002237#endif /* !defined(GRPC_LINUX_EPOLL) */