blob: 4358020f9b24ea98c534956dc4fb4fe057edc9b6 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
murgatroid9954070892016-08-08 17:01:18 -070035#include "src/core/lib/iomgr/port.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070036
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
murgatroid99623dd4f2016-08-08 17:31:27 -070038#ifdef GRPC_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
Sree Kuchibhotla4998e302016-08-16 07:26:31 -070045#include <pthread.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070046#include <signal.h>
47#include <string.h>
48#include <sys/epoll.h>
49#include <sys/socket.h>
50#include <unistd.h>
51
52#include <grpc/support/alloc.h>
53#include <grpc/support/log.h>
54#include <grpc/support/string_util.h>
55#include <grpc/support/tls.h>
56#include <grpc/support/useful.h>
57
58#include "src/core/lib/iomgr/ev_posix.h"
59#include "src/core/lib/iomgr/iomgr_internal.h"
60#include "src/core/lib/iomgr/wakeup_fd_posix.h"
Craig Tillerb39307d2016-06-30 15:39:13 -070061#include "src/core/lib/iomgr/workqueue.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070062#include "src/core/lib/profiling/timers.h"
63#include "src/core/lib/support/block_annotate.h"
64
Sree Kuchibhotla34217242016-06-29 00:19:07 -070065/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
66static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070067#define GRPC_POLLING_TRACE(fmt, ...) \
68 if (grpc_polling_trace) { \
69 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
70 }
71
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070072static int grpc_wakeup_signal = -1;
73static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070074
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070075/* Implements the function defined in grpc_posix.h. This function might be
76 * called before even calling grpc_init() to set either a different signal to
77 * use. If signum == -1, then the use of signals is disabled */
78void grpc_use_signal(int signum) {
79 grpc_wakeup_signal = signum;
80 is_grpc_wakeup_signal_initialized = true;
81
82 if (grpc_wakeup_signal < 0) {
83 gpr_log(GPR_INFO,
84 "Use of signals is disabled. Epoll engine will not be used");
85 } else {
86 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
87 grpc_wakeup_signal);
88 }
89}
90
91struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070092
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070094 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070095 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070096struct grpc_fd {
97 int fd;
98 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070099 bit 0 : 1=Active / 0=Orphaned
100 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700101 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700102 gpr_atm refst;
103
104 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700105
106 /* Indicates that the fd is shutdown and that any pending read/write closures
107 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700109
110 /* The fd is either closed or we relinquished control of it. In either cases,
111 this indicates that the 'fd' on this structure is no longer valid */
112 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700113
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700114 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700115 grpc_closure *read_closure;
116 grpc_closure *write_closure;
117
Craig Tillerf83f8ca2016-07-06 11:34:08 -0700118 /* The polling island to which this fd belongs to (protected by mu) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 struct polling_island *polling_island;
120
121 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122 grpc_closure *on_done_closure;
123
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700124 /* The pollset that last noticed that the fd is readable */
125 grpc_pollset *read_notifier_pollset;
126
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700127 grpc_iomgr_object iomgr_object;
128};
129
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700130/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700131// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700132#ifdef GRPC_FD_REF_COUNT_DEBUG
133static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
134static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
135 int line);
136#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
137#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
138#else
139static void fd_ref(grpc_fd *fd);
140static void fd_unref(grpc_fd *fd);
141#define GRPC_FD_REF(fd, reason) fd_ref(fd)
142#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
143#endif
144
145static void fd_global_init(void);
146static void fd_global_shutdown(void);
147
148#define CLOSURE_NOT_READY ((grpc_closure *)0)
149#define CLOSURE_READY ((grpc_closure *)1)
150
151/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700152 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700153 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700154
Craig Tillerd8a3c042016-09-09 12:42:37 -0700155#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700156
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700157#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
Craig Tillerb39307d2016-06-30 15:39:13 -0700158#define PI_UNREF(exec_ctx, p, r) \
159 pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700160
Craig Tillerd8a3c042016-09-09 12:42:37 -0700161#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700162
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700163#define PI_ADD_REF(p, r) pi_add_ref((p))
Craig Tillerb39307d2016-06-30 15:39:13 -0700164#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700165
166#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
167
Craig Tiller460502e2016-10-13 10:02:08 -0700168/* This is also used as grpc_workqueue (by directly casing it) */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700169typedef struct polling_island {
170 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700171 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
172 the refcount.
173 Once the ref count becomes zero, this structure is destroyed which means
174 we should ensure that there is never a scenario where a PI_ADD_REF() is
175 racing with a PI_UNREF() that just made the ref_count zero. */
Craig Tiller15007612016-07-06 09:36:16 -0700176 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700177
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700178 /* Pointer to the polling_island this merged into.
179 * merged_to value is only set once in polling_island's lifetime (and that too
180 * only if the island is merged with another island). Because of this, we can
181 * use gpr_atm type here so that we can do atomic access on this and reduce
182 * lock contention on 'mu' mutex.
183 *
184 * Note that if this field is not NULL (i.e not 0), all the remaining fields
185 * (except mu and ref_count) are invalid and must be ignored. */
186 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700187
Craig Tiller460502e2016-10-13 10:02:08 -0700188 /* Number of threads currently polling on this island */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700189 gpr_atm poller_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700190 /* Mutex guarding the read end of the workqueue (must be held to pop from
191 * workqueue_items) */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700192 gpr_mu workqueue_read_mu;
Craig Tiller460502e2016-10-13 10:02:08 -0700193 /* Queue of closures to be executed */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700194 gpr_mpscq workqueue_items;
Craig Tiller460502e2016-10-13 10:02:08 -0700195 /* Count of items in workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700196 gpr_atm workqueue_item_count;
Craig Tiller460502e2016-10-13 10:02:08 -0700197 /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700198 grpc_wakeup_fd workqueue_wakeup_fd;
Craig Tillerb39307d2016-06-30 15:39:13 -0700199
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700200 /* The fd of the underlying epoll set */
201 int epoll_fd;
202
203 /* The file descriptors in the epoll set */
204 size_t fd_cnt;
205 size_t fd_capacity;
206 grpc_fd **fds;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700207} polling_island;
208
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700209/*******************************************************************************
210 * Pollset Declarations
211 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700212struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700213 /* Thread id of this worker */
214 pthread_t pt_id;
215
216 /* Used to prevent a worker from getting kicked multiple times */
217 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700218 struct grpc_pollset_worker *next;
219 struct grpc_pollset_worker *prev;
220};
221
222struct grpc_pollset {
223 gpr_mu mu;
224 grpc_pollset_worker root_worker;
225 bool kicked_without_pollers;
226
227 bool shutting_down; /* Is the pollset shutting down ? */
228 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
229 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
230
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700231 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700232 struct polling_island *polling_island;
233};
234
235/*******************************************************************************
236 * Pollset-set Declarations
237 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700238/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
239 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
240 * the current pollset_set would result in polling island merges. This would
241 * remove the need to maintain fd_count here. This will also significantly
242 * simplify the grpc_fd structure since we would no longer need to explicitly
243 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700244struct grpc_pollset_set {
245 gpr_mu mu;
246
247 size_t pollset_count;
248 size_t pollset_capacity;
249 grpc_pollset **pollsets;
250
251 size_t pollset_set_count;
252 size_t pollset_set_capacity;
253 struct grpc_pollset_set **pollset_sets;
254
255 size_t fd_count;
256 size_t fd_capacity;
257 grpc_fd **fds;
258};
259
260/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700261 * Common helpers
262 */
263
Craig Tillerf975f742016-07-01 14:56:27 -0700264static bool append_error(grpc_error **composite, grpc_error *error,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700265 const char *desc) {
Craig Tillerf975f742016-07-01 14:56:27 -0700266 if (error == GRPC_ERROR_NONE) return true;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700267 if (*composite == GRPC_ERROR_NONE) {
268 *composite = GRPC_ERROR_CREATE(desc);
269 }
270 *composite = grpc_error_add_child(*composite, error);
Craig Tillerf975f742016-07-01 14:56:27 -0700271 return false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700272}
273
274/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700275 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700276 */
277
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700278/* The wakeup fd that is used to wake up all threads in a Polling island. This
279 is useful in the polling island merge operation where we need to wakeup all
280 the threads currently polling the smaller polling island (so that they can
281 start polling the new/merged polling island)
282
283 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
284 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
285static grpc_wakeup_fd polling_island_wakeup_fd;
286
Craig Tiller2e620132016-10-10 15:27:44 -0700287/* The polling island being polled right now.
288 See comments in workqueue_maybe_wakeup for why this is tracked. */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700289static __thread polling_island *g_current_thread_polling_island;
290
Craig Tillerb39307d2016-06-30 15:39:13 -0700291/* Forward declaration */
Craig Tiller2b49ea92016-07-01 13:21:27 -0700292static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700293
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700294#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700295/* Currently TSAN may incorrectly flag data races between epoll_ctl and
296 epoll_wait for any grpc_fd structs that are added to the epoll set via
297 epoll_ctl and are returned (within a very short window) via epoll_wait().
298
299 To work-around this race, we establish a happens-before relation between
300 the code just-before epoll_ctl() and the code after epoll_wait() by using
301 this atomic */
302gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700303#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700304
Craig Tillerb39307d2016-06-30 15:39:13 -0700305static void pi_add_ref(polling_island *pi);
306static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700307
Craig Tillerd8a3c042016-09-09 12:42:37 -0700308#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
Craig Tillera10b0b12016-09-09 16:20:07 -0700309static void pi_add_ref_dbg(polling_island *pi, const char *reason,
310 const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700311 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700312 pi_add_ref(pi);
313 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
314 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700315}
316
Craig Tillerb39307d2016-06-30 15:39:13 -0700317static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
Craig Tillera10b0b12016-09-09 16:20:07 -0700318 const char *reason, const char *file, int line) {
Craig Tiller15007612016-07-06 09:36:16 -0700319 long old_cnt = gpr_atm_acq_load(&pi->ref_count);
Craig Tillerb39307d2016-06-30 15:39:13 -0700320 pi_unref(exec_ctx, pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700321 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700322 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700323}
Craig Tillerd8a3c042016-09-09 12:42:37 -0700324
325static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
326 const char *file, int line,
327 const char *reason) {
328 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700329 pi_add_ref_dbg((polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700330 }
331 return workqueue;
332}
333
334static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
335 const char *file, int line, const char *reason) {
336 if (workqueue != NULL) {
Craig Tillera10b0b12016-09-09 16:20:07 -0700337 pi_unref_dbg(exec_ctx, (polling_island *)workqueue, reason, file, line);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700338 }
339}
340#else
341static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
342 if (workqueue != NULL) {
343 pi_add_ref((polling_island *)workqueue);
344 }
345 return workqueue;
346}
347
348static void workqueue_unref(grpc_exec_ctx *exec_ctx,
349 grpc_workqueue *workqueue) {
350 if (workqueue != NULL) {
351 pi_unref(exec_ctx, (polling_island *)workqueue);
352 }
353}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700354#endif
355
Craig Tiller15007612016-07-06 09:36:16 -0700356static void pi_add_ref(polling_island *pi) {
357 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
358}
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700359
Craig Tillerb39307d2016-06-30 15:39:13 -0700360static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700361 /* If ref count went to zero, delete the polling island.
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700362 Note that this deletion not be done under a lock. Once the ref count goes
363 to zero, we are guaranteed that no one else holds a reference to the
364 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700365
366 Also, if we are deleting the polling island and the merged_to field is
367 non-empty, we should remove a ref to the merged_to polling island
368 */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700369 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
370 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
371 polling_island_delete(exec_ctx, pi);
372 if (next != NULL) {
373 PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700374 }
375 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700376}
377
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700378/* The caller is expected to hold pi->mu lock before calling this function */
379static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700380 size_t fd_count, bool add_fd_refs,
381 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700382 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700383 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700384 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700385 char *err_msg;
386 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700387
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700388#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700389 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700390 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700391#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700392
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700393 for (i = 0; i < fd_count; i++) {
394 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
395 ev.data.ptr = fds[i];
396 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700397
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700398 if (err < 0) {
399 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700400 gpr_asprintf(
401 &err_msg,
402 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
403 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
404 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
405 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700406 }
407
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700408 continue;
409 }
410
411 if (pi->fd_cnt == pi->fd_capacity) {
412 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
413 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
414 }
415
416 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700417 if (add_fd_refs) {
418 GRPC_FD_REF(fds[i], "polling_island");
419 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700420 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700421}
422
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700423/* The caller is expected to hold pi->mu before calling this */
424static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700425 grpc_wakeup_fd *wakeup_fd,
426 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700427 struct epoll_event ev;
428 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700429 char *err_msg;
430 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700431
432 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
433 ev.data.ptr = wakeup_fd;
434 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
435 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700436 if (err < 0 && errno != EEXIST) {
437 gpr_asprintf(&err_msg,
438 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
439 "error: %d (%s)",
440 pi->epoll_fd,
441 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
442 strerror(errno));
443 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
444 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700445 }
446}
447
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700448/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700449static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700450 bool remove_fd_refs,
451 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700452 int err;
453 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700454 char *err_msg;
455 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700456
457 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700458 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700459 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700460 gpr_asprintf(&err_msg,
461 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
462 "error: %d (%s)",
463 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
464 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
465 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700466 }
467
468 if (remove_fd_refs) {
469 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700470 }
471 }
472
473 pi->fd_cnt = 0;
474}
475
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700476/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700477static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700478 bool is_fd_closed,
479 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700480 int err;
481 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700482 char *err_msg;
483 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700484
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700485 /* If fd is already closed, then it would have been automatically been removed
486 from the epoll set */
487 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700488 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
489 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700490 gpr_asprintf(
491 &err_msg,
492 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
493 pi->epoll_fd, fd->fd, errno, strerror(errno));
494 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
495 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700496 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700497 }
498
499 for (i = 0; i < pi->fd_cnt; i++) {
500 if (pi->fds[i] == fd) {
501 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700502 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700503 break;
504 }
505 }
506}
507
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700508/* Might return NULL in case of an error */
Craig Tillerb39307d2016-06-30 15:39:13 -0700509static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
510 grpc_fd *initial_fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700511 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700512 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700513 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700514
Craig Tillerb39307d2016-06-30 15:39:13 -0700515 *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700516
Craig Tillerb39307d2016-06-30 15:39:13 -0700517 pi = gpr_malloc(sizeof(*pi));
518 gpr_mu_init(&pi->mu);
519 pi->fd_cnt = 0;
520 pi->fd_capacity = 0;
521 pi->fds = NULL;
522 pi->epoll_fd = -1;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700523
524 gpr_mu_init(&pi->workqueue_read_mu);
525 gpr_mpscq_init(&pi->workqueue_items);
526 gpr_atm_rel_store(&pi->workqueue_item_count, 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700527
Craig Tiller15007612016-07-06 09:36:16 -0700528 gpr_atm_rel_store(&pi->ref_count, 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700529 gpr_atm_rel_store(&pi->poller_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700530 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700531
Craig Tillerd8a3c042016-09-09 12:42:37 -0700532 if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
533 err_desc)) {
534 goto done;
535 }
536
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700537 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700538
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700539 if (pi->epoll_fd < 0) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700540 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
541 goto done;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700542 }
543
Craig Tillerb39307d2016-06-30 15:39:13 -0700544 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700545 polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700546
547 if (initial_fd != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -0700548 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
Craig Tillerb39307d2016-06-30 15:39:13 -0700549 }
550
Craig Tillerb39307d2016-06-30 15:39:13 -0700551done:
552 if (*error != GRPC_ERROR_NONE) {
Craig Tiller0a06cd72016-07-14 13:21:24 -0700553 polling_island_delete(exec_ctx, pi);
Craig Tillerb39307d2016-06-30 15:39:13 -0700554 pi = NULL;
555 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700556 return pi;
557}
558
Craig Tillerb39307d2016-06-30 15:39:13 -0700559static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700560 GPR_ASSERT(pi->fd_cnt == 0);
561
Craig Tiller0a06cd72016-07-14 13:21:24 -0700562 if (pi->epoll_fd >= 0) {
563 close(pi->epoll_fd);
564 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700565 GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
566 gpr_mu_destroy(&pi->workqueue_read_mu);
567 gpr_mpscq_destroy(&pi->workqueue_items);
Craig Tillerb39307d2016-06-30 15:39:13 -0700568 gpr_mu_destroy(&pi->mu);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700569 grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
Craig Tillerb39307d2016-06-30 15:39:13 -0700570 gpr_free(pi->fds);
571 gpr_free(pi);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700572}
573
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700574/* Attempts to gets the last polling island in the linked list (liked by the
575 * 'merged_to' field). Since this does not lock the polling island, there are no
576 * guarantees that the island returned is the last island */
577static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
578 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
579 while (next != NULL) {
580 pi = next;
581 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
582 }
583
584 return pi;
585}
586
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700587/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700588 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700589 returned polling island's mu.
590 Usage: To lock/unlock polling island "pi", do the following:
591 polling_island *pi_latest = polling_island_lock(pi);
592 ...
593 ... critical section ..
594 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700595 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
596static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700597 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700598
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700599 while (true) {
600 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
601 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700602 /* Looks like 'pi' is the last node in the linked list but unless we check
603 this by holding the pi->mu lock, we cannot be sure (i.e without the
604 pi->mu lock, we don't prevent island merges).
605 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700606 gpr_mu_lock(&pi->mu);
607 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
608 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700609 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700610 break;
611 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700612
613 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
614 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700615 gpr_mu_unlock(&pi->mu);
616 }
617
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700618 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700619 }
620
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700621 return pi;
622}
623
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700624/* Gets the lock on the *latest* polling islands in the linked lists pointed by
625 *p and *q (and also updates *p and *q to point to the latest polling islands)
626
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700627 This function is needed because calling the following block of code to obtain
628 locks on polling islands (*p and *q) is prone to deadlocks.
629 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700630 polling_island_lock(*p, true);
631 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700632 }
633
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700634 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700635 polling_island *p1;
636 polling_island *p2;
637 ..
638 polling_island_lock_pair(&p1, &p2);
639 ..
640 .. Critical section with both p1 and p2 locked
641 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700642 // Release locks: Always call polling_island_unlock_pair() to release locks
643 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700644*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700645static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700646 polling_island *pi_1 = *p;
647 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700648 polling_island *next_1 = NULL;
649 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700650
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700651 /* The algorithm is simple:
652 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
653 keep updating pi_1 and pi_2)
654 - Then obtain locks on the islands by following a lock order rule of
655 locking polling_island with lower address first
656 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
657 pointing to the same island. If that is the case, we can just call
658 polling_island_lock()
659 - After obtaining both the locks, double check that the polling islands
660 are still the last polling islands in their respective linked lists
661 (this is because there might have been polling island merges before
662 we got the lock)
663 - If the polling islands are the last islands, we are done. If not,
664 release the locks and continue the process from the first step */
665 while (true) {
666 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
667 while (next_1 != NULL) {
668 pi_1 = next_1;
669 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700670 }
671
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700672 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
673 while (next_2 != NULL) {
674 pi_2 = next_2;
675 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
676 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700677
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700678 if (pi_1 == pi_2) {
679 pi_1 = pi_2 = polling_island_lock(pi_1);
680 break;
681 }
682
683 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700684 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700685 gpr_mu_lock(&pi_2->mu);
686 } else {
687 gpr_mu_lock(&pi_2->mu);
688 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700689 }
690
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700691 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
692 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
693 if (next_1 == NULL && next_2 == NULL) {
694 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700695 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700696
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700697 gpr_mu_unlock(&pi_1->mu);
698 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700699 }
700
701 *p = pi_1;
702 *q = pi_2;
703}
704
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700705static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
706 if (p == q) {
707 gpr_mu_unlock(&p->mu);
708 } else {
709 gpr_mu_unlock(&p->mu);
710 gpr_mu_unlock(&q->mu);
711 }
712}
713
Craig Tillerd8a3c042016-09-09 12:42:37 -0700714static void workqueue_maybe_wakeup(polling_island *pi) {
Craig Tiller2e620132016-10-10 15:27:44 -0700715 /* If this thread is the current poller, then it may be that it's about to
716 decrement the current poller count, so we need to look past this thread */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700717 bool is_current_poller = (g_current_thread_polling_island == pi);
718 gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
719 gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
Craig Tiller2e620132016-10-10 15:27:44 -0700720 /* Only issue a wakeup if it's likely that some poller could come in and take
721 it right now. Note that since we do an anticipatory mpscq_pop every poll
722 loop, it's ok if we miss the wakeup here, as we'll get the work item when
723 the next poller enters anyway. */
724 if (current_pollers > min_current_pollers_for_wakeup) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700725 GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
726 grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
727 }
728}
729
730static void workqueue_move_items_to_parent(polling_island *q) {
731 polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
732 if (p == NULL) {
733 return;
734 }
735 gpr_mu_lock(&q->workqueue_read_mu);
736 int num_added = 0;
737 while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
738 gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
739 if (n != NULL) {
740 gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
741 gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
742 gpr_mpscq_push(&p->workqueue_items, n);
743 num_added++;
744 }
745 }
746 gpr_mu_unlock(&q->workqueue_read_mu);
747 if (num_added > 0) {
748 workqueue_maybe_wakeup(p);
749 }
750 workqueue_move_items_to_parent(p);
751}
752
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700753static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700754 polling_island *q,
755 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700756 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700757 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700758
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700759 if (p != q) {
760 /* Make sure that p points to the polling island with fewer fds than q */
761 if (p->fd_cnt > q->fd_cnt) {
762 GPR_SWAP(polling_island *, p, q);
763 }
764
765 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
766 Note that the refcounts on the fds being moved will not change here.
767 This is why the last param in the following two functions is 'false') */
768 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
769 polling_island_remove_all_fds_locked(p, false, error);
770
771 /* Wakeup all the pollers (if any) on p so that they pickup this change */
772 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
773
774 /* Add the 'merged_to' link from p --> q */
775 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
776 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Craig Tillerd8a3c042016-09-09 12:42:37 -0700777
778 workqueue_move_items_to_parent(q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700779 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700780 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700781
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700782 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700783
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700784 /* Return the merged polling island (Note that no merge would have happened
785 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700786 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700787}
788
Craig Tillerd8a3c042016-09-09 12:42:37 -0700789static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
790 grpc_workqueue *workqueue, grpc_closure *closure,
791 grpc_error *error) {
Craig Tillerd8a3c042016-09-09 12:42:37 -0700792 GPR_TIMER_BEGIN("workqueue.enqueue", 0);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700793 /* take a ref to the workqueue: otherwise it can happen that whatever events
794 * this kicks off ends up destroying the workqueue before this function
795 * completes */
796 GRPC_WORKQUEUE_REF(workqueue, "enqueue");
797 polling_island *pi = (polling_island *)workqueue;
Craig Tillerd8a3c042016-09-09 12:42:37 -0700798 gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
799 closure->error_data.error = error;
800 gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
801 if (last == 0) {
802 workqueue_maybe_wakeup(pi);
803 }
Craig Tillerd8a3c042016-09-09 12:42:37 -0700804 workqueue_move_items_to_parent(pi);
Craig Tiller1dc5dbb2016-09-09 17:09:39 -0700805 GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
806 GPR_TIMER_END("workqueue.enqueue", 0);
Craig Tillerd8a3c042016-09-09 12:42:37 -0700807}
808
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700809static grpc_error *polling_island_global_init() {
810 grpc_error *error = GRPC_ERROR_NONE;
811
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700812 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
813 if (error == GRPC_ERROR_NONE) {
814 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
815 }
816
817 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700818}
819
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700820static void polling_island_global_shutdown() {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700821 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700822}
823
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700824/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700825 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700826 */
827
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700828/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700829 * but instead so that implementations with multiple threads in (for example)
830 * epoll_wait deal with the race between pollset removal and incoming poll
831 * notifications.
832 *
833 * The problem is that the poller ultimately holds a reference to this
834 * object, so it is very difficult to know when is safe to free it, at least
835 * without some expensive synchronization.
836 *
837 * If we keep the object freelisted, in the worst case losing this race just
838 * becomes a spurious read notification on a reused fd.
839 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700840
841/* The alarm system needs to be able to wakeup 'some poller' sometimes
842 * (specifically when a new alarm needs to be triggered earlier than the next
843 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
844 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700845
846/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
847 * sure to wake up one polling thread (which can wake up other threads if
848 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700849grpc_wakeup_fd grpc_global_wakeup_fd;
850
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700851static grpc_fd *fd_freelist = NULL;
852static gpr_mu fd_freelist_mu;
853
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700854#ifdef GRPC_FD_REF_COUNT_DEBUG
855#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
856#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
857static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
858 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700859 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
860 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700861 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
862#else
863#define REF_BY(fd, n, reason) ref_by(fd, n)
864#define UNREF_BY(fd, n, reason) unref_by(fd, n)
865static void ref_by(grpc_fd *fd, int n) {
866#endif
867 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
868}
869
870#ifdef GRPC_FD_REF_COUNT_DEBUG
871static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
872 int line) {
873 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700874 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
875 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700876 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
877#else
878static void unref_by(grpc_fd *fd, int n) {
879 gpr_atm old;
880#endif
881 old = gpr_atm_full_fetch_add(&fd->refst, -n);
882 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700883 /* Add the fd to the freelist */
884 gpr_mu_lock(&fd_freelist_mu);
885 fd->freelist_next = fd_freelist;
886 fd_freelist = fd;
887 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700888
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700889 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700890 } else {
891 GPR_ASSERT(old > n);
892 }
893}
894
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700895/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700896#ifdef GRPC_FD_REF_COUNT_DEBUG
897static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
898 int line) {
899 ref_by(fd, 2, reason, file, line);
900}
901
902static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
903 int line) {
904 unref_by(fd, 2, reason, file, line);
905}
906#else
907static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700908static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
909#endif
910
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700911static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
912
913static void fd_global_shutdown(void) {
914 gpr_mu_lock(&fd_freelist_mu);
915 gpr_mu_unlock(&fd_freelist_mu);
916 while (fd_freelist != NULL) {
917 grpc_fd *fd = fd_freelist;
918 fd_freelist = fd_freelist->freelist_next;
919 gpr_mu_destroy(&fd->mu);
920 gpr_free(fd);
921 }
922 gpr_mu_destroy(&fd_freelist_mu);
923}
924
925static grpc_fd *fd_create(int fd, const char *name) {
926 grpc_fd *new_fd = NULL;
927
928 gpr_mu_lock(&fd_freelist_mu);
929 if (fd_freelist != NULL) {
930 new_fd = fd_freelist;
931 fd_freelist = fd_freelist->freelist_next;
932 }
933 gpr_mu_unlock(&fd_freelist_mu);
934
935 if (new_fd == NULL) {
936 new_fd = gpr_malloc(sizeof(grpc_fd));
937 gpr_mu_init(&new_fd->mu);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700938 }
939
940 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
941 newly created fd (or an fd we got from the freelist), no one else would be
942 holding a lock to it anyway. */
943 gpr_mu_lock(&new_fd->mu);
944
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700945 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700946 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700947 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700948 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700949 new_fd->read_closure = CLOSURE_NOT_READY;
950 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700951 new_fd->polling_island = NULL;
952 new_fd->freelist_next = NULL;
953 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700954 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700955
956 gpr_mu_unlock(&new_fd->mu);
957
958 char *fd_name;
959 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
960 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700961#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700962 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700963#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700964 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700965 return new_fd;
966}
967
968static bool fd_is_orphaned(grpc_fd *fd) {
969 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
970}
971
972static int fd_wrapped_fd(grpc_fd *fd) {
973 int ret_fd = -1;
974 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700975 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700976 ret_fd = fd->fd;
977 }
978 gpr_mu_unlock(&fd->mu);
979
980 return ret_fd;
981}
982
983static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
984 grpc_closure *on_done, int *release_fd,
985 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700986 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700987 grpc_error *error = GRPC_ERROR_NONE;
Craig Tiller15007612016-07-06 09:36:16 -0700988 polling_island *unref_pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700989
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700990 gpr_mu_lock(&fd->mu);
991 fd->on_done_closure = on_done;
992
993 /* If release_fd is not NULL, we should be relinquishing control of the file
994 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700995 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700996 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700997 } else {
998 close(fd->fd);
999 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001000 }
1001
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001002 fd->orphaned = true;
1003
1004 /* Remove the active status but keep referenced. We want this grpc_fd struct
1005 to be alive (and not added to freelist) until the end of this function */
1006 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001007
1008 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001009 - Get a lock on the latest polling island (i.e the last island in the
1010 linked list pointed by fd->polling_island). This is the island that
1011 would actually contain the fd
1012 - Remove the fd from the latest polling island
1013 - Unlock the latest polling island
1014 - Set fd->polling_island to NULL (but remove the ref on the polling island
1015 before doing this.) */
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001016 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001017 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001018 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001019 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001020
Craig Tiller15007612016-07-06 09:36:16 -07001021 unref_pi = fd->polling_island;
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001022 fd->polling_island = NULL;
1023 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001024
Yuchen Zenga0399f22016-08-04 17:52:53 -07001025 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
1026 NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001027
1028 gpr_mu_unlock(&fd->mu);
1029 UNREF_BY(fd, 2, reason); /* Drop the reference */
Craig Tiller15007612016-07-06 09:36:16 -07001030 if (unref_pi != NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001031 /* Unref stale polling island here, outside the fd lock above.
1032 The polling island owns a workqueue which owns an fd, and unreffing
1033 inside the lock can cause an eventual lock loop that makes TSAN very
1034 unhappy. */
Craig Tiller15007612016-07-06 09:36:16 -07001035 PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
1036 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001037 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Yuchen Zenga0399f22016-08-04 17:52:53 -07001038 GRPC_ERROR_UNREF(error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001039}
1040
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001041static grpc_error *fd_shutdown_error(bool shutdown) {
1042 if (!shutdown) {
1043 return GRPC_ERROR_NONE;
1044 } else {
1045 return GRPC_ERROR_CREATE("FD shutdown");
1046 }
1047}
1048
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001049static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1050 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001051 if (fd->shutdown) {
1052 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
1053 NULL);
1054 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001055 /* not ready ==> switch to a waiting state by setting the closure */
1056 *st = closure;
1057 } else if (*st == CLOSURE_READY) {
1058 /* already ready ==> queue the closure to run immediately */
1059 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001060 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
1061 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001062 } else {
1063 /* upcallptr was set to a different closure. This is an error! */
1064 gpr_log(GPR_ERROR,
1065 "User called a notify_on function with a previous callback still "
1066 "pending");
1067 abort();
1068 }
1069}
1070
1071/* returns 1 if state becomes not ready */
1072static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1073 grpc_closure **st) {
1074 if (*st == CLOSURE_READY) {
1075 /* duplicate ready ==> ignore */
1076 return 0;
1077 } else if (*st == CLOSURE_NOT_READY) {
1078 /* not ready, and not waiting ==> flag ready */
1079 *st = CLOSURE_READY;
1080 return 0;
1081 } else {
1082 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001083 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001084 *st = CLOSURE_NOT_READY;
1085 return 1;
1086 }
1087}
1088
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001089static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
1090 grpc_fd *fd) {
1091 grpc_pollset *notifier = NULL;
1092
1093 gpr_mu_lock(&fd->mu);
1094 notifier = fd->read_notifier_pollset;
1095 gpr_mu_unlock(&fd->mu);
1096
1097 return notifier;
1098}
1099
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001100static bool fd_is_shutdown(grpc_fd *fd) {
1101 gpr_mu_lock(&fd->mu);
1102 const bool r = fd->shutdown;
1103 gpr_mu_unlock(&fd->mu);
1104 return r;
1105}
1106
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001107/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001108static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1109 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001110 /* Do the actual shutdown only once */
1111 if (!fd->shutdown) {
1112 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001113
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001114 shutdown(fd->fd, SHUT_RDWR);
1115 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1116 at this point, the closures would be called with 'success = false' */
1117 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1118 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1119 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001120 gpr_mu_unlock(&fd->mu);
1121}
1122
1123static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1124 grpc_closure *closure) {
1125 gpr_mu_lock(&fd->mu);
1126 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1127 gpr_mu_unlock(&fd->mu);
1128}
1129
1130static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1131 grpc_closure *closure) {
1132 gpr_mu_lock(&fd->mu);
1133 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1134 gpr_mu_unlock(&fd->mu);
1135}
1136
Craig Tillerd6ba6192016-06-30 15:42:41 -07001137static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001138 gpr_mu_lock(&fd->mu);
Craig Tillera10b0b12016-09-09 16:20:07 -07001139 grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF(
1140 (grpc_workqueue *)fd->polling_island, "fd_get_workqueue");
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001141 gpr_mu_unlock(&fd->mu);
Craig Tillerd6ba6192016-06-30 15:42:41 -07001142 return workqueue;
1143}
Craig Tiller70bd4832016-06-30 14:20:46 -07001144
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001145/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001146 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001147 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001148GPR_TLS_DECL(g_current_thread_pollset);
1149GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001150static __thread bool g_initialized_sigmask;
1151static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001152
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001153static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001154#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001155 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001156#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001157}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001158
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001159static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001160
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001161/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001162static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001163 gpr_tls_init(&g_current_thread_pollset);
1164 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001165 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001166 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001167}
1168
1169static void pollset_global_shutdown(void) {
1170 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001171 gpr_tls_destroy(&g_current_thread_pollset);
1172 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001173}
1174
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001175static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1176 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001177
1178 /* Kick the worker only if it was not already kicked */
1179 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1180 GRPC_POLLING_TRACE(
1181 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1182 (void *)worker, worker->pt_id);
1183 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1184 if (err_num != 0) {
1185 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1186 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001187 }
1188 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001189}
1190
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001191/* Return 1 if the pollset has active threads in pollset_work (pollset must
1192 * be locked) */
1193static int pollset_has_workers(grpc_pollset *p) {
1194 return p->root_worker.next != &p->root_worker;
1195}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001196
1197static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1198 worker->prev->next = worker->next;
1199 worker->next->prev = worker->prev;
1200}
1201
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001202static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1203 if (pollset_has_workers(p)) {
1204 grpc_pollset_worker *w = p->root_worker.next;
1205 remove_worker(p, w);
1206 return w;
1207 } else {
1208 return NULL;
1209 }
1210}
1211
1212static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1213 worker->next = &p->root_worker;
1214 worker->prev = worker->next->prev;
1215 worker->prev->next = worker->next->prev = worker;
1216}
1217
1218static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1219 worker->prev = &p->root_worker;
1220 worker->next = worker->prev->next;
1221 worker->prev->next = worker->next->prev = worker;
1222}
1223
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001224/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001225static grpc_error *pollset_kick(grpc_pollset *p,
1226 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001227 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001228 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001229 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001230 grpc_pollset_worker *worker = specific_worker;
1231 if (worker != NULL) {
1232 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001233 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001234 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001235 for (worker = p->root_worker.next; worker != &p->root_worker;
1236 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001237 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001238 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001239 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001240 }
Craig Tillera218a062016-06-26 09:58:37 -07001241 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001242 } else {
1243 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001244 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001245 } else {
1246 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001247 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001248 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001249 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001250 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001251 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1252 /* Since worker == NULL, it means that we can kick "any" worker on this
1253 pollset 'p'. If 'p' happens to be the same pollset this thread is
1254 currently polling (i.e in pollset_work() function), then there is no need
1255 to kick any other worker since the current thread can just absorb the
1256 kick. This is the reason why we enter this case only when
1257 g_current_thread_pollset is != p */
1258
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001259 GPR_TIMER_MARK("kick_anonymous", 0);
1260 worker = pop_front_worker(p);
1261 if (worker != NULL) {
1262 GPR_TIMER_MARK("finally_kick", 0);
1263 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001264 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001265 } else {
1266 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001267 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001268 }
1269 }
1270
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001271 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001272 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1273 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001274}
1275
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001276static grpc_error *kick_poller(void) {
1277 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1278}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001279
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001280static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1281 gpr_mu_init(&pollset->mu);
1282 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001283
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001284 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001285 pollset->kicked_without_pollers = false;
1286
1287 pollset->shutting_down = false;
1288 pollset->finish_shutdown_called = false;
1289 pollset->shutdown_done = NULL;
1290
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001291 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001292}
1293
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001294/* Convert a timespec to milliseconds:
1295 - Very small or negative poll times are clamped to zero to do a non-blocking
1296 poll (which becomes spin polling)
1297 - Other small values are rounded up to one millisecond
1298 - Longer than a millisecond polls are rounded up to the next nearest
1299 millisecond to avoid spinning
1300 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001301static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1302 gpr_timespec now) {
1303 gpr_timespec timeout;
1304 static const int64_t max_spin_polling_us = 10;
1305 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1306 return -1;
1307 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001308
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001309 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1310 max_spin_polling_us,
1311 GPR_TIMESPAN))) <= 0) {
1312 return 0;
1313 }
1314 timeout = gpr_time_sub(deadline, now);
1315 return gpr_time_to_millis(gpr_time_add(
1316 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1317}
1318
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001319static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1320 grpc_pollset *notifier) {
1321 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001322 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001323 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1324 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001325 gpr_mu_unlock(&fd->mu);
1326}
1327
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001328static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001329 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1330 gpr_mu_lock(&fd->mu);
1331 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1332 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001333}
1334
Craig Tillerb39307d2016-06-30 15:39:13 -07001335static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
1336 grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001337 if (ps->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001338 PI_UNREF(exec_ctx, ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001339 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001340 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001341}
1342
1343static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1344 grpc_pollset *pollset) {
1345 /* The pollset cannot have any workers if we are at this stage */
1346 GPR_ASSERT(!pollset_has_workers(pollset));
1347
1348 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001349
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001350 /* Release the ref and set pollset->polling_island to NULL */
Craig Tillerb39307d2016-06-30 15:39:13 -07001351 pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001352 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001353}
1354
1355/* pollset->mu lock must be held by the caller before calling this */
1356static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1357 grpc_closure *closure) {
1358 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1359 GPR_ASSERT(!pollset->shutting_down);
1360 pollset->shutting_down = true;
1361 pollset->shutdown_done = closure;
1362 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1363
1364 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1365 because it would release the underlying polling island. In such a case, we
1366 let the last worker call finish_shutdown_locked() from pollset_work() */
1367 if (!pollset_has_workers(pollset)) {
1368 GPR_ASSERT(!pollset->finish_shutdown_called);
1369 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1370 finish_shutdown_locked(exec_ctx, pollset);
1371 }
1372 GPR_TIMER_END("pollset_shutdown", 0);
1373}
1374
1375/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1376 * than destroying the mutexes, there is nothing special that needs to be done
1377 * here */
1378static void pollset_destroy(grpc_pollset *pollset) {
1379 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001380 gpr_mu_destroy(&pollset->mu);
1381}
1382
Craig Tiller2b49ea92016-07-01 13:21:27 -07001383static void pollset_reset(grpc_pollset *pollset) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001384 GPR_ASSERT(pollset->shutting_down);
1385 GPR_ASSERT(!pollset_has_workers(pollset));
1386 pollset->shutting_down = false;
1387 pollset->finish_shutdown_called = false;
1388 pollset->kicked_without_pollers = false;
1389 pollset->shutdown_done = NULL;
Craig Tillerb39307d2016-06-30 15:39:13 -07001390 GPR_ASSERT(pollset->polling_island == NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001391}
1392
Craig Tillerd8a3c042016-09-09 12:42:37 -07001393static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
1394 polling_island *pi) {
1395 if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
1396 gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
1397 gpr_mu_unlock(&pi->workqueue_read_mu);
1398 if (n != NULL) {
1399 if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
1400 workqueue_maybe_wakeup(pi);
1401 }
1402 grpc_closure *c = (grpc_closure *)n;
1403 grpc_closure_run(exec_ctx, c, c->error_data.error);
1404 return true;
1405 } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
Craig Tiller460502e2016-10-13 10:02:08 -07001406 /* n == NULL might mean there's work but it's not available to be popped
1407 * yet - try to ensure another workqueue wakes up to check shortly if so
1408 */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001409 workqueue_maybe_wakeup(pi);
1410 }
1411 }
1412 return false;
1413}
1414
Craig Tiller84ea3412016-09-08 14:57:56 -07001415#define GRPC_EPOLL_MAX_EVENTS 100
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001416/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1417static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001418 grpc_pollset *pollset,
1419 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001420 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001421 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001422 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001423 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001424 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001425 char *err_msg;
1426 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001427 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1428
1429 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001430 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001431
1432 Since epoll_fd is immutable, we can read it without obtaining the polling
1433 island lock. There is however a possibility that the polling island (from
1434 which we got the epoll_fd) got merged with another island while we are
1435 in this function. This is still okay because in such a case, we will wakeup
1436 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001437 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001438
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001439 if (pollset->polling_island == NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001440 pollset->polling_island = polling_island_create(exec_ctx, NULL, error);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001441 if (pollset->polling_island == NULL) {
1442 GPR_TIMER_END("pollset_work_and_unlock", 0);
1443 return; /* Fatal error. We cannot continue */
1444 }
1445
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001446 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001447 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1448 (void *)pollset, (void *)pollset->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001449 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001450
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001451 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001452 epoll_fd = pi->epoll_fd;
1453
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001454 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001455 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001456 if (pollset->polling_island != pi) {
1457 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1458 polling island to be deleted */
1459 PI_ADD_REF(pi, "ps");
Craig Tillerb39307d2016-06-30 15:39:13 -07001460 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001461 pollset->polling_island = pi;
1462 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001463
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001464 /* Add an extra ref so that the island does not get destroyed (which means
1465 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1466 epoll_fd */
1467 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001468 gpr_mu_unlock(&pollset->mu);
1469
Craig Tiller460502e2016-10-13 10:02:08 -07001470 /* If we get some workqueue work to do, it might end up completing an item on
1471 the completion queue, so there's no need to poll... so we skip that and
1472 redo the complete loop to verify */
Craig Tillerd8a3c042016-09-09 12:42:37 -07001473 if (!maybe_do_workqueue_work(exec_ctx, pi)) {
1474 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1475 g_current_thread_polling_island = pi;
1476
Vijay Paicef54012016-08-28 23:05:31 -07001477 GRPC_SCHEDULING_START_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001478 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1479 sig_mask);
Vijay Paicef54012016-08-28 23:05:31 -07001480 GRPC_SCHEDULING_END_BLOCKING_REGION;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001481 if (ep_rv < 0) {
1482 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001483 gpr_asprintf(&err_msg,
1484 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1485 epoll_fd, errno, strerror(errno));
1486 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001487 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001488 /* We were interrupted. Save an interation by doing a zero timeout
1489 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001490 GRPC_POLLING_TRACE(
1491 "pollset_work: pollset: %p, worker: %p received kick",
1492 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001493 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001494 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001495 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001496
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001497#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001498 /* See the definition of g_poll_sync for more details */
1499 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001500#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001501
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001502 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001503 void *data_ptr = ep_ev[i].data.ptr;
1504 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001505 append_error(error,
1506 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1507 err_desc);
Craig Tillerd8a3c042016-09-09 12:42:37 -07001508 } else if (data_ptr == &pi->workqueue_wakeup_fd) {
1509 append_error(error,
1510 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1511 err_desc);
1512 maybe_do_workqueue_work(exec_ctx, pi);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001513 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001514 GRPC_POLLING_TRACE(
1515 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1516 "%d) got merged",
1517 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001518 /* This means that our polling island is merged with a different
1519 island. We do not have to do anything here since the subsequent call
1520 to the function pollset_work_and_unlock() will pick up the correct
1521 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001522 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001523 grpc_fd *fd = data_ptr;
1524 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1525 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1526 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001527 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001528 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001529 }
1530 if (write_ev || cancel) {
1531 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001532 }
1533 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001534 }
Craig Tillerd8a3c042016-09-09 12:42:37 -07001535
1536 g_current_thread_polling_island = NULL;
1537 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1538 }
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001539
1540 GPR_ASSERT(pi != NULL);
1541
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001542 /* Before leaving, release the extra ref we added to the polling island. It
1543 is important to use "pi" here (i.e our old copy of pollset->polling_island
1544 that we got before releasing the polling island lock). This is because
1545 pollset->polling_island pointer might get udpated in other parts of the
1546 code when there is an island merge while we are doing epoll_wait() above */
Craig Tillerb39307d2016-06-30 15:39:13 -07001547 PI_UNREF(exec_ctx, pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001548
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001549 GPR_TIMER_END("pollset_work_and_unlock", 0);
1550}
1551
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001552/* pollset->mu lock must be held by the caller before calling this.
1553 The function pollset_work() may temporarily release the lock (pollset->mu)
1554 during the course of its execution but it will always re-acquire the lock and
1555 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001556static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1557 grpc_pollset_worker **worker_hdl,
1558 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001559 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001560 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001561 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1562
1563 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001564
1565 grpc_pollset_worker worker;
1566 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001567 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001568 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001569
1570 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001571
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001572 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1573 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001574
1575 if (pollset->kicked_without_pollers) {
1576 /* If the pollset was kicked without pollers, pretend that the current
1577 worker got the kick and skip polling. A kick indicates that there is some
1578 work that needs attention like an event on the completion queue or an
1579 alarm */
1580 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1581 pollset->kicked_without_pollers = 0;
1582 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001583 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001584 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1585 worker that there is some pending work that needs immediate attention
1586 (like an event on the completion queue, or a polling island merge that
1587 results in a new epoll-fd to wait on) and that the worker should not
1588 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001589
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001590 A worker can be kicked anytime from the point it is added to the pollset
1591 via push_front_worker() (or push_back_worker()) to the point it is
1592 removed via remove_worker().
1593 If the worker is kicked before/during it calls epoll_pwait(), it should
1594 immediately exit from epoll_wait(). If the worker is kicked after it
1595 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001596
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001597 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001598 times *except* when it is in epoll_pwait(). This way, the worker never
1599 misses acting on a kick */
1600
Craig Tiller19196992016-06-27 18:45:56 -07001601 if (!g_initialized_sigmask) {
1602 sigemptyset(&new_mask);
1603 sigaddset(&new_mask, grpc_wakeup_signal);
1604 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1605 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1606 g_initialized_sigmask = true;
1607 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1608 This is the mask used at all times *except during
1609 epoll_wait()*"
1610 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001611 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001612
Craig Tiller19196992016-06-27 18:45:56 -07001613 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001614 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001615 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001616
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001617 push_front_worker(pollset, &worker); /* Add worker to pollset */
1618
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001619 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1620 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001621 grpc_exec_ctx_flush(exec_ctx);
1622
1623 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001624
1625 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1626 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001627 remove_worker(pollset, &worker);
1628 }
1629
1630 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1631 false at this point) and the pollset is shutting down, we may have to
1632 finish the shutdown process by calling finish_shutdown_locked().
1633 See pollset_shutdown() for more details.
1634
1635 Note: Continuing to access pollset here is safe; it is the caller's
1636 responsibility to not destroy a pollset when it has outstanding calls to
1637 pollset_work() */
1638 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1639 !pollset->finish_shutdown_called) {
1640 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1641 finish_shutdown_locked(exec_ctx, pollset);
1642
1643 gpr_mu_unlock(&pollset->mu);
1644 grpc_exec_ctx_flush(exec_ctx);
1645 gpr_mu_lock(&pollset->mu);
1646 }
1647
1648 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001649
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001650 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1651 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001652
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001653 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001654
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001655 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1656 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001657}
1658
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001659static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1660 grpc_fd *fd) {
Craig Tiller57726ca2016-09-12 11:59:45 -07001661 GPR_TIMER_BEGIN("pollset_add_fd", 0);
1662
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001663 grpc_error *error = GRPC_ERROR_NONE;
1664
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001665 gpr_mu_lock(&pollset->mu);
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001666 gpr_mu_lock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001667
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001668 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001669
Craig Tiller7212c232016-07-06 13:11:09 -07001670retry:
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001671 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1672 * equal, do nothing.
1673 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1674 * a new polling island (with a refcount of 2) and make the polling_island
1675 * fields in both fd and pollset to point to the new island
1676 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1677 * the NULL polling_island field to point to the non-NULL polling_island
1678 * field (ensure that the refcount on the polling island is incremented by
1679 * 1 to account for the newly added reference)
1680 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1681 * and different, merge both the polling islands and update the
1682 * polling_island fields in both fd and pollset to point to the merged
1683 * polling island.
1684 */
Craig Tiller8e8027b2016-07-07 10:42:09 -07001685
Craig Tiller42ac6db2016-07-06 17:13:56 -07001686 if (fd->orphaned) {
1687 gpr_mu_unlock(&fd->mu);
1688 gpr_mu_unlock(&pollset->mu);
1689 /* early out */
1690 return;
1691 }
1692
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001693 if (fd->polling_island == pollset->polling_island) {
1694 pi_new = fd->polling_island;
1695 if (pi_new == NULL) {
Craig Tiller0a06cd72016-07-14 13:21:24 -07001696 /* Unlock before creating a new polling island: the polling island will
1697 create a workqueue which creates a file descriptor, and holding an fd
1698 lock here can eventually cause a loop to appear to TSAN (making it
1699 unhappy). We don't think it's a real loop (there's an epoch point where
1700 that loop possibility disappears), but the advantages of keeping TSAN
1701 happy outweigh any performance advantage we might have by keeping the
1702 lock held. */
Craig Tiller7212c232016-07-06 13:11:09 -07001703 gpr_mu_unlock(&fd->mu);
Craig Tillerb39307d2016-06-30 15:39:13 -07001704 pi_new = polling_island_create(exec_ctx, fd, &error);
Craig Tiller7212c232016-07-06 13:11:09 -07001705 gpr_mu_lock(&fd->mu);
Craig Tiller0a06cd72016-07-14 13:21:24 -07001706 /* Need to reverify any assumptions made between the initial lock and
1707 getting to this branch: if they've changed, we need to throw away our
1708 work and figure things out again. */
Craig Tiller7212c232016-07-06 13:11:09 -07001709 if (fd->polling_island != NULL) {
Craig Tiller27da6422016-07-06 13:14:46 -07001710 GRPC_POLLING_TRACE(
1711 "pollset_add_fd: Raced creating new polling island. pi_new: %p "
1712 "(fd: %d, pollset: %p)",
1713 (void *)pi_new, fd->fd, (void *)pollset);
Sree Kuchibhotlaa129adf2016-10-26 16:44:44 -07001714
1715 /* No need to lock 'pi_new' here since this is a new polling island and
1716 no one has a reference to it yet */
1717 polling_island_remove_all_fds_locked(pi_new, true, &error);
1718
1719 /* Ref and unref so that the polling island gets deleted during unref */
Craig Tiller27da6422016-07-06 13:14:46 -07001720 PI_ADD_REF(pi_new, "dance_of_destruction");
1721 PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
Craig Tiller7212c232016-07-06 13:11:09 -07001722 goto retry;
Craig Tiller27da6422016-07-06 13:14:46 -07001723 } else {
1724 GRPC_POLLING_TRACE(
1725 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
1726 "pollset: %p)",
1727 (void *)pi_new, fd->fd, (void *)pollset);
Craig Tiller7212c232016-07-06 13:11:09 -07001728 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001729 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001730 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001731 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001732 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001733 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001734
1735 GRPC_POLLING_TRACE(
1736 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1737 "pollset->pi: %p)",
1738 (void *)pi_new, fd->fd, (void *)pollset,
1739 (void *)pollset->polling_island);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001740 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001741 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001742 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001743
1744 GRPC_POLLING_TRACE(
1745 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1746 "%p, fd->pi: %p",
1747 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001748 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001749 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1750 &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001751 GRPC_POLLING_TRACE(
1752 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1753 "%p, fd->pi: %p, pollset->pi: %p)",
1754 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
1755 (void *)pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001756 }
1757
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001758 /* At this point, pi_new is the polling island that both fd->polling_island
1759 and pollset->polling_island must be pointing to */
1760
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001761 if (fd->polling_island != pi_new) {
1762 PI_ADD_REF(pi_new, "fd");
1763 if (fd->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001764 PI_UNREF(exec_ctx, fd->polling_island, "fd");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001765 }
1766 fd->polling_island = pi_new;
1767 }
1768
1769 if (pollset->polling_island != pi_new) {
1770 PI_ADD_REF(pi_new, "ps");
1771 if (pollset->polling_island != NULL) {
Craig Tillerb39307d2016-06-30 15:39:13 -07001772 PI_UNREF(exec_ctx, pollset->polling_island, "ps");
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001773 }
1774 pollset->polling_island = pi_new;
1775 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001776
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001777 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001778 gpr_mu_unlock(&pollset->mu);
Craig Tiller15007612016-07-06 09:36:16 -07001779
1780 GRPC_LOG_IF_ERROR("pollset_add_fd", error);
Craig Tiller57726ca2016-09-12 11:59:45 -07001781
1782 GPR_TIMER_END("pollset_add_fd", 0);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001783}
1784
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001785/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001786 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001787 */
1788
1789static grpc_pollset_set *pollset_set_create(void) {
1790 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1791 memset(pollset_set, 0, sizeof(*pollset_set));
1792 gpr_mu_init(&pollset_set->mu);
1793 return pollset_set;
1794}
1795
1796static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1797 size_t i;
1798 gpr_mu_destroy(&pollset_set->mu);
1799 for (i = 0; i < pollset_set->fd_count; i++) {
1800 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1801 }
1802 gpr_free(pollset_set->pollsets);
1803 gpr_free(pollset_set->pollset_sets);
1804 gpr_free(pollset_set->fds);
1805 gpr_free(pollset_set);
1806}
1807
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001808static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1809 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1810 size_t i;
1811 gpr_mu_lock(&pollset_set->mu);
1812 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1813 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1814 pollset_set->fds = gpr_realloc(
1815 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1816 }
1817 GRPC_FD_REF(fd, "pollset_set");
1818 pollset_set->fds[pollset_set->fd_count++] = fd;
1819 for (i = 0; i < pollset_set->pollset_count; i++) {
1820 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1821 }
1822 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1823 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1824 }
1825 gpr_mu_unlock(&pollset_set->mu);
1826}
1827
1828static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1829 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1830 size_t i;
1831 gpr_mu_lock(&pollset_set->mu);
1832 for (i = 0; i < pollset_set->fd_count; i++) {
1833 if (pollset_set->fds[i] == fd) {
1834 pollset_set->fd_count--;
1835 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1836 pollset_set->fds[pollset_set->fd_count]);
1837 GRPC_FD_UNREF(fd, "pollset_set");
1838 break;
1839 }
1840 }
1841 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1842 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1843 }
1844 gpr_mu_unlock(&pollset_set->mu);
1845}
1846
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001847static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1848 grpc_pollset_set *pollset_set,
1849 grpc_pollset *pollset) {
1850 size_t i, j;
1851 gpr_mu_lock(&pollset_set->mu);
1852 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1853 pollset_set->pollset_capacity =
1854 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1855 pollset_set->pollsets =
1856 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1857 sizeof(*pollset_set->pollsets));
1858 }
1859 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1860 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1861 if (fd_is_orphaned(pollset_set->fds[i])) {
1862 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1863 } else {
1864 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1865 pollset_set->fds[j++] = pollset_set->fds[i];
1866 }
1867 }
1868 pollset_set->fd_count = j;
1869 gpr_mu_unlock(&pollset_set->mu);
1870}
1871
1872static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1873 grpc_pollset_set *pollset_set,
1874 grpc_pollset *pollset) {
1875 size_t i;
1876 gpr_mu_lock(&pollset_set->mu);
1877 for (i = 0; i < pollset_set->pollset_count; i++) {
1878 if (pollset_set->pollsets[i] == pollset) {
1879 pollset_set->pollset_count--;
1880 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1881 pollset_set->pollsets[pollset_set->pollset_count]);
1882 break;
1883 }
1884 }
1885 gpr_mu_unlock(&pollset_set->mu);
1886}
1887
1888static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1889 grpc_pollset_set *bag,
1890 grpc_pollset_set *item) {
1891 size_t i, j;
1892 gpr_mu_lock(&bag->mu);
1893 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1894 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1895 bag->pollset_sets =
1896 gpr_realloc(bag->pollset_sets,
1897 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1898 }
1899 bag->pollset_sets[bag->pollset_set_count++] = item;
1900 for (i = 0, j = 0; i < bag->fd_count; i++) {
1901 if (fd_is_orphaned(bag->fds[i])) {
1902 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1903 } else {
1904 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1905 bag->fds[j++] = bag->fds[i];
1906 }
1907 }
1908 bag->fd_count = j;
1909 gpr_mu_unlock(&bag->mu);
1910}
1911
1912static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1913 grpc_pollset_set *bag,
1914 grpc_pollset_set *item) {
1915 size_t i;
1916 gpr_mu_lock(&bag->mu);
1917 for (i = 0; i < bag->pollset_set_count; i++) {
1918 if (bag->pollset_sets[i] == item) {
1919 bag->pollset_set_count--;
1920 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1921 bag->pollset_sets[bag->pollset_set_count]);
1922 break;
1923 }
1924 }
1925 gpr_mu_unlock(&bag->mu);
1926}
1927
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001928/* Test helper functions
1929 * */
1930void *grpc_fd_get_polling_island(grpc_fd *fd) {
1931 polling_island *pi;
1932
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001933 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001934 pi = fd->polling_island;
Craig Tillerf83f8ca2016-07-06 11:34:08 -07001935 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001936
1937 return pi;
1938}
1939
1940void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1941 polling_island *pi;
1942
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001943 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001944 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001945 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001946
1947 return pi;
1948}
1949
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001950bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001951 polling_island *p1 = p;
1952 polling_island *p2 = q;
1953
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001954 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1955 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001956 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001957 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001958
1959 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001960}
1961
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001962/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001963 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001964 */
1965
1966static void shutdown_engine(void) {
1967 fd_global_shutdown();
1968 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001969 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001970}
1971
1972static const grpc_event_engine_vtable vtable = {
1973 .pollset_size = sizeof(grpc_pollset),
1974
1975 .fd_create = fd_create,
1976 .fd_wrapped_fd = fd_wrapped_fd,
1977 .fd_orphan = fd_orphan,
1978 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001979 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001980 .fd_notify_on_read = fd_notify_on_read,
1981 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001982 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Craig Tiller70bd4832016-06-30 14:20:46 -07001983 .fd_get_workqueue = fd_get_workqueue,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001984
1985 .pollset_init = pollset_init,
1986 .pollset_shutdown = pollset_shutdown,
1987 .pollset_reset = pollset_reset,
1988 .pollset_destroy = pollset_destroy,
1989 .pollset_work = pollset_work,
1990 .pollset_kick = pollset_kick,
1991 .pollset_add_fd = pollset_add_fd,
1992
1993 .pollset_set_create = pollset_set_create,
1994 .pollset_set_destroy = pollset_set_destroy,
1995 .pollset_set_add_pollset = pollset_set_add_pollset,
1996 .pollset_set_del_pollset = pollset_set_del_pollset,
1997 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1998 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1999 .pollset_set_add_fd = pollset_set_add_fd,
2000 .pollset_set_del_fd = pollset_set_del_fd,
2001
2002 .kick_poller = kick_poller,
2003
Craig Tillerd8a3c042016-09-09 12:42:37 -07002004 .workqueue_ref = workqueue_ref,
2005 .workqueue_unref = workqueue_unref,
2006 .workqueue_enqueue = workqueue_enqueue,
2007
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002008 .shutdown_engine = shutdown_engine,
2009};
2010
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002011/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
2012 * Create a dummy epoll_fd to make sure epoll support is available */
2013static bool is_epoll_available() {
2014 int fd = epoll_create1(EPOLL_CLOEXEC);
2015 if (fd < 0) {
2016 gpr_log(
2017 GPR_ERROR,
2018 "epoll_create1 failed with error: %d. Not using epoll polling engine",
2019 fd);
2020 return false;
2021 }
2022 close(fd);
2023 return true;
2024}
2025
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002026const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002027 /* If use of signals is disabled, we cannot use epoll engine*/
2028 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
2029 return NULL;
2030 }
2031
Ken Paysoncd7d0472016-10-11 12:24:20 -07002032 if (!grpc_has_wakeup_fd()) {
Ken Paysonbc544be2016-10-06 19:23:47 -07002033 return NULL;
2034 }
2035
Sree Kuchibhotla72744022016-06-09 09:42:06 -07002036 if (!is_epoll_available()) {
2037 return NULL;
2038 }
2039
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002040 if (!is_grpc_wakeup_signal_initialized) {
Sree Kuchibhotlabd48c912016-09-27 16:48:25 -07002041 grpc_use_signal(SIGRTMIN + 6);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002042 }
2043
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002044 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07002045
2046 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
2047 return NULL;
2048 }
2049
2050 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
2051 polling_island_global_init())) {
2052 return NULL;
2053 }
2054
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07002055 return &vtable;
2056}
2057
murgatroid99623dd4f2016-08-08 17:31:27 -07002058#else /* defined(GRPC_LINUX_EPOLL) */
2059#if defined(GRPC_POSIX_SOCKET)
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07002060#include "src/core/lib/iomgr/ev_posix.h"
murgatroid99623dd4f2016-08-08 17:31:27 -07002061/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002062 * NULL */
2063const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
murgatroid99623dd4f2016-08-08 17:31:27 -07002064#endif /* defined(GRPC_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07002065
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07002066void grpc_use_signal(int signum) {}
murgatroid99623dd4f2016-08-08 17:31:27 -07002067#endif /* !defined(GRPC_LINUX_EPOLL) */