blob: cc7beecba88813a69fb6bd9abac57b8aa89e491c [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid9954070892016-08-08 17:01:18 -070034#include "src/core/lib/iomgr/port.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070036/* This polling engine is only relevant on linux kernels supporting epoll() */
murgatroid99623dd4f2016-08-08 17:31:27 -070037#ifdef GRPC_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070044#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070045#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070060#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070061#include "src/core/lib/profiling/timers.h"
62#include "src/core/lib/support/block_annotate.h"
63
Sree Kuchibhotla34217242016-06-29 00:19:07 -070064/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
65static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070066#define GRPC_POLLING_TRACE(fmt, ...) \
67 if (grpc_polling_trace) { \
68 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
69 }
70
Sree Kuchibhotla82d73412017-02-09 18:27:45 -080071/* Uncomment the following to enable extra checks on poll_object operations */
Sree Kuchibhotlae6f516e2016-12-08 12:20:23 -080072/* #define PO_DEBUG */
73
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070074static int grpc_wakeup_signal = -1;
75static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070076
Craig Tillerb4b8e1e2016-11-28 07:33:13 -080077/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
78 * sure to wake up one polling thread (which can wake up other threads if
79 * needed) */
80static grpc_wakeup_fd global_wakeup_fd;
81
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070082/* Implements the function defined in grpc_posix.h. This function might be
83 * called before even calling grpc_init() to set either a different signal to
84 * use. If signum == -1, then the use of signals is disabled */
85void grpc_use_signal(int signum) {
86 grpc_wakeup_signal = signum;
87 is_grpc_wakeup_signal_initialized = true;
88
89 if (grpc_wakeup_signal < 0) {
90 gpr_log(GPR_INFO,
91 "Use of signals is disabled. Epoll engine will not be used");
92 } else {
93 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
94 grpc_wakeup_signal);
95 }
96}
97
98struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070099
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800100typedef enum {
101 POLL_OBJ_FD,
102 POLL_OBJ_POLLSET,
103 POLL_OBJ_POLLSET_SET
104} poll_obj_type;
105
106typedef struct poll_obj {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -0800107#ifdef PO_DEBUG
108 poll_obj_type obj_type;
109#endif
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800110 gpr_mu mu;
111 struct polling_island *pi;
112} poll_obj;
113
114const char *poll_obj_string(poll_obj_type po_type) {
115 switch (po_type) {
116 case POLL_OBJ_FD:
117 return "fd";
118 case POLL_OBJ_POLLSET:
119 return "pollset";
120 case POLL_OBJ_POLLSET_SET:
121 return "pollset_set";
122 }
123
124 GPR_UNREACHABLE_CODE(return "UNKNOWN");
125}
126
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700127/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700128 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700129 */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800130
131#define FD_FROM_PO(po) ((grpc_fd *)(po))
132
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700133struct grpc_fd {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800134 poll_obj po;
135
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700136 int fd;
137 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700138 bit 0 : 1=Active / 0=Orphaned
139 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700140 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700141 gpr_atm refst;
142
Sree Kuchibhotla8b8cbed2017-02-09 21:31:27 -0800143 /* Internally stores data of type (grpc_error *). If the value is anything
144 other than GRPC_ERROR_NONE, it indicates that the fd is shutdown and this
145 contains the reason for shutdown. Once an fd is shutdown, any pending or
146 future read/write closures on the fd should fail */
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800147 gpr_atm shutdown_error;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700148
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800149 /* The fd is either closed or we relinquished control of it. In either
150 cases, this indicates that the 'fd' on this structure is no longer
151 valid */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700152 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700153
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800154 /* Closures to call when the fd is readable or writable respectively. These
155 fields contain one of the following values:
156 CLOSURE_READY : The fd has an I/O event of interest but there is no
157 closure yet to execute
158
159 CLOSURE_NOT_READY : The fd has no I/O event of interest
160
161 closure ptr : The closure to be executed when the fd has an I/O event
162 of interest.
163 shutdown_error |
164 CLOSURE_SHUTDOWN : 'shutdown_error' field OR'ed with CLOSURE_SHUTDOWN.
165 This indicates that the fd is shutdown. Since all
166 memory allocations are word-aligned, the lower to
167 bits of the shutdown_error pointer are always 0. So
168 it is safe to OR these with CLOSURE_SHUTDOWN.
169
170 Valid state transitions:
171
172 <closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY
173 | | ^ | ^ | |
174 | | | | | | |
175 | +--------------4----------+ 6 +---------2---------------+ |
176 | | |
177 | v |
178 +-----5-------> [shutdown_error | CLOSURE_SHUTDOWN] <--------7----+
179
180 For 1, 4 : See set_ready() function
181 For 2, 3 : See notify_on() function
182 For 5,6,7: See set_shutdown() function */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800183 gpr_atm read_closure;
184 gpr_atm write_closure;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700185
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700186 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700187 grpc_closure *on_done_closure;
188
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800189 /* The pollset that last noticed that the fd is readable. The actual type
190 * stored in this is (grpc_pollset *) */
191 gpr_atm read_notifier_pollset;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700192
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700193 grpc_iomgr_object iomgr_object;
194};
195
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700196/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700197// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700198#ifdef GRPC_FD_REF_COUNT_DEBUG
199static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
200static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
201 int line);
202#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
203#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
204#else
205static void fd_ref(grpc_fd *fd);
206static void fd_unref(grpc_fd *fd);
207#define GRPC_FD_REF(fd, reason) fd_ref(fd)
208#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
209#endif
210
211static void fd_global_init(void);
212static void fd_global_shutdown(void);
213
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800214#define CLOSURE_NOT_READY ((gpr_atm)0)
215#define CLOSURE_READY ((gpr_atm)1)
Sree Kuchibhotla8b8cbed2017-02-09 21:31:27 -0800216#define CLOSURE_SHUTDOWN ((gpr_atm)2)
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700217
218/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700219 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700220 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700221
Craig Tillerd8a3c042016-09-09 12:42:37 -0700222#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700223
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700224#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700225#define PI_UNREF(exec_ctx, p, r) \
226 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700227
Craig Tillerd8a3c042016-09-09 12:42:37 -0700228#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700229
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700230#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700231#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700232
Yuchen Zeng362ac1b2016-09-13 16:01:31 -0700233#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700234
Craig Tiller460502e2016-10-13 10:02:08 -0700235/* This is also used as grpc_workqueue (by directly casing it) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700236typedef struct polling_island {
Craig Tiller91031da2016-12-28 15:44:25 -0800237 grpc_closure_scheduler workqueue_scheduler;
238
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700239 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700240 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
241 the refcount.
242 Once the ref count becomes zero, this structure is destroyed which means
243 we should ensure that there is never a scenario where a PI_ADD_REF() is
244 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700245 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700246
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700247 /* Pointer to the polling_island this merged into.
248 * merged_to value is only set once in polling_island's lifetime (and that too
249 * only if the island is merged with another island). Because of this, we can
250 * use gpr_atm type here so that we can do atomic access on this and reduce
251 * lock contention on 'mu' mutex.
252 *
253 * Note that if this field is not NULL (i.e not 0), all the remaining fields
254 * (except mu and ref_count) are invalid and must be ignored. */
255 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700256
Craig Tiller460502e2016-10-13 10:02:08 -0700257 /* Number of threads currently polling on this island */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700258 gpr_atm poller_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700259 /* Mutex guarding the read end of the workqueue (must be held to pop from
260 * workqueue_items) */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700261 gpr_mu workqueue_read_mu;
Craig Tiller460502e2016-10-13 10:02:08 -0700262 /* Queue of closures to be executed */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700263 gpr_mpscq workqueue_items;
Craig Tiller460502e2016-10-13 10:02:08 -0700264 /* Count of items in workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700265 gpr_atm workqueue_item_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700266 /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700267 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700268
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700269 /* The fd of the underlying epoll set */
270 int epoll_fd;
271
272 /* The file descriptors in the epoll set */
273 size_t fd_cnt;
274 size_t fd_capacity;
275 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700276} polling_island;
277
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700278/*******************************************************************************
279 * Pollset Declarations
280 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700281struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700282 /* Thread id of this worker */
283 pthread_t pt_id;
284
285 /* Used to prevent a worker from getting kicked multiple times */
286 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700287 struct grpc_pollset_worker *next;
288 struct grpc_pollset_worker *prev;
289};
290
291struct grpc_pollset {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800292 poll_obj po;
293
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700294 grpc_pollset_worker root_worker;
295 bool kicked_without_pollers;
296
297 bool shutting_down; /* Is the pollset shutting down ? */
298 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
299 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700300};
301
302/*******************************************************************************
303 * Pollset-set Declarations
304 */
305struct grpc_pollset_set {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800306 poll_obj po;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700307};
308
309/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700310 * Common helpers
311 */
312
Craig Tillerf975f742016-07-01 14:56:27 -0700313static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700314 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700315 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700316 if (*composite == GRPC_ERROR_NONE) {
317 *composite = GRPC_ERROR_CREATE(desc);
318 }
319 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700320 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700321}
322
323/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700324 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700325 */
326
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700327/* The wakeup fd that is used to wake up all threads in a Polling island. This
328 is useful in the polling island merge operation where we need to wakeup all
329 the threads currently polling the smaller polling island (so that they can
330 start polling the new/merged polling island)
331
332 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
333 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
334static grpc_wakeup_fd polling_island_wakeup_fd;
335
Craig Tiller2e620132016-10-10 15:27:44 -0700336/* The polling island being polled right now.
337 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700338static __thread polling_island *g_current_thread_polling_island;
339
Craig Tillerb39307d2016-06-30 15:39:13 -0700340/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700341static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Craig Tiller91031da2016-12-28 15:44:25 -0800342static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
343 grpc_error *error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700344
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700345#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700346/* Currently TSAN may incorrectly flag data races between epoll_ctl and
347 epoll_wait for any grpc_fd structs that are added to the epoll set via
348 epoll_ctl and are returned (within a very short window) via epoll_wait().
349
350 To work-around this race, we establish a happens-before relation between
351 the code just-before epoll_ctl() and the code after epoll_wait() by using
352 this atomic */
353gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700354#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700355
Craig Tiller91031da2016-12-28 15:44:25 -0800356static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800357 workqueue_enqueue, workqueue_enqueue, "workqueue"};
Craig Tiller91031da2016-12-28 15:44:25 -0800358
Craig Tillerb39307d2016-06-30 15:39:13 -0700359static void pi_add_ref(polling_island *pi);
360static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700361
Craig Tillerd8a3c042016-09-09 12:42:37 -0700362#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700363static void pi_add_ref_dbg(polling_island *pi, const char *reason,
364 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700365 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700366 pi_add_ref(pi);
367 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
368 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700369}
370
Craig Tillerb39307d2016-06-30 15:39:13 -0700371static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700372 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700373 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700374 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700375 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700376 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700377}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700378
379static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
380 const char *file, int line,
381 const char *reason) {
382 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700383 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700384 }
385 return workqueue;
386}
387
388static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
389 const char *file, int line, const char *reason) {
390 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700391 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700392 }
393}
394#else
395static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
396 if (workqueue != NULL) {
397 pi_add_ref((polling_island *)workqueue);
398 }
399 return workqueue;
400}
401
402static void workqueue_unref(grpc_exec_ctx *exec_ctx,
403 grpc_workqueue *workqueue) {
404 if (workqueue != NULL) {
405 pi_unref(exec_ctx, (polling_island *)workqueue);
406 }
407}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700408#endif
409
Craig Tiller15007612016-07-06 09:36:16 -0700410static void pi_add_ref(polling_island *pi) {
411 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
412}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700413
Craig Tillerb39307d2016-06-30 15:39:13 -0700414static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700415 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700416 Note that this deletion not be done under a lock. Once the ref count goes
417 to zero, we are guaranteed that no one else holds a reference to the
418 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700419
420 Also, if we are deleting the polling island and the merged_to field is
421 non-empty, we should remove a ref to the merged_to polling island
422 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700423 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
424 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
425 polling_island_delete(exec_ctx, pi);
426 if (next != NULL) {
427 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700428 }
429 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700430}
431
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700432/* The caller is expected to hold pi->mu lock before calling this function */
433static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700434 size_t fd_count, bool add_fd_refs,
435 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700436 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700437 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700438 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700439 char *err_msg;
440 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700441
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700442#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700443 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700444 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700445#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700446
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700447 for (i = 0; i < fd_count; i++) {
448 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
449 ev.data.ptr = fds[i];
450 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700451
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700452 if (err < 0) {
453 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700454 gpr_asprintf(
455 &err_msg,
456 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
457 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
458 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
459 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700460 }
461
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700462 continue;
463 }
464
465 if (pi->fd_cnt == pi->fd_capacity) {
466 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
467 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
468 }
469
470 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700471 if (add_fd_refs) {
472 GRPC_FD_REF(fds[i], "polling_island");
473 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700474 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700475}
476
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700477/* The caller is expected to hold pi->mu before calling this */
478static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700479 grpc_wakeup_fd *wakeup_fd,
480 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700481 struct epoll_event ev;
482 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700483 char *err_msg;
484 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700485
486 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
487 ev.data.ptr = wakeup_fd;
488 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
489 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700490 if (err < 0 && errno != EEXIST) {
491 gpr_asprintf(&err_msg,
492 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
493 "error: %d (%s)",
Craig Tiller1fa9ddb2016-11-28 08:19:37 -0800494 pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd),
495 errno, strerror(errno));
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700496 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
497 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700498 }
499}
500
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700501/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700502static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700503 bool remove_fd_refs,
504 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700505 int err;
506 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700507 char *err_msg;
508 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700509
510 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700511 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700512 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700513 gpr_asprintf(&err_msg,
514 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
515 "error: %d (%s)",
516 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
517 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
518 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700519 }
520
521 if (remove_fd_refs) {
522 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700523 }
524 }
525
526 pi->fd_cnt = 0;
527}
528
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700529/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700530static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700531 bool is_fd_closed,
532 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700533 int err;
534 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700535 char *err_msg;
536 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700537
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700538 /* If fd is already closed, then it would have been automatically been removed
539 from the epoll set */
540 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700541 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
542 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700543 gpr_asprintf(
544 &err_msg,
545 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
546 pi->epoll_fd, fd->fd, errno, strerror(errno));
547 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
548 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700549 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700550 }
551
552 for (i = 0; i < pi->fd_cnt; i++) {
553 if (pi->fds[i] == fd) {
554 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700555 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700556 break;
557 }
558 }
559}
560
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700561/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700562static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
563 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700564 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700565 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700566 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700567
Craig Tillerb39307d2016-06-30 15:39:13 -0700568 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700569
Craig Tillerb39307d2016-06-30 15:39:13 -0700570 pi = gpr_malloc(sizeof(*pi));
Craig Tiller91031da2016-12-28 15:44:25 -0800571 pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
Craig Tillerb39307d2016-06-30 15:39:13 -0700572 gpr_mu_init(&pi->mu);
573 pi->fd_cnt = 0;
574 pi->fd_capacity = 0;
575 pi->fds = NULL;
576 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700577
578 gpr_mu_init(&pi->workqueue_read_mu);
579 gpr_mpscq_init(&pi->workqueue_items);
580 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700581
Craig Tiller15007612016-07-06 09:36:16 -0700582 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700583 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700584 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700585
Craig Tillerd8a3c042016-09-09 12:42:37 -0700586 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
587 err_desc)) {
588 goto done;
589 }
590
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700591 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700592
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700593 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700594 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
595 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700596 }
597
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800598 polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700599 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700600
601 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700602 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700603 }
604
Craig Tillerb39307d2016-06-30 15:39:13 -0700605done:
606 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700607 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700608 pi = NULL;
609 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700610 return pi;
611}
612
Craig Tillerb39307d2016-06-30 15:39:13 -0700613static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700614 GPR_ASSERT(pi->fd_cnt == 0);
615
Craig Tiller0a06cd72016-07-14 13:21:24 -0700616 if (pi->epoll_fd >= 0) {
617 close(pi->epoll_fd);
618 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700619 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
620 gpr_mu_destroy(&pi->workqueue_read_mu);
621 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700622 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700623 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700624 gpr_free(pi->fds);
625 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700626}
627
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700628/* Attempts to gets the last polling island in the linked list (liked by the
629 * 'merged_to' field). Since this does not lock the polling island, there are no
630 * guarantees that the island returned is the last island */
631static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
632 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
633 while (next != NULL) {
634 pi = next;
635 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
636 }
637
638 return pi;
639}
640
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700641/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700642 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700643 returned polling island's mu.
644 Usage: To lock/unlock polling island "pi", do the following:
645 polling_island *pi_latest = polling_island_lock(pi);
646 ...
647 ... critical section ..
648 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700649 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
650static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700651 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700652
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700653 while (true) {
654 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
655 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700656 /* Looks like 'pi' is the last node in the linked list but unless we check
657 this by holding the pi->mu lock, we cannot be sure (i.e without the
658 pi->mu lock, we don't prevent island merges).
659 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700660 gpr_mu_lock(&pi->mu);
661 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
662 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700663 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700664 break;
665 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700666
667 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
668 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700669 gpr_mu_unlock(&pi->mu);
670 }
671
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700672 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700673 }
674
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700675 return pi;
676}
677
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700678/* Gets the lock on the *latest* polling islands in the linked lists pointed by
679 *p and *q (and also updates *p and *q to point to the latest polling islands)
680
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700681 This function is needed because calling the following block of code to obtain
682 locks on polling islands (*p and *q) is prone to deadlocks.
683 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700684 polling_island_lock(*p, true);
685 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700686 }
687
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700688 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700689 polling_island *p1;
690 polling_island *p2;
691 ..
692 polling_island_lock_pair(&p1, &p2);
693 ..
694 .. Critical section with both p1 and p2 locked
695 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700696 // Release locks: Always call polling_island_unlock_pair() to release locks
697 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700698*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700699static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700700 polling_island *pi_1 = *p;
701 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700702 polling_island *next_1 = NULL;
703 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700704
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700705 /* The algorithm is simple:
706 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
707 keep updating pi_1 and pi_2)
708 - Then obtain locks on the islands by following a lock order rule of
709 locking polling_island with lower address first
710 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
711 pointing to the same island. If that is the case, we can just call
712 polling_island_lock()
713 - After obtaining both the locks, double check that the polling islands
714 are still the last polling islands in their respective linked lists
715 (this is because there might have been polling island merges before
716 we got the lock)
717 - If the polling islands are the last islands, we are done. If not,
718 release the locks and continue the process from the first step */
719 while (true) {
720 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
721 while (next_1 != NULL) {
722 pi_1 = next_1;
723 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700724 }
725
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700726 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
727 while (next_2 != NULL) {
728 pi_2 = next_2;
729 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
730 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700731
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700732 if (pi_1 == pi_2) {
733 pi_1 = pi_2 = polling_island_lock(pi_1);
734 break;
735 }
736
737 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700738 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700739 gpr_mu_lock(&pi_2->mu);
740 } else {
741 gpr_mu_lock(&pi_2->mu);
742 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700743 }
744
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700745 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
746 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
747 if (next_1 == NULL && next_2 == NULL) {
748 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700749 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700750
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700751 gpr_mu_unlock(&pi_1->mu);
752 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700753 }
754
755 *p = pi_1;
756 *q = pi_2;
757}
758
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700759static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
760 if (p == q) {
761 gpr_mu_unlock(&p->mu);
762 } else {
763 gpr_mu_unlock(&p->mu);
764 gpr_mu_unlock(&q->mu);
765 }
766}
767
Craig Tillerd8a3c042016-09-09 12:42:37 -0700768static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700769 /* If this thread is the current poller, then it may be that it's about to
770 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700771 bool is_current_poller = (g_current_thread_polling_island == pi);
772 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
773 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700774 /* Only issue a wakeup if it's likely that some poller could come in and take
775 it right now. Note that since we do an anticipatory mpscq_pop every poll
776 loop, it's ok if we miss the wakeup here, as we'll get the work item when
777 the next poller enters anyway. */
778 if (current_pollers > min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700779 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
780 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
781 }
782}
783
784static void workqueue_move_items_to_parent(polling_island *q) {
785 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
786 if (p == NULL) {
787 return;
788 }
789 gpr_mu_lock(&q->workqueue_read_mu);
790 int num_added = 0;
791 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
792 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
793 if (n != NULL) {
794 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
795 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
796 gpr_mpscq_push(&p->workqueue_items, n);
797 num_added++;
798 }
799 }
800 gpr_mu_unlock(&q->workqueue_read_mu);
801 if (num_added > 0) {
802 workqueue_maybe_wakeup(p);
803 }
804 workqueue_move_items_to_parent(p);
805}
806
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700807static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700808 polling_island *q,
809 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700810 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700811 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700812
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700813 if (p != q) {
814 /* Make sure that p points to the polling island with fewer fds than q */
815 if (p->fd_cnt > q->fd_cnt) {
816 GPR_SWAP(polling_island *, p, q);
817 }
818
819 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
820 Note that the refcounts on the fds being moved will not change here.
821 This is why the last param in the following two functions is 'false') */
822 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
823 polling_island_remove_all_fds_locked(p, false, error);
824
825 /* Wakeup all the pollers (if any) on p so that they pickup this change */
826 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
827
828 /* Add the 'merged_to' link from p --> q */
829 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
830 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700831
Harvey Tuchdaa9f452016-11-21 15:42:49 -0500832 workqueue_move_items_to_parent(p);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700833 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700834 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700835
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700836 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700837
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700838 /* Return the merged polling island (Note that no merge would have happened
839 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700840 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700841}
842
Craig Tiller91031da2016-12-28 15:44:25 -0800843static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
Craig Tillerd8a3c042016-09-09 12:42:37 -0700844 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700845 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller91031da2016-12-28 15:44:25 -0800846 grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700847 /* take a ref to the workqueue: otherwise it can happen that whatever events
848 * this kicks off ends up destroying the workqueue before this function
849 * completes */
850 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
851 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700852 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
853 closure->error_data.error = error;
854 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
855 if (last == 0) {
856 workqueue_maybe_wakeup(pi);
857 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700858 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700859 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
860 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700861}
862
Craig Tiller91031da2016-12-28 15:44:25 -0800863static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
864 polling_island *pi = (polling_island *)workqueue;
Craig Tiller801c6cc2017-01-03 08:13:13 -0800865 return workqueue == NULL ? grpc_schedule_on_exec_ctx
866 : &pi->workqueue_scheduler;
Craig Tiller91031da2016-12-28 15:44:25 -0800867}
868
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700869static grpc_error *polling_island_global_init() {
870 grpc_error *error = GRPC_ERROR_NONE;
871
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700872 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
873 if (error == GRPC_ERROR_NONE) {
874 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
875 }
876
877 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700878}
879
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700880static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700881 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700882}
883
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700884/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700885 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700886 */
887
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700888/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700889 * but instead so that implementations with multiple threads in (for example)
890 * epoll_wait deal with the race between pollset removal and incoming poll
891 * notifications.
892 *
893 * The problem is that the poller ultimately holds a reference to this
894 * object, so it is very difficult to know when is safe to free it, at least
895 * without some expensive synchronization.
896 *
897 * If we keep the object freelisted, in the worst case losing this race just
898 * becomes a spurious read notification on a reused fd.
899 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700900
901/* The alarm system needs to be able to wakeup 'some poller' sometimes
902 * (specifically when a new alarm needs to be triggered earlier than the next
903 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
904 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700905
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700906static grpc_fd *fd_freelist = NULL;
907static gpr_mu fd_freelist_mu;
908
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700909#ifdef GRPC_FD_REF_COUNT_DEBUG
910#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
911#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
912static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
913 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700914 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
915 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700916 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
917#else
918#define REF_BY(fd, n, reason) ref_by(fd, n)
919#define UNREF_BY(fd, n, reason) unref_by(fd, n)
920static void ref_by(grpc_fd *fd, int n) {
921#endif
922 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
923}
924
925#ifdef GRPC_FD_REF_COUNT_DEBUG
926static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
927 int line) {
928 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700929 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
930 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700931 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
932#else
933static void unref_by(grpc_fd *fd, int n) {
934 gpr_atm old;
935#endif
936 old = gpr_atm_full_fetch_add(&fd->refst, -n);
937 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700938 /* Add the fd to the freelist */
939 gpr_mu_lock(&fd_freelist_mu);
940 fd->freelist_next = fd_freelist;
941 fd_freelist = fd;
942 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800943
Sree Kuchibhotla8b8cbed2017-02-09 21:31:27 -0800944 grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error1);
945 GRPC_ERROR_UNREF(err);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700946
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700947 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700948 } else {
949 GPR_ASSERT(old > n);
950 }
951}
952
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700953/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700954#ifdef GRPC_FD_REF_COUNT_DEBUG
955static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
956 int line) {
957 ref_by(fd, 2, reason, file, line);
958}
959
960static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
961 int line) {
962 unref_by(fd, 2, reason, file, line);
963}
964#else
965static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700966static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
967#endif
968
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700969static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
970
971static void fd_global_shutdown(void) {
972 gpr_mu_lock(&fd_freelist_mu);
973 gpr_mu_unlock(&fd_freelist_mu);
974 while (fd_freelist != NULL) {
975 grpc_fd *fd = fd_freelist;
976 fd_freelist = fd_freelist->freelist_next;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800977 gpr_mu_destroy(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700978 gpr_free(fd);
979 }
980 gpr_mu_destroy(&fd_freelist_mu);
981}
982
983static grpc_fd *fd_create(int fd, const char *name) {
984 grpc_fd *new_fd = NULL;
985
986 gpr_mu_lock(&fd_freelist_mu);
987 if (fd_freelist != NULL) {
988 new_fd = fd_freelist;
989 fd_freelist = fd_freelist->freelist_next;
990 }
991 gpr_mu_unlock(&fd_freelist_mu);
992
993 if (new_fd == NULL) {
994 new_fd = gpr_malloc(sizeof(grpc_fd));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800995 gpr_mu_init(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700996 }
997
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800998 /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
999 * is a newly created fd (or an fd we got from the freelist), no one else
1000 * would be holding a lock to it anyway. */
1001 gpr_mu_lock(&new_fd->po.mu);
1002 new_fd->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001003#ifdef PO_DEBUG
1004 new_fd->po.obj_type = POLL_OBJ_FD;
1005#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001006
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -07001007 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001008 new_fd->fd = fd;
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001009 gpr_atm_rel_store(&new_fd->shutdown_error1, (gpr_atm)GRPC_ERROR_NONE);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001010 new_fd->orphaned = false;
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001011 gpr_atm_rel_store(&new_fd->read_closure, CLOSURE_NOT_READY);
1012 gpr_atm_rel_store(&new_fd->write_closure, CLOSURE_NOT_READY);
1013 gpr_atm_rel_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
1014
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001015 new_fd->freelist_next = NULL;
1016 new_fd->on_done_closure = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001017
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001018 gpr_mu_unlock(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001019
1020 char *fd_name;
1021 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
1022 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001023#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -07001024 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001025#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -07001026 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001027 return new_fd;
1028}
1029
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001030static int fd_wrapped_fd(grpc_fd *fd) {
1031 int ret_fd = -1;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001032 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001033 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001034 ret_fd = fd->fd;
1035 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001036 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001037
1038 return ret_fd;
1039}
1040
1041static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1042 grpc_closure *on_done, int *release_fd,
1043 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001044 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001045 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -07001046 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001047
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001048 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001049 fd->on_done_closure = on_done;
1050
1051 /* If release_fd is not NULL, we should be relinquishing control of the file
1052 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001053 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001054 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001055 } else {
1056 close(fd->fd);
1057 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001058 }
1059
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001060 fd->orphaned = true;
1061
1062 /* Remove the active status but keep referenced. We want this grpc_fd struct
1063 to be alive (and not added to freelist) until the end of this function */
1064 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001065
1066 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001067 - Get a lock on the latest polling island (i.e the last island in the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001068 linked list pointed by fd->po.pi). This is the island that
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001069 would actually contain the fd
1070 - Remove the fd from the latest polling island
1071 - Unlock the latest polling island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001072 - Set fd->po.pi to NULL (but remove the ref on the polling island
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001073 before doing this.) */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001074 if (fd->po.pi != NULL) {
1075 polling_island *pi_latest = polling_island_lock(fd->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001076 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001077 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001078
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001079 unref_pi = fd->po.pi;
1080 fd->po.pi = NULL;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001081 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001082
Craig Tiller91031da2016-12-28 15:44:25 -08001083 grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001084
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001085 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001086 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001087 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001088 /* Unref stale polling island here, outside the fd lock above.
1089 The polling island owns a workqueue which owns an fd, and unreffing
1090 inside the lock can cause an eventual lock loop that makes TSAN very
1091 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001092 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1093 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001094 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001095 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001096}
1097
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001098static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
1099 grpc_closure *closure) {
1100 bool is_done = false;
1101 while (!is_done) {
1102 is_done = true;
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001103 /* Fast-path: CLOSURE_NOT_READY -> <closure> */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001104 if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001105 /* Fallback to slowpath */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001106 gpr_atm curr = gpr_atm_acq_load(state);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001107 switch (curr) {
1108 case CLOSURE_NOT_READY: {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001109 is_done = false;
1110 break;
1111 }
1112
1113 case CLOSURE_READY: {
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001114 /* Change the state to CLOSURE_NOT_READY and if successful, schedule
1115 the closure */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001116 if (gpr_atm_rel_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001117 grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001118 } else {
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001119 /* Looks like the current state is not CLOSURE_READY anymore. Most
1120 likely it transitioned to CLOSURE_NOT_READY. Retry the fast-path
1121 again */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001122 is_done = false;
1123 }
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001124 break;
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001125 }
1126
1127 default: {
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001128 /* 'curr' is either a closure or the fd is shutdown (in which case
1129 * 'curr' contains a pointer to the shutdown-error) */
1130 if ((curr & CLOSURE_SHUTDOWN) > 0) {
1131 /* FD is shutdown. Schedule the closure with the shutdown error */
1132 grpc_error *shutdown_err = (grpc_error *)(curr & ~CLOSURE_SHUTDOWN);
1133 grpc_closure_sched(
1134 exec_ctx, closure,
1135 GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
1136
1137 } else {
1138 /* There is already a closure!. This indicates a bug in the code */
1139 gpr_log(
1140 GPR_ERROR,
1141 "User called notify_on function with a previous callback still "
1142 "pending");
1143 abort();
1144 }
1145 break;
1146 }
1147 }
1148 }
1149 }
1150}
1151
1152static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
1153 grpc_error *shutdown_err) {
1154 /* Try the fast-path first (i.e expect the current value to be
1155 CLOSURE_NOT_READY */
1156 gpr_atm curr = CLOSURE_NOT_READY;
1157 gpr_atm new_state = (gpr_atm)shutdown_err | CLOSURE_SHUTDOWN;
1158
1159 bool is_done = false;
1160 while (!is_done) {
1161 is_done = true;
1162 if (!gpr_atm_acq_cas(state, curr, new_state)) {
1163 /* Fallback to slowpath */
1164 curr = gpr_atm_acq_load(state);
1165 switch (curr) {
1166 case CLOSURE_READY: {
1167 is_done = false;
1168 break;
1169 }
1170
1171 case CLOSURE_NOT_READY: {
1172 is_done = false;
1173 break;
1174 }
1175
1176 default: {
1177 /* 'curr' is either a closure or the fd is already shutdown */
1178 if ((curr & CLOSURE_SHUTDOWN) > 0) {
1179 /* fd is already shutdown. Do nothing */
1180 } else if (gpr_atm_rel_cas(state, curr, new_state)) {
1181 grpc_closure_sched(
1182 exec_ctx, (grpc_closure *)curr,
1183 GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
1184 } else {
1185 /* 'curr' was a closure but now changed to a different state. We
1186 will have to retry */
1187 is_done = false;
1188 }
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001189 break;
1190 }
1191 }
1192 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001193 }
1194}
1195
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001196static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001197 /* Try the fast-path first (i.e expect the current value to be
1198 * CLOSURE_NOT_READY */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001199 if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
Sree Kuchibhotla8b8cbed2017-02-09 21:31:27 -08001200 /* Fallback to slowpath */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001201 gpr_atm curr = gpr_atm_acq_load(state);
1202 switch (curr) {
1203 case CLOSURE_READY: {
1204 /* Already ready. We are done here */
1205 break;
1206 }
1207
1208 case CLOSURE_NOT_READY: {
1209 /* The state was not CLOSURE_NOT_READY when we checked initially at the
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001210 beginning of this function but now it is CLOSURE_NOT_READY again.
1211 This is only possible if the state transitioned out of
1212 CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then
1213 back to CLOSURE_NOT_READY again (i.e after we entered this function,
1214 the fd became "ready" and the necessary actions were already done).
1215 So there is no need to make the state CLOSURE_READY now */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001216 break;
1217 }
1218
1219 default: {
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001220 /* 'curr' is either a closure or the fd is shutdown */
1221 if ((curr & CLOSURE_SHUTDOWN) > 0) {
1222 /* The fd is shutdown. Do nothing */
1223 } else if (gpr_atm_rel_cas(state, curr, CLOSURE_NOT_READY)) {
1224 grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
1225 }
1226 /* else the state changed again (only possible by either a racing
1227 set_ready or set_shutdown functions. In both these cases, the closure
1228 would have been scheduled for execution. So we are done here */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001229 break;
1230 }
1231 }
1232 } /* else fast-path succeeded. We are done */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001233}
1234
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001235static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1236 grpc_fd *fd) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001237 gpr_atm notifier = gpr_atm_no_barrier_load(&fd->read_closure);
1238 return (grpc_pollset *)notifier;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001239}
1240
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001241static bool fd_is_shutdown(grpc_fd *fd) {
Sree Kuchibhotla99983382017-02-12 17:03:27 -08001242 grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
Sree Kuchibhotla8b8cbed2017-02-09 21:31:27 -08001243 return (err != GRPC_ERROR_NONE);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001244}
1245
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001246/* Might be called multiple times */
Craig Tillercda759d2017-01-27 11:37:37 -08001247static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
Sree Kuchibhotla8b8cbed2017-02-09 21:31:27 -08001248 /* If 'why' is GRPC_ERROR_NONE, change it to something else so that we know
1249 that the fd is shutdown just by looking at fd->shutdown_error */
1250 if (why == GRPC_ERROR_NONE) {
1251 why = GRPC_ERROR_INTERNAL;
1252 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001253
Sree Kuchibhotla99983382017-02-12 17:03:27 -08001254 if (gpr_atm_acq_cas(&fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE,
Sree Kuchibhotla8b8cbed2017-02-09 21:31:27 -08001255 (gpr_atm)why)) {
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001256 shutdown(fd->fd, SHUT_RDWR);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001257
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001258 set_shutdown(exec_ctx, fd, &fd->read_closure, why);
1259 set_shutdown(exec_ctx, fd, &fd->write_closure, why);
Craig Tillercda759d2017-01-27 11:37:37 -08001260 } else {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001261 // Shutdown already called
Craig Tillercda759d2017-01-27 11:37:37 -08001262 GRPC_ERROR_UNREF(why);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001263 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001264}
1265
1266static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1267 grpc_closure *closure) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001268 notify_on(exec_ctx, fd, &fd->read_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001269}
1270
1271static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1272 grpc_closure *closure) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001273 notify_on(exec_ctx, fd, &fd->write_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001274}
1275
Craig Tillerd6ba6192016-06-30 15:42:41 -07001276static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001277 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001278 grpc_workqueue *workqueue =
1279 GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001280 gpr_mu_unlock(&fd->po.mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001281 return workqueue;
1282}
Craig Tiller70bd4832016-06-30 14:20:46 -07001283
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001284/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001285 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001286 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001287GPR_TLS_DECL(g_current_thread_pollset);
1288GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001289static __thread bool g_initialized_sigmask;
1290static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001291
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001292static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001293#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001294 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001295#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001296}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001297
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001298static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001299
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001300/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001301static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001302 gpr_tls_init(&g_current_thread_pollset);
1303 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001304 poller_kick_init();
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001305 return grpc_wakeup_fd_init(&global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001306}
1307
1308static void pollset_global_shutdown(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001309 grpc_wakeup_fd_destroy(&global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001310 gpr_tls_destroy(&g_current_thread_pollset);
1311 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001312}
1313
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001314static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1315 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001316
1317 /* Kick the worker only if it was not already kicked */
1318 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1319 GRPC_POLLING_TRACE(
1320 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1321 (void *)worker, worker->pt_id);
1322 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1323 if (err_num != 0) {
1324 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1325 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001326 }
1327 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001328}
1329
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001330/* Return 1 if the pollset has active threads in pollset_work (pollset must
1331 * be locked) */
1332static int pollset_has_workers(grpc_pollset *p) {
1333 return p->root_worker.next != &p->root_worker;
1334}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001335
1336static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1337 worker->prev->next = worker->next;
1338 worker->next->prev = worker->prev;
1339}
1340
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001341static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1342 if (pollset_has_workers(p)) {
1343 grpc_pollset_worker *w = p->root_worker.next;
1344 remove_worker(p, w);
1345 return w;
1346 } else {
1347 return NULL;
1348 }
1349}
1350
1351static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1352 worker->next = &p->root_worker;
1353 worker->prev = worker->next->prev;
1354 worker->prev->next = worker->next->prev = worker;
1355}
1356
1357static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1358 worker->prev = &p->root_worker;
1359 worker->next = worker->prev->next;
1360 worker->prev->next = worker->next->prev = worker;
1361}
1362
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001363/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001364static grpc_error *pollset_kick(grpc_pollset *p,
1365 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001366 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001367 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001368 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001369 grpc_pollset_worker *worker = specific_worker;
1370 if (worker != NULL) {
1371 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001372 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001373 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001374 for (worker = p->root_worker.next; worker != &p->root_worker;
1375 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001376 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001377 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001378 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001379 }
Craig Tillera218a062016-06-26 09:58:37 -07001380 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001381 } else {
1382 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001383 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001384 } else {
1385 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001386 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001387 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001388 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001389 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001390 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1391 /* Since worker == NULL, it means that we can kick "any" worker on this
1392 pollset 'p'. If 'p' happens to be the same pollset this thread is
1393 currently polling (i.e in pollset_work() function), then there is no need
1394 to kick any other worker since the current thread can just absorb the
1395 kick. This is the reason why we enter this case only when
1396 g_current_thread_pollset is != p */
1397
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001398 GPR_TIMER_MARK("kick_anonymous", 0);
1399 worker = pop_front_worker(p);
1400 if (worker != NULL) {
1401 GPR_TIMER_MARK("finally_kick", 0);
1402 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001403 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001404 } else {
1405 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001406 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001407 }
1408 }
1409
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001410 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001411 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1412 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001413}
1414
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001415static grpc_error *kick_poller(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001416 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001417}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001418
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001419static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001420 gpr_mu_init(&pollset->po.mu);
1421 *mu = &pollset->po.mu;
1422 pollset->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001423#ifdef PO_DEBUG
1424 pollset->po.obj_type = POLL_OBJ_POLLSET;
1425#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001426
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001427 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001428 pollset->kicked_without_pollers = false;
1429
1430 pollset->shutting_down = false;
1431 pollset->finish_shutdown_called = false;
1432 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001433}
1434
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001435/* Convert a timespec to milliseconds:
1436 - Very small or negative poll times are clamped to zero to do a non-blocking
1437 poll (which becomes spin polling)
1438 - Other small values are rounded up to one millisecond
1439 - Longer than a millisecond polls are rounded up to the next nearest
1440 millisecond to avoid spinning
1441 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001442static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1443 gpr_timespec now) {
1444 gpr_timespec timeout;
1445 static const int64_t max_spin_polling_us = 10;
1446 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1447 return -1;
1448 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001449
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001450 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1451 max_spin_polling_us,
1452 GPR_TIMESPAN))) <= 0) {
1453 return 0;
1454 }
1455 timeout = gpr_time_sub(deadline, now);
1456 return gpr_time_to_millis(gpr_time_add(
1457 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1458}
1459
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001460static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1461 grpc_pollset *notifier) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001462 set_ready(exec_ctx, fd, &fd->read_closure);
1463
1464 // Note, it is possible that fd_become_readable might be called twice with
1465 // different 'notifier's when an fd becomes readable and it is in two epoll
1466 // sets (This can happen briefly during polling island merges). In such cases
1467 // it does not really matter which notifer is set as the read_notifier_pollset
1468 // (They would both point to the same polling island anyway)
1469 gpr_atm_no_barrier_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001470}
1471
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001472static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001473 set_ready(exec_ctx, fd, &fd->write_closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001474}
1475
Craig Tillerb39307d2016-06-30 15:39:13 -07001476static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1477 grpc_pollset *ps, char *reason) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001478 if (ps->po.pi != NULL) {
1479 PI_UNREF(exec_ctx, ps->po.pi, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001480 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001481 ps->po.pi = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001482}
1483
1484static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1485 grpc_pollset *pollset) {
1486 /* The pollset cannot have any workers if we are at this stage */
1487 GPR_ASSERT(!pollset_has_workers(pollset));
1488
1489 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001490
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001491 /* Release the ref and set pollset->po.pi to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001492 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Craig Tiller91031da2016-12-28 15:44:25 -08001493 grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001494}
1495
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001496/* pollset->po.mu lock must be held by the caller before calling this */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001497static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1498 grpc_closure *closure) {
1499 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1500 GPR_ASSERT(!pollset->shutting_down);
1501 pollset->shutting_down = true;
1502 pollset->shutdown_done = closure;
1503 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1504
1505 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1506 because it would release the underlying polling island. In such a case, we
1507 let the last worker call finish_shutdown_locked() from pollset_work() */
1508 if (!pollset_has_workers(pollset)) {
1509 GPR_ASSERT(!pollset->finish_shutdown_called);
1510 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1511 finish_shutdown_locked(exec_ctx, pollset);
1512 }
1513 GPR_TIMER_END("pollset_shutdown", 0);
1514}
1515
1516/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1517 * than destroying the mutexes, there is nothing special that needs to be done
1518 * here */
1519static void pollset_destroy(grpc_pollset *pollset) {
1520 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001521 gpr_mu_destroy(&pollset->po.mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001522}
1523
Craig Tiller2b49ea92016-07-01 13:21:27 -07001524static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001525 GPR_ASSERT(pollset->shutting_down);
1526 GPR_ASSERT(!pollset_has_workers(pollset));
1527 pollset->shutting_down = false;
1528 pollset->finish_shutdown_called = false;
1529 pollset->kicked_without_pollers = false;
1530 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001531 GPR_ASSERT(pollset->po.pi == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001532}
1533
Craig Tillerd8a3c042016-09-09 12:42:37 -07001534static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1535 polling_island *pi) {
1536 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1537 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1538 gpr_mu_unlock(&pi->workqueue_read_mu);
1539 if (n != NULL) {
1540 if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
1541 workqueue_maybe_wakeup(pi);
1542 }
1543 grpc_closure *c = (grpc_closure *)n;
Craig Tiller061ef742016-12-29 10:54:09 -08001544 grpc_error *error = c->error_data.error;
1545 c->cb(exec_ctx, c->cb_arg, error);
1546 GRPC_ERROR_UNREF(error);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001547 return true;
1548 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
Craig Tiller460502e2016-10-13 10:02:08 -07001549 /* n == NULL might mean there's work but it's not available to be popped
1550 * yet - try to ensure another workqueue wakes up to check shortly if so
1551 */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001552 workqueue_maybe_wakeup(pi);
1553 }
1554 }
1555 return false;
1556}
1557
Craig Tiller84ea3412016-09-08 14:57:56 -07001558#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001559/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1560static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001561 grpc_pollset *pollset,
1562 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001563 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001564 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001565 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001566 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001567 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001568 char *err_msg;
1569 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001570 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1571
1572 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001573 latest polling island pointed by pollset->po.pi
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001574
1575 Since epoll_fd is immutable, we can read it without obtaining the polling
1576 island lock. There is however a possibility that the polling island (from
1577 which we got the epoll_fd) got merged with another island while we are
1578 in this function. This is still okay because in such a case, we will wakeup
1579 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001580 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001581
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001582 if (pollset->po.pi == NULL) {
1583 pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
1584 if (pollset->po.pi == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001585 GPR_TIMER_END("pollset_work_and_unlock", 0);
1586 return; /* Fatal error. We cannot continue */
1587 }
1588
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001589 PI_ADD_REF(pollset->po.pi, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001590 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001591 (void *)pollset, (void *)pollset->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001592 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001593
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001594 pi = polling_island_maybe_get_latest(pollset->po.pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001595 epoll_fd = pi->epoll_fd;
1596
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001597 /* Update the pollset->po.pi since the island being pointed by
1598 pollset->po.pi maybe older than the one pointed by pi) */
1599 if (pollset->po.pi != pi) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001600 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1601 polling island to be deleted */
1602 PI_ADD_REF(pi, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001603 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
1604 pollset->po.pi = pi;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001605 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001606
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001607 /* Add an extra ref so that the island does not get destroyed (which means
1608 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1609 epoll_fd */
1610 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001611 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001612
Craig Tiller460502e2016-10-13 10:02:08 -07001613 /* If we get some workqueue work to do, it might end up completing an item on
1614 the completion queue, so there's no need to poll... so we skip that and
1615 redo the complete loop to verify */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001616 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
1617 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1618 g_current_thread_polling_island = pi;
1619
Vijay Paicef54012016-08-28 23:05:31 -07001620 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001621 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1622 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001623 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001624 if (ep_rv < 0) {
1625 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001626 gpr_asprintf(&err_msg,
1627 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1628 epoll_fd, errno, strerror(errno));
1629 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001630 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001631 /* We were interrupted. Save an interation by doing a zero timeout
1632 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001633 GRPC_POLLING_TRACE(
1634 "pollset_work: pollset: %p, worker: %p received kick",
1635 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001636 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001637 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001638 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001639
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001640#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001641 /* See the definition of g_poll_sync for more details */
1642 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001643#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001644
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001645 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001646 void *data_ptr = ep_ev[i].data.ptr;
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001647 if (data_ptr == &global_wakeup_fd) {
Craig Tiller1fa9ddb2016-11-28 08:19:37 -08001648 append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001649 err_desc);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001650 } else if (data_ptr == &pi->workqueue_wakeup_fd) {
Craig Tillere49959d2017-01-26 08:39:38 -08001651 append_error(error,
1652 grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
Craig Tillerd8a3c042016-09-09 12:42:37 -07001653 err_desc);
1654 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001655 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001656 GRPC_POLLING_TRACE(
1657 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1658 "%d) got merged",
1659 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001660 /* This means that our polling island is merged with a different
1661 island. We do not have to do anything here since the subsequent call
1662 to the function pollset_work_and_unlock() will pick up the correct
1663 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001664 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001665 grpc_fd *fd = data_ptr;
1666 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1667 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1668 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001669 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001670 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001671 }
1672 if (write_ev || cancel) {
1673 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001674 }
1675 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001676 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001677
1678 g_current_thread_polling_island = NULL;
1679 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1680 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001681
1682 GPR_ASSERT(pi != NULL);
1683
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001684 /* Before leaving, release the extra ref we added to the polling island. It
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001685 is important to use "pi" here (i.e our old copy of pollset->po.pi
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001686 that we got before releasing the polling island lock). This is because
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001687 pollset->po.pi pointer might get udpated in other parts of the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001688 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001689 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001690
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001691 GPR_TIMER_END("pollset_work_and_unlock", 0);
1692}
1693
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001694/* pollset->po.mu lock must be held by the caller before calling this.
1695 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001696 during the course of its execution but it will always re-acquire the lock and
1697 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001698static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1699 grpc_pollset_worker **worker_hdl,
1700 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001701 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001702 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001703 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1704
1705 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001706
1707 grpc_pollset_worker worker;
1708 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001709 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001710 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001711
1712 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001713
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001714 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1715 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001716
1717 if (pollset->kicked_without_pollers) {
1718 /* If the pollset was kicked without pollers, pretend that the current
1719 worker got the kick and skip polling. A kick indicates that there is some
1720 work that needs attention like an event on the completion queue or an
1721 alarm */
1722 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1723 pollset->kicked_without_pollers = 0;
1724 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001725 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001726 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1727 worker that there is some pending work that needs immediate attention
1728 (like an event on the completion queue, or a polling island merge that
1729 results in a new epoll-fd to wait on) and that the worker should not
1730 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001731
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001732 A worker can be kicked anytime from the point it is added to the pollset
1733 via push_front_worker() (or push_back_worker()) to the point it is
1734 removed via remove_worker().
1735 If the worker is kicked before/during it calls epoll_pwait(), it should
1736 immediately exit from epoll_wait(). If the worker is kicked after it
1737 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001738
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001739 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001740 times *except* when it is in epoll_pwait(). This way, the worker never
1741 misses acting on a kick */
1742
Craig Tiller19196992016-06-27 18:45:56 -07001743 if (!g_initialized_sigmask) {
1744 sigemptyset(&new_mask);
1745 sigaddset(&new_mask, grpc_wakeup_signal);
1746 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1747 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1748 g_initialized_sigmask = true;
1749 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1750 This is the mask used at all times *except during
1751 epoll_wait()*"
1752 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001753 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001754
Craig Tiller19196992016-06-27 18:45:56 -07001755 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001756 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001757 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001758
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001759 push_front_worker(pollset, &worker); /* Add worker to pollset */
1760
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001761 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1762 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001763 grpc_exec_ctx_flush(exec_ctx);
1764
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001765 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001766
1767 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1768 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001769 remove_worker(pollset, &worker);
1770 }
1771
1772 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1773 false at this point) and the pollset is shutting down, we may have to
1774 finish the shutdown process by calling finish_shutdown_locked().
1775 See pollset_shutdown() for more details.
1776
1777 Note: Continuing to access pollset here is safe; it is the caller's
1778 responsibility to not destroy a pollset when it has outstanding calls to
1779 pollset_work() */
1780 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1781 !pollset->finish_shutdown_called) {
1782 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1783 finish_shutdown_locked(exec_ctx, pollset);
1784
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001785 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001786 grpc_exec_ctx_flush(exec_ctx);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001787 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001788 }
1789
1790 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001791
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001792 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1793 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001794
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001795 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001796
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001797 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1798 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001799}
1800
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001801static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001802 poll_obj_type bag_type, poll_obj *item,
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001803 poll_obj_type item_type) {
1804 GPR_TIMER_BEGIN("add_poll_object", 0);
1805
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001806#ifdef PO_DEBUG
1807 GPR_ASSERT(item->obj_type == item_type);
1808 GPR_ASSERT(bag->obj_type == bag_type);
1809#endif
Craig Tiller57726ca2016-09-12 11:59:45 -07001810
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001811 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001812 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001813
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001814 gpr_mu_lock(&bag->mu);
1815 gpr_mu_lock(&item->mu);
1816
Craig Tiller7212c232016-07-06 13:11:09 -07001817retry:
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001818 /*
1819 * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1820 * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1821 * a refcount of 2) and point item->pi and bag->pi to the new island
1822 * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1823 * the other's non-NULL pi
1824 * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1825 * polling islands and update item->pi and bag->pi to point to the new
1826 * island
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001827 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001828
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001829 /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1830 * orphaned */
1831 if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1832 gpr_mu_unlock(&item->mu);
1833 gpr_mu_unlock(&bag->mu);
Craig Tiller42ac6db2016-07-06 17:13:56 -07001834 return;
1835 }
1836
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001837 if (item->pi == bag->pi) {
1838 pi_new = item->pi;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001839 if (pi_new == NULL) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001840 /* GPR_ASSERT(item->pi == bag->pi == NULL) */
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001841
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001842 /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1843 * we need to do some extra work to make TSAN happy */
1844 if (item_type == POLL_OBJ_FD) {
1845 /* Unlock before creating a new polling island: the polling island will
1846 create a workqueue which creates a file descriptor, and holding an fd
1847 lock here can eventually cause a loop to appear to TSAN (making it
1848 unhappy). We don't think it's a real loop (there's an epoch point
1849 where that loop possibility disappears), but the advantages of
1850 keeping TSAN happy outweigh any performance advantage we might have
1851 by keeping the lock held. */
1852 gpr_mu_unlock(&item->mu);
1853 pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
1854 gpr_mu_lock(&item->mu);
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001855
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001856 /* Need to reverify any assumptions made between the initial lock and
1857 getting to this branch: if they've changed, we need to throw away our
1858 work and figure things out again. */
1859 if (item->pi != NULL) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001860 GRPC_POLLING_TRACE(
1861 "add_poll_object: Raced creating new polling island. pi_new: %p "
1862 "(fd: %d, %s: %p)",
1863 (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1864 (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001865 /* No need to lock 'pi_new' here since this is a new polling island
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001866 * and no one has a reference to it yet */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001867 polling_island_remove_all_fds_locked(pi_new, true, &error);
1868
1869 /* Ref and unref so that the polling island gets deleted during unref
1870 */
1871 PI_ADD_REF(pi_new, "dance_of_destruction");
1872 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
1873 goto retry;
1874 }
Craig Tiller27da6422016-07-06 13:14:46 -07001875 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001876 pi_new = polling_island_create(exec_ctx, NULL, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001877 }
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001878
1879 GRPC_POLLING_TRACE(
1880 "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1881 "%s: %p)",
1882 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1883 poll_obj_string(bag_type), (void *)bag);
1884 } else {
1885 GRPC_POLLING_TRACE(
1886 "add_poll_object: Same polling island. pi: %p (%s, %s)",
1887 (void *)pi_new, poll_obj_string(item_type),
1888 poll_obj_string(bag_type));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001889 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001890 } else if (item->pi == NULL) {
1891 /* GPR_ASSERT(bag->pi != NULL) */
1892 /* Make pi_new point to latest pi*/
1893 pi_new = polling_island_lock(bag->pi);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001894
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001895 if (item_type == POLL_OBJ_FD) {
1896 grpc_fd *fd = FD_FROM_PO(item);
1897 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1898 }
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001899
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001900 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001901 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001902 "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1903 "bag(%s): %p)",
1904 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1905 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001906 } else if (bag->pi == NULL) {
1907 /* GPR_ASSERT(item->pi != NULL) */
1908 /* Make pi_new to point to latest pi */
1909 pi_new = polling_island_lock(item->pi);
1910 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001911 GRPC_POLLING_TRACE(
1912 "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1913 "bag(%s): %p)",
1914 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1915 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001916 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001917 pi_new = polling_island_merge(item->pi, bag->pi, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001918 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001919 "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1920 "bag(%s): %p)",
1921 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1922 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001923 }
1924
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001925 /* At this point, pi_new is the polling island that both item->pi and bag->pi
1926 MUST be pointing to */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001927
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001928 if (item->pi != pi_new) {
1929 PI_ADD_REF(pi_new, poll_obj_string(item_type));
1930 if (item->pi != NULL) {
1931 PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001932 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001933 item->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001934 }
1935
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001936 if (bag->pi != pi_new) {
1937 PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1938 if (bag->pi != NULL) {
1939 PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001940 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001941 bag->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001942 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001943
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001944 gpr_mu_unlock(&item->mu);
1945 gpr_mu_unlock(&bag->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001946
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001947 GRPC_LOG_IF_ERROR("add_poll_object", error);
1948 GPR_TIMER_END("add_poll_object", 0);
1949}
Craig Tiller57726ca2016-09-12 11:59:45 -07001950
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001951static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1952 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001953 add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001954 POLL_OBJ_FD);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001955}
1956
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001957/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001958 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001959 */
1960
1961static grpc_pollset_set *pollset_set_create(void) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001962 grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001963 gpr_mu_init(&pss->po.mu);
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001964 pss->po.pi = NULL;
1965#ifdef PO_DEBUG
1966 pss->po.obj_type = POLL_OBJ_POLLSET_SET;
1967#endif
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001968 return pss;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001969}
1970
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001971static void pollset_set_destroy(grpc_pollset_set *pss) {
1972 gpr_mu_destroy(&pss->po.mu);
1973
1974 if (pss->po.pi != NULL) {
1975 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1976 PI_UNREF(&exec_ctx, pss->po.pi, "pss_destroy");
1977 grpc_exec_ctx_finish(&exec_ctx);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001978 }
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001979
1980 gpr_free(pss);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001981}
1982
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001983static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1984 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001985 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001986 POLL_OBJ_FD);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001987}
1988
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001989static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1990 grpc_fd *fd) {
1991 /* Nothing to do */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001992}
1993
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001994static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001995 grpc_pollset_set *pss, grpc_pollset *ps) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001996 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001997 POLL_OBJ_POLLSET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001998}
1999
2000static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002001 grpc_pollset_set *pss, grpc_pollset *ps) {
2002 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002003}
2004
2005static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
2006 grpc_pollset_set *bag,
2007 grpc_pollset_set *item) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08002008 add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002009 POLL_OBJ_POLLSET_SET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002010}
2011
2012static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
2013 grpc_pollset_set *bag,
2014 grpc_pollset_set *item) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002015 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002016}
2017
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002018/* Test helper functions
2019 * */
2020void *grpc_fd_get_polling_island(grpc_fd *fd) {
2021 polling_island *pi;
2022
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002023 gpr_mu_lock(&fd->po.mu);
2024 pi = fd->po.pi;
2025 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002026
2027 return pi;
2028}
2029
2030void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
2031 polling_island *pi;
2032
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002033 gpr_mu_lock(&ps->po.mu);
2034 pi = ps->po.pi;
2035 gpr_mu_unlock(&ps->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002036
2037 return pi;
2038}
2039
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002040bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002041 polling_island *p1 = p;
2042 polling_island *p2 = q;
2043
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07002044 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
2045 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002046 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07002047 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002048
2049 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002050}
2051
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002052/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002053 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002054 */
2055
2056static void shutdown_engine(void) {
2057 fd_global_shutdown();
2058 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07002059 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002060}
2061
2062static const grpc_event_engine_vtable vtable = {
2063 .pollset_size = sizeof(grpc_pollset),
2064
2065 .fd_create = fd_create,
2066 .fd_wrapped_fd = fd_wrapped_fd,
2067 .fd_orphan = fd_orphan,
2068 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07002069 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002070 .fd_notify_on_read = fd_notify_on_read,
2071 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002072 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07002073 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002074
2075 .pollset_init = pollset_init,
2076 .pollset_shutdown = pollset_shutdown,
2077 .pollset_reset = pollset_reset,
2078 .pollset_destroy = pollset_destroy,
2079 .pollset_work = pollset_work,
2080 .pollset_kick = pollset_kick,
2081 .pollset_add_fd = pollset_add_fd,
2082
2083 .pollset_set_create = pollset_set_create,
2084 .pollset_set_destroy = pollset_set_destroy,
2085 .pollset_set_add_pollset = pollset_set_add_pollset,
2086 .pollset_set_del_pollset = pollset_set_del_pollset,
2087 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
2088 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
2089 .pollset_set_add_fd = pollset_set_add_fd,
2090 .pollset_set_del_fd = pollset_set_del_fd,
2091
2092 .kick_poller = kick_poller,
2093
Craig Tillerd8a3c042016-09-09 12:42:37 -07002094 .workqueue_ref = workqueue_ref,
2095 .workqueue_unref = workqueue_unref,
Craig Tiller91031da2016-12-28 15:44:25 -08002096 .workqueue_scheduler = workqueue_scheduler,
Craig Tillerd8a3c042016-09-09 12:42:37 -07002097
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002098 .shutdown_engine = shutdown_engine,
2099};
2100
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002101/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
2102 * Create a dummy epoll_fd to make sure epoll support is available */
2103static bool is_epoll_available() {
2104 int fd = epoll_create1(EPOLL_CLOEXEC);
2105 if (fd < 0) {
2106 gpr_log(
2107 GPR_ERROR,
2108 "epoll_create1 failed with error: %d. Not using epoll polling engine",
2109 fd);
2110 return false;
2111 }
2112 close(fd);
2113 return true;
2114}
2115
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002116const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002117 /* If use of signals is disabled, we cannot use epoll engine*/
2118 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
2119 return NULL;
2120 }
2121
Ken Paysoncd7d0472016-10-11 12:24:20 -07002122 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07002123 return NULL;
2124 }
2125
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002126 if (!is_epoll_available()) {
2127 return NULL;
2128 }
2129
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002130 if (!is_grpc_wakeup_signal_initialized) {
Sree Kuchibhotlabd48c912016-09-27 16:48:25 -07002131 grpc_use_signal(SIGRTMIN + 6);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002132 }
2133
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002134 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07002135
2136 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
2137 return NULL;
2138 }
2139
2140 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
2141 polling_island_global_init())) {
2142 return NULL;
2143 }
2144
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002145 return &vtable;
2146}
2147
murgatroid99623dd4f2016-08-08 17:31:27 -07002148#else /* defined(GRPC_LINUX_EPOLL) */
2149#if defined(GRPC_POSIX_SOCKET)
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002150#include "src/core/lib/iomgr/ev_posix.h"
murgatroid99623dd4f2016-08-08 17:31:27 -07002151/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002152 * NULL */
2153const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
murgatroid99623dd4f2016-08-08 17:31:27 -07002154#endif /* defined(GRPC_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002155
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002156void grpc_use_signal(int signum) {}
murgatroid99623dd4f2016-08-08 17:31:27 -07002157#endif /* !defined(GRPC_LINUX_EPOLL) */