blob: 1924e76f13a471f4809b186a9430c098e82f03b5 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid9954070892016-08-08 17:01:18 -070034#include "src/core/lib/iomgr/port.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070036/* This polling engine is only relevant on linux kernels supporting epoll() */
murgatroid99623dd4f2016-08-08 17:31:27 -070037#ifdef GRPC_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070044#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070045#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070060#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070061#include "src/core/lib/profiling/timers.h"
62#include "src/core/lib/support/block_annotate.h"
63
Sree Kuchibhotla34217242016-06-29 00:19:07 -070064/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
65static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070066#define GRPC_POLLING_TRACE(fmt, ...) \
67 if (grpc_polling_trace) { \
68 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
69 }
70
Sree Kuchibhotla82d73412017-02-09 18:27:45 -080071/* Uncomment the following to enable extra checks on poll_object operations */
Sree Kuchibhotlae6f516e2016-12-08 12:20:23 -080072/* #define PO_DEBUG */
73
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070074static int grpc_wakeup_signal = -1;
75static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070076
Craig Tillerb4b8e1e2016-11-28 07:33:13 -080077/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
78 * sure to wake up one polling thread (which can wake up other threads if
79 * needed) */
80static grpc_wakeup_fd global_wakeup_fd;
81
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070082/* Implements the function defined in grpc_posix.h. This function might be
83 * called before even calling grpc_init() to set either a different signal to
84 * use. If signum == -1, then the use of signals is disabled */
85void grpc_use_signal(int signum) {
86 grpc_wakeup_signal = signum;
87 is_grpc_wakeup_signal_initialized = true;
88
89 if (grpc_wakeup_signal < 0) {
90 gpr_log(GPR_INFO,
91 "Use of signals is disabled. Epoll engine will not be used");
92 } else {
93 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
94 grpc_wakeup_signal);
95 }
96}
97
98struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070099
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800100typedef enum {
101 POLL_OBJ_FD,
102 POLL_OBJ_POLLSET,
103 POLL_OBJ_POLLSET_SET
104} poll_obj_type;
105
106typedef struct poll_obj {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -0800107#ifdef PO_DEBUG
108 poll_obj_type obj_type;
109#endif
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800110 gpr_mu mu;
111 struct polling_island *pi;
112} poll_obj;
113
114const char *poll_obj_string(poll_obj_type po_type) {
115 switch (po_type) {
116 case POLL_OBJ_FD:
117 return "fd";
118 case POLL_OBJ_POLLSET:
119 return "pollset";
120 case POLL_OBJ_POLLSET_SET:
121 return "pollset_set";
122 }
123
124 GPR_UNREACHABLE_CODE(return "UNKNOWN");
125}
126
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700127/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700128 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700129 */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800130
131#define FD_FROM_PO(po) ((grpc_fd *)(po))
132
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700133struct grpc_fd {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800134 poll_obj po;
135
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700136 int fd;
137 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700138 bit 0 : 1=Active / 0=Orphaned
139 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700140 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700141 gpr_atm refst;
142
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -0800143 /* Internally stores data of type (grpc_error *). If the FD is shutdown, this
144 contains reason for shutdown (i.e a pointer to grpc_error) ORed with
145 FD_SHUTDOWN_BIT. Since address allocations are word-aligned, the lower bit
146 of (grpc_error *) addresses is guaranteed to be zero. Even if the
147 (grpc_error *), is of special types like GRPC_ERROR_NONE, GRPC_ERROR_OOM
148 etc, the lower bit is guaranteed to be zero.
149
150 Once an fd is shutdown, any pending or future read/write closures on the
151 fd should fail */
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800152 gpr_atm shutdown_error;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700153
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800154 /* The fd is either closed or we relinquished control of it. In either
155 cases, this indicates that the 'fd' on this structure is no longer
156 valid */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700157 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700158
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800159 /* Closures to call when the fd is readable or writable respectively. These
160 fields contain one of the following values:
161 CLOSURE_READY : The fd has an I/O event of interest but there is no
162 closure yet to execute
163
164 CLOSURE_NOT_READY : The fd has no I/O event of interest
165
Sree Kuchibhotlaa70ccb62017-02-13 23:16:52 -0800166 closure ptr : The closure to be executed when the fd has an I/O
167 event of interest
168
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -0800169 shutdown_error | FD_SHUTDOWN_BIT :
170 'shutdown_error' field ORed with FD_SHUTDOWN_BIT.
Sree Kuchibhotlaa70ccb62017-02-13 23:16:52 -0800171 This indicates that the fd is shutdown. Since all
172 memory allocations are word-aligned, the lower two
173 bits of the shutdown_error pointer are always 0. So
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -0800174 it is safe to OR these with FD_SHUTDOWN_BIT
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800175
176 Valid state transitions:
177
178 <closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY
179 | | ^ | ^ | |
180 | | | | | | |
181 | +--------------4----------+ 6 +---------2---------------+ |
182 | | |
183 | v |
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -0800184 +-----5-------> [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800185
186 For 1, 4 : See set_ready() function
187 For 2, 3 : See notify_on() function
188 For 5,6,7: See set_shutdown() function */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800189 gpr_atm read_closure;
190 gpr_atm write_closure;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700191
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700192 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700193 grpc_closure *on_done_closure;
194
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800195 /* The pollset that last noticed that the fd is readable. The actual type
196 * stored in this is (grpc_pollset *) */
197 gpr_atm read_notifier_pollset;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700198
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700199 grpc_iomgr_object iomgr_object;
200};
201
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700202/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700203// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700204#ifdef GRPC_FD_REF_COUNT_DEBUG
205static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
206static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
207 int line);
208#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
209#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
210#else
211static void fd_ref(grpc_fd *fd);
212static void fd_unref(grpc_fd *fd);
213#define GRPC_FD_REF(fd, reason) fd_ref(fd)
214#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
215#endif
216
217static void fd_global_init(void);
218static void fd_global_shutdown(void);
219
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800220#define CLOSURE_NOT_READY ((gpr_atm)0)
Sree Kuchibhotlaff4b25d2017-02-16 15:07:11 -0800221#define CLOSURE_READY ((gpr_atm)2)
Sree Kuchibhotlaa70ccb62017-02-13 23:16:52 -0800222
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -0800223#define FD_SHUTDOWN_BIT 1
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700224
225/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700226 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700227 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700228
Craig Tillerd8a3c042016-09-09 12:42:37 -0700229#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700230
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700231#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700232#define PI_UNREF(exec_ctx, p, r) \
233 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700234
Craig Tillerd8a3c042016-09-09 12:42:37 -0700235#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700236
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700237#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700238#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700239
Yuchen Zeng362ac1b2016-09-13 16:01:31 -0700240#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700241
Craig Tiller460502e2016-10-13 10:02:08 -0700242/* This is also used as grpc_workqueue (by directly casing it) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700243typedef struct polling_island {
Craig Tiller91031da2016-12-28 15:44:25 -0800244 grpc_closure_scheduler workqueue_scheduler;
245
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700246 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700247 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
248 the refcount.
249 Once the ref count becomes zero, this structure is destroyed which means
250 we should ensure that there is never a scenario where a PI_ADD_REF() is
251 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700252 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700253
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700254 /* Pointer to the polling_island this merged into.
255 * merged_to value is only set once in polling_island's lifetime (and that too
256 * only if the island is merged with another island). Because of this, we can
257 * use gpr_atm type here so that we can do atomic access on this and reduce
258 * lock contention on 'mu' mutex.
259 *
260 * Note that if this field is not NULL (i.e not 0), all the remaining fields
261 * (except mu and ref_count) are invalid and must be ignored. */
262 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700263
Craig Tiller460502e2016-10-13 10:02:08 -0700264 /* Number of threads currently polling on this island */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700265 gpr_atm poller_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700266 /* Mutex guarding the read end of the workqueue (must be held to pop from
267 * workqueue_items) */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700268 gpr_mu workqueue_read_mu;
Craig Tiller460502e2016-10-13 10:02:08 -0700269 /* Queue of closures to be executed */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700270 gpr_mpscq workqueue_items;
Craig Tiller460502e2016-10-13 10:02:08 -0700271 /* Count of items in workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700272 gpr_atm workqueue_item_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700273 /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700274 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700275
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700276 /* The fd of the underlying epoll set */
277 int epoll_fd;
278
279 /* The file descriptors in the epoll set */
280 size_t fd_cnt;
281 size_t fd_capacity;
282 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700283} polling_island;
284
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700285/*******************************************************************************
286 * Pollset Declarations
287 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700288struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700289 /* Thread id of this worker */
290 pthread_t pt_id;
291
292 /* Used to prevent a worker from getting kicked multiple times */
293 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700294 struct grpc_pollset_worker *next;
295 struct grpc_pollset_worker *prev;
296};
297
298struct grpc_pollset {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800299 poll_obj po;
300
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700301 grpc_pollset_worker root_worker;
302 bool kicked_without_pollers;
303
304 bool shutting_down; /* Is the pollset shutting down ? */
305 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
306 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700307};
308
309/*******************************************************************************
310 * Pollset-set Declarations
311 */
312struct grpc_pollset_set {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800313 poll_obj po;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700314};
315
316/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700317 * Common helpers
318 */
319
Craig Tillerf975f742016-07-01 14:56:27 -0700320static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700321 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700322 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700323 if (*composite == GRPC_ERROR_NONE) {
Noah Eisen3005ce82017-03-14 13:38:41 -0700324 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700325 }
326 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700327 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700328}
329
330/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700331 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700332 */
333
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700334/* The wakeup fd that is used to wake up all threads in a Polling island. This
335 is useful in the polling island merge operation where we need to wakeup all
336 the threads currently polling the smaller polling island (so that they can
337 start polling the new/merged polling island)
338
339 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
340 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
341static grpc_wakeup_fd polling_island_wakeup_fd;
342
Craig Tiller2e620132016-10-10 15:27:44 -0700343/* The polling island being polled right now.
344 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700345static __thread polling_island *g_current_thread_polling_island;
346
Craig Tillerb39307d2016-06-30 15:39:13 -0700347/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700348static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Craig Tiller91031da2016-12-28 15:44:25 -0800349static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
350 grpc_error *error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700351
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700352#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700353/* Currently TSAN may incorrectly flag data races between epoll_ctl and
354 epoll_wait for any grpc_fd structs that are added to the epoll set via
355 epoll_ctl and are returned (within a very short window) via epoll_wait().
356
357 To work-around this race, we establish a happens-before relation between
358 the code just-before epoll_ctl() and the code after epoll_wait() by using
359 this atomic */
360gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700361#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700362
Craig Tiller91031da2016-12-28 15:44:25 -0800363static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800364 workqueue_enqueue, workqueue_enqueue, "workqueue"};
Craig Tiller91031da2016-12-28 15:44:25 -0800365
Craig Tillerb39307d2016-06-30 15:39:13 -0700366static void pi_add_ref(polling_island *pi);
367static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700368
Craig Tillerd8a3c042016-09-09 12:42:37 -0700369#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700370static void pi_add_ref_dbg(polling_island *pi, const char *reason,
371 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700372 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700373 pi_add_ref(pi);
374 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
375 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700376}
377
Craig Tillerb39307d2016-06-30 15:39:13 -0700378static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700379 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700380 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700381 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700382 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700383 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700384}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700385
386static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
387 const char *file, int line,
388 const char *reason) {
389 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700390 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700391 }
392 return workqueue;
393}
394
395static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
396 const char *file, int line, const char *reason) {
397 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700398 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700399 }
400}
401#else
402static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
403 if (workqueue != NULL) {
404 pi_add_ref((polling_island *)workqueue);
405 }
406 return workqueue;
407}
408
409static void workqueue_unref(grpc_exec_ctx *exec_ctx,
410 grpc_workqueue *workqueue) {
411 if (workqueue != NULL) {
412 pi_unref(exec_ctx, (polling_island *)workqueue);
413 }
414}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700415#endif
416
Craig Tiller15007612016-07-06 09:36:16 -0700417static void pi_add_ref(polling_island *pi) {
418 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
419}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700420
Craig Tillerb39307d2016-06-30 15:39:13 -0700421static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700422 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700423 Note that this deletion not be done under a lock. Once the ref count goes
424 to zero, we are guaranteed that no one else holds a reference to the
425 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700426
427 Also, if we are deleting the polling island and the merged_to field is
428 non-empty, we should remove a ref to the merged_to polling island
429 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700430 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
431 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
432 polling_island_delete(exec_ctx, pi);
433 if (next != NULL) {
434 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700435 }
436 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700437}
438
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700439/* The caller is expected to hold pi->mu lock before calling this function */
440static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700441 size_t fd_count, bool add_fd_refs,
442 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700443 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700444 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700445 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700446 char *err_msg;
447 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700448
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700449#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700450 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700451 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700452#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700453
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700454 for (i = 0; i < fd_count; i++) {
455 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
456 ev.data.ptr = fds[i];
457 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700458
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700459 if (err < 0) {
460 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700461 gpr_asprintf(
462 &err_msg,
463 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
464 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
465 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
466 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700467 }
468
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700469 continue;
470 }
471
472 if (pi->fd_cnt == pi->fd_capacity) {
473 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
474 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
475 }
476
477 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700478 if (add_fd_refs) {
479 GRPC_FD_REF(fds[i], "polling_island");
480 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700481 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700482}
483
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700484/* The caller is expected to hold pi->mu before calling this */
485static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700486 grpc_wakeup_fd *wakeup_fd,
487 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700488 struct epoll_event ev;
489 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700490 char *err_msg;
491 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700492
493 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
494 ev.data.ptr = wakeup_fd;
495 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
496 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700497 if (err < 0 && errno != EEXIST) {
498 gpr_asprintf(&err_msg,
499 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
500 "error: %d (%s)",
Craig Tiller1fa9ddb2016-11-28 08:19:37 -0800501 pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd),
502 errno, strerror(errno));
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700503 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
504 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700505 }
506}
507
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700508/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700509static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700510 bool remove_fd_refs,
511 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700512 int err;
513 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700514 char *err_msg;
515 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700516
517 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700518 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700519 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700520 gpr_asprintf(&err_msg,
521 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
522 "error: %d (%s)",
523 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
524 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
525 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700526 }
527
528 if (remove_fd_refs) {
529 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700530 }
531 }
532
533 pi->fd_cnt = 0;
534}
535
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700536/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700537static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700538 bool is_fd_closed,
539 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700540 int err;
541 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700542 char *err_msg;
543 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700544
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700545 /* If fd is already closed, then it would have been automatically been removed
546 from the epoll set */
547 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700548 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
549 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700550 gpr_asprintf(
551 &err_msg,
552 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
553 pi->epoll_fd, fd->fd, errno, strerror(errno));
554 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
555 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700556 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700557 }
558
559 for (i = 0; i < pi->fd_cnt; i++) {
560 if (pi->fds[i] == fd) {
561 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700562 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700563 break;
564 }
565 }
566}
567
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700568/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700569static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
570 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700571 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700572 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700573 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700574
Craig Tillerb39307d2016-06-30 15:39:13 -0700575 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700576
Craig Tillerb39307d2016-06-30 15:39:13 -0700577 pi = gpr_malloc(sizeof(*pi));
Craig Tiller91031da2016-12-28 15:44:25 -0800578 pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
Craig Tillerb39307d2016-06-30 15:39:13 -0700579 gpr_mu_init(&pi->mu);
580 pi->fd_cnt = 0;
581 pi->fd_capacity = 0;
582 pi->fds = NULL;
583 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700584
585 gpr_mu_init(&pi->workqueue_read_mu);
586 gpr_mpscq_init(&pi->workqueue_items);
587 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700588
Craig Tiller15007612016-07-06 09:36:16 -0700589 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700590 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700591 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700592
Craig Tillerd8a3c042016-09-09 12:42:37 -0700593 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
594 err_desc)) {
595 goto done;
596 }
597
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700598 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700599
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700600 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700601 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
602 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700603 }
604
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800605 polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700606 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700607
608 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700609 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700610 }
611
Craig Tillerb39307d2016-06-30 15:39:13 -0700612done:
613 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700614 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700615 pi = NULL;
616 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700617 return pi;
618}
619
Craig Tillerb39307d2016-06-30 15:39:13 -0700620static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700621 GPR_ASSERT(pi->fd_cnt == 0);
622
Craig Tiller0a06cd72016-07-14 13:21:24 -0700623 if (pi->epoll_fd >= 0) {
624 close(pi->epoll_fd);
625 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700626 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
627 gpr_mu_destroy(&pi->workqueue_read_mu);
628 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700629 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700630 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700631 gpr_free(pi->fds);
632 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700633}
634
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700635/* Attempts to gets the last polling island in the linked list (liked by the
636 * 'merged_to' field). Since this does not lock the polling island, there are no
637 * guarantees that the island returned is the last island */
638static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
639 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
640 while (next != NULL) {
641 pi = next;
642 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
643 }
644
645 return pi;
646}
647
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700648/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700649 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700650 returned polling island's mu.
651 Usage: To lock/unlock polling island "pi", do the following:
652 polling_island *pi_latest = polling_island_lock(pi);
653 ...
654 ... critical section ..
655 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700656 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
657static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700658 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700659
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700660 while (true) {
661 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
662 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700663 /* Looks like 'pi' is the last node in the linked list but unless we check
664 this by holding the pi->mu lock, we cannot be sure (i.e without the
665 pi->mu lock, we don't prevent island merges).
666 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700667 gpr_mu_lock(&pi->mu);
668 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
669 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700670 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700671 break;
672 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700673
674 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
675 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700676 gpr_mu_unlock(&pi->mu);
677 }
678
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700679 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700680 }
681
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700682 return pi;
683}
684
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700685/* Gets the lock on the *latest* polling islands in the linked lists pointed by
686 *p and *q (and also updates *p and *q to point to the latest polling islands)
687
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700688 This function is needed because calling the following block of code to obtain
689 locks on polling islands (*p and *q) is prone to deadlocks.
690 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700691 polling_island_lock(*p, true);
692 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700693 }
694
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700695 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700696 polling_island *p1;
697 polling_island *p2;
698 ..
699 polling_island_lock_pair(&p1, &p2);
700 ..
701 .. Critical section with both p1 and p2 locked
702 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700703 // Release locks: Always call polling_island_unlock_pair() to release locks
704 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700705*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700706static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700707 polling_island *pi_1 = *p;
708 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700709 polling_island *next_1 = NULL;
710 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700711
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700712 /* The algorithm is simple:
713 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
714 keep updating pi_1 and pi_2)
715 - Then obtain locks on the islands by following a lock order rule of
716 locking polling_island with lower address first
717 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
718 pointing to the same island. If that is the case, we can just call
719 polling_island_lock()
720 - After obtaining both the locks, double check that the polling islands
721 are still the last polling islands in their respective linked lists
722 (this is because there might have been polling island merges before
723 we got the lock)
724 - If the polling islands are the last islands, we are done. If not,
725 release the locks and continue the process from the first step */
726 while (true) {
727 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
728 while (next_1 != NULL) {
729 pi_1 = next_1;
730 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700731 }
732
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700733 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
734 while (next_2 != NULL) {
735 pi_2 = next_2;
736 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
737 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700738
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700739 if (pi_1 == pi_2) {
740 pi_1 = pi_2 = polling_island_lock(pi_1);
741 break;
742 }
743
744 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700745 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700746 gpr_mu_lock(&pi_2->mu);
747 } else {
748 gpr_mu_lock(&pi_2->mu);
749 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700750 }
751
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700752 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
753 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
754 if (next_1 == NULL && next_2 == NULL) {
755 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700756 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700757
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700758 gpr_mu_unlock(&pi_1->mu);
759 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700760 }
761
762 *p = pi_1;
763 *q = pi_2;
764}
765
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700766static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
767 if (p == q) {
768 gpr_mu_unlock(&p->mu);
769 } else {
770 gpr_mu_unlock(&p->mu);
771 gpr_mu_unlock(&q->mu);
772 }
773}
774
Craig Tillerd8a3c042016-09-09 12:42:37 -0700775static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700776 /* If this thread is the current poller, then it may be that it's about to
777 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700778 bool is_current_poller = (g_current_thread_polling_island == pi);
779 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
780 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700781 /* Only issue a wakeup if it's likely that some poller could come in and take
782 it right now. Note that since we do an anticipatory mpscq_pop every poll
783 loop, it's ok if we miss the wakeup here, as we'll get the work item when
784 the next poller enters anyway. */
785 if (current_pollers > min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700786 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
787 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
788 }
789}
790
791static void workqueue_move_items_to_parent(polling_island *q) {
792 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
793 if (p == NULL) {
794 return;
795 }
796 gpr_mu_lock(&q->workqueue_read_mu);
797 int num_added = 0;
798 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
799 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
800 if (n != NULL) {
801 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
802 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
803 gpr_mpscq_push(&p->workqueue_items, n);
804 num_added++;
805 }
806 }
807 gpr_mu_unlock(&q->workqueue_read_mu);
808 if (num_added > 0) {
809 workqueue_maybe_wakeup(p);
810 }
811 workqueue_move_items_to_parent(p);
812}
813
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700814static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700815 polling_island *q,
816 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700817 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700818 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700819
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700820 if (p != q) {
821 /* Make sure that p points to the polling island with fewer fds than q */
822 if (p->fd_cnt > q->fd_cnt) {
823 GPR_SWAP(polling_island *, p, q);
824 }
825
826 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
827 Note that the refcounts on the fds being moved will not change here.
828 This is why the last param in the following two functions is 'false') */
829 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
830 polling_island_remove_all_fds_locked(p, false, error);
831
832 /* Wakeup all the pollers (if any) on p so that they pickup this change */
833 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
834
835 /* Add the 'merged_to' link from p --> q */
836 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
837 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700838
Harvey Tuchdaa9f452016-11-21 15:42:49 -0500839 workqueue_move_items_to_parent(p);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700840 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700841 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700842
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700843 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700844
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700845 /* Return the merged polling island (Note that no merge would have happened
846 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700847 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700848}
849
Craig Tiller91031da2016-12-28 15:44:25 -0800850static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
Craig Tillerd8a3c042016-09-09 12:42:37 -0700851 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700852 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller91031da2016-12-28 15:44:25 -0800853 grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700854 /* take a ref to the workqueue: otherwise it can happen that whatever events
855 * this kicks off ends up destroying the workqueue before this function
856 * completes */
857 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
858 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700859 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
860 closure->error_data.error = error;
861 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
862 if (last == 0) {
863 workqueue_maybe_wakeup(pi);
864 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700865 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700866 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
867 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700868}
869
Craig Tiller91031da2016-12-28 15:44:25 -0800870static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
871 polling_island *pi = (polling_island *)workqueue;
Craig Tiller801c6cc2017-01-03 08:13:13 -0800872 return workqueue == NULL ? grpc_schedule_on_exec_ctx
873 : &pi->workqueue_scheduler;
Craig Tiller91031da2016-12-28 15:44:25 -0800874}
875
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700876static grpc_error *polling_island_global_init() {
877 grpc_error *error = GRPC_ERROR_NONE;
878
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700879 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
880 if (error == GRPC_ERROR_NONE) {
881 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
882 }
883
884 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700885}
886
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700887static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700888 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700889}
890
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700891/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700892 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700893 */
894
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700895/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700896 * but instead so that implementations with multiple threads in (for example)
897 * epoll_wait deal with the race between pollset removal and incoming poll
898 * notifications.
899 *
900 * The problem is that the poller ultimately holds a reference to this
901 * object, so it is very difficult to know when is safe to free it, at least
902 * without some expensive synchronization.
903 *
904 * If we keep the object freelisted, in the worst case losing this race just
905 * becomes a spurious read notification on a reused fd.
906 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700907
908/* The alarm system needs to be able to wakeup 'some poller' sometimes
909 * (specifically when a new alarm needs to be triggered earlier than the next
910 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
911 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700912
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700913static grpc_fd *fd_freelist = NULL;
914static gpr_mu fd_freelist_mu;
915
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700916#ifdef GRPC_FD_REF_COUNT_DEBUG
917#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
918#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
919static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
920 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700921 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
922 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700923 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
924#else
925#define REF_BY(fd, n, reason) ref_by(fd, n)
926#define UNREF_BY(fd, n, reason) unref_by(fd, n)
927static void ref_by(grpc_fd *fd, int n) {
928#endif
929 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
930}
931
932#ifdef GRPC_FD_REF_COUNT_DEBUG
933static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
934 int line) {
935 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700936 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
937 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700938 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
939#else
940static void unref_by(grpc_fd *fd, int n) {
941 gpr_atm old;
942#endif
943 old = gpr_atm_full_fetch_add(&fd->refst, -n);
944 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700945 /* Add the fd to the freelist */
946 gpr_mu_lock(&fd_freelist_mu);
947 fd->freelist_next = fd_freelist;
948 fd_freelist = fd;
949 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800950
Sree Kuchibhotlaa70ccb62017-02-13 23:16:52 -0800951 grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -0800952 /* Clear the least significant bit if it set (in case fd was shutdown) */
953 err = (grpc_error *)((intptr_t)err & ~FD_SHUTDOWN_BIT);
Sree Kuchibhotla8b8cbed2017-02-09 21:31:27 -0800954 GRPC_ERROR_UNREF(err);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700955
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700956 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700957 } else {
958 GPR_ASSERT(old > n);
959 }
960}
961
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700962/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700963#ifdef GRPC_FD_REF_COUNT_DEBUG
964static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
965 int line) {
966 ref_by(fd, 2, reason, file, line);
967}
968
969static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
970 int line) {
971 unref_by(fd, 2, reason, file, line);
972}
973#else
974static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700975static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
976#endif
977
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700978static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
979
980static void fd_global_shutdown(void) {
981 gpr_mu_lock(&fd_freelist_mu);
982 gpr_mu_unlock(&fd_freelist_mu);
983 while (fd_freelist != NULL) {
984 grpc_fd *fd = fd_freelist;
985 fd_freelist = fd_freelist->freelist_next;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800986 gpr_mu_destroy(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700987 gpr_free(fd);
988 }
989 gpr_mu_destroy(&fd_freelist_mu);
990}
991
992static grpc_fd *fd_create(int fd, const char *name) {
993 grpc_fd *new_fd = NULL;
994
995 gpr_mu_lock(&fd_freelist_mu);
996 if (fd_freelist != NULL) {
997 new_fd = fd_freelist;
998 fd_freelist = fd_freelist->freelist_next;
999 }
1000 gpr_mu_unlock(&fd_freelist_mu);
1001
1002 if (new_fd == NULL) {
1003 new_fd = gpr_malloc(sizeof(grpc_fd));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001004 gpr_mu_init(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001005 }
1006
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001007 /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
1008 * is a newly created fd (or an fd we got from the freelist), no one else
1009 * would be holding a lock to it anyway. */
1010 gpr_mu_lock(&new_fd->po.mu);
1011 new_fd->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001012#ifdef PO_DEBUG
1013 new_fd->po.obj_type = POLL_OBJ_FD;
1014#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001015
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -07001016 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001017 new_fd->fd = fd;
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001018 gpr_atm_no_barrier_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001019 new_fd->orphaned = false;
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001020 gpr_atm_no_barrier_store(&new_fd->read_closure, CLOSURE_NOT_READY);
1021 gpr_atm_no_barrier_store(&new_fd->write_closure, CLOSURE_NOT_READY);
1022 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001023
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001024 new_fd->freelist_next = NULL;
1025 new_fd->on_done_closure = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001026
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001027 gpr_mu_unlock(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001028
1029 char *fd_name;
1030 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
1031 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001032#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -07001033 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001034#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -07001035 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001036 return new_fd;
1037}
1038
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001039static int fd_wrapped_fd(grpc_fd *fd) {
1040 int ret_fd = -1;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001041 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001042 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001043 ret_fd = fd->fd;
1044 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001045 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001046
1047 return ret_fd;
1048}
1049
1050static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1051 grpc_closure *on_done, int *release_fd,
1052 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001053 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001054 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -07001055 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001056
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001057 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001058 fd->on_done_closure = on_done;
1059
1060 /* If release_fd is not NULL, we should be relinquishing control of the file
1061 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001062 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001063 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001064 } else {
1065 close(fd->fd);
1066 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001067 }
1068
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001069 fd->orphaned = true;
1070
1071 /* Remove the active status but keep referenced. We want this grpc_fd struct
1072 to be alive (and not added to freelist) until the end of this function */
1073 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001074
1075 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001076 - Get a lock on the latest polling island (i.e the last island in the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001077 linked list pointed by fd->po.pi). This is the island that
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001078 would actually contain the fd
1079 - Remove the fd from the latest polling island
1080 - Unlock the latest polling island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001081 - Set fd->po.pi to NULL (but remove the ref on the polling island
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001082 before doing this.) */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001083 if (fd->po.pi != NULL) {
1084 polling_island *pi_latest = polling_island_lock(fd->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001085 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001086 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001087
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001088 unref_pi = fd->po.pi;
1089 fd->po.pi = NULL;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001090 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001091
Craig Tiller91031da2016-12-28 15:44:25 -08001092 grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001093
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001094 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001095 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001096 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001097 /* Unref stale polling island here, outside the fd lock above.
1098 The polling island owns a workqueue which owns an fd, and unreffing
1099 inside the lock can cause an eventual lock loop that makes TSAN very
1100 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001101 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1102 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001103 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001104 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001105}
1106
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001107static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
1108 grpc_closure *closure) {
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001109 while (true) {
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001110 /* Fast-path: CLOSURE_NOT_READY -> <closure>.
1111 The 'release' cas here matches the 'acquire' load in set_ready and
1112 set_shutdown ensuring that the closure (scheduled by set_ready or
1113 set_shutdown) happens-after the I/O event on the fd */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001114 if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001115 return; /* Fast-path successful. Return */
1116 }
1117
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001118 /* Slowpath. The 'acquire' load matches the 'release' cas in set_ready and
1119 set_shutdown */
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001120 gpr_atm curr = gpr_atm_acq_load(state);
1121 switch (curr) {
1122 case CLOSURE_NOT_READY: {
1123 break; /* retry */
1124 }
1125
1126 case CLOSURE_READY: {
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001127 /* Change the state to CLOSURE_NOT_READY. Schedule the closure if
1128 successful. If not, the state most likely transitioned to shutdown.
1129 We should retry.
1130
1131 This can be a no-barrier cas since the state is being transitioned to
1132 CLOSURE_NOT_READY; set_ready and set_shutdown do not schedule any
1133 closure when transitioning out of CLOSURE_NO_READY state (i.e there
1134 is no other code that needs to 'happen-after' this) */
1135 if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001136 grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
1137 return; /* Slow-path successful. Return */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001138 }
1139
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001140 break; /* retry */
1141 }
1142
1143 default: {
1144 /* 'curr' is either a closure or the fd is shutdown(in which case 'curr'
1145 contains a pointer to the shutdown-error). If the fd is shutdown,
1146 schedule the closure with the shutdown error */
1147 if ((curr & FD_SHUTDOWN_BIT) > 0) {
1148 grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT);
ncteisen4b36a3d2017-03-13 19:08:06 -07001149 grpc_closure_sched(exec_ctx, closure,
1150 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1151 "FD Shutdown", &shutdown_err, 1));
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001152 return;
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001153 }
1154
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001155 /* There is already a closure!. This indicates a bug in the code */
1156 gpr_log(GPR_ERROR,
1157 "notify_on called with a previous callback still pending");
1158 abort();
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001159 }
1160 }
1161 }
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001162
1163 GPR_UNREACHABLE_CODE(return );
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001164}
1165
1166static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
1167 grpc_error *shutdown_err) {
1168 /* Try the fast-path first (i.e expect the current value to be
1169 CLOSURE_NOT_READY */
1170 gpr_atm curr = CLOSURE_NOT_READY;
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -08001171 gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT;
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001172
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001173 while (true) {
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001174 /* The 'release' cas here matches the 'acquire' load in notify_on to ensure
1175 that the closure it schedules 'happens-after' the set_shutdown is called
1176 on the fd */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001177 if (gpr_atm_rel_cas(state, curr, new_state)) {
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001178 return; /* Fast-path successful. Return */
1179 }
1180
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001181 /* Fallback to slowpath. This 'acquire' load matches the 'release' cas in
1182 notify_on and set_ready */
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001183 curr = gpr_atm_acq_load(state);
1184 switch (curr) {
1185 case CLOSURE_READY: {
1186 break; /* retry */
1187 }
1188
1189 case CLOSURE_NOT_READY: {
1190 break; /* retry */
1191 }
1192
1193 default: {
1194 /* 'curr' is either a closure or the fd is already shutdown */
1195
1196 /* If fd is already shutdown, we are done */
1197 if ((curr & FD_SHUTDOWN_BIT) > 0) {
1198 return;
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001199 }
1200
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001201 /* Fd is not shutdown. Schedule the closure and move the state to
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001202 shutdown state. The 'release' cas here matches the 'acquire' load in
1203 notify_on to ensure that the closure it schedules 'happens-after'
1204 the set_shutdown is called on the fd */
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001205 if (gpr_atm_rel_cas(state, curr, new_state)) {
ncteisen4b36a3d2017-03-13 19:08:06 -07001206 grpc_closure_sched(exec_ctx, (grpc_closure *)curr,
1207 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1208 "FD Shutdown", &shutdown_err, 1));
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001209 return;
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001210 }
1211
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001212 /* 'curr' was a closure but now changed to a different state. We will
1213 have to retry */
1214 break;
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001215 }
1216 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001217 }
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001218
1219 GPR_UNREACHABLE_CODE(return );
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001220}
1221
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001222static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001223 /* Try an optimistic case first (i.e assume current state is
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001224 CLOSURE_NOT_READY).
1225
1226 This 'release' cas matches the 'acquire' load in notify_on ensuring that
1227 any closure (scheduled by notify_on) 'happens-after' the return from
1228 epoll_pwait */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001229 if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001230 return; /* early out */
1231 }
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001232
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001233 /* The 'acquire' load here matches the 'release' cas in notify_on and
1234 set_shutdown */
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001235 gpr_atm curr = gpr_atm_acq_load(state);
1236 switch (curr) {
1237 case CLOSURE_READY: {
1238 /* Already ready. We are done here */
1239 break;
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001240 }
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001241
1242 case CLOSURE_NOT_READY: {
1243 /* The state was not CLOSURE_NOT_READY when we checked initially at the
1244 beginning of this function but now it is CLOSURE_NOT_READY again.
1245 This is only possible if the state transitioned out of
1246 CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then
1247 back to CLOSURE_NOT_READY again (i.e after we entered this function,
1248 the fd became "ready" and the necessary actions were already done).
1249 So there is no need to make the state CLOSURE_READY now */
1250 break;
1251 }
1252
1253 default: {
1254 /* 'curr' is either a closure or the fd is shutdown */
1255 if ((curr & FD_SHUTDOWN_BIT) > 0) {
1256 /* The fd is shutdown. Do nothing */
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001257 } else if (gpr_atm_no_barrier_cas(state, curr, CLOSURE_NOT_READY)) {
1258 /* The cas above was no-barrier since the state is being transitioned to
1259 CLOSURE_NOT_READY; notify_on and set_shutdown do not schedule any
1260 closures when transitioning out of CLOSURE_NO_READY state (i.e there
1261 is no other code that needs to 'happen-after' this) */
1262
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001263 grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
1264 }
1265 /* else the state changed again (only possible by either a racing
1266 set_ready or set_shutdown functions. In both these cases, the closure
1267 would have been scheduled for execution. So we are done here */
1268 break;
1269 }
1270 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001271}
1272
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001273static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1274 grpc_fd *fd) {
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001275 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001276 return (grpc_pollset *)notifier;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001277}
1278
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001279static bool fd_is_shutdown(grpc_fd *fd) {
Sree Kuchibhotla99983382017-02-12 17:03:27 -08001280 grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001281 return (((intptr_t)err & FD_SHUTDOWN_BIT) > 0);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001282}
1283
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001284/* Might be called multiple times */
Craig Tillercda759d2017-01-27 11:37:37 -08001285static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -08001286 /* Store the shutdown error ORed with FD_SHUTDOWN_BIT in fd->shutdown_error */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001287 if (gpr_atm_rel_cas(&fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE,
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -08001288 (gpr_atm)why | FD_SHUTDOWN_BIT)) {
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001289 shutdown(fd->fd, SHUT_RDWR);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001290
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001291 set_shutdown(exec_ctx, fd, &fd->read_closure, why);
1292 set_shutdown(exec_ctx, fd, &fd->write_closure, why);
Craig Tillercda759d2017-01-27 11:37:37 -08001293 } else {
Sree Kuchibhotla4db8c822017-02-12 17:07:31 -08001294 /* Shutdown already called */
Craig Tillercda759d2017-01-27 11:37:37 -08001295 GRPC_ERROR_UNREF(why);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001296 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001297}
1298
1299static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1300 grpc_closure *closure) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001301 notify_on(exec_ctx, fd, &fd->read_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001302}
1303
1304static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1305 grpc_closure *closure) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001306 notify_on(exec_ctx, fd, &fd->write_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001307}
1308
Craig Tillerd6ba6192016-06-30 15:42:41 -07001309static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001310 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001311 grpc_workqueue *workqueue =
1312 GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001313 gpr_mu_unlock(&fd->po.mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001314 return workqueue;
1315}
Craig Tiller70bd4832016-06-30 14:20:46 -07001316
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001317/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001318 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001319 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001320GPR_TLS_DECL(g_current_thread_pollset);
1321GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001322static __thread bool g_initialized_sigmask;
1323static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001324
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001325static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001326#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001327 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001328#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001329}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001330
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001331static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001332
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001333/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001334static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001335 gpr_tls_init(&g_current_thread_pollset);
1336 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001337 poller_kick_init();
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001338 return grpc_wakeup_fd_init(&global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001339}
1340
1341static void pollset_global_shutdown(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001342 grpc_wakeup_fd_destroy(&global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001343 gpr_tls_destroy(&g_current_thread_pollset);
1344 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001345}
1346
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001347static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1348 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001349
1350 /* Kick the worker only if it was not already kicked */
1351 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1352 GRPC_POLLING_TRACE(
1353 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1354 (void *)worker, worker->pt_id);
1355 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1356 if (err_num != 0) {
1357 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1358 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001359 }
1360 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001361}
1362
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001363/* Return 1 if the pollset has active threads in pollset_work (pollset must
1364 * be locked) */
1365static int pollset_has_workers(grpc_pollset *p) {
1366 return p->root_worker.next != &p->root_worker;
1367}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001368
1369static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1370 worker->prev->next = worker->next;
1371 worker->next->prev = worker->prev;
1372}
1373
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001374static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1375 if (pollset_has_workers(p)) {
1376 grpc_pollset_worker *w = p->root_worker.next;
1377 remove_worker(p, w);
1378 return w;
1379 } else {
1380 return NULL;
1381 }
1382}
1383
1384static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1385 worker->next = &p->root_worker;
1386 worker->prev = worker->next->prev;
1387 worker->prev->next = worker->next->prev = worker;
1388}
1389
1390static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1391 worker->prev = &p->root_worker;
1392 worker->next = worker->prev->next;
1393 worker->prev->next = worker->next->prev = worker;
1394}
1395
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001396/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001397static grpc_error *pollset_kick(grpc_pollset *p,
1398 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001399 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001400 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001401 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001402 grpc_pollset_worker *worker = specific_worker;
1403 if (worker != NULL) {
1404 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001405 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001406 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001407 for (worker = p->root_worker.next; worker != &p->root_worker;
1408 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001409 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001410 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001411 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001412 }
Craig Tillera218a062016-06-26 09:58:37 -07001413 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001414 } else {
1415 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001416 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001417 } else {
1418 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001419 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001420 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001421 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001422 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001423 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1424 /* Since worker == NULL, it means that we can kick "any" worker on this
1425 pollset 'p'. If 'p' happens to be the same pollset this thread is
1426 currently polling (i.e in pollset_work() function), then there is no need
1427 to kick any other worker since the current thread can just absorb the
1428 kick. This is the reason why we enter this case only when
1429 g_current_thread_pollset is != p */
1430
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001431 GPR_TIMER_MARK("kick_anonymous", 0);
1432 worker = pop_front_worker(p);
1433 if (worker != NULL) {
1434 GPR_TIMER_MARK("finally_kick", 0);
1435 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001436 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001437 } else {
1438 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001439 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001440 }
1441 }
1442
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001443 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001444 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1445 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001446}
1447
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001448static grpc_error *kick_poller(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001449 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001450}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001451
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001452static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001453 gpr_mu_init(&pollset->po.mu);
1454 *mu = &pollset->po.mu;
1455 pollset->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001456#ifdef PO_DEBUG
1457 pollset->po.obj_type = POLL_OBJ_POLLSET;
1458#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001459
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001460 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001461 pollset->kicked_without_pollers = false;
1462
1463 pollset->shutting_down = false;
1464 pollset->finish_shutdown_called = false;
1465 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001466}
1467
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001468/* Convert a timespec to milliseconds:
1469 - Very small or negative poll times are clamped to zero to do a non-blocking
1470 poll (which becomes spin polling)
1471 - Other small values are rounded up to one millisecond
1472 - Longer than a millisecond polls are rounded up to the next nearest
1473 millisecond to avoid spinning
1474 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001475static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1476 gpr_timespec now) {
1477 gpr_timespec timeout;
1478 static const int64_t max_spin_polling_us = 10;
1479 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1480 return -1;
1481 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001482
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001483 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1484 max_spin_polling_us,
1485 GPR_TIMESPAN))) <= 0) {
1486 return 0;
1487 }
1488 timeout = gpr_time_sub(deadline, now);
1489 return gpr_time_to_millis(gpr_time_add(
1490 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1491}
1492
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001493static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1494 grpc_pollset *notifier) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001495 set_ready(exec_ctx, fd, &fd->read_closure);
1496
Sree Kuchibhotla4db8c822017-02-12 17:07:31 -08001497 /* Note, it is possible that fd_become_readable might be called twice with
Sree Kuchibhotla4c60d0d2017-02-12 17:09:08 -08001498 different 'notifier's when an fd becomes readable and it is in two epoll
1499 sets (This can happen briefly during polling island merges). In such cases
1500 it does not really matter which notifer is set as the read_notifier_pollset
1501 (They would both point to the same polling island anyway) */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001502 /* Use release store to match with acquire load in fd_get_read_notifier */
1503 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001504}
1505
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001506static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001507 set_ready(exec_ctx, fd, &fd->write_closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001508}
1509
Craig Tillerb39307d2016-06-30 15:39:13 -07001510static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1511 grpc_pollset *ps, char *reason) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001512 if (ps->po.pi != NULL) {
1513 PI_UNREF(exec_ctx, ps->po.pi, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001514 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001515 ps->po.pi = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001516}
1517
1518static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1519 grpc_pollset *pollset) {
1520 /* The pollset cannot have any workers if we are at this stage */
1521 GPR_ASSERT(!pollset_has_workers(pollset));
1522
1523 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001524
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001525 /* Release the ref and set pollset->po.pi to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001526 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Craig Tiller91031da2016-12-28 15:44:25 -08001527 grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001528}
1529
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001530/* pollset->po.mu lock must be held by the caller before calling this */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001531static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1532 grpc_closure *closure) {
1533 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1534 GPR_ASSERT(!pollset->shutting_down);
1535 pollset->shutting_down = true;
1536 pollset->shutdown_done = closure;
1537 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1538
1539 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1540 because it would release the underlying polling island. In such a case, we
1541 let the last worker call finish_shutdown_locked() from pollset_work() */
1542 if (!pollset_has_workers(pollset)) {
1543 GPR_ASSERT(!pollset->finish_shutdown_called);
1544 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1545 finish_shutdown_locked(exec_ctx, pollset);
1546 }
1547 GPR_TIMER_END("pollset_shutdown", 0);
1548}
1549
1550/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1551 * than destroying the mutexes, there is nothing special that needs to be done
1552 * here */
1553static void pollset_destroy(grpc_pollset *pollset) {
1554 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001555 gpr_mu_destroy(&pollset->po.mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001556}
1557
Craig Tillerd8a3c042016-09-09 12:42:37 -07001558static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1559 polling_island *pi) {
1560 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1561 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1562 gpr_mu_unlock(&pi->workqueue_read_mu);
1563 if (n != NULL) {
1564 if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
1565 workqueue_maybe_wakeup(pi);
1566 }
1567 grpc_closure *c = (grpc_closure *)n;
Craig Tiller061ef742016-12-29 10:54:09 -08001568 grpc_error *error = c->error_data.error;
1569 c->cb(exec_ctx, c->cb_arg, error);
1570 GRPC_ERROR_UNREF(error);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001571 return true;
1572 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
Craig Tiller460502e2016-10-13 10:02:08 -07001573 /* n == NULL might mean there's work but it's not available to be popped
1574 * yet - try to ensure another workqueue wakes up to check shortly if so
1575 */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001576 workqueue_maybe_wakeup(pi);
1577 }
1578 }
1579 return false;
1580}
1581
Craig Tiller84ea3412016-09-08 14:57:56 -07001582#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001583/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1584static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001585 grpc_pollset *pollset,
1586 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001587 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001588 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001589 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001590 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001591 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001592 char *err_msg;
1593 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001594 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1595
1596 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001597 latest polling island pointed by pollset->po.pi
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001598
1599 Since epoll_fd is immutable, we can read it without obtaining the polling
1600 island lock. There is however a possibility that the polling island (from
1601 which we got the epoll_fd) got merged with another island while we are
1602 in this function. This is still okay because in such a case, we will wakeup
1603 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001604 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001605
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001606 if (pollset->po.pi == NULL) {
1607 pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
1608 if (pollset->po.pi == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001609 GPR_TIMER_END("pollset_work_and_unlock", 0);
1610 return; /* Fatal error. We cannot continue */
1611 }
1612
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001613 PI_ADD_REF(pollset->po.pi, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001614 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001615 (void *)pollset, (void *)pollset->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001616 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001617
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001618 pi = polling_island_maybe_get_latest(pollset->po.pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001619 epoll_fd = pi->epoll_fd;
1620
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001621 /* Update the pollset->po.pi since the island being pointed by
1622 pollset->po.pi maybe older than the one pointed by pi) */
1623 if (pollset->po.pi != pi) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001624 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1625 polling island to be deleted */
1626 PI_ADD_REF(pi, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001627 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
1628 pollset->po.pi = pi;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001629 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001630
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001631 /* Add an extra ref so that the island does not get destroyed (which means
1632 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1633 epoll_fd */
1634 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001635 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001636
Craig Tiller460502e2016-10-13 10:02:08 -07001637 /* If we get some workqueue work to do, it might end up completing an item on
1638 the completion queue, so there's no need to poll... so we skip that and
1639 redo the complete loop to verify */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001640 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
1641 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1642 g_current_thread_polling_island = pi;
1643
Vijay Paicef54012016-08-28 23:05:31 -07001644 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001645 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1646 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001647 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001648 if (ep_rv < 0) {
1649 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001650 gpr_asprintf(&err_msg,
1651 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1652 epoll_fd, errno, strerror(errno));
1653 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001654 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001655 /* We were interrupted. Save an interation by doing a zero timeout
1656 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001657 GRPC_POLLING_TRACE(
1658 "pollset_work: pollset: %p, worker: %p received kick",
1659 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001660 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001661 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001662 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001663
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001664#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001665 /* See the definition of g_poll_sync for more details */
1666 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001667#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001668
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001669 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001670 void *data_ptr = ep_ev[i].data.ptr;
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001671 if (data_ptr == &global_wakeup_fd) {
Craig Tiller1fa9ddb2016-11-28 08:19:37 -08001672 append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001673 err_desc);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001674 } else if (data_ptr == &pi->workqueue_wakeup_fd) {
Craig Tillere49959d2017-01-26 08:39:38 -08001675 append_error(error,
1676 grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
Craig Tillerd8a3c042016-09-09 12:42:37 -07001677 err_desc);
1678 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001679 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001680 GRPC_POLLING_TRACE(
1681 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1682 "%d) got merged",
1683 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001684 /* This means that our polling island is merged with a different
1685 island. We do not have to do anything here since the subsequent call
1686 to the function pollset_work_and_unlock() will pick up the correct
1687 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001688 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001689 grpc_fd *fd = data_ptr;
1690 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1691 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1692 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001693 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001694 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001695 }
1696 if (write_ev || cancel) {
1697 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001698 }
1699 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001700 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001701
1702 g_current_thread_polling_island = NULL;
1703 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1704 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001705
1706 GPR_ASSERT(pi != NULL);
1707
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001708 /* Before leaving, release the extra ref we added to the polling island. It
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001709 is important to use "pi" here (i.e our old copy of pollset->po.pi
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001710 that we got before releasing the polling island lock). This is because
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001711 pollset->po.pi pointer might get udpated in other parts of the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001712 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001713 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001714
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001715 GPR_TIMER_END("pollset_work_and_unlock", 0);
1716}
1717
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001718/* pollset->po.mu lock must be held by the caller before calling this.
1719 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001720 during the course of its execution but it will always re-acquire the lock and
1721 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001722static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1723 grpc_pollset_worker **worker_hdl,
1724 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001725 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001726 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001727 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1728
1729 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001730
1731 grpc_pollset_worker worker;
1732 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001733 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001734 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001735
1736 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001737
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001738 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1739 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001740
1741 if (pollset->kicked_without_pollers) {
1742 /* If the pollset was kicked without pollers, pretend that the current
1743 worker got the kick and skip polling. A kick indicates that there is some
1744 work that needs attention like an event on the completion queue or an
1745 alarm */
1746 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1747 pollset->kicked_without_pollers = 0;
1748 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001749 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001750 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1751 worker that there is some pending work that needs immediate attention
1752 (like an event on the completion queue, or a polling island merge that
1753 results in a new epoll-fd to wait on) and that the worker should not
1754 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001755
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001756 A worker can be kicked anytime from the point it is added to the pollset
1757 via push_front_worker() (or push_back_worker()) to the point it is
1758 removed via remove_worker().
1759 If the worker is kicked before/during it calls epoll_pwait(), it should
1760 immediately exit from epoll_wait(). If the worker is kicked after it
1761 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001762
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001763 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001764 times *except* when it is in epoll_pwait(). This way, the worker never
1765 misses acting on a kick */
1766
Craig Tiller19196992016-06-27 18:45:56 -07001767 if (!g_initialized_sigmask) {
1768 sigemptyset(&new_mask);
1769 sigaddset(&new_mask, grpc_wakeup_signal);
1770 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1771 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1772 g_initialized_sigmask = true;
1773 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1774 This is the mask used at all times *except during
1775 epoll_wait()*"
1776 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001777 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001778
Craig Tiller19196992016-06-27 18:45:56 -07001779 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001780 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001781 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001782
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001783 push_front_worker(pollset, &worker); /* Add worker to pollset */
1784
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001785 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1786 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001787 grpc_exec_ctx_flush(exec_ctx);
1788
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001789 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001790
1791 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1792 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001793 remove_worker(pollset, &worker);
1794 }
1795
1796 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1797 false at this point) and the pollset is shutting down, we may have to
1798 finish the shutdown process by calling finish_shutdown_locked().
1799 See pollset_shutdown() for more details.
1800
1801 Note: Continuing to access pollset here is safe; it is the caller's
1802 responsibility to not destroy a pollset when it has outstanding calls to
1803 pollset_work() */
1804 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1805 !pollset->finish_shutdown_called) {
1806 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1807 finish_shutdown_locked(exec_ctx, pollset);
1808
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001809 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001810 grpc_exec_ctx_flush(exec_ctx);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001811 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001812 }
1813
1814 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001815
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001816 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1817 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001818
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001819 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001820
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001821 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1822 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001823}
1824
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001825static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001826 poll_obj_type bag_type, poll_obj *item,
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001827 poll_obj_type item_type) {
1828 GPR_TIMER_BEGIN("add_poll_object", 0);
1829
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001830#ifdef PO_DEBUG
1831 GPR_ASSERT(item->obj_type == item_type);
1832 GPR_ASSERT(bag->obj_type == bag_type);
1833#endif
Craig Tiller57726ca2016-09-12 11:59:45 -07001834
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001835 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001836 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001837
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001838 gpr_mu_lock(&bag->mu);
1839 gpr_mu_lock(&item->mu);
1840
Craig Tiller7212c232016-07-06 13:11:09 -07001841retry:
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001842 /*
1843 * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1844 * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1845 * a refcount of 2) and point item->pi and bag->pi to the new island
1846 * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1847 * the other's non-NULL pi
1848 * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1849 * polling islands and update item->pi and bag->pi to point to the new
1850 * island
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001851 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001852
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001853 /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1854 * orphaned */
1855 if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1856 gpr_mu_unlock(&item->mu);
1857 gpr_mu_unlock(&bag->mu);
Craig Tiller42ac6db2016-07-06 17:13:56 -07001858 return;
1859 }
1860
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001861 if (item->pi == bag->pi) {
1862 pi_new = item->pi;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001863 if (pi_new == NULL) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001864 /* GPR_ASSERT(item->pi == bag->pi == NULL) */
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001865
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001866 /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1867 * we need to do some extra work to make TSAN happy */
1868 if (item_type == POLL_OBJ_FD) {
1869 /* Unlock before creating a new polling island: the polling island will
1870 create a workqueue which creates a file descriptor, and holding an fd
1871 lock here can eventually cause a loop to appear to TSAN (making it
1872 unhappy). We don't think it's a real loop (there's an epoch point
1873 where that loop possibility disappears), but the advantages of
1874 keeping TSAN happy outweigh any performance advantage we might have
1875 by keeping the lock held. */
1876 gpr_mu_unlock(&item->mu);
1877 pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
1878 gpr_mu_lock(&item->mu);
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001879
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001880 /* Need to reverify any assumptions made between the initial lock and
1881 getting to this branch: if they've changed, we need to throw away our
1882 work and figure things out again. */
1883 if (item->pi != NULL) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001884 GRPC_POLLING_TRACE(
1885 "add_poll_object: Raced creating new polling island. pi_new: %p "
1886 "(fd: %d, %s: %p)",
1887 (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1888 (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001889 /* No need to lock 'pi_new' here since this is a new polling island
Sree Kuchibhotla8214b872017-02-23 12:49:48 -08001890 and no one has a reference to it yet */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001891 polling_island_remove_all_fds_locked(pi_new, true, &error);
1892
1893 /* Ref and unref so that the polling island gets deleted during unref
1894 */
1895 PI_ADD_REF(pi_new, "dance_of_destruction");
1896 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
1897 goto retry;
1898 }
Craig Tiller27da6422016-07-06 13:14:46 -07001899 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001900 pi_new = polling_island_create(exec_ctx, NULL, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001901 }
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001902
1903 GRPC_POLLING_TRACE(
1904 "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1905 "%s: %p)",
1906 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1907 poll_obj_string(bag_type), (void *)bag);
1908 } else {
1909 GRPC_POLLING_TRACE(
1910 "add_poll_object: Same polling island. pi: %p (%s, %s)",
1911 (void *)pi_new, poll_obj_string(item_type),
1912 poll_obj_string(bag_type));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001913 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001914 } else if (item->pi == NULL) {
1915 /* GPR_ASSERT(bag->pi != NULL) */
1916 /* Make pi_new point to latest pi*/
1917 pi_new = polling_island_lock(bag->pi);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001918
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001919 if (item_type == POLL_OBJ_FD) {
1920 grpc_fd *fd = FD_FROM_PO(item);
1921 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1922 }
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001923
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001924 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001925 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001926 "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1927 "bag(%s): %p)",
1928 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1929 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001930 } else if (bag->pi == NULL) {
1931 /* GPR_ASSERT(item->pi != NULL) */
1932 /* Make pi_new to point to latest pi */
1933 pi_new = polling_island_lock(item->pi);
1934 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001935 GRPC_POLLING_TRACE(
1936 "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1937 "bag(%s): %p)",
1938 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1939 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001940 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001941 pi_new = polling_island_merge(item->pi, bag->pi, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001942 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001943 "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1944 "bag(%s): %p)",
1945 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1946 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001947 }
1948
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001949 /* At this point, pi_new is the polling island that both item->pi and bag->pi
1950 MUST be pointing to */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001951
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001952 if (item->pi != pi_new) {
1953 PI_ADD_REF(pi_new, poll_obj_string(item_type));
1954 if (item->pi != NULL) {
1955 PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001956 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001957 item->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001958 }
1959
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001960 if (bag->pi != pi_new) {
1961 PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1962 if (bag->pi != NULL) {
1963 PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001964 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001965 bag->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001966 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001967
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001968 gpr_mu_unlock(&item->mu);
1969 gpr_mu_unlock(&bag->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001970
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001971 GRPC_LOG_IF_ERROR("add_poll_object", error);
1972 GPR_TIMER_END("add_poll_object", 0);
1973}
Craig Tiller57726ca2016-09-12 11:59:45 -07001974
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001975static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1976 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001977 add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001978 POLL_OBJ_FD);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001979}
1980
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001981/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001982 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001983 */
1984
1985static grpc_pollset_set *pollset_set_create(void) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001986 grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001987 gpr_mu_init(&pss->po.mu);
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001988 pss->po.pi = NULL;
1989#ifdef PO_DEBUG
1990 pss->po.obj_type = POLL_OBJ_POLLSET_SET;
1991#endif
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001992 return pss;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001993}
1994
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001995static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1996 grpc_pollset_set *pss) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001997 gpr_mu_destroy(&pss->po.mu);
1998
1999 if (pss->po.pi != NULL) {
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08002000 PI_UNREF(exec_ctx, pss->po.pi, "pss_destroy");
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002001 }
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002002
2003 gpr_free(pss);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002004}
2005
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002006static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
2007 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08002008 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002009 POLL_OBJ_FD);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002010}
2011
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002012static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
2013 grpc_fd *fd) {
2014 /* Nothing to do */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002015}
2016
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002017static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002018 grpc_pollset_set *pss, grpc_pollset *ps) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08002019 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002020 POLL_OBJ_POLLSET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002021}
2022
2023static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002024 grpc_pollset_set *pss, grpc_pollset *ps) {
2025 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002026}
2027
2028static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
2029 grpc_pollset_set *bag,
2030 grpc_pollset_set *item) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08002031 add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002032 POLL_OBJ_POLLSET_SET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002033}
2034
2035static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
2036 grpc_pollset_set *bag,
2037 grpc_pollset_set *item) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002038 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002039}
2040
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002041/* Test helper functions
2042 * */
2043void *grpc_fd_get_polling_island(grpc_fd *fd) {
2044 polling_island *pi;
2045
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002046 gpr_mu_lock(&fd->po.mu);
2047 pi = fd->po.pi;
2048 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002049
2050 return pi;
2051}
2052
2053void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
2054 polling_island *pi;
2055
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002056 gpr_mu_lock(&ps->po.mu);
2057 pi = ps->po.pi;
2058 gpr_mu_unlock(&ps->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002059
2060 return pi;
2061}
2062
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002063bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002064 polling_island *p1 = p;
2065 polling_island *p2 = q;
2066
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07002067 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
2068 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002069 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07002070 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002071
2072 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002073}
2074
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002075/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002076 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002077 */
2078
2079static void shutdown_engine(void) {
2080 fd_global_shutdown();
2081 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07002082 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002083}
2084
2085static const grpc_event_engine_vtable vtable = {
2086 .pollset_size = sizeof(grpc_pollset),
2087
2088 .fd_create = fd_create,
2089 .fd_wrapped_fd = fd_wrapped_fd,
2090 .fd_orphan = fd_orphan,
2091 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07002092 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002093 .fd_notify_on_read = fd_notify_on_read,
2094 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002095 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07002096 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002097
2098 .pollset_init = pollset_init,
2099 .pollset_shutdown = pollset_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002100 .pollset_destroy = pollset_destroy,
2101 .pollset_work = pollset_work,
2102 .pollset_kick = pollset_kick,
2103 .pollset_add_fd = pollset_add_fd,
2104
2105 .pollset_set_create = pollset_set_create,
2106 .pollset_set_destroy = pollset_set_destroy,
2107 .pollset_set_add_pollset = pollset_set_add_pollset,
2108 .pollset_set_del_pollset = pollset_set_del_pollset,
2109 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
2110 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
2111 .pollset_set_add_fd = pollset_set_add_fd,
2112 .pollset_set_del_fd = pollset_set_del_fd,
2113
2114 .kick_poller = kick_poller,
2115
Craig Tillerd8a3c042016-09-09 12:42:37 -07002116 .workqueue_ref = workqueue_ref,
2117 .workqueue_unref = workqueue_unref,
Craig Tiller91031da2016-12-28 15:44:25 -08002118 .workqueue_scheduler = workqueue_scheduler,
Craig Tillerd8a3c042016-09-09 12:42:37 -07002119
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002120 .shutdown_engine = shutdown_engine,
2121};
2122
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002123/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
2124 * Create a dummy epoll_fd to make sure epoll support is available */
2125static bool is_epoll_available() {
2126 int fd = epoll_create1(EPOLL_CLOEXEC);
2127 if (fd < 0) {
2128 gpr_log(
2129 GPR_ERROR,
2130 "epoll_create1 failed with error: %d. Not using epoll polling engine",
2131 fd);
2132 return false;
2133 }
2134 close(fd);
2135 return true;
2136}
2137
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002138const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002139 /* If use of signals is disabled, we cannot use epoll engine*/
2140 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
2141 return NULL;
2142 }
2143
Ken Paysoncd7d0472016-10-11 12:24:20 -07002144 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07002145 return NULL;
2146 }
2147
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002148 if (!is_epoll_available()) {
2149 return NULL;
2150 }
2151
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002152 if (!is_grpc_wakeup_signal_initialized) {
Sree Kuchibhotlabd48c912016-09-27 16:48:25 -07002153 grpc_use_signal(SIGRTMIN + 6);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002154 }
2155
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002156 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07002157
2158 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
2159 return NULL;
2160 }
2161
2162 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
2163 polling_island_global_init())) {
2164 return NULL;
2165 }
2166
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002167 return &vtable;
2168}
2169
murgatroid99623dd4f2016-08-08 17:31:27 -07002170#else /* defined(GRPC_LINUX_EPOLL) */
2171#if defined(GRPC_POSIX_SOCKET)
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002172#include "src/core/lib/iomgr/ev_posix.h"
murgatroid99623dd4f2016-08-08 17:31:27 -07002173/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002174 * NULL */
2175const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
murgatroid99623dd4f2016-08-08 17:31:27 -07002176#endif /* defined(GRPC_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002177
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002178void grpc_use_signal(int signum) {}
murgatroid99623dd4f2016-08-08 17:31:27 -07002179#endif /* !defined(GRPC_LINUX_EPOLL) */