blob: 400d4057a7cc7c4e7d15cd325ab270b1fbe21ebb [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid9954070892016-08-08 17:01:18 -070034#include "src/core/lib/iomgr/port.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070036/* This polling engine is only relevant on linux kernels supporting epoll() */
murgatroid99623dd4f2016-08-08 17:31:27 -070037#ifdef GRPC_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070044#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070045#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
Craig Tiller185f6c92017-03-17 08:33:19 -070059#include "src/core/lib/iomgr/timer.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070060#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070061#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070062#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Sree Kuchibhotla34217242016-06-29 00:19:07 -070065/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
66static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070067#define GRPC_POLLING_TRACE(fmt, ...) \
68 if (grpc_polling_trace) { \
69 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
70 }
71
Sree Kuchibhotla82d73412017-02-09 18:27:45 -080072/* Uncomment the following to enable extra checks on poll_object operations */
Sree Kuchibhotlae6f516e2016-12-08 12:20:23 -080073/* #define PO_DEBUG */
74
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070075static int grpc_wakeup_signal = -1;
76static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070077
Craig Tillerb4b8e1e2016-11-28 07:33:13 -080078/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
79 * sure to wake up one polling thread (which can wake up other threads if
80 * needed) */
81static grpc_wakeup_fd global_wakeup_fd;
82
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070083/* Implements the function defined in grpc_posix.h. This function might be
84 * called before even calling grpc_init() to set either a different signal to
85 * use. If signum == -1, then the use of signals is disabled */
86void grpc_use_signal(int signum) {
87 grpc_wakeup_signal = signum;
88 is_grpc_wakeup_signal_initialized = true;
89
90 if (grpc_wakeup_signal < 0) {
91 gpr_log(GPR_INFO,
92 "Use of signals is disabled. Epoll engine will not be used");
93 } else {
94 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
95 grpc_wakeup_signal);
96 }
97}
98
99struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700100
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800101typedef enum {
102 POLL_OBJ_FD,
103 POLL_OBJ_POLLSET,
104 POLL_OBJ_POLLSET_SET
105} poll_obj_type;
106
107typedef struct poll_obj {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -0800108#ifdef PO_DEBUG
109 poll_obj_type obj_type;
110#endif
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800111 gpr_mu mu;
112 struct polling_island *pi;
113} poll_obj;
114
115const char *poll_obj_string(poll_obj_type po_type) {
116 switch (po_type) {
117 case POLL_OBJ_FD:
118 return "fd";
119 case POLL_OBJ_POLLSET:
120 return "pollset";
121 case POLL_OBJ_POLLSET_SET:
122 return "pollset_set";
123 }
124
125 GPR_UNREACHABLE_CODE(return "UNKNOWN");
126}
127
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700128/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700129 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700130 */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800131
132#define FD_FROM_PO(po) ((grpc_fd *)(po))
133
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700134struct grpc_fd {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800135 poll_obj po;
136
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700137 int fd;
138 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700139 bit 0 : 1=Active / 0=Orphaned
140 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700141 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700142 gpr_atm refst;
143
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -0800144 /* Internally stores data of type (grpc_error *). If the FD is shutdown, this
145 contains reason for shutdown (i.e a pointer to grpc_error) ORed with
146 FD_SHUTDOWN_BIT. Since address allocations are word-aligned, the lower bit
147 of (grpc_error *) addresses is guaranteed to be zero. Even if the
148 (grpc_error *), is of special types like GRPC_ERROR_NONE, GRPC_ERROR_OOM
149 etc, the lower bit is guaranteed to be zero.
150
151 Once an fd is shutdown, any pending or future read/write closures on the
152 fd should fail */
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800153 gpr_atm shutdown_error;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700154
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800155 /* The fd is either closed or we relinquished control of it. In either
156 cases, this indicates that the 'fd' on this structure is no longer
157 valid */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700158 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700159
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800160 /* Closures to call when the fd is readable or writable respectively. These
161 fields contain one of the following values:
162 CLOSURE_READY : The fd has an I/O event of interest but there is no
163 closure yet to execute
164
165 CLOSURE_NOT_READY : The fd has no I/O event of interest
166
Sree Kuchibhotlaa70ccb62017-02-13 23:16:52 -0800167 closure ptr : The closure to be executed when the fd has an I/O
168 event of interest
169
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -0800170 shutdown_error | FD_SHUTDOWN_BIT :
171 'shutdown_error' field ORed with FD_SHUTDOWN_BIT.
Sree Kuchibhotlaa70ccb62017-02-13 23:16:52 -0800172 This indicates that the fd is shutdown. Since all
173 memory allocations are word-aligned, the lower two
174 bits of the shutdown_error pointer are always 0. So
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -0800175 it is safe to OR these with FD_SHUTDOWN_BIT
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800176
177 Valid state transitions:
178
179 <closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY
180 | | ^ | ^ | |
181 | | | | | | |
182 | +--------------4----------+ 6 +---------2---------------+ |
183 | | |
184 | v |
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -0800185 +-----5-------> [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+
Sree Kuchibhotla99983382017-02-12 17:03:27 -0800186
187 For 1, 4 : See set_ready() function
188 For 2, 3 : See notify_on() function
189 For 5,6,7: See set_shutdown() function */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800190 gpr_atm read_closure;
191 gpr_atm write_closure;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700192
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700193 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700194 grpc_closure *on_done_closure;
195
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800196 /* The pollset that last noticed that the fd is readable. The actual type
197 * stored in this is (grpc_pollset *) */
198 gpr_atm read_notifier_pollset;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700199
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700200 grpc_iomgr_object iomgr_object;
201};
202
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700203/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700204// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700205#ifdef GRPC_FD_REF_COUNT_DEBUG
206static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
207static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
208 int line);
209#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
210#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
211#else
212static void fd_ref(grpc_fd *fd);
213static void fd_unref(grpc_fd *fd);
214#define GRPC_FD_REF(fd, reason) fd_ref(fd)
215#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
216#endif
217
218static void fd_global_init(void);
219static void fd_global_shutdown(void);
220
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800221#define CLOSURE_NOT_READY ((gpr_atm)0)
Sree Kuchibhotlaff4b25d2017-02-16 15:07:11 -0800222#define CLOSURE_READY ((gpr_atm)2)
Sree Kuchibhotlaa70ccb62017-02-13 23:16:52 -0800223
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -0800224#define FD_SHUTDOWN_BIT 1
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700225
226/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700227 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700228 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700229
Craig Tillerd8a3c042016-09-09 12:42:37 -0700230#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700231
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700232#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700233#define PI_UNREF(exec_ctx, p, r) \
234 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700235
Craig Tillerd8a3c042016-09-09 12:42:37 -0700236#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700237
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700238#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700239#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700240
Yuchen Zeng362ac1b2016-09-13 16:01:31 -0700241#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700242
Craig Tiller460502e2016-10-13 10:02:08 -0700243/* This is also used as grpc_workqueue (by directly casing it) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700244typedef struct polling_island {
Craig Tiller91031da2016-12-28 15:44:25 -0800245 grpc_closure_scheduler workqueue_scheduler;
246
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700247 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700248 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
249 the refcount.
250 Once the ref count becomes zero, this structure is destroyed which means
251 we should ensure that there is never a scenario where a PI_ADD_REF() is
252 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700253 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700254
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700255 /* Pointer to the polling_island this merged into.
256 * merged_to value is only set once in polling_island's lifetime (and that too
257 * only if the island is merged with another island). Because of this, we can
258 * use gpr_atm type here so that we can do atomic access on this and reduce
259 * lock contention on 'mu' mutex.
260 *
261 * Note that if this field is not NULL (i.e not 0), all the remaining fields
262 * (except mu and ref_count) are invalid and must be ignored. */
263 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700264
Craig Tiller460502e2016-10-13 10:02:08 -0700265 /* Number of threads currently polling on this island */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700266 gpr_atm poller_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700267 /* Mutex guarding the read end of the workqueue (must be held to pop from
268 * workqueue_items) */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700269 gpr_mu workqueue_read_mu;
Craig Tiller460502e2016-10-13 10:02:08 -0700270 /* Queue of closures to be executed */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700271 gpr_mpscq workqueue_items;
Craig Tiller460502e2016-10-13 10:02:08 -0700272 /* Count of items in workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700273 gpr_atm workqueue_item_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700274 /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700275 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700276
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700277 /* The fd of the underlying epoll set */
278 int epoll_fd;
279
280 /* The file descriptors in the epoll set */
281 size_t fd_cnt;
282 size_t fd_capacity;
283 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700284} polling_island;
285
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700286/*******************************************************************************
287 * Pollset Declarations
288 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700289struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700290 /* Thread id of this worker */
291 pthread_t pt_id;
292
293 /* Used to prevent a worker from getting kicked multiple times */
294 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700295 struct grpc_pollset_worker *next;
296 struct grpc_pollset_worker *prev;
297};
298
299struct grpc_pollset {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800300 poll_obj po;
301
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700302 grpc_pollset_worker root_worker;
303 bool kicked_without_pollers;
304
305 bool shutting_down; /* Is the pollset shutting down ? */
306 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
307 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700308};
309
310/*******************************************************************************
311 * Pollset-set Declarations
312 */
313struct grpc_pollset_set {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800314 poll_obj po;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700315};
316
317/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700318 * Common helpers
319 */
320
Craig Tillerf975f742016-07-01 14:56:27 -0700321static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700322 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700323 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700324 if (*composite == GRPC_ERROR_NONE) {
325 *composite = GRPC_ERROR_CREATE(desc);
326 }
327 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700328 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700329}
330
331/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700332 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700333 */
334
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700335/* The wakeup fd that is used to wake up all threads in a Polling island. This
336 is useful in the polling island merge operation where we need to wakeup all
337 the threads currently polling the smaller polling island (so that they can
338 start polling the new/merged polling island)
339
340 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
341 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
342static grpc_wakeup_fd polling_island_wakeup_fd;
343
Craig Tiller2e620132016-10-10 15:27:44 -0700344/* The polling island being polled right now.
345 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700346static __thread polling_island *g_current_thread_polling_island;
347
Craig Tillerb39307d2016-06-30 15:39:13 -0700348/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700349static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Craig Tiller91031da2016-12-28 15:44:25 -0800350static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
351 grpc_error *error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700352
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700353#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700354/* Currently TSAN may incorrectly flag data races between epoll_ctl and
355 epoll_wait for any grpc_fd structs that are added to the epoll set via
356 epoll_ctl and are returned (within a very short window) via epoll_wait().
357
358 To work-around this race, we establish a happens-before relation between
359 the code just-before epoll_ctl() and the code after epoll_wait() by using
360 this atomic */
361gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700362#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700363
Craig Tiller91031da2016-12-28 15:44:25 -0800364static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800365 workqueue_enqueue, workqueue_enqueue, "workqueue"};
Craig Tiller91031da2016-12-28 15:44:25 -0800366
Craig Tillerb39307d2016-06-30 15:39:13 -0700367static void pi_add_ref(polling_island *pi);
368static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700369
Craig Tillerd8a3c042016-09-09 12:42:37 -0700370#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700371static void pi_add_ref_dbg(polling_island *pi, const char *reason,
372 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700373 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700374 pi_add_ref(pi);
375 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
376 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700377}
378
Craig Tillerb39307d2016-06-30 15:39:13 -0700379static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700380 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700381 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700382 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700383 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700384 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700385}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700386
387static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
388 const char *file, int line,
389 const char *reason) {
390 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700391 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700392 }
393 return workqueue;
394}
395
396static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
397 const char *file, int line, const char *reason) {
398 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700399 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700400 }
401}
402#else
403static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
404 if (workqueue != NULL) {
405 pi_add_ref((polling_island *)workqueue);
406 }
407 return workqueue;
408}
409
410static void workqueue_unref(grpc_exec_ctx *exec_ctx,
411 grpc_workqueue *workqueue) {
412 if (workqueue != NULL) {
413 pi_unref(exec_ctx, (polling_island *)workqueue);
414 }
415}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700416#endif
417
Craig Tiller15007612016-07-06 09:36:16 -0700418static void pi_add_ref(polling_island *pi) {
419 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
420}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700421
Craig Tillerb39307d2016-06-30 15:39:13 -0700422static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700423 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700424 Note that this deletion not be done under a lock. Once the ref count goes
425 to zero, we are guaranteed that no one else holds a reference to the
426 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700427
428 Also, if we are deleting the polling island and the merged_to field is
429 non-empty, we should remove a ref to the merged_to polling island
430 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700431 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
432 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
433 polling_island_delete(exec_ctx, pi);
434 if (next != NULL) {
435 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700436 }
437 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700438}
439
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700440/* The caller is expected to hold pi->mu lock before calling this function */
441static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700442 size_t fd_count, bool add_fd_refs,
443 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700444 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700445 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700446 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700447 char *err_msg;
448 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700449
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700450#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700451 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700452 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700453#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700454
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700455 for (i = 0; i < fd_count; i++) {
456 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
457 ev.data.ptr = fds[i];
458 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700459
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700460 if (err < 0) {
461 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700462 gpr_asprintf(
463 &err_msg,
464 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
465 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
466 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
467 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700468 }
469
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700470 continue;
471 }
472
473 if (pi->fd_cnt == pi->fd_capacity) {
474 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
475 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
476 }
477
478 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700479 if (add_fd_refs) {
480 GRPC_FD_REF(fds[i], "polling_island");
481 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700482 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700483}
484
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700485/* The caller is expected to hold pi->mu before calling this */
486static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700487 grpc_wakeup_fd *wakeup_fd,
488 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700489 struct epoll_event ev;
490 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700491 char *err_msg;
492 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700493
494 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
495 ev.data.ptr = wakeup_fd;
496 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
497 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700498 if (err < 0 && errno != EEXIST) {
499 gpr_asprintf(&err_msg,
500 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
501 "error: %d (%s)",
Craig Tiller1fa9ddb2016-11-28 08:19:37 -0800502 pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd),
503 errno, strerror(errno));
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700504 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
505 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700506 }
507}
508
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700509/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700510static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700511 bool remove_fd_refs,
512 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700513 int err;
514 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700515 char *err_msg;
516 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700517
518 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700519 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700520 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700521 gpr_asprintf(&err_msg,
522 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
523 "error: %d (%s)",
524 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
525 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
526 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700527 }
528
529 if (remove_fd_refs) {
530 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700531 }
532 }
533
534 pi->fd_cnt = 0;
535}
536
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700537/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700538static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700539 bool is_fd_closed,
540 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700541 int err;
542 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700543 char *err_msg;
544 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700545
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700546 /* If fd is already closed, then it would have been automatically been removed
547 from the epoll set */
548 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700549 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
550 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700551 gpr_asprintf(
552 &err_msg,
553 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
554 pi->epoll_fd, fd->fd, errno, strerror(errno));
555 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
556 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700557 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700558 }
559
560 for (i = 0; i < pi->fd_cnt; i++) {
561 if (pi->fds[i] == fd) {
562 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700563 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700564 break;
565 }
566 }
567}
568
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700569/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700570static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
571 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700572 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700573 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700574 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700575
Craig Tillerb39307d2016-06-30 15:39:13 -0700576 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700577
Craig Tillerb39307d2016-06-30 15:39:13 -0700578 pi = gpr_malloc(sizeof(*pi));
Craig Tiller91031da2016-12-28 15:44:25 -0800579 pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
Craig Tillerb39307d2016-06-30 15:39:13 -0700580 gpr_mu_init(&pi->mu);
581 pi->fd_cnt = 0;
582 pi->fd_capacity = 0;
583 pi->fds = NULL;
584 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700585
586 gpr_mu_init(&pi->workqueue_read_mu);
587 gpr_mpscq_init(&pi->workqueue_items);
588 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700589
Craig Tiller15007612016-07-06 09:36:16 -0700590 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700591 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700592 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700593
Craig Tillerd8a3c042016-09-09 12:42:37 -0700594 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
595 err_desc)) {
596 goto done;
597 }
598
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700599 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700600
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700601 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700602 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
603 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700604 }
605
Craig Tillerb4b8e1e2016-11-28 07:33:13 -0800606 polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700607 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700608
609 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700610 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700611 }
612
Craig Tillerb39307d2016-06-30 15:39:13 -0700613done:
614 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700615 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700616 pi = NULL;
617 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700618 return pi;
619}
620
Craig Tillerb39307d2016-06-30 15:39:13 -0700621static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700622 GPR_ASSERT(pi->fd_cnt == 0);
623
Craig Tiller0a06cd72016-07-14 13:21:24 -0700624 if (pi->epoll_fd >= 0) {
625 close(pi->epoll_fd);
626 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700627 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
628 gpr_mu_destroy(&pi->workqueue_read_mu);
629 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700630 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700631 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700632 gpr_free(pi->fds);
633 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700634}
635
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700636/* Attempts to gets the last polling island in the linked list (liked by the
637 * 'merged_to' field). Since this does not lock the polling island, there are no
638 * guarantees that the island returned is the last island */
639static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
640 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
641 while (next != NULL) {
642 pi = next;
643 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
644 }
645
646 return pi;
647}
648
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700649/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700650 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700651 returned polling island's mu.
652 Usage: To lock/unlock polling island "pi", do the following:
653 polling_island *pi_latest = polling_island_lock(pi);
654 ...
655 ... critical section ..
656 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700657 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
658static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700659 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700660
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700661 while (true) {
662 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
663 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700664 /* Looks like 'pi' is the last node in the linked list but unless we check
665 this by holding the pi->mu lock, we cannot be sure (i.e without the
666 pi->mu lock, we don't prevent island merges).
667 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700668 gpr_mu_lock(&pi->mu);
669 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
670 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700671 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700672 break;
673 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700674
675 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
676 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700677 gpr_mu_unlock(&pi->mu);
678 }
679
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700680 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700681 }
682
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700683 return pi;
684}
685
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700686/* Gets the lock on the *latest* polling islands in the linked lists pointed by
687 *p and *q (and also updates *p and *q to point to the latest polling islands)
688
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700689 This function is needed because calling the following block of code to obtain
690 locks on polling islands (*p and *q) is prone to deadlocks.
691 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700692 polling_island_lock(*p, true);
693 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700694 }
695
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700696 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700697 polling_island *p1;
698 polling_island *p2;
699 ..
700 polling_island_lock_pair(&p1, &p2);
701 ..
702 .. Critical section with both p1 and p2 locked
703 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700704 // Release locks: Always call polling_island_unlock_pair() to release locks
705 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700706*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700707static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700708 polling_island *pi_1 = *p;
709 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700710 polling_island *next_1 = NULL;
711 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700712
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700713 /* The algorithm is simple:
714 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
715 keep updating pi_1 and pi_2)
716 - Then obtain locks on the islands by following a lock order rule of
717 locking polling_island with lower address first
718 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
719 pointing to the same island. If that is the case, we can just call
720 polling_island_lock()
721 - After obtaining both the locks, double check that the polling islands
722 are still the last polling islands in their respective linked lists
723 (this is because there might have been polling island merges before
724 we got the lock)
725 - If the polling islands are the last islands, we are done. If not,
726 release the locks and continue the process from the first step */
727 while (true) {
728 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
729 while (next_1 != NULL) {
730 pi_1 = next_1;
731 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700732 }
733
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700734 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
735 while (next_2 != NULL) {
736 pi_2 = next_2;
737 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
738 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700739
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700740 if (pi_1 == pi_2) {
741 pi_1 = pi_2 = polling_island_lock(pi_1);
742 break;
743 }
744
745 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700746 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700747 gpr_mu_lock(&pi_2->mu);
748 } else {
749 gpr_mu_lock(&pi_2->mu);
750 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700751 }
752
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700753 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
754 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
755 if (next_1 == NULL && next_2 == NULL) {
756 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700757 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700758
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700759 gpr_mu_unlock(&pi_1->mu);
760 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700761 }
762
763 *p = pi_1;
764 *q = pi_2;
765}
766
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700767static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
768 if (p == q) {
769 gpr_mu_unlock(&p->mu);
770 } else {
771 gpr_mu_unlock(&p->mu);
772 gpr_mu_unlock(&q->mu);
773 }
774}
775
Craig Tillerd8a3c042016-09-09 12:42:37 -0700776static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700777 /* If this thread is the current poller, then it may be that it's about to
778 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700779 bool is_current_poller = (g_current_thread_polling_island == pi);
780 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
781 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700782 /* Only issue a wakeup if it's likely that some poller could come in and take
783 it right now. Note that since we do an anticipatory mpscq_pop every poll
784 loop, it's ok if we miss the wakeup here, as we'll get the work item when
785 the next poller enters anyway. */
786 if (current_pollers > min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700787 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
788 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
789 }
790}
791
792static void workqueue_move_items_to_parent(polling_island *q) {
793 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
794 if (p == NULL) {
795 return;
796 }
797 gpr_mu_lock(&q->workqueue_read_mu);
798 int num_added = 0;
799 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
800 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
801 if (n != NULL) {
802 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
803 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
804 gpr_mpscq_push(&p->workqueue_items, n);
805 num_added++;
806 }
807 }
808 gpr_mu_unlock(&q->workqueue_read_mu);
809 if (num_added > 0) {
810 workqueue_maybe_wakeup(p);
811 }
812 workqueue_move_items_to_parent(p);
813}
814
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700815static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700816 polling_island *q,
817 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700818 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700819 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700820
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700821 if (p != q) {
822 /* Make sure that p points to the polling island with fewer fds than q */
823 if (p->fd_cnt > q->fd_cnt) {
824 GPR_SWAP(polling_island *, p, q);
825 }
826
827 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
828 Note that the refcounts on the fds being moved will not change here.
829 This is why the last param in the following two functions is 'false') */
830 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
831 polling_island_remove_all_fds_locked(p, false, error);
832
833 /* Wakeup all the pollers (if any) on p so that they pickup this change */
834 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
835
836 /* Add the 'merged_to' link from p --> q */
837 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
838 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700839
Harvey Tuchdaa9f452016-11-21 15:42:49 -0500840 workqueue_move_items_to_parent(p);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700841 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700842 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700843
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700844 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700845
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700846 /* Return the merged polling island (Note that no merge would have happened
847 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700848 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700849}
850
Craig Tiller91031da2016-12-28 15:44:25 -0800851static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
Craig Tillerd8a3c042016-09-09 12:42:37 -0700852 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700853 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller91031da2016-12-28 15:44:25 -0800854 grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700855 /* take a ref to the workqueue: otherwise it can happen that whatever events
856 * this kicks off ends up destroying the workqueue before this function
857 * completes */
858 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
859 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700860 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
861 closure->error_data.error = error;
862 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
863 if (last == 0) {
864 workqueue_maybe_wakeup(pi);
865 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700866 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700867 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
868 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700869}
870
Craig Tiller91031da2016-12-28 15:44:25 -0800871static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
872 polling_island *pi = (polling_island *)workqueue;
Craig Tiller801c6cc2017-01-03 08:13:13 -0800873 return workqueue == NULL ? grpc_schedule_on_exec_ctx
874 : &pi->workqueue_scheduler;
Craig Tiller91031da2016-12-28 15:44:25 -0800875}
876
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700877static grpc_error *polling_island_global_init() {
878 grpc_error *error = GRPC_ERROR_NONE;
879
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700880 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
881 if (error == GRPC_ERROR_NONE) {
882 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
883 }
884
885 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700886}
887
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700888static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700889 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700890}
891
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700892/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700893 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700894 */
895
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700896/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700897 * but instead so that implementations with multiple threads in (for example)
898 * epoll_wait deal with the race between pollset removal and incoming poll
899 * notifications.
900 *
901 * The problem is that the poller ultimately holds a reference to this
902 * object, so it is very difficult to know when is safe to free it, at least
903 * without some expensive synchronization.
904 *
905 * If we keep the object freelisted, in the worst case losing this race just
906 * becomes a spurious read notification on a reused fd.
907 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700908
909/* The alarm system needs to be able to wakeup 'some poller' sometimes
910 * (specifically when a new alarm needs to be triggered earlier than the next
911 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
912 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700913
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700914static grpc_fd *fd_freelist = NULL;
915static gpr_mu fd_freelist_mu;
916
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700917#ifdef GRPC_FD_REF_COUNT_DEBUG
918#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
919#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
920static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
921 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700922 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
923 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700924 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
925#else
926#define REF_BY(fd, n, reason) ref_by(fd, n)
927#define UNREF_BY(fd, n, reason) unref_by(fd, n)
928static void ref_by(grpc_fd *fd, int n) {
929#endif
930 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
931}
932
933#ifdef GRPC_FD_REF_COUNT_DEBUG
934static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
935 int line) {
936 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700937 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
938 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700939 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
940#else
941static void unref_by(grpc_fd *fd, int n) {
942 gpr_atm old;
943#endif
944 old = gpr_atm_full_fetch_add(&fd->refst, -n);
945 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700946 /* Add the fd to the freelist */
947 gpr_mu_lock(&fd_freelist_mu);
948 fd->freelist_next = fd_freelist;
949 fd_freelist = fd;
950 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -0800951
Sree Kuchibhotlaa70ccb62017-02-13 23:16:52 -0800952 grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -0800953 /* Clear the least significant bit if it set (in case fd was shutdown) */
954 err = (grpc_error *)((intptr_t)err & ~FD_SHUTDOWN_BIT);
Sree Kuchibhotla8b8cbed2017-02-09 21:31:27 -0800955 GRPC_ERROR_UNREF(err);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700956
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700957 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700958 } else {
959 GPR_ASSERT(old > n);
960 }
961}
962
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700963/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700964#ifdef GRPC_FD_REF_COUNT_DEBUG
965static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
966 int line) {
967 ref_by(fd, 2, reason, file, line);
968}
969
970static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
971 int line) {
972 unref_by(fd, 2, reason, file, line);
973}
974#else
975static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700976static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
977#endif
978
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700979static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
980
981static void fd_global_shutdown(void) {
982 gpr_mu_lock(&fd_freelist_mu);
983 gpr_mu_unlock(&fd_freelist_mu);
984 while (fd_freelist != NULL) {
985 grpc_fd *fd = fd_freelist;
986 fd_freelist = fd_freelist->freelist_next;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800987 gpr_mu_destroy(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700988 gpr_free(fd);
989 }
990 gpr_mu_destroy(&fd_freelist_mu);
991}
992
993static grpc_fd *fd_create(int fd, const char *name) {
994 grpc_fd *new_fd = NULL;
995
996 gpr_mu_lock(&fd_freelist_mu);
997 if (fd_freelist != NULL) {
998 new_fd = fd_freelist;
999 fd_freelist = fd_freelist->freelist_next;
1000 }
1001 gpr_mu_unlock(&fd_freelist_mu);
1002
1003 if (new_fd == NULL) {
1004 new_fd = gpr_malloc(sizeof(grpc_fd));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001005 gpr_mu_init(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001006 }
1007
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001008 /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
1009 * is a newly created fd (or an fd we got from the freelist), no one else
1010 * would be holding a lock to it anyway. */
1011 gpr_mu_lock(&new_fd->po.mu);
1012 new_fd->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001013#ifdef PO_DEBUG
1014 new_fd->po.obj_type = POLL_OBJ_FD;
1015#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001016
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -07001017 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001018 new_fd->fd = fd;
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001019 gpr_atm_no_barrier_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001020 new_fd->orphaned = false;
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001021 gpr_atm_no_barrier_store(&new_fd->read_closure, CLOSURE_NOT_READY);
1022 gpr_atm_no_barrier_store(&new_fd->write_closure, CLOSURE_NOT_READY);
1023 gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001024
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001025 new_fd->freelist_next = NULL;
1026 new_fd->on_done_closure = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001027
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001028 gpr_mu_unlock(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001029
1030 char *fd_name;
1031 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
1032 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001033#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -07001034 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001035#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -07001036 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001037 return new_fd;
1038}
1039
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001040static int fd_wrapped_fd(grpc_fd *fd) {
1041 int ret_fd = -1;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001042 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001043 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001044 ret_fd = fd->fd;
1045 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001046 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001047
1048 return ret_fd;
1049}
1050
1051static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1052 grpc_closure *on_done, int *release_fd,
1053 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001054 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001055 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -07001056 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001057
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001058 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001059 fd->on_done_closure = on_done;
1060
1061 /* If release_fd is not NULL, we should be relinquishing control of the file
1062 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001063 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001064 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001065 } else {
1066 close(fd->fd);
1067 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001068 }
1069
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001070 fd->orphaned = true;
1071
1072 /* Remove the active status but keep referenced. We want this grpc_fd struct
1073 to be alive (and not added to freelist) until the end of this function */
1074 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001075
1076 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001077 - Get a lock on the latest polling island (i.e the last island in the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001078 linked list pointed by fd->po.pi). This is the island that
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001079 would actually contain the fd
1080 - Remove the fd from the latest polling island
1081 - Unlock the latest polling island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001082 - Set fd->po.pi to NULL (but remove the ref on the polling island
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001083 before doing this.) */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001084 if (fd->po.pi != NULL) {
1085 polling_island *pi_latest = polling_island_lock(fd->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001086 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001087 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001088
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001089 unref_pi = fd->po.pi;
1090 fd->po.pi = NULL;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001091 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001092
Craig Tiller91031da2016-12-28 15:44:25 -08001093 grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001094
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001095 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001096 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001097 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001098 /* Unref stale polling island here, outside the fd lock above.
1099 The polling island owns a workqueue which owns an fd, and unreffing
1100 inside the lock can cause an eventual lock loop that makes TSAN very
1101 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001102 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1103 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001104 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001105 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001106}
1107
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001108static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
1109 grpc_closure *closure) {
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001110 while (true) {
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001111 /* Fast-path: CLOSURE_NOT_READY -> <closure>.
1112 The 'release' cas here matches the 'acquire' load in set_ready and
1113 set_shutdown ensuring that the closure (scheduled by set_ready or
1114 set_shutdown) happens-after the I/O event on the fd */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001115 if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001116 return; /* Fast-path successful. Return */
1117 }
1118
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001119 /* Slowpath. The 'acquire' load matches the 'release' cas in set_ready and
1120 set_shutdown */
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001121 gpr_atm curr = gpr_atm_acq_load(state);
1122 switch (curr) {
1123 case CLOSURE_NOT_READY: {
1124 break; /* retry */
1125 }
1126
1127 case CLOSURE_READY: {
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001128 /* Change the state to CLOSURE_NOT_READY. Schedule the closure if
1129 successful. If not, the state most likely transitioned to shutdown.
1130 We should retry.
1131
1132 This can be a no-barrier cas since the state is being transitioned to
1133 CLOSURE_NOT_READY; set_ready and set_shutdown do not schedule any
1134 closure when transitioning out of CLOSURE_NO_READY state (i.e there
1135 is no other code that needs to 'happen-after' this) */
1136 if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001137 grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
1138 return; /* Slow-path successful. Return */
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001139 }
1140
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001141 break; /* retry */
1142 }
1143
1144 default: {
1145 /* 'curr' is either a closure or the fd is shutdown(in which case 'curr'
1146 contains a pointer to the shutdown-error). If the fd is shutdown,
1147 schedule the closure with the shutdown error */
1148 if ((curr & FD_SHUTDOWN_BIT) > 0) {
1149 grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT);
1150 grpc_closure_sched(
1151 exec_ctx, closure,
1152 GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
1153 return;
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001154 }
1155
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001156 /* There is already a closure!. This indicates a bug in the code */
1157 gpr_log(GPR_ERROR,
1158 "notify_on called with a previous callback still pending");
1159 abort();
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001160 }
1161 }
1162 }
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001163
1164 GPR_UNREACHABLE_CODE(return );
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001165}
1166
1167static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
1168 grpc_error *shutdown_err) {
1169 /* Try the fast-path first (i.e expect the current value to be
1170 CLOSURE_NOT_READY */
1171 gpr_atm curr = CLOSURE_NOT_READY;
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -08001172 gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT;
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001173
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001174 while (true) {
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001175 /* The 'release' cas here matches the 'acquire' load in notify_on to ensure
1176 that the closure it schedules 'happens-after' the set_shutdown is called
1177 on the fd */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001178 if (gpr_atm_rel_cas(state, curr, new_state)) {
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001179 return; /* Fast-path successful. Return */
1180 }
1181
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001182 /* Fallback to slowpath. This 'acquire' load matches the 'release' cas in
1183 notify_on and set_ready */
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001184 curr = gpr_atm_acq_load(state);
1185 switch (curr) {
1186 case CLOSURE_READY: {
1187 break; /* retry */
1188 }
1189
1190 case CLOSURE_NOT_READY: {
1191 break; /* retry */
1192 }
1193
1194 default: {
1195 /* 'curr' is either a closure or the fd is already shutdown */
1196
1197 /* If fd is already shutdown, we are done */
1198 if ((curr & FD_SHUTDOWN_BIT) > 0) {
1199 return;
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001200 }
1201
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001202 /* Fd is not shutdown. Schedule the closure and move the state to
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001203 shutdown state. The 'release' cas here matches the 'acquire' load in
1204 notify_on to ensure that the closure it schedules 'happens-after'
1205 the set_shutdown is called on the fd */
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001206 if (gpr_atm_rel_cas(state, curr, new_state)) {
1207 grpc_closure_sched(
1208 exec_ctx, (grpc_closure *)curr,
1209 GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
1210 return;
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001211 }
1212
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001213 /* 'curr' was a closure but now changed to a different state. We will
1214 have to retry */
1215 break;
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001216 }
1217 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001218 }
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001219
1220 GPR_UNREACHABLE_CODE(return );
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001221}
1222
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001223static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001224 /* Try an optimistic case first (i.e assume current state is
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001225 CLOSURE_NOT_READY).
1226
1227 This 'release' cas matches the 'acquire' load in notify_on ensuring that
1228 any closure (scheduled by notify_on) 'happens-after' the return from
1229 epoll_pwait */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001230 if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001231 return; /* early out */
1232 }
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001233
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001234 /* The 'acquire' load here matches the 'release' cas in notify_on and
1235 set_shutdown */
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001236 gpr_atm curr = gpr_atm_acq_load(state);
1237 switch (curr) {
1238 case CLOSURE_READY: {
1239 /* Already ready. We are done here */
1240 break;
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001241 }
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001242
1243 case CLOSURE_NOT_READY: {
1244 /* The state was not CLOSURE_NOT_READY when we checked initially at the
1245 beginning of this function but now it is CLOSURE_NOT_READY again.
1246 This is only possible if the state transitioned out of
1247 CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then
1248 back to CLOSURE_NOT_READY again (i.e after we entered this function,
1249 the fd became "ready" and the necessary actions were already done).
1250 So there is no need to make the state CLOSURE_READY now */
1251 break;
1252 }
1253
1254 default: {
1255 /* 'curr' is either a closure or the fd is shutdown */
1256 if ((curr & FD_SHUTDOWN_BIT) > 0) {
1257 /* The fd is shutdown. Do nothing */
Sree Kuchibhotla97e3ecc2017-02-23 14:51:11 -08001258 } else if (gpr_atm_no_barrier_cas(state, curr, CLOSURE_NOT_READY)) {
1259 /* The cas above was no-barrier since the state is being transitioned to
1260 CLOSURE_NOT_READY; notify_on and set_shutdown do not schedule any
1261 closures when transitioning out of CLOSURE_NO_READY state (i.e there
1262 is no other code that needs to 'happen-after' this) */
1263
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001264 grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
1265 }
1266 /* else the state changed again (only possible by either a racing
1267 set_ready or set_shutdown functions. In both these cases, the closure
1268 would have been scheduled for execution. So we are done here */
1269 break;
1270 }
1271 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001272}
1273
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001274static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1275 grpc_fd *fd) {
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001276 gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001277 return (grpc_pollset *)notifier;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001278}
1279
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001280static bool fd_is_shutdown(grpc_fd *fd) {
Sree Kuchibhotla99983382017-02-12 17:03:27 -08001281 grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
Sree Kuchibhotla61fe0942017-02-14 12:26:56 -08001282 return (((intptr_t)err & FD_SHUTDOWN_BIT) > 0);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001283}
1284
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001285/* Might be called multiple times */
Craig Tillercda759d2017-01-27 11:37:37 -08001286static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -08001287 /* Store the shutdown error ORed with FD_SHUTDOWN_BIT in fd->shutdown_error */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001288 if (gpr_atm_rel_cas(&fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE,
Sree Kuchibhotla2fc2b3e2017-02-14 10:05:14 -08001289 (gpr_atm)why | FD_SHUTDOWN_BIT)) {
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001290 shutdown(fd->fd, SHUT_RDWR);
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001291
Sree Kuchibhotla91c4da32017-02-12 14:00:39 -08001292 set_shutdown(exec_ctx, fd, &fd->read_closure, why);
1293 set_shutdown(exec_ctx, fd, &fd->write_closure, why);
Craig Tillercda759d2017-01-27 11:37:37 -08001294 } else {
Sree Kuchibhotla4db8c822017-02-12 17:07:31 -08001295 /* Shutdown already called */
Craig Tillercda759d2017-01-27 11:37:37 -08001296 GRPC_ERROR_UNREF(why);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001297 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001298}
1299
1300static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1301 grpc_closure *closure) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001302 notify_on(exec_ctx, fd, &fd->read_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001303}
1304
1305static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1306 grpc_closure *closure) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001307 notify_on(exec_ctx, fd, &fd->write_closure, closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001308}
1309
Craig Tillerd6ba6192016-06-30 15:42:41 -07001310static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001311 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001312 grpc_workqueue *workqueue =
1313 GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001314 gpr_mu_unlock(&fd->po.mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001315 return workqueue;
1316}
Craig Tiller70bd4832016-06-30 14:20:46 -07001317
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001318/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001319 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001320 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001321GPR_TLS_DECL(g_current_thread_pollset);
1322GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001323static __thread bool g_initialized_sigmask;
1324static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001325
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001326static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001327#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001328 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001329#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001330}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001331
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001332static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001333
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001334/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001335static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001336 gpr_tls_init(&g_current_thread_pollset);
1337 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001338 poller_kick_init();
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001339 return grpc_wakeup_fd_init(&global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001340}
1341
1342static void pollset_global_shutdown(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001343 grpc_wakeup_fd_destroy(&global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001344 gpr_tls_destroy(&g_current_thread_pollset);
1345 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001346}
1347
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001348static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1349 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001350
1351 /* Kick the worker only if it was not already kicked */
1352 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1353 GRPC_POLLING_TRACE(
1354 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1355 (void *)worker, worker->pt_id);
1356 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1357 if (err_num != 0) {
1358 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1359 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001360 }
1361 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001362}
1363
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001364/* Return 1 if the pollset has active threads in pollset_work (pollset must
1365 * be locked) */
1366static int pollset_has_workers(grpc_pollset *p) {
1367 return p->root_worker.next != &p->root_worker;
1368}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001369
1370static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1371 worker->prev->next = worker->next;
1372 worker->next->prev = worker->prev;
1373}
1374
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001375static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1376 if (pollset_has_workers(p)) {
1377 grpc_pollset_worker *w = p->root_worker.next;
1378 remove_worker(p, w);
1379 return w;
1380 } else {
1381 return NULL;
1382 }
1383}
1384
1385static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1386 worker->next = &p->root_worker;
1387 worker->prev = worker->next->prev;
1388 worker->prev->next = worker->next->prev = worker;
1389}
1390
1391static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1392 worker->prev = &p->root_worker;
1393 worker->next = worker->prev->next;
1394 worker->prev->next = worker->next->prev = worker;
1395}
1396
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001397/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001398static grpc_error *pollset_kick(grpc_pollset *p,
1399 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001400 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001401 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001402 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001403 grpc_pollset_worker *worker = specific_worker;
1404 if (worker != NULL) {
1405 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001406 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001407 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001408 for (worker = p->root_worker.next; worker != &p->root_worker;
1409 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001410 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001411 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001412 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001413 }
Craig Tillera218a062016-06-26 09:58:37 -07001414 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001415 } else {
1416 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001417 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001418 } else {
1419 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001420 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001421 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001422 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001423 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001424 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1425 /* Since worker == NULL, it means that we can kick "any" worker on this
1426 pollset 'p'. If 'p' happens to be the same pollset this thread is
1427 currently polling (i.e in pollset_work() function), then there is no need
1428 to kick any other worker since the current thread can just absorb the
1429 kick. This is the reason why we enter this case only when
1430 g_current_thread_pollset is != p */
1431
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001432 GPR_TIMER_MARK("kick_anonymous", 0);
1433 worker = pop_front_worker(p);
1434 if (worker != NULL) {
1435 GPR_TIMER_MARK("finally_kick", 0);
1436 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001437 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001438 } else {
1439 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001440 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001441 }
1442 }
1443
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001444 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001445 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1446 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001447}
1448
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001449static grpc_error *kick_poller(void) {
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001450 return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001451}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001452
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001453static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001454 gpr_mu_init(&pollset->po.mu);
1455 *mu = &pollset->po.mu;
1456 pollset->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001457#ifdef PO_DEBUG
1458 pollset->po.obj_type = POLL_OBJ_POLLSET;
1459#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001460
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001461 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001462 pollset->kicked_without_pollers = false;
1463
1464 pollset->shutting_down = false;
1465 pollset->finish_shutdown_called = false;
1466 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001467}
1468
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001469/* Convert a timespec to milliseconds:
1470 - Very small or negative poll times are clamped to zero to do a non-blocking
1471 poll (which becomes spin polling)
1472 - Other small values are rounded up to one millisecond
1473 - Longer than a millisecond polls are rounded up to the next nearest
1474 millisecond to avoid spinning
1475 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001476static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1477 gpr_timespec now) {
1478 gpr_timespec timeout;
1479 static const int64_t max_spin_polling_us = 10;
1480 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1481 return -1;
1482 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001483
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001484 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1485 max_spin_polling_us,
1486 GPR_TIMESPAN))) <= 0) {
1487 return 0;
1488 }
1489 timeout = gpr_time_sub(deadline, now);
1490 return gpr_time_to_millis(gpr_time_add(
1491 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1492}
1493
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001494static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1495 grpc_pollset *notifier) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001496 set_ready(exec_ctx, fd, &fd->read_closure);
1497
Sree Kuchibhotla4db8c822017-02-12 17:07:31 -08001498 /* Note, it is possible that fd_become_readable might be called twice with
Sree Kuchibhotla4c60d0d2017-02-12 17:09:08 -08001499 different 'notifier's when an fd becomes readable and it is in two epoll
1500 sets (This can happen briefly during polling island merges). In such cases
1501 it does not really matter which notifer is set as the read_notifier_pollset
1502 (They would both point to the same polling island anyway) */
Sree Kuchibhotlafb7ced62017-02-15 18:31:57 -08001503 /* Use release store to match with acquire load in fd_get_read_notifier */
1504 gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001505}
1506
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001507static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla82d73412017-02-09 18:27:45 -08001508 set_ready(exec_ctx, fd, &fd->write_closure);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001509}
1510
Craig Tillerb39307d2016-06-30 15:39:13 -07001511static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1512 grpc_pollset *ps, char *reason) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001513 if (ps->po.pi != NULL) {
1514 PI_UNREF(exec_ctx, ps->po.pi, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001515 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001516 ps->po.pi = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001517}
1518
1519static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1520 grpc_pollset *pollset) {
1521 /* The pollset cannot have any workers if we are at this stage */
1522 GPR_ASSERT(!pollset_has_workers(pollset));
1523
1524 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001525
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001526 /* Release the ref and set pollset->po.pi to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001527 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Craig Tiller91031da2016-12-28 15:44:25 -08001528 grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001529}
1530
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001531/* pollset->po.mu lock must be held by the caller before calling this */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001532static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1533 grpc_closure *closure) {
1534 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1535 GPR_ASSERT(!pollset->shutting_down);
1536 pollset->shutting_down = true;
1537 pollset->shutdown_done = closure;
1538 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1539
1540 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1541 because it would release the underlying polling island. In such a case, we
1542 let the last worker call finish_shutdown_locked() from pollset_work() */
1543 if (!pollset_has_workers(pollset)) {
1544 GPR_ASSERT(!pollset->finish_shutdown_called);
1545 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1546 finish_shutdown_locked(exec_ctx, pollset);
1547 }
1548 GPR_TIMER_END("pollset_shutdown", 0);
1549}
1550
1551/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1552 * than destroying the mutexes, there is nothing special that needs to be done
1553 * here */
1554static void pollset_destroy(grpc_pollset *pollset) {
1555 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001556 gpr_mu_destroy(&pollset->po.mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001557}
1558
Craig Tillerd8a3c042016-09-09 12:42:37 -07001559static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1560 polling_island *pi) {
1561 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1562 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1563 gpr_mu_unlock(&pi->workqueue_read_mu);
1564 if (n != NULL) {
1565 if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
1566 workqueue_maybe_wakeup(pi);
1567 }
1568 grpc_closure *c = (grpc_closure *)n;
Craig Tiller061ef742016-12-29 10:54:09 -08001569 grpc_error *error = c->error_data.error;
1570 c->cb(exec_ctx, c->cb_arg, error);
1571 GRPC_ERROR_UNREF(error);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001572 return true;
1573 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
Craig Tiller460502e2016-10-13 10:02:08 -07001574 /* n == NULL might mean there's work but it's not available to be popped
1575 * yet - try to ensure another workqueue wakes up to check shortly if so
1576 */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001577 workqueue_maybe_wakeup(pi);
1578 }
1579 }
1580 return false;
1581}
1582
Craig Tiller84ea3412016-09-08 14:57:56 -07001583#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001584/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1585static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001586 grpc_pollset *pollset,
1587 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001588 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001589 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001590 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001591 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001592 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001593 char *err_msg;
1594 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001595 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1596
1597 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001598 latest polling island pointed by pollset->po.pi
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001599
1600 Since epoll_fd is immutable, we can read it without obtaining the polling
1601 island lock. There is however a possibility that the polling island (from
1602 which we got the epoll_fd) got merged with another island while we are
1603 in this function. This is still okay because in such a case, we will wakeup
1604 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001605 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001606
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001607 if (pollset->po.pi == NULL) {
1608 pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
1609 if (pollset->po.pi == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001610 GPR_TIMER_END("pollset_work_and_unlock", 0);
1611 return; /* Fatal error. We cannot continue */
1612 }
1613
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001614 PI_ADD_REF(pollset->po.pi, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001615 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001616 (void *)pollset, (void *)pollset->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001617 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001618
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001619 pi = polling_island_maybe_get_latest(pollset->po.pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001620 epoll_fd = pi->epoll_fd;
1621
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001622 /* Update the pollset->po.pi since the island being pointed by
1623 pollset->po.pi maybe older than the one pointed by pi) */
1624 if (pollset->po.pi != pi) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001625 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1626 polling island to be deleted */
1627 PI_ADD_REF(pi, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001628 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
1629 pollset->po.pi = pi;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001630 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001631
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001632 /* Add an extra ref so that the island does not get destroyed (which means
1633 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1634 epoll_fd */
1635 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001636 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001637
Craig Tiller460502e2016-10-13 10:02:08 -07001638 /* If we get some workqueue work to do, it might end up completing an item on
1639 the completion queue, so there's no need to poll... so we skip that and
1640 redo the complete loop to verify */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001641 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
1642 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1643 g_current_thread_polling_island = pi;
1644
Vijay Paicef54012016-08-28 23:05:31 -07001645 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001646 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1647 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001648 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001649 if (ep_rv < 0) {
1650 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001651 gpr_asprintf(&err_msg,
1652 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1653 epoll_fd, errno, strerror(errno));
1654 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001655 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001656 /* We were interrupted. Save an interation by doing a zero timeout
1657 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001658 GRPC_POLLING_TRACE(
1659 "pollset_work: pollset: %p, worker: %p received kick",
1660 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001661 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001662 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001663 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001664
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001665#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001666 /* See the definition of g_poll_sync for more details */
1667 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001668#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001669
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001670 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001671 void *data_ptr = ep_ev[i].data.ptr;
Craig Tillerb4b8e1e2016-11-28 07:33:13 -08001672 if (data_ptr == &global_wakeup_fd) {
Craig Tiller185f6c92017-03-17 08:33:19 -07001673 grpc_timer_consume_kick();
Craig Tiller1fa9ddb2016-11-28 08:19:37 -08001674 append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001675 err_desc);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001676 } else if (data_ptr == &pi->workqueue_wakeup_fd) {
Craig Tillere49959d2017-01-26 08:39:38 -08001677 append_error(error,
1678 grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
Craig Tillerd8a3c042016-09-09 12:42:37 -07001679 err_desc);
1680 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001681 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001682 GRPC_POLLING_TRACE(
1683 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1684 "%d) got merged",
1685 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001686 /* This means that our polling island is merged with a different
1687 island. We do not have to do anything here since the subsequent call
1688 to the function pollset_work_and_unlock() will pick up the correct
1689 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001690 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001691 grpc_fd *fd = data_ptr;
1692 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1693 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1694 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001695 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001696 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001697 }
1698 if (write_ev || cancel) {
1699 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001700 }
1701 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001702 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001703
1704 g_current_thread_polling_island = NULL;
1705 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1706 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001707
1708 GPR_ASSERT(pi != NULL);
1709
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001710 /* Before leaving, release the extra ref we added to the polling island. It
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001711 is important to use "pi" here (i.e our old copy of pollset->po.pi
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001712 that we got before releasing the polling island lock). This is because
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001713 pollset->po.pi pointer might get udpated in other parts of the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001714 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001715 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001716
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001717 GPR_TIMER_END("pollset_work_and_unlock", 0);
1718}
1719
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001720/* pollset->po.mu lock must be held by the caller before calling this.
1721 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001722 during the course of its execution but it will always re-acquire the lock and
1723 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001724static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1725 grpc_pollset_worker **worker_hdl,
1726 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001727 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001728 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001729 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1730
1731 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001732
1733 grpc_pollset_worker worker;
1734 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001735 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001736 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001737
1738 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001739
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001740 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1741 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001742
1743 if (pollset->kicked_without_pollers) {
1744 /* If the pollset was kicked without pollers, pretend that the current
1745 worker got the kick and skip polling. A kick indicates that there is some
1746 work that needs attention like an event on the completion queue or an
1747 alarm */
1748 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1749 pollset->kicked_without_pollers = 0;
1750 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001751 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001752 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1753 worker that there is some pending work that needs immediate attention
1754 (like an event on the completion queue, or a polling island merge that
1755 results in a new epoll-fd to wait on) and that the worker should not
1756 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001757
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001758 A worker can be kicked anytime from the point it is added to the pollset
1759 via push_front_worker() (or push_back_worker()) to the point it is
1760 removed via remove_worker().
1761 If the worker is kicked before/during it calls epoll_pwait(), it should
1762 immediately exit from epoll_wait(). If the worker is kicked after it
1763 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001764
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001765 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001766 times *except* when it is in epoll_pwait(). This way, the worker never
1767 misses acting on a kick */
1768
Craig Tiller19196992016-06-27 18:45:56 -07001769 if (!g_initialized_sigmask) {
1770 sigemptyset(&new_mask);
1771 sigaddset(&new_mask, grpc_wakeup_signal);
1772 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1773 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1774 g_initialized_sigmask = true;
1775 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1776 This is the mask used at all times *except during
1777 epoll_wait()*"
1778 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001779 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001780
Craig Tiller19196992016-06-27 18:45:56 -07001781 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001782 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001783 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001784
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001785 push_front_worker(pollset, &worker); /* Add worker to pollset */
1786
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001787 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1788 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001789 grpc_exec_ctx_flush(exec_ctx);
1790
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001791 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001792
1793 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1794 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001795 remove_worker(pollset, &worker);
1796 }
1797
1798 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1799 false at this point) and the pollset is shutting down, we may have to
1800 finish the shutdown process by calling finish_shutdown_locked().
1801 See pollset_shutdown() for more details.
1802
1803 Note: Continuing to access pollset here is safe; it is the caller's
1804 responsibility to not destroy a pollset when it has outstanding calls to
1805 pollset_work() */
1806 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1807 !pollset->finish_shutdown_called) {
1808 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1809 finish_shutdown_locked(exec_ctx, pollset);
1810
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001811 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001812 grpc_exec_ctx_flush(exec_ctx);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001813 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001814 }
1815
1816 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001817
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001818 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1819 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001820
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001821 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001822
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001823 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1824 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001825}
1826
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001827static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001828 poll_obj_type bag_type, poll_obj *item,
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001829 poll_obj_type item_type) {
1830 GPR_TIMER_BEGIN("add_poll_object", 0);
1831
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001832#ifdef PO_DEBUG
1833 GPR_ASSERT(item->obj_type == item_type);
1834 GPR_ASSERT(bag->obj_type == bag_type);
1835#endif
Craig Tiller57726ca2016-09-12 11:59:45 -07001836
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001837 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001838 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001839
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001840 gpr_mu_lock(&bag->mu);
1841 gpr_mu_lock(&item->mu);
1842
Craig Tiller7212c232016-07-06 13:11:09 -07001843retry:
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001844 /*
1845 * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1846 * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1847 * a refcount of 2) and point item->pi and bag->pi to the new island
1848 * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1849 * the other's non-NULL pi
1850 * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1851 * polling islands and update item->pi and bag->pi to point to the new
1852 * island
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001853 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001854
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001855 /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1856 * orphaned */
1857 if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1858 gpr_mu_unlock(&item->mu);
1859 gpr_mu_unlock(&bag->mu);
Craig Tiller42ac6db2016-07-06 17:13:56 -07001860 return;
1861 }
1862
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001863 if (item->pi == bag->pi) {
1864 pi_new = item->pi;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001865 if (pi_new == NULL) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001866 /* GPR_ASSERT(item->pi == bag->pi == NULL) */
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001867
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001868 /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1869 * we need to do some extra work to make TSAN happy */
1870 if (item_type == POLL_OBJ_FD) {
1871 /* Unlock before creating a new polling island: the polling island will
1872 create a workqueue which creates a file descriptor, and holding an fd
1873 lock here can eventually cause a loop to appear to TSAN (making it
1874 unhappy). We don't think it's a real loop (there's an epoch point
1875 where that loop possibility disappears), but the advantages of
1876 keeping TSAN happy outweigh any performance advantage we might have
1877 by keeping the lock held. */
1878 gpr_mu_unlock(&item->mu);
1879 pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
1880 gpr_mu_lock(&item->mu);
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001881
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001882 /* Need to reverify any assumptions made between the initial lock and
1883 getting to this branch: if they've changed, we need to throw away our
1884 work and figure things out again. */
1885 if (item->pi != NULL) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001886 GRPC_POLLING_TRACE(
1887 "add_poll_object: Raced creating new polling island. pi_new: %p "
1888 "(fd: %d, %s: %p)",
1889 (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1890 (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001891 /* No need to lock 'pi_new' here since this is a new polling island
Sree Kuchibhotla8214b872017-02-23 12:49:48 -08001892 and no one has a reference to it yet */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001893 polling_island_remove_all_fds_locked(pi_new, true, &error);
1894
1895 /* Ref and unref so that the polling island gets deleted during unref
1896 */
1897 PI_ADD_REF(pi_new, "dance_of_destruction");
1898 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
1899 goto retry;
1900 }
Craig Tiller27da6422016-07-06 13:14:46 -07001901 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001902 pi_new = polling_island_create(exec_ctx, NULL, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001903 }
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001904
1905 GRPC_POLLING_TRACE(
1906 "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1907 "%s: %p)",
1908 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1909 poll_obj_string(bag_type), (void *)bag);
1910 } else {
1911 GRPC_POLLING_TRACE(
1912 "add_poll_object: Same polling island. pi: %p (%s, %s)",
1913 (void *)pi_new, poll_obj_string(item_type),
1914 poll_obj_string(bag_type));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001915 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001916 } else if (item->pi == NULL) {
1917 /* GPR_ASSERT(bag->pi != NULL) */
1918 /* Make pi_new point to latest pi*/
1919 pi_new = polling_island_lock(bag->pi);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001920
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001921 if (item_type == POLL_OBJ_FD) {
1922 grpc_fd *fd = FD_FROM_PO(item);
1923 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1924 }
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001925
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001926 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001927 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001928 "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1929 "bag(%s): %p)",
1930 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1931 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001932 } else if (bag->pi == NULL) {
1933 /* GPR_ASSERT(item->pi != NULL) */
1934 /* Make pi_new to point to latest pi */
1935 pi_new = polling_island_lock(item->pi);
1936 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001937 GRPC_POLLING_TRACE(
1938 "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1939 "bag(%s): %p)",
1940 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1941 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001942 } else {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001943 pi_new = polling_island_merge(item->pi, bag->pi, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001944 GRPC_POLLING_TRACE(
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001945 "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1946 "bag(%s): %p)",
1947 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1948 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001949 }
1950
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001951 /* At this point, pi_new is the polling island that both item->pi and bag->pi
1952 MUST be pointing to */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001953
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001954 if (item->pi != pi_new) {
1955 PI_ADD_REF(pi_new, poll_obj_string(item_type));
1956 if (item->pi != NULL) {
1957 PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001958 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001959 item->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001960 }
1961
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001962 if (bag->pi != pi_new) {
1963 PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1964 if (bag->pi != NULL) {
1965 PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001966 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001967 bag->pi = pi_new;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001968 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001969
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001970 gpr_mu_unlock(&item->mu);
1971 gpr_mu_unlock(&bag->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001972
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001973 GRPC_LOG_IF_ERROR("add_poll_object", error);
1974 GPR_TIMER_END("add_poll_object", 0);
1975}
Craig Tiller57726ca2016-09-12 11:59:45 -07001976
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001977static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1978 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001979 add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001980 POLL_OBJ_FD);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001981}
1982
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001983/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001984 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001985 */
1986
1987static grpc_pollset_set *pollset_set_create(void) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001988 grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001989 gpr_mu_init(&pss->po.mu);
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001990 pss->po.pi = NULL;
1991#ifdef PO_DEBUG
1992 pss->po.obj_type = POLL_OBJ_POLLSET_SET;
1993#endif
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001994 return pss;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001995}
1996
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08001997static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
1998 grpc_pollset_set *pss) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001999 gpr_mu_destroy(&pss->po.mu);
2000
2001 if (pss->po.pi != NULL) {
Craig Tiller9e5ac1b2017-02-14 22:25:50 -08002002 PI_UNREF(exec_ctx, pss->po.pi, "pss_destroy");
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002003 }
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002004
2005 gpr_free(pss);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002006}
2007
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002008static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
2009 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08002010 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002011 POLL_OBJ_FD);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002012}
2013
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002014static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
2015 grpc_fd *fd) {
2016 /* Nothing to do */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002017}
2018
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002019static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002020 grpc_pollset_set *pss, grpc_pollset *ps) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08002021 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002022 POLL_OBJ_POLLSET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002023}
2024
2025static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002026 grpc_pollset_set *pss, grpc_pollset *ps) {
2027 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002028}
2029
2030static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
2031 grpc_pollset_set *bag,
2032 grpc_pollset_set *item) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08002033 add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002034 POLL_OBJ_POLLSET_SET);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002035}
2036
2037static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
2038 grpc_pollset_set *bag,
2039 grpc_pollset_set *item) {
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08002040 /* Nothing to do */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002041}
2042
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002043/* Test helper functions
2044 * */
2045void *grpc_fd_get_polling_island(grpc_fd *fd) {
2046 polling_island *pi;
2047
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002048 gpr_mu_lock(&fd->po.mu);
2049 pi = fd->po.pi;
2050 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002051
2052 return pi;
2053}
2054
2055void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
2056 polling_island *pi;
2057
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08002058 gpr_mu_lock(&ps->po.mu);
2059 pi = ps->po.pi;
2060 gpr_mu_unlock(&ps->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002061
2062 return pi;
2063}
2064
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002065bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002066 polling_island *p1 = p;
2067 polling_island *p2 = q;
2068
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07002069 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
2070 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002071 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07002072 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07002073
2074 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07002075}
2076
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002077/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07002078 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002079 */
2080
2081static void shutdown_engine(void) {
2082 fd_global_shutdown();
2083 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07002084 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002085}
2086
2087static const grpc_event_engine_vtable vtable = {
2088 .pollset_size = sizeof(grpc_pollset),
2089
2090 .fd_create = fd_create,
2091 .fd_wrapped_fd = fd_wrapped_fd,
2092 .fd_orphan = fd_orphan,
2093 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07002094 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002095 .fd_notify_on_read = fd_notify_on_read,
2096 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002097 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07002098 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002099
2100 .pollset_init = pollset_init,
2101 .pollset_shutdown = pollset_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002102 .pollset_destroy = pollset_destroy,
2103 .pollset_work = pollset_work,
2104 .pollset_kick = pollset_kick,
2105 .pollset_add_fd = pollset_add_fd,
2106
2107 .pollset_set_create = pollset_set_create,
2108 .pollset_set_destroy = pollset_set_destroy,
2109 .pollset_set_add_pollset = pollset_set_add_pollset,
2110 .pollset_set_del_pollset = pollset_set_del_pollset,
2111 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
2112 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
2113 .pollset_set_add_fd = pollset_set_add_fd,
2114 .pollset_set_del_fd = pollset_set_del_fd,
2115
2116 .kick_poller = kick_poller,
2117
Craig Tillerd8a3c042016-09-09 12:42:37 -07002118 .workqueue_ref = workqueue_ref,
2119 .workqueue_unref = workqueue_unref,
Craig Tiller91031da2016-12-28 15:44:25 -08002120 .workqueue_scheduler = workqueue_scheduler,
Craig Tillerd8a3c042016-09-09 12:42:37 -07002121
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002122 .shutdown_engine = shutdown_engine,
2123};
2124
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002125/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
2126 * Create a dummy epoll_fd to make sure epoll support is available */
2127static bool is_epoll_available() {
2128 int fd = epoll_create1(EPOLL_CLOEXEC);
2129 if (fd < 0) {
2130 gpr_log(
2131 GPR_ERROR,
2132 "epoll_create1 failed with error: %d. Not using epoll polling engine",
2133 fd);
2134 return false;
2135 }
2136 close(fd);
2137 return true;
2138}
2139
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002140const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002141 /* If use of signals is disabled, we cannot use epoll engine*/
2142 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
2143 return NULL;
2144 }
2145
Ken Paysoncd7d0472016-10-11 12:24:20 -07002146 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07002147 return NULL;
2148 }
2149
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002150 if (!is_epoll_available()) {
2151 return NULL;
2152 }
2153
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002154 if (!is_grpc_wakeup_signal_initialized) {
Sree Kuchibhotlabd48c912016-09-27 16:48:25 -07002155 grpc_use_signal(SIGRTMIN + 6);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002156 }
2157
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002158 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07002159
2160 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
2161 return NULL;
2162 }
2163
2164 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
2165 polling_island_global_init())) {
2166 return NULL;
2167 }
2168
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002169 return &vtable;
2170}
2171
murgatroid99623dd4f2016-08-08 17:31:27 -07002172#else /* defined(GRPC_LINUX_EPOLL) */
2173#if defined(GRPC_POSIX_SOCKET)
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002174#include "src/core/lib/iomgr/ev_posix.h"
murgatroid99623dd4f2016-08-08 17:31:27 -07002175/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002176 * NULL */
2177const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
murgatroid99623dd4f2016-08-08 17:31:27 -07002178#endif /* defined(GRPC_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002179
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002180void grpc_use_signal(int signum) {}
murgatroid99623dd4f2016-08-08 17:31:27 -07002181#endif /* !defined(GRPC_LINUX_EPOLL) */