blob: 16a22b7b9be1e455dfe244c2e5bc4fe8320cee2c [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
murgatroid9954070892016-08-08 17:01:18 -070035#include "src/core/lib/iomgr/port.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070036
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
murgatroid99623dd4f2016-08-08 17:31:27 -070038#ifdef GRPC_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070045#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070046#include <signal.h>
47#include <string.h>
48#include <sys/epoll.h>
49#include <sys/socket.h>
50#include <unistd.h>
51
52#include <grpc/support/alloc.h>
53#include <grpc/support/log.h>
54#include <grpc/support/string_util.h>
55#include <grpc/support/tls.h>
56#include <grpc/support/useful.h>
57
58#include "src/core/lib/iomgr/ev_posix.h"
59#include "src/core/lib/iomgr/iomgr_internal.h"
60#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070061#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070062#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Sree Kuchibhotla34217242016-06-29 00:19:07 -070065/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
66static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070067#define GRPC_POLLING_TRACE(fmt, ...) \
68 if (grpc_polling_trace) { \
69 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
70 }
71
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070072static int grpc_wakeup_signal = -1;
73static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070075/* Implements the function defined in grpc_posix.h. This function might be
76 * called before even calling grpc_init() to set either a different signal to
77 * use. If signum == -1, then the use of signals is disabled */
78void grpc_use_signal(int signum) {
79 grpc_wakeup_signal = signum;
80 is_grpc_wakeup_signal_initialized = true;
81
82 if (grpc_wakeup_signal < 0) {
83 gpr_log(GPR_INFO,
84 "Use of signals is disabled. Epoll engine will not be used");
85 } else {
86 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
87 grpc_wakeup_signal);
88 }
89}
90
91struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070092
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -080093typedef enum {
94 POLL_OBJ_FD,
95 POLL_OBJ_POLLSET,
96 POLL_OBJ_POLLSET_SET
97} poll_obj_type;
98
99typedef struct poll_obj {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -0800100#ifdef PO_DEBUG
101 poll_obj_type obj_type;
102#endif
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800103 gpr_mu mu;
104 struct polling_island *pi;
105} poll_obj;
106
107const char *poll_obj_string(poll_obj_type po_type) {
108 switch (po_type) {
109 case POLL_OBJ_FD:
110 return "fd";
111 case POLL_OBJ_POLLSET:
112 return "pollset";
113 case POLL_OBJ_POLLSET_SET:
114 return "pollset_set";
115 }
116
117 GPR_UNREACHABLE_CODE(return "UNKNOWN");
118}
119
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700120/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700121 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122 */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800123
124#define FD_FROM_PO(po) ((grpc_fd *)(po))
125
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700126struct grpc_fd {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800127 poll_obj po;
128
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700129 int fd;
130 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -0700131 bit 0 : 1=Active / 0=Orphaned
132 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700133 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700134 gpr_atm refst;
135
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700136 /* Indicates that the fd is shutdown and that any pending read/write closures
137 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700138 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700139
140 /* The fd is either closed or we relinquished control of it. In either cases,
141 this indicates that the 'fd' on this structure is no longer valid */
142 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700143
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700144 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700145 grpc_closure *read_closure;
146 grpc_closure *write_closure;
147
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700148 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700149 grpc_closure *on_done_closure;
150
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700151 /* The pollset that last noticed that the fd is readable */
152 grpc_pollset *read_notifier_pollset;
153
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700154 grpc_iomgr_object iomgr_object;
155};
156
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700157/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700158// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700159#ifdef GRPC_FD_REF_COUNT_DEBUG
160static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
161static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
162 int line);
163#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
164#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
165#else
166static void fd_ref(grpc_fd *fd);
167static void fd_unref(grpc_fd *fd);
168#define GRPC_FD_REF(fd, reason) fd_ref(fd)
169#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
170#endif
171
172static void fd_global_init(void);
173static void fd_global_shutdown(void);
174
175#define CLOSURE_NOT_READY ((grpc_closure *)0)
176#define CLOSURE_READY ((grpc_closure *)1)
177
178/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700179 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700180 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700181
Craig Tillerd8a3c042016-09-09 12:42:37 -0700182#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700183
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700184#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700185#define PI_UNREF(exec_ctx, p, r) \
186 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700187
Craig Tillerd8a3c042016-09-09 12:42:37 -0700188#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700189
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700190#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700191#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700192
Yuchen Zeng362ac1b2016-09-13 16:01:31 -0700193#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700194
Craig Tiller460502e2016-10-13 10:02:08 -0700195/* This is also used as grpc_workqueue (by directly casing it) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700196typedef struct polling_island {
197 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700198 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
199 the refcount.
200 Once the ref count becomes zero, this structure is destroyed which means
201 we should ensure that there is never a scenario where a PI_ADD_REF() is
202 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700203 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700204
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700205 /* Pointer to the polling_island this merged into.
206 * merged_to value is only set once in polling_island's lifetime (and that too
207 * only if the island is merged with another island). Because of this, we can
208 * use gpr_atm type here so that we can do atomic access on this and reduce
209 * lock contention on 'mu' mutex.
210 *
211 * Note that if this field is not NULL (i.e not 0), all the remaining fields
212 * (except mu and ref_count) are invalid and must be ignored. */
213 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700214
Craig Tiller460502e2016-10-13 10:02:08 -0700215 /* Number of threads currently polling on this island */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700216 gpr_atm poller_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700217 /* Mutex guarding the read end of the workqueue (must be held to pop from
218 * workqueue_items) */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700219 gpr_mu workqueue_read_mu;
Craig Tiller460502e2016-10-13 10:02:08 -0700220 /* Queue of closures to be executed */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700221 gpr_mpscq workqueue_items;
Craig Tiller460502e2016-10-13 10:02:08 -0700222 /* Count of items in workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700223 gpr_atm workqueue_item_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700224 /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700225 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700226
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700227 /* The fd of the underlying epoll set */
228 int epoll_fd;
229
230 /* The file descriptors in the epoll set */
231 size_t fd_cnt;
232 size_t fd_capacity;
233 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700234} polling_island;
235
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700236/*******************************************************************************
237 * Pollset Declarations
238 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700239struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700240 /* Thread id of this worker */
241 pthread_t pt_id;
242
243 /* Used to prevent a worker from getting kicked multiple times */
244 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700245 struct grpc_pollset_worker *next;
246 struct grpc_pollset_worker *prev;
247};
248
249struct grpc_pollset {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800250 poll_obj po;
251
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700252 grpc_pollset_worker root_worker;
253 bool kicked_without_pollers;
254
255 bool shutting_down; /* Is the pollset shutting down ? */
256 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
257 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700258};
259
260/*******************************************************************************
261 * Pollset-set Declarations
262 */
263struct grpc_pollset_set {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800264 poll_obj po;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700265};
266
267/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700268 * Common helpers
269 */
270
Craig Tillerf975f742016-07-01 14:56:27 -0700271static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700272 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700273 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700274 if (*composite == GRPC_ERROR_NONE) {
275 *composite = GRPC_ERROR_CREATE(desc);
276 }
277 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700278 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700279}
280
281/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700282 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700283 */
284
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700285/* The wakeup fd that is used to wake up all threads in a Polling island. This
286 is useful in the polling island merge operation where we need to wakeup all
287 the threads currently polling the smaller polling island (so that they can
288 start polling the new/merged polling island)
289
290 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
291 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
292static grpc_wakeup_fd polling_island_wakeup_fd;
293
Craig Tiller2e620132016-10-10 15:27:44 -0700294/* The polling island being polled right now.
295 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700296static __thread polling_island *g_current_thread_polling_island;
297
Craig Tillerb39307d2016-06-30 15:39:13 -0700298/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700299static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700300
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700301#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700302/* Currently TSAN may incorrectly flag data races between epoll_ctl and
303 epoll_wait for any grpc_fd structs that are added to the epoll set via
304 epoll_ctl and are returned (within a very short window) via epoll_wait().
305
306 To work-around this race, we establish a happens-before relation between
307 the code just-before epoll_ctl() and the code after epoll_wait() by using
308 this atomic */
309gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700310#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700311
Craig Tillerb39307d2016-06-30 15:39:13 -0700312static void pi_add_ref(polling_island *pi);
313static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700314
Craig Tillerd8a3c042016-09-09 12:42:37 -0700315#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700316static void pi_add_ref_dbg(polling_island *pi, const char *reason,
317 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700318 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700319 pi_add_ref(pi);
320 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
321 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700322}
323
Craig Tillerb39307d2016-06-30 15:39:13 -0700324static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700325 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700326 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700327 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700328 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700329 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700330}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700331
332static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
333 const char *file, int line,
334 const char *reason) {
335 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700336 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700337 }
338 return workqueue;
339}
340
341static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
342 const char *file, int line, const char *reason) {
343 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700344 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700345 }
346}
347#else
348static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
349 if (workqueue != NULL) {
350 pi_add_ref((polling_island *)workqueue);
351 }
352 return workqueue;
353}
354
355static void workqueue_unref(grpc_exec_ctx *exec_ctx,
356 grpc_workqueue *workqueue) {
357 if (workqueue != NULL) {
358 pi_unref(exec_ctx, (polling_island *)workqueue);
359 }
360}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700361#endif
362
Craig Tiller15007612016-07-06 09:36:16 -0700363static void pi_add_ref(polling_island *pi) {
364 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
365}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700366
Craig Tillerb39307d2016-06-30 15:39:13 -0700367static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700368 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700369 Note that this deletion not be done under a lock. Once the ref count goes
370 to zero, we are guaranteed that no one else holds a reference to the
371 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700372
373 Also, if we are deleting the polling island and the merged_to field is
374 non-empty, we should remove a ref to the merged_to polling island
375 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700376 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
377 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
378 polling_island_delete(exec_ctx, pi);
379 if (next != NULL) {
380 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700381 }
382 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700383}
384
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700385/* The caller is expected to hold pi->mu lock before calling this function */
386static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700387 size_t fd_count, bool add_fd_refs,
388 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700389 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700390 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700391 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700392 char *err_msg;
393 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700394
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700395#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700396 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700397 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700398#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700399
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700400 for (i = 0; i < fd_count; i++) {
401 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
402 ev.data.ptr = fds[i];
403 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700404
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700405 if (err < 0) {
406 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700407 gpr_asprintf(
408 &err_msg,
409 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
410 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
411 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
412 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700413 }
414
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700415 continue;
416 }
417
418 if (pi->fd_cnt == pi->fd_capacity) {
419 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
420 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
421 }
422
423 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700424 if (add_fd_refs) {
425 GRPC_FD_REF(fds[i], "polling_island");
426 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700427 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700428}
429
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700430/* The caller is expected to hold pi->mu before calling this */
431static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700432 grpc_wakeup_fd *wakeup_fd,
433 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700434 struct epoll_event ev;
435 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700436 char *err_msg;
437 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700438
439 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
440 ev.data.ptr = wakeup_fd;
441 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
442 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700443 if (err < 0 && errno != EEXIST) {
444 gpr_asprintf(&err_msg,
445 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
446 "error: %d (%s)",
447 pi->epoll_fd,
448 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
449 strerror(errno));
450 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
451 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700452 }
453}
454
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700455/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700456static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700457 bool remove_fd_refs,
458 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700459 int err;
460 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700461 char *err_msg;
462 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700463
464 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700465 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700466 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700467 gpr_asprintf(&err_msg,
468 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
469 "error: %d (%s)",
470 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
471 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
472 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700473 }
474
475 if (remove_fd_refs) {
476 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700477 }
478 }
479
480 pi->fd_cnt = 0;
481}
482
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700483/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700484static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700485 bool is_fd_closed,
486 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700487 int err;
488 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700489 char *err_msg;
490 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700491
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700492 /* If fd is already closed, then it would have been automatically been removed
493 from the epoll set */
494 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700495 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
496 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700497 gpr_asprintf(
498 &err_msg,
499 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
500 pi->epoll_fd, fd->fd, errno, strerror(errno));
501 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
502 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700503 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700504 }
505
506 for (i = 0; i < pi->fd_cnt; i++) {
507 if (pi->fds[i] == fd) {
508 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700509 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700510 break;
511 }
512 }
513}
514
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700515/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700516static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
517 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700518 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700519 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700520 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700521
Craig Tillerb39307d2016-06-30 15:39:13 -0700522 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700523
Craig Tillerb39307d2016-06-30 15:39:13 -0700524 pi = gpr_malloc(sizeof(*pi));
525 gpr_mu_init(&pi->mu);
526 pi->fd_cnt = 0;
527 pi->fd_capacity = 0;
528 pi->fds = NULL;
529 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700530
531 gpr_mu_init(&pi->workqueue_read_mu);
532 gpr_mpscq_init(&pi->workqueue_items);
533 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700534
Craig Tiller15007612016-07-06 09:36:16 -0700535 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700536 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700537 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700538
Craig Tillerd8a3c042016-09-09 12:42:37 -0700539 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
540 err_desc)) {
541 goto done;
542 }
543
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700544 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700545
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700546 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700547 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
548 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700549 }
550
Craig Tillerb39307d2016-06-30 15:39:13 -0700551 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700552 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700553
554 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700555 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700556 }
557
Craig Tillerb39307d2016-06-30 15:39:13 -0700558done:
559 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700560 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700561 pi = NULL;
562 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700563 return pi;
564}
565
Craig Tillerb39307d2016-06-30 15:39:13 -0700566static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700567 GPR_ASSERT(pi->fd_cnt == 0);
568
Craig Tiller0a06cd72016-07-14 13:21:24 -0700569 if (pi->epoll_fd >= 0) {
570 close(pi->epoll_fd);
571 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700572 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
573 gpr_mu_destroy(&pi->workqueue_read_mu);
574 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700575 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700576 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700577 gpr_free(pi->fds);
578 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700579}
580
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700581/* Attempts to gets the last polling island in the linked list (liked by the
582 * 'merged_to' field). Since this does not lock the polling island, there are no
583 * guarantees that the island returned is the last island */
584static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
585 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
586 while (next != NULL) {
587 pi = next;
588 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
589 }
590
591 return pi;
592}
593
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700594/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700595 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700596 returned polling island's mu.
597 Usage: To lock/unlock polling island "pi", do the following:
598 polling_island *pi_latest = polling_island_lock(pi);
599 ...
600 ... critical section ..
601 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700602 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
603static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700604 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700605
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700606 while (true) {
607 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
608 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700609 /* Looks like 'pi' is the last node in the linked list but unless we check
610 this by holding the pi->mu lock, we cannot be sure (i.e without the
611 pi->mu lock, we don't prevent island merges).
612 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700613 gpr_mu_lock(&pi->mu);
614 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
615 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700616 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700617 break;
618 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700619
620 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
621 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700622 gpr_mu_unlock(&pi->mu);
623 }
624
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700625 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700626 }
627
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700628 return pi;
629}
630
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700631/* Gets the lock on the *latest* polling islands in the linked lists pointed by
632 *p and *q (and also updates *p and *q to point to the latest polling islands)
633
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700634 This function is needed because calling the following block of code to obtain
635 locks on polling islands (*p and *q) is prone to deadlocks.
636 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700637 polling_island_lock(*p, true);
638 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700639 }
640
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700641 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700642 polling_island *p1;
643 polling_island *p2;
644 ..
645 polling_island_lock_pair(&p1, &p2);
646 ..
647 .. Critical section with both p1 and p2 locked
648 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700649 // Release locks: Always call polling_island_unlock_pair() to release locks
650 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700651*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700652static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700653 polling_island *pi_1 = *p;
654 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700655 polling_island *next_1 = NULL;
656 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700657
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700658 /* The algorithm is simple:
659 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
660 keep updating pi_1 and pi_2)
661 - Then obtain locks on the islands by following a lock order rule of
662 locking polling_island with lower address first
663 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
664 pointing to the same island. If that is the case, we can just call
665 polling_island_lock()
666 - After obtaining both the locks, double check that the polling islands
667 are still the last polling islands in their respective linked lists
668 (this is because there might have been polling island merges before
669 we got the lock)
670 - If the polling islands are the last islands, we are done. If not,
671 release the locks and continue the process from the first step */
672 while (true) {
673 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
674 while (next_1 != NULL) {
675 pi_1 = next_1;
676 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700677 }
678
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700679 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
680 while (next_2 != NULL) {
681 pi_2 = next_2;
682 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
683 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700684
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700685 if (pi_1 == pi_2) {
686 pi_1 = pi_2 = polling_island_lock(pi_1);
687 break;
688 }
689
690 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700691 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700692 gpr_mu_lock(&pi_2->mu);
693 } else {
694 gpr_mu_lock(&pi_2->mu);
695 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700696 }
697
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700698 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
699 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
700 if (next_1 == NULL && next_2 == NULL) {
701 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700702 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700703
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700704 gpr_mu_unlock(&pi_1->mu);
705 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700706 }
707
708 *p = pi_1;
709 *q = pi_2;
710}
711
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700712static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
713 if (p == q) {
714 gpr_mu_unlock(&p->mu);
715 } else {
716 gpr_mu_unlock(&p->mu);
717 gpr_mu_unlock(&q->mu);
718 }
719}
720
Craig Tillerd8a3c042016-09-09 12:42:37 -0700721static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700722 /* If this thread is the current poller, then it may be that it's about to
723 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700724 bool is_current_poller = (g_current_thread_polling_island == pi);
725 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
726 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700727 /* Only issue a wakeup if it's likely that some poller could come in and take
728 it right now. Note that since we do an anticipatory mpscq_pop every poll
729 loop, it's ok if we miss the wakeup here, as we'll get the work item when
730 the next poller enters anyway. */
731 if (current_pollers > min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700732 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
733 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
734 }
735}
736
737static void workqueue_move_items_to_parent(polling_island *q) {
738 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
739 if (p == NULL) {
740 return;
741 }
742 gpr_mu_lock(&q->workqueue_read_mu);
743 int num_added = 0;
744 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
745 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
746 if (n != NULL) {
747 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
748 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
749 gpr_mpscq_push(&p->workqueue_items, n);
750 num_added++;
751 }
752 }
753 gpr_mu_unlock(&q->workqueue_read_mu);
754 if (num_added > 0) {
755 workqueue_maybe_wakeup(p);
756 }
757 workqueue_move_items_to_parent(p);
758}
759
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700760static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700761 polling_island *q,
762 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700763 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700764 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700765
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700766 if (p != q) {
767 /* Make sure that p points to the polling island with fewer fds than q */
768 if (p->fd_cnt > q->fd_cnt) {
769 GPR_SWAP(polling_island *, p, q);
770 }
771
772 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
773 Note that the refcounts on the fds being moved will not change here.
774 This is why the last param in the following two functions is 'false') */
775 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
776 polling_island_remove_all_fds_locked(p, false, error);
777
778 /* Wakeup all the pollers (if any) on p so that they pickup this change */
779 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
780
781 /* Add the 'merged_to' link from p --> q */
782 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
783 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700784
785 workqueue_move_items_to_parent(q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700786 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700787 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700788
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700789 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700790
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700791 /* Return the merged polling island (Note that no merge would have happened
792 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700793 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700794}
795
Craig Tillerd8a3c042016-09-09 12:42:37 -0700796static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
797 grpc_workqueue *workqueue, grpc_closure *closure,
798 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700799 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700800 /* take a ref to the workqueue: otherwise it can happen that whatever events
801 * this kicks off ends up destroying the workqueue before this function
802 * completes */
803 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
804 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700805 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
806 closure->error_data.error = error;
807 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
808 if (last == 0) {
809 workqueue_maybe_wakeup(pi);
810 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700811 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700812 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
813 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700814}
815
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700816static grpc_error *polling_island_global_init() {
817 grpc_error *error = GRPC_ERROR_NONE;
818
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700819 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
820 if (error == GRPC_ERROR_NONE) {
821 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
822 }
823
824 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700825}
826
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700827static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700828 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700829}
830
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700831/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700832 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700833 */
834
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700835/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700836 * but instead so that implementations with multiple threads in (for example)
837 * epoll_wait deal with the race between pollset removal and incoming poll
838 * notifications.
839 *
840 * The problem is that the poller ultimately holds a reference to this
841 * object, so it is very difficult to know when is safe to free it, at least
842 * without some expensive synchronization.
843 *
844 * If we keep the object freelisted, in the worst case losing this race just
845 * becomes a spurious read notification on a reused fd.
846 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700847
848/* The alarm system needs to be able to wakeup 'some poller' sometimes
849 * (specifically when a new alarm needs to be triggered earlier than the next
850 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
851 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700852
853/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
854 * sure to wake up one polling thread (which can wake up other threads if
855 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700856grpc_wakeup_fd grpc_global_wakeup_fd;
857
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700858static grpc_fd *fd_freelist = NULL;
859static gpr_mu fd_freelist_mu;
860
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700861#ifdef GRPC_FD_REF_COUNT_DEBUG
862#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
863#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
864static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
865 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700866 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
867 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700868 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
869#else
870#define REF_BY(fd, n, reason) ref_by(fd, n)
871#define UNREF_BY(fd, n, reason) unref_by(fd, n)
872static void ref_by(grpc_fd *fd, int n) {
873#endif
874 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
875}
876
877#ifdef GRPC_FD_REF_COUNT_DEBUG
878static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
879 int line) {
880 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700881 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
882 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700883 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
884#else
885static void unref_by(grpc_fd *fd, int n) {
886 gpr_atm old;
887#endif
888 old = gpr_atm_full_fetch_add(&fd->refst, -n);
889 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700890 /* Add the fd to the freelist */
891 gpr_mu_lock(&fd_freelist_mu);
892 fd->freelist_next = fd_freelist;
893 fd_freelist = fd;
894 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700895
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700896 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700897 } else {
898 GPR_ASSERT(old > n);
899 }
900}
901
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700902/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700903#ifdef GRPC_FD_REF_COUNT_DEBUG
904static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
905 int line) {
906 ref_by(fd, 2, reason, file, line);
907}
908
909static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
910 int line) {
911 unref_by(fd, 2, reason, file, line);
912}
913#else
914static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700915static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
916#endif
917
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700918static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
919
920static void fd_global_shutdown(void) {
921 gpr_mu_lock(&fd_freelist_mu);
922 gpr_mu_unlock(&fd_freelist_mu);
923 while (fd_freelist != NULL) {
924 grpc_fd *fd = fd_freelist;
925 fd_freelist = fd_freelist->freelist_next;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800926 gpr_mu_destroy(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700927 gpr_free(fd);
928 }
929 gpr_mu_destroy(&fd_freelist_mu);
930}
931
932static grpc_fd *fd_create(int fd, const char *name) {
933 grpc_fd *new_fd = NULL;
934
935 gpr_mu_lock(&fd_freelist_mu);
936 if (fd_freelist != NULL) {
937 new_fd = fd_freelist;
938 fd_freelist = fd_freelist->freelist_next;
939 }
940 gpr_mu_unlock(&fd_freelist_mu);
941
942 if (new_fd == NULL) {
943 new_fd = gpr_malloc(sizeof(grpc_fd));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800944 gpr_mu_init(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700945 }
946
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800947 /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
948 * is a newly created fd (or an fd we got from the freelist), no one else
949 * would be holding a lock to it anyway. */
950 gpr_mu_lock(&new_fd->po.mu);
951 new_fd->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -0800952#ifdef PO_DEBUG
953 new_fd->po.obj_type = POLL_OBJ_FD;
954#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700955
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700956 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700957 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700958 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700959 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700960 new_fd->read_closure = CLOSURE_NOT_READY;
961 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700962 new_fd->freelist_next = NULL;
963 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700964 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700965
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800966 gpr_mu_unlock(&new_fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700967
968 char *fd_name;
969 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
970 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700971#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700972 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700973#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700974 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700975 return new_fd;
976}
977
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700978static int fd_wrapped_fd(grpc_fd *fd) {
979 int ret_fd = -1;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800980 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700981 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700982 ret_fd = fd->fd;
983 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800984 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700985
986 return ret_fd;
987}
988
989static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
990 grpc_closure *on_done, int *release_fd,
991 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700992 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700993 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -0700994 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700995
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -0800996 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700997 fd->on_done_closure = on_done;
998
999 /* If release_fd is not NULL, we should be relinquishing control of the file
1000 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001001 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001002 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001003 } else {
1004 close(fd->fd);
1005 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001006 }
1007
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001008 fd->orphaned = true;
1009
1010 /* Remove the active status but keep referenced. We want this grpc_fd struct
1011 to be alive (and not added to freelist) until the end of this function */
1012 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001013
1014 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001015 - Get a lock on the latest polling island (i.e the last island in the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001016 linked list pointed by fd->po.pi). This is the island that
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001017 would actually contain the fd
1018 - Remove the fd from the latest polling island
1019 - Unlock the latest polling island
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001020 - Set fd->po.pi to NULL (but remove the ref on the polling island
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001021 before doing this.) */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001022 if (fd->po.pi != NULL) {
1023 polling_island *pi_latest = polling_island_lock(fd->po.pi);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001024 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001025 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001026
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001027 unref_pi = fd->po.pi;
1028 fd->po.pi = NULL;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001029 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001030
Yuchen Zenga0399f22016-08-04 17:52:53 -07001031 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
1032 NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001033
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001034 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001035 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001036 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001037 /* Unref stale polling island here, outside the fd lock above.
1038 The polling island owns a workqueue which owns an fd, and unreffing
1039 inside the lock can cause an eventual lock loop that makes TSAN very
1040 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001041 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1042 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001043 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001044 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001045}
1046
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001047static grpc_error *fd_shutdown_error(bool shutdown) {
1048 if (!shutdown) {
1049 return GRPC_ERROR_NONE;
1050 } else {
1051 return GRPC_ERROR_CREATE("FD shutdown");
1052 }
1053}
1054
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001055static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1056 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001057 if (fd->shutdown) {
1058 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
1059 NULL);
1060 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001061 /* not ready ==> switch to a waiting state by setting the closure */
1062 *st = closure;
1063 } else if (*st == CLOSURE_READY) {
1064 /* already ready ==> queue the closure to run immediately */
1065 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001066 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
1067 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001068 } else {
1069 /* upcallptr was set to a different closure. This is an error! */
1070 gpr_log(GPR_ERROR,
1071 "User called a notify_on function with a previous callback still "
1072 "pending");
1073 abort();
1074 }
1075}
1076
1077/* returns 1 if state becomes not ready */
1078static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1079 grpc_closure **st) {
1080 if (*st == CLOSURE_READY) {
1081 /* duplicate ready ==> ignore */
1082 return 0;
1083 } else if (*st == CLOSURE_NOT_READY) {
1084 /* not ready, and not waiting ==> flag ready */
1085 *st = CLOSURE_READY;
1086 return 0;
1087 } else {
1088 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001089 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001090 *st = CLOSURE_NOT_READY;
1091 return 1;
1092 }
1093}
1094
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001095static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1096 grpc_fd *fd) {
1097 grpc_pollset *notifier = NULL;
1098
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001099 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001100 notifier = fd->read_notifier_pollset;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001101 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001102
1103 return notifier;
1104}
1105
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001106static bool fd_is_shutdown(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001107 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001108 const bool r = fd->shutdown;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001109 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001110 return r;
1111}
1112
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001113/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001114static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001115 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001116 /* Do the actual shutdown only once */
1117 if (!fd->shutdown) {
1118 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001119
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001120 shutdown(fd->fd, SHUT_RDWR);
1121 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1122 at this point, the closures would be called with 'success = false' */
1123 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1124 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1125 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001126 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001127}
1128
1129static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1130 grpc_closure *closure) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001131 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001132 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001133 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001134}
1135
1136static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1137 grpc_closure *closure) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001138 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001139 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001140 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001141}
1142
Craig Tillerd6ba6192016-06-30 15:42:41 -07001143static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001144 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001145 grpc_workqueue *workqueue =
1146 GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001147 gpr_mu_unlock(&fd->po.mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001148 return workqueue;
1149}
Craig Tiller70bd4832016-06-30 14:20:46 -07001150
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001151/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001152 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001153 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001154GPR_TLS_DECL(g_current_thread_pollset);
1155GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001156static __thread bool g_initialized_sigmask;
1157static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001158
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001159static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001160#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001161 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001162#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001163}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001164
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001165static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001166
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001167/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001168static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001169 gpr_tls_init(&g_current_thread_pollset);
1170 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001171 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001172 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001173}
1174
1175static void pollset_global_shutdown(void) {
1176 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001177 gpr_tls_destroy(&g_current_thread_pollset);
1178 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001179}
1180
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001181static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1182 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001183
1184 /* Kick the worker only if it was not already kicked */
1185 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1186 GRPC_POLLING_TRACE(
1187 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1188 (void *)worker, worker->pt_id);
1189 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1190 if (err_num != 0) {
1191 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1192 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001193 }
1194 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001195}
1196
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001197/* Return 1 if the pollset has active threads in pollset_work (pollset must
1198 * be locked) */
1199static int pollset_has_workers(grpc_pollset *p) {
1200 return p->root_worker.next != &p->root_worker;
1201}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001202
1203static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1204 worker->prev->next = worker->next;
1205 worker->next->prev = worker->prev;
1206}
1207
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001208static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1209 if (pollset_has_workers(p)) {
1210 grpc_pollset_worker *w = p->root_worker.next;
1211 remove_worker(p, w);
1212 return w;
1213 } else {
1214 return NULL;
1215 }
1216}
1217
1218static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1219 worker->next = &p->root_worker;
1220 worker->prev = worker->next->prev;
1221 worker->prev->next = worker->next->prev = worker;
1222}
1223
1224static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1225 worker->prev = &p->root_worker;
1226 worker->next = worker->prev->next;
1227 worker->prev->next = worker->next->prev = worker;
1228}
1229
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001230/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001231static grpc_error *pollset_kick(grpc_pollset *p,
1232 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001233 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001234 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001235 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001236 grpc_pollset_worker *worker = specific_worker;
1237 if (worker != NULL) {
1238 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001239 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001240 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001241 for (worker = p->root_worker.next; worker != &p->root_worker;
1242 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001243 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001244 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001245 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001246 }
Craig Tillera218a062016-06-26 09:58:37 -07001247 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001248 } else {
1249 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001250 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001251 } else {
1252 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001253 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001254 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001255 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001256 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001257 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1258 /* Since worker == NULL, it means that we can kick "any" worker on this
1259 pollset 'p'. If 'p' happens to be the same pollset this thread is
1260 currently polling (i.e in pollset_work() function), then there is no need
1261 to kick any other worker since the current thread can just absorb the
1262 kick. This is the reason why we enter this case only when
1263 g_current_thread_pollset is != p */
1264
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001265 GPR_TIMER_MARK("kick_anonymous", 0);
1266 worker = pop_front_worker(p);
1267 if (worker != NULL) {
1268 GPR_TIMER_MARK("finally_kick", 0);
1269 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001270 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001271 } else {
1272 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001273 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001274 }
1275 }
1276
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001277 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001278 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1279 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001280}
1281
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001282static grpc_error *kick_poller(void) {
1283 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1284}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001285
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001286static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001287 gpr_mu_init(&pollset->po.mu);
1288 *mu = &pollset->po.mu;
1289 pollset->po.pi = NULL;
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001290#ifdef PO_DEBUG
1291 pollset->po.obj_type = POLL_OBJ_POLLSET;
1292#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001293
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001294 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001295 pollset->kicked_without_pollers = false;
1296
1297 pollset->shutting_down = false;
1298 pollset->finish_shutdown_called = false;
1299 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001300}
1301
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001302/* Convert a timespec to milliseconds:
1303 - Very small or negative poll times are clamped to zero to do a non-blocking
1304 poll (which becomes spin polling)
1305 - Other small values are rounded up to one millisecond
1306 - Longer than a millisecond polls are rounded up to the next nearest
1307 millisecond to avoid spinning
1308 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001309static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1310 gpr_timespec now) {
1311 gpr_timespec timeout;
1312 static const int64_t max_spin_polling_us = 10;
1313 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1314 return -1;
1315 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001316
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001317 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1318 max_spin_polling_us,
1319 GPR_TIMESPAN))) <= 0) {
1320 return 0;
1321 }
1322 timeout = gpr_time_sub(deadline, now);
1323 return gpr_time_to_millis(gpr_time_add(
1324 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1325}
1326
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001327static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1328 grpc_pollset *notifier) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001329 /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
1330 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001331 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1332 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001333 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001334}
1335
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001336static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001337 /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
1338 gpr_mu_lock(&fd->po.mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001339 set_ready_locked(exec_ctx, fd, &fd->write_closure);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001340 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001341}
1342
Craig Tillerb39307d2016-06-30 15:39:13 -07001343static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1344 grpc_pollset *ps, char *reason) {
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001345 if (ps->po.pi != NULL) {
1346 PI_UNREF(exec_ctx, ps->po.pi, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001347 }
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001348 ps->po.pi = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001349}
1350
1351static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1352 grpc_pollset *pollset) {
1353 /* The pollset cannot have any workers if we are at this stage */
1354 GPR_ASSERT(!pollset_has_workers(pollset));
1355
1356 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001357
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001358 /* Release the ref and set pollset->po.pi to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001359 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001360 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001361}
1362
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001363/* pollset->po.mu lock must be held by the caller before calling this */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001364static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1365 grpc_closure *closure) {
1366 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1367 GPR_ASSERT(!pollset->shutting_down);
1368 pollset->shutting_down = true;
1369 pollset->shutdown_done = closure;
1370 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1371
1372 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1373 because it would release the underlying polling island. In such a case, we
1374 let the last worker call finish_shutdown_locked() from pollset_work() */
1375 if (!pollset_has_workers(pollset)) {
1376 GPR_ASSERT(!pollset->finish_shutdown_called);
1377 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1378 finish_shutdown_locked(exec_ctx, pollset);
1379 }
1380 GPR_TIMER_END("pollset_shutdown", 0);
1381}
1382
1383/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1384 * than destroying the mutexes, there is nothing special that needs to be done
1385 * here */
1386static void pollset_destroy(grpc_pollset *pollset) {
1387 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001388 gpr_mu_destroy(&pollset->po.mu);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001389}
1390
Craig Tiller2b49ea92016-07-01 13:21:27 -07001391static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001392 GPR_ASSERT(pollset->shutting_down);
1393 GPR_ASSERT(!pollset_has_workers(pollset));
1394 pollset->shutting_down = false;
1395 pollset->finish_shutdown_called = false;
1396 pollset->kicked_without_pollers = false;
1397 pollset->shutdown_done = NULL;
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001398 GPR_ASSERT(pollset->po.pi == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001399}
1400
Craig Tillerd8a3c042016-09-09 12:42:37 -07001401static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1402 polling_island *pi) {
1403 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1404 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1405 gpr_mu_unlock(&pi->workqueue_read_mu);
1406 if (n != NULL) {
1407 if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
1408 workqueue_maybe_wakeup(pi);
1409 }
1410 grpc_closure *c = (grpc_closure *)n;
1411 grpc_closure_run(exec_ctx, c, c->error_data.error);
1412 return true;
1413 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
Craig Tiller460502e2016-10-13 10:02:08 -07001414 /* n == NULL might mean there's work but it's not available to be popped
1415 * yet - try to ensure another workqueue wakes up to check shortly if so
1416 */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001417 workqueue_maybe_wakeup(pi);
1418 }
1419 }
1420 return false;
1421}
1422
Craig Tiller84ea3412016-09-08 14:57:56 -07001423#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001424/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1425static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001426 grpc_pollset *pollset,
1427 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001428 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001429 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001430 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001431 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001432 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001433 char *err_msg;
1434 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001435 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1436
1437 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001438 latest polling island pointed by pollset->po.pi
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001439
1440 Since epoll_fd is immutable, we can read it without obtaining the polling
1441 island lock. There is however a possibility that the polling island (from
1442 which we got the epoll_fd) got merged with another island while we are
1443 in this function. This is still okay because in such a case, we will wakeup
1444 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001445 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001446
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001447 if (pollset->po.pi == NULL) {
1448 pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
1449 if (pollset->po.pi == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001450 GPR_TIMER_END("pollset_work_and_unlock", 0);
1451 return; /* Fatal error. We cannot continue */
1452 }
1453
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001454 PI_ADD_REF(pollset->po.pi, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001455 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001456 (void *)pollset, (void *)pollset->po.pi);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001457 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001458
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001459 pi = polling_island_maybe_get_latest(pollset->po.pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001460 epoll_fd = pi->epoll_fd;
1461
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001462 /* Update the pollset->po.pi since the island being pointed by
1463 pollset->po.pi maybe older than the one pointed by pi) */
1464 if (pollset->po.pi != pi) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001465 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1466 polling island to be deleted */
1467 PI_ADD_REF(pi, "ps");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001468 PI_UNREF(exec_ctx, pollset->po.pi, "ps");
1469 pollset->po.pi = pi;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001470 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001471
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001472 /* Add an extra ref so that the island does not get destroyed (which means
1473 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1474 epoll_fd */
1475 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001476 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001477
Craig Tiller460502e2016-10-13 10:02:08 -07001478 /* If we get some workqueue work to do, it might end up completing an item on
1479 the completion queue, so there's no need to poll... so we skip that and
1480 redo the complete loop to verify */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001481 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
1482 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1483 g_current_thread_polling_island = pi;
1484
Vijay Paicef54012016-08-28 23:05:31 -07001485 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001486 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1487 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001488 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001489 if (ep_rv < 0) {
1490 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001491 gpr_asprintf(&err_msg,
1492 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1493 epoll_fd, errno, strerror(errno));
1494 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001495 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001496 /* We were interrupted. Save an interation by doing a zero timeout
1497 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001498 GRPC_POLLING_TRACE(
1499 "pollset_work: pollset: %p, worker: %p received kick",
1500 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001501 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001502 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001503 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001504
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001505#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001506 /* See the definition of g_poll_sync for more details */
1507 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001508#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001509
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001510 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001511 void *data_ptr = ep_ev[i].data.ptr;
1512 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001513 append_error(error,
1514 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1515 err_desc);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001516 } else if (data_ptr == &pi->workqueue_wakeup_fd) {
1517 append_error(error,
1518 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1519 err_desc);
1520 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001521 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001522 GRPC_POLLING_TRACE(
1523 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1524 "%d) got merged",
1525 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001526 /* This means that our polling island is merged with a different
1527 island. We do not have to do anything here since the subsequent call
1528 to the function pollset_work_and_unlock() will pick up the correct
1529 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001530 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001531 grpc_fd *fd = data_ptr;
1532 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1533 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1534 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001535 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001536 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001537 }
1538 if (write_ev || cancel) {
1539 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001540 }
1541 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001542 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001543
1544 g_current_thread_polling_island = NULL;
1545 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1546 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001547
1548 GPR_ASSERT(pi != NULL);
1549
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001550 /* Before leaving, release the extra ref we added to the polling island. It
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001551 is important to use "pi" here (i.e our old copy of pollset->po.pi
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001552 that we got before releasing the polling island lock). This is because
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001553 pollset->po.pi pointer might get udpated in other parts of the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001554 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001555 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001556
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001557 GPR_TIMER_END("pollset_work_and_unlock", 0);
1558}
1559
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001560/* pollset->po.mu lock must be held by the caller before calling this.
1561 The function pollset_work() may temporarily release the lock (pollset->po.mu)
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001562 during the course of its execution but it will always re-acquire the lock and
1563 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001564static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1565 grpc_pollset_worker **worker_hdl,
1566 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001567 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001568 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001569 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1570
1571 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001572
1573 grpc_pollset_worker worker;
1574 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001575 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001576 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001577
1578 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001579
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001580 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1581 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001582
1583 if (pollset->kicked_without_pollers) {
1584 /* If the pollset was kicked without pollers, pretend that the current
1585 worker got the kick and skip polling. A kick indicates that there is some
1586 work that needs attention like an event on the completion queue or an
1587 alarm */
1588 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1589 pollset->kicked_without_pollers = 0;
1590 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001591 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001592 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1593 worker that there is some pending work that needs immediate attention
1594 (like an event on the completion queue, or a polling island merge that
1595 results in a new epoll-fd to wait on) and that the worker should not
1596 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001597
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001598 A worker can be kicked anytime from the point it is added to the pollset
1599 via push_front_worker() (or push_back_worker()) to the point it is
1600 removed via remove_worker().
1601 If the worker is kicked before/during it calls epoll_pwait(), it should
1602 immediately exit from epoll_wait(). If the worker is kicked after it
1603 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001604
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001605 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001606 times *except* when it is in epoll_pwait(). This way, the worker never
1607 misses acting on a kick */
1608
Craig Tiller19196992016-06-27 18:45:56 -07001609 if (!g_initialized_sigmask) {
1610 sigemptyset(&new_mask);
1611 sigaddset(&new_mask, grpc_wakeup_signal);
1612 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1613 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1614 g_initialized_sigmask = true;
1615 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1616 This is the mask used at all times *except during
1617 epoll_wait()*"
1618 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001619 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001620
Craig Tiller19196992016-06-27 18:45:56 -07001621 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001622 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001623 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001624
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001625 push_front_worker(pollset, &worker); /* Add worker to pollset */
1626
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001627 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1628 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001629 grpc_exec_ctx_flush(exec_ctx);
1630
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001631 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001632
1633 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1634 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001635 remove_worker(pollset, &worker);
1636 }
1637
1638 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1639 false at this point) and the pollset is shutting down, we may have to
1640 finish the shutdown process by calling finish_shutdown_locked().
1641 See pollset_shutdown() for more details.
1642
1643 Note: Continuing to access pollset here is safe; it is the caller's
1644 responsibility to not destroy a pollset when it has outstanding calls to
1645 pollset_work() */
1646 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1647 !pollset->finish_shutdown_called) {
1648 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1649 finish_shutdown_locked(exec_ctx, pollset);
1650
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001651 gpr_mu_unlock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001652 grpc_exec_ctx_flush(exec_ctx);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001653 gpr_mu_lock(&pollset->po.mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001654 }
1655
1656 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001657
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001658 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1659 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001660
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001661 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001662
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001663 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1664 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001665}
1666
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001667static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001668 poll_obj_type bag_type, poll_obj *item,
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001669 poll_obj_type item_type) {
1670 GPR_TIMER_BEGIN("add_poll_object", 0);
1671
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001672#ifdef PO_DEBUG
1673 GPR_ASSERT(item->obj_type == item_type);
1674 GPR_ASSERT(bag->obj_type == bag_type);
1675#endif
1676
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001677 grpc_error *error = GRPC_ERROR_NONE;
1678 polling_island *pi_new = NULL;
1679
1680 gpr_mu_lock(&bag->mu);
1681 gpr_mu_lock(&item->mu);
1682
1683retry:
1684 /*
1685 * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1686 * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1687 * a refcount of 2) and point item->pi and bag->pi to the new island
1688 * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1689 * the other's non-NULL pi
1690 * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1691 * polling islands and update item->pi and bag->pi to point to the new
1692 * island
1693 */
1694
1695 /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1696 * orphaned */
1697 if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1698 gpr_mu_unlock(&item->mu);
1699 gpr_mu_unlock(&bag->mu);
1700 return;
1701 }
1702
1703 if (item->pi == bag->pi) {
1704 pi_new = item->pi;
1705 if (pi_new == NULL) {
1706 /* GPR_ASSERT(item->pi == bag->pi == NULL) */
1707
1708 /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1709 * we need to do some extra work to make TSAN happy */
1710 if (item_type == POLL_OBJ_FD) {
1711 /* Unlock before creating a new polling island: the polling island will
1712 create a workqueue which creates a file descriptor, and holding an fd
1713 lock here can eventually cause a loop to appear to TSAN (making it
1714 unhappy). We don't think it's a real loop (there's an epoch point
1715 where that loop possibility disappears), but the advantages of
1716 keeping TSAN happy outweigh any performance advantage we might have
1717 by keeping the lock held. */
1718 gpr_mu_unlock(&item->mu);
1719 pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
1720 gpr_mu_lock(&item->mu);
1721
1722 /* Need to reverify any assumptions made between the initial lock and
1723 getting to this branch: if they've changed, we need to throw away our
1724 work and figure things out again. */
1725 if (item->pi != NULL) {
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001726 GRPC_POLLING_TRACE(
1727 "add_poll_object: Raced creating new polling island. pi_new: %p "
1728 "(fd: %d, %s: %p)",
1729 (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1730 (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001731 /* No need to lock 'pi_new' here since this is a new polling island
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001732 * and no one has a reference to it yet */
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001733 polling_island_remove_all_fds_locked(pi_new, true, &error);
1734
1735 /* Ref and unref so that the polling island gets deleted during unref
1736 */
1737 PI_ADD_REF(pi_new, "dance_of_destruction");
1738 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
1739 goto retry;
1740 }
1741 } else {
1742 pi_new = polling_island_create(exec_ctx, NULL, &error);
1743 }
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001744
1745 GRPC_POLLING_TRACE(
1746 "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1747 "%s: %p)",
1748 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1749 poll_obj_string(bag_type), (void *)bag);
1750 } else {
1751 GRPC_POLLING_TRACE(
1752 "add_poll_object: Same polling island. pi: %p (%s, %s)",
1753 (void *)pi_new, poll_obj_string(item_type),
1754 poll_obj_string(bag_type));
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001755 }
1756 } else if (item->pi == NULL) {
1757 /* GPR_ASSERT(bag->pi != NULL) */
1758 /* Make pi_new point to latest pi*/
1759 pi_new = polling_island_lock(bag->pi);
1760
1761 if (item_type == POLL_OBJ_FD) {
1762 grpc_fd *fd = FD_FROM_PO(item);
1763 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1764 }
1765
1766 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001767 GRPC_POLLING_TRACE(
1768 "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1769 "bag(%s): %p)",
1770 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1771 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001772 } else if (bag->pi == NULL) {
1773 /* GPR_ASSERT(item->pi != NULL) */
1774 /* Make pi_new to point to latest pi */
1775 pi_new = polling_island_lock(item->pi);
1776 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001777 GRPC_POLLING_TRACE(
1778 "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1779 "bag(%s): %p)",
1780 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1781 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001782 } else {
1783 pi_new = polling_island_merge(item->pi, bag->pi, &error);
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001784 GRPC_POLLING_TRACE(
1785 "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1786 "bag(%s): %p)",
1787 (void *)pi_new, poll_obj_string(item_type), (void *)item,
1788 poll_obj_string(bag_type), (void *)bag);
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001789 }
1790
1791 /* At this point, pi_new is the polling island that both item->pi and bag->pi
1792 MUST be pointing to */
1793
1794 if (item->pi != pi_new) {
1795 PI_ADD_REF(pi_new, poll_obj_string(item_type));
1796 if (item->pi != NULL) {
1797 PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
1798 }
1799 item->pi = pi_new;
1800 }
1801
1802 if (bag->pi != pi_new) {
1803 PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1804 if (bag->pi != NULL) {
1805 PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
1806 }
1807 bag->pi = pi_new;
1808 }
1809
1810 gpr_mu_unlock(&item->mu);
1811 gpr_mu_unlock(&bag->mu);
1812
1813 GRPC_LOG_IF_ERROR("add_poll_object", error);
1814 GPR_TIMER_END("add_poll_object", 0);
1815}
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001816
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001817static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1818 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001819 add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
Sree Kuchibhotla499b94b2016-11-18 14:35:47 -08001820 POLL_OBJ_FD);
1821}
1822
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001823/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001824 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001825 */
1826
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001827static grpc_pollset_set *pollset_set_create(void) {
1828 grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001829 gpr_mu_init(&pss->po.mu);
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001830 pss->po.pi = NULL;
1831#ifdef PO_DEBUG
1832 pss->po.obj_type = POLL_OBJ_POLLSET_SET;
1833#endif
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001834 return pss;
1835}
1836
1837static void pollset_set_destroy(grpc_pollset_set *pss) {
1838 gpr_mu_destroy(&pss->po.mu);
1839
1840 if (pss->po.pi != NULL) {
1841 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1842 PI_UNREF(&exec_ctx, pss->po.pi, "pss_destroy");
1843 grpc_exec_ctx_finish(&exec_ctx);
1844 }
1845
1846 gpr_free(pss);
1847}
1848
1849static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1850 grpc_fd *fd) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001851 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001852 POLL_OBJ_FD);
1853}
1854
1855static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
1856 grpc_fd *fd) {
1857 /* Nothing to do */
1858}
1859
1860static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1861 grpc_pollset_set *pss, grpc_pollset *ps) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001862 add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001863 POLL_OBJ_POLLSET);
1864}
1865
1866static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1867 grpc_pollset_set *pss, grpc_pollset *ps) {
1868 /* Nothing to do */
1869}
1870
1871static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1872 grpc_pollset_set *bag,
1873 grpc_pollset_set *item) {
Sree Kuchibhotlaa0749a62016-11-18 20:22:09 -08001874 add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
Sree Kuchibhotla2385fd72016-11-18 16:30:41 -08001875 POLL_OBJ_POLLSET_SET);
1876}
1877
1878static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1879 grpc_pollset_set *bag,
1880 grpc_pollset_set *item) {
1881 /* Nothing to do */
1882}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001883
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001884/* Test helper functions
1885 * */
1886void *grpc_fd_get_polling_island(grpc_fd *fd) {
1887 polling_island *pi;
1888
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001889 gpr_mu_lock(&fd->po.mu);
1890 pi = fd->po.pi;
1891 gpr_mu_unlock(&fd->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001892
1893 return pi;
1894}
1895
1896void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1897 polling_island *pi;
1898
Sree Kuchibhotlaf6f33d72016-11-18 11:38:52 -08001899 gpr_mu_lock(&ps->po.mu);
1900 pi = ps->po.pi;
1901 gpr_mu_unlock(&ps->po.mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001902
1903 return pi;
1904}
1905
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001906bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001907 polling_island *p1 = p;
1908 polling_island *p2 = q;
1909
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001910 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1911 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001912 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001913 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001914
1915 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001916}
1917
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001918/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001919 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001920 */
1921
1922static void shutdown_engine(void) {
1923 fd_global_shutdown();
1924 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001925 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001926}
1927
1928static const grpc_event_engine_vtable vtable = {
1929 .pollset_size = sizeof(grpc_pollset),
1930
1931 .fd_create = fd_create,
1932 .fd_wrapped_fd = fd_wrapped_fd,
1933 .fd_orphan = fd_orphan,
1934 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001935 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001936 .fd_notify_on_read = fd_notify_on_read,
1937 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001938 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001939 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001940
1941 .pollset_init = pollset_init,
1942 .pollset_shutdown = pollset_shutdown,
1943 .pollset_reset = pollset_reset,
1944 .pollset_destroy = pollset_destroy,
1945 .pollset_work = pollset_work,
1946 .pollset_kick = pollset_kick,
1947 .pollset_add_fd = pollset_add_fd,
1948
1949 .pollset_set_create = pollset_set_create,
1950 .pollset_set_destroy = pollset_set_destroy,
1951 .pollset_set_add_pollset = pollset_set_add_pollset,
1952 .pollset_set_del_pollset = pollset_set_del_pollset,
1953 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1954 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1955 .pollset_set_add_fd = pollset_set_add_fd,
1956 .pollset_set_del_fd = pollset_set_del_fd,
1957
1958 .kick_poller = kick_poller,
1959
Craig Tillerd8a3c042016-09-09 12:42:37 -07001960 .workqueue_ref = workqueue_ref,
1961 .workqueue_unref = workqueue_unref,
1962 .workqueue_enqueue = workqueue_enqueue,
1963
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001964 .shutdown_engine = shutdown_engine,
1965};
1966
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001967/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1968 * Create a dummy epoll_fd to make sure epoll support is available */
1969static bool is_epoll_available() {
1970 int fd = epoll_create1(EPOLL_CLOEXEC);
1971 if (fd < 0) {
1972 gpr_log(
1973 GPR_ERROR,
1974 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1975 fd);
1976 return false;
1977 }
1978 close(fd);
1979 return true;
1980}
1981
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001982const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001983 /* If use of signals is disabled, we cannot use epoll engine*/
1984 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1985 return NULL;
1986 }
1987
Ken Paysoncd7d0472016-10-11 12:24:20 -07001988 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07001989 return NULL;
1990 }
1991
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001992 if (!is_epoll_available()) {
1993 return NULL;
1994 }
1995
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001996 if (!is_grpc_wakeup_signal_initialized) {
Sree Kuchibhotlabd48c912016-09-27 16:48:25 -07001997 grpc_use_signal(SIGRTMIN + 6);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001998 }
1999
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002000 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07002001
2002 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
2003 return NULL;
2004 }
2005
2006 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
2007 polling_island_global_init())) {
2008 return NULL;
2009 }
2010
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002011 return &vtable;
2012}
2013
murgatroid99623dd4f2016-08-08 17:31:27 -07002014#else /* defined(GRPC_LINUX_EPOLL) */
2015#if defined(GRPC_POSIX_SOCKET)
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002016#include "src/core/lib/iomgr/ev_posix.h"
murgatroid99623dd4f2016-08-08 17:31:27 -07002017/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002018 * NULL */
2019const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
murgatroid99623dd4f2016-08-08 17:31:27 -07002020#endif /* defined(GRPC_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002021
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002022void grpc_use_signal(int signum) {}
murgatroid99623dd4f2016-08-08 17:31:27 -07002023#endif /* !defined(GRPC_LINUX_EPOLL) */