blob: cf0fe736a0b492b5c03fbfb81cfacc693a418726 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
45#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/wakeup_fd_posix.h"
60#include "src/core/lib/profiling/timers.h"
61#include "src/core/lib/support/block_annotate.h"
62
Sree Kuchibhotla34217242016-06-29 00:19:07 -070063/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
64static int grpc_polling_trace = 0; /* Disabled by default */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -070065#define GRPC_POLLING_TRACE(fmt, ...) \
66 if (grpc_polling_trace) { \
67 gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
68 }
69
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070070static int grpc_wakeup_signal = -1;
71static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070072
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070073/* Implements the function defined in grpc_posix.h. This function might be
74 * called before even calling grpc_init() to set either a different signal to
75 * use. If signum == -1, then the use of signals is disabled */
76void grpc_use_signal(int signum) {
77 grpc_wakeup_signal = signum;
78 is_grpc_wakeup_signal_initialized = true;
79
80 if (grpc_wakeup_signal < 0) {
81 gpr_log(GPR_INFO,
82 "Use of signals is disabled. Epoll engine will not be used");
83 } else {
84 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
85 grpc_wakeup_signal);
86 }
87}
88
89struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070090
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070091/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070092 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070094struct grpc_fd {
95 int fd;
96 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070097 bit 0 : 1=Active / 0=Orphaned
98 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070099 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700100 gpr_atm refst;
101
102 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700103
104 /* Indicates that the fd is shutdown and that any pending read/write closures
105 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700106 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700107
108 /* The fd is either closed or we relinquished control of it. In either cases,
109 this indicates that the 'fd' on this structure is no longer valid */
110 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700112 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700113 grpc_closure *read_closure;
114 grpc_closure *write_closure;
115
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700116 /* The polling island to which this fd belongs to and the mutex protecting the
117 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700118 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 struct polling_island *polling_island;
120
121 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122 grpc_closure *on_done_closure;
123
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700124 /* The pollset that last noticed that the fd is readable */
125 grpc_pollset *read_notifier_pollset;
126
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700127 grpc_iomgr_object iomgr_object;
128};
129
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700130/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700131// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700132#ifdef GRPC_FD_REF_COUNT_DEBUG
133static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
134static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
135 int line);
136#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
137#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
138#else
139static void fd_ref(grpc_fd *fd);
140static void fd_unref(grpc_fd *fd);
141#define GRPC_FD_REF(fd, reason) fd_ref(fd)
142#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
143#endif
144
145static void fd_global_init(void);
146static void fd_global_shutdown(void);
147
148#define CLOSURE_NOT_READY ((grpc_closure *)0)
149#define CLOSURE_READY ((grpc_closure *)1)
150
151/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700152 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700153 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700154
155// #define GRPC_PI_REF_COUNT_DEBUG
156#ifdef GRPC_PI_REF_COUNT_DEBUG
157
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700158#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
159#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700160
161#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
162
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700163#define PI_ADD_REF(p, r) pi_add_ref((p))
164#define PI_UNREF(p, r) pi_unref((p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700165
166#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
167
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700168typedef struct polling_island {
169 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700170 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
171 the refcount.
172 Once the ref count becomes zero, this structure is destroyed which means
173 we should ensure that there is never a scenario where a PI_ADD_REF() is
174 racing with a PI_UNREF() that just made the ref_count zero. */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700175 gpr_refcount ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700176
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700177 /* Pointer to the polling_island this merged into.
178 * merged_to value is only set once in polling_island's lifetime (and that too
179 * only if the island is merged with another island). Because of this, we can
180 * use gpr_atm type here so that we can do atomic access on this and reduce
181 * lock contention on 'mu' mutex.
182 *
183 * Note that if this field is not NULL (i.e not 0), all the remaining fields
184 * (except mu and ref_count) are invalid and must be ignored. */
185 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700186
187 /* The fd of the underlying epoll set */
188 int epoll_fd;
189
190 /* The file descriptors in the epoll set */
191 size_t fd_cnt;
192 size_t fd_capacity;
193 grpc_fd **fds;
194
195 /* Polling islands that are no longer needed are kept in a freelist so that
196 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700197 free list */
198 struct polling_island *next_free;
199} polling_island;
200
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700201/*******************************************************************************
202 * Pollset Declarations
203 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700204struct grpc_pollset_worker {
Sree Kuchibhotla34217242016-06-29 00:19:07 -0700205 /* Thread id of this worker */
206 pthread_t pt_id;
207
208 /* Used to prevent a worker from getting kicked multiple times */
209 gpr_atm is_kicked;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700210 struct grpc_pollset_worker *next;
211 struct grpc_pollset_worker *prev;
212};
213
214struct grpc_pollset {
215 gpr_mu mu;
216 grpc_pollset_worker root_worker;
217 bool kicked_without_pollers;
218
219 bool shutting_down; /* Is the pollset shutting down ? */
220 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
221 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
222
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700223 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700224 struct polling_island *polling_island;
225};
226
227/*******************************************************************************
228 * Pollset-set Declarations
229 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700230/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
231 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
232 * the current pollset_set would result in polling island merges. This would
233 * remove the need to maintain fd_count here. This will also significantly
234 * simplify the grpc_fd structure since we would no longer need to explicitly
235 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700236struct grpc_pollset_set {
237 gpr_mu mu;
238
239 size_t pollset_count;
240 size_t pollset_capacity;
241 grpc_pollset **pollsets;
242
243 size_t pollset_set_count;
244 size_t pollset_set_capacity;
245 struct grpc_pollset_set **pollset_sets;
246
247 size_t fd_count;
248 size_t fd_capacity;
249 grpc_fd **fds;
250};
251
252/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700253 * Common helpers
254 */
255
256static void append_error(grpc_error **composite, grpc_error *error,
257 const char *desc) {
258 if (error == GRPC_ERROR_NONE) return;
259 if (*composite == GRPC_ERROR_NONE) {
260 *composite = GRPC_ERROR_CREATE(desc);
261 }
262 *composite = grpc_error_add_child(*composite, error);
263}
264
265/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700266 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700267 */
268
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700269/* The wakeup fd that is used to wake up all threads in a Polling island. This
270 is useful in the polling island merge operation where we need to wakeup all
271 the threads currently polling the smaller polling island (so that they can
272 start polling the new/merged polling island)
273
274 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
275 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
276static grpc_wakeup_fd polling_island_wakeup_fd;
277
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700278/* Polling island freelist */
279static gpr_mu g_pi_freelist_mu;
280static polling_island *g_pi_freelist = NULL;
281
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700282static void polling_island_delete(); /* Forward declaration */
283
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700284#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700285/* Currently TSAN may incorrectly flag data races between epoll_ctl and
286 epoll_wait for any grpc_fd structs that are added to the epoll set via
287 epoll_ctl and are returned (within a very short window) via epoll_wait().
288
289 To work-around this race, we establish a happens-before relation between
290 the code just-before epoll_ctl() and the code after epoll_wait() by using
291 this atomic */
292gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700293#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700294
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700295#ifdef GRPC_PI_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700296void pi_add_ref(polling_island *pi);
297void pi_unref(polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700298
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700299void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) {
300 long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
301 pi_add_ref(pi);
302 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
303 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700304}
305
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700306void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) {
307 long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
308 pi_unref(pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700309 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700310 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700311}
312#endif
313
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700314void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700315
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700316void pi_unref(polling_island *pi) {
317 /* If ref count went to zero, delete the polling island.
318 Note that this deletion not be done under a lock. Once the ref count goes
319 to zero, we are guaranteed that no one else holds a reference to the
320 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700321
322 Also, if we are deleting the polling island and the merged_to field is
323 non-empty, we should remove a ref to the merged_to polling island
324 */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700325 if (gpr_unref(&pi->ref_count)) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700326 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
327 polling_island_delete(pi);
328 if (next != NULL) {
329 PI_UNREF(next, "pi_delete"); /* Recursive call */
330 }
331 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700332}
333
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700334/* The caller is expected to hold pi->mu lock before calling this function */
335static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700336 size_t fd_count, bool add_fd_refs,
337 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700338 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700339 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700340 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700341 char *err_msg;
342 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700343
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700344#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700345 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700346 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700347#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700348
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700349 for (i = 0; i < fd_count; i++) {
350 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
351 ev.data.ptr = fds[i];
352 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700353
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700354 if (err < 0) {
355 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700356 gpr_asprintf(
357 &err_msg,
358 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
359 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
360 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
361 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700362 }
363
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700364 continue;
365 }
366
367 if (pi->fd_cnt == pi->fd_capacity) {
368 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
369 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
370 }
371
372 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700373 if (add_fd_refs) {
374 GRPC_FD_REF(fds[i], "polling_island");
375 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700376 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700377}
378
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700379/* The caller is expected to hold pi->mu before calling this */
380static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700381 grpc_wakeup_fd *wakeup_fd,
382 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700383 struct epoll_event ev;
384 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700385 char *err_msg;
386 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700387
388 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
389 ev.data.ptr = wakeup_fd;
390 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
391 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700392 if (err < 0 && errno != EEXIST) {
393 gpr_asprintf(&err_msg,
394 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
395 "error: %d (%s)",
396 pi->epoll_fd,
397 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
398 strerror(errno));
399 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
400 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700401 }
402}
403
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700404/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700405static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700406 bool remove_fd_refs,
407 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700408 int err;
409 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700410 char *err_msg;
411 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700412
413 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700414 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700415 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700416 gpr_asprintf(&err_msg,
417 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
418 "error: %d (%s)",
419 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
420 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
421 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700422 }
423
424 if (remove_fd_refs) {
425 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700426 }
427 }
428
429 pi->fd_cnt = 0;
430}
431
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700432/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700433static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700434 bool is_fd_closed,
435 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700436 int err;
437 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700438 char *err_msg;
439 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700440
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700441 /* If fd is already closed, then it would have been automatically been removed
442 from the epoll set */
443 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700444 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
445 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700446 gpr_asprintf(
447 &err_msg,
448 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
449 pi->epoll_fd, fd->fd, errno, strerror(errno));
450 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
451 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700452 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700453 }
454
455 for (i = 0; i < pi->fd_cnt; i++) {
456 if (pi->fds[i] == fd) {
457 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700458 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700459 break;
460 }
461 }
462}
463
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700464/* Might return NULL in case of an error */
465static polling_island *polling_island_create(grpc_fd *initial_fd,
466 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700467 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700468 char *err_msg;
469 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700470
471 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700472 gpr_mu_lock(&g_pi_freelist_mu);
473 if (g_pi_freelist != NULL) {
474 pi = g_pi_freelist;
475 g_pi_freelist = g_pi_freelist->next_free;
476 pi->next_free = NULL;
477 }
478 gpr_mu_unlock(&g_pi_freelist_mu);
479
480 /* Create new polling island if we could not get one from the free list */
481 if (pi == NULL) {
482 pi = gpr_malloc(sizeof(*pi));
483 gpr_mu_init(&pi->mu);
484 pi->fd_cnt = 0;
485 pi->fd_capacity = 0;
486 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700487 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700488
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700489 gpr_ref_init(&pi->ref_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700490 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700491
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700492 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700493
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700494 if (pi->epoll_fd < 0) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700495 gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno,
496 strerror(errno));
497 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
498 gpr_free(err_msg);
499 } else {
500 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
501 pi->next_free = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700502
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700503 if (initial_fd != NULL) {
504 /* Lock the polling island here just in case we got this structure from
505 the freelist and the polling island lock was not released yet (by the
506 code that adds the polling island to the freelist) */
507 gpr_mu_lock(&pi->mu);
508 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
509 gpr_mu_unlock(&pi->mu);
510 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700511 }
512
513 return pi;
514}
515
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700516static void polling_island_delete(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700517 GPR_ASSERT(pi->fd_cnt == 0);
518
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700519 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700520
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700521 close(pi->epoll_fd);
522 pi->epoll_fd = -1;
523
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700524 gpr_mu_lock(&g_pi_freelist_mu);
525 pi->next_free = g_pi_freelist;
526 g_pi_freelist = pi;
527 gpr_mu_unlock(&g_pi_freelist_mu);
528}
529
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700530/* Attempts to gets the last polling island in the linked list (liked by the
531 * 'merged_to' field). Since this does not lock the polling island, there are no
532 * guarantees that the island returned is the last island */
533static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
534 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
535 while (next != NULL) {
536 pi = next;
537 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
538 }
539
540 return pi;
541}
542
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700543/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700544 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700545 returned polling island's mu.
546 Usage: To lock/unlock polling island "pi", do the following:
547 polling_island *pi_latest = polling_island_lock(pi);
548 ...
549 ... critical section ..
550 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700551 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
552static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700553 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700554
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700555 while (true) {
556 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
557 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700558 /* Looks like 'pi' is the last node in the linked list but unless we check
559 this by holding the pi->mu lock, we cannot be sure (i.e without the
560 pi->mu lock, we don't prevent island merges).
561 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700562 gpr_mu_lock(&pi->mu);
563 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
564 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700565 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700566 break;
567 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700568
569 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
570 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700571 gpr_mu_unlock(&pi->mu);
572 }
573
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700574 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700575 }
576
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700577 return pi;
578}
579
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700580/* Gets the lock on the *latest* polling islands in the linked lists pointed by
581 *p and *q (and also updates *p and *q to point to the latest polling islands)
582
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700583 This function is needed because calling the following block of code to obtain
584 locks on polling islands (*p and *q) is prone to deadlocks.
585 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700586 polling_island_lock(*p, true);
587 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700588 }
589
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700590 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700591 polling_island *p1;
592 polling_island *p2;
593 ..
594 polling_island_lock_pair(&p1, &p2);
595 ..
596 .. Critical section with both p1 and p2 locked
597 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700598 // Release locks: Always call polling_island_unlock_pair() to release locks
599 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700600*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700601static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700602 polling_island *pi_1 = *p;
603 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700604 polling_island *next_1 = NULL;
605 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700606
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700607 /* The algorithm is simple:
608 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
609 keep updating pi_1 and pi_2)
610 - Then obtain locks on the islands by following a lock order rule of
611 locking polling_island with lower address first
612 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
613 pointing to the same island. If that is the case, we can just call
614 polling_island_lock()
615 - After obtaining both the locks, double check that the polling islands
616 are still the last polling islands in their respective linked lists
617 (this is because there might have been polling island merges before
618 we got the lock)
619 - If the polling islands are the last islands, we are done. If not,
620 release the locks and continue the process from the first step */
621 while (true) {
622 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
623 while (next_1 != NULL) {
624 pi_1 = next_1;
625 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700626 }
627
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700628 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
629 while (next_2 != NULL) {
630 pi_2 = next_2;
631 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
632 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700633
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700634 if (pi_1 == pi_2) {
635 pi_1 = pi_2 = polling_island_lock(pi_1);
636 break;
637 }
638
639 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700640 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700641 gpr_mu_lock(&pi_2->mu);
642 } else {
643 gpr_mu_lock(&pi_2->mu);
644 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700645 }
646
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700647 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
648 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
649 if (next_1 == NULL && next_2 == NULL) {
650 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700651 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700652
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700653 gpr_mu_unlock(&pi_1->mu);
654 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700655 }
656
657 *p = pi_1;
658 *q = pi_2;
659}
660
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700661static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
662 if (p == q) {
663 gpr_mu_unlock(&p->mu);
664 } else {
665 gpr_mu_unlock(&p->mu);
666 gpr_mu_unlock(&q->mu);
667 }
668}
669
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700670static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700671 polling_island *q,
672 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700673 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700674 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700675
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700676 if (p != q) {
677 /* Make sure that p points to the polling island with fewer fds than q */
678 if (p->fd_cnt > q->fd_cnt) {
679 GPR_SWAP(polling_island *, p, q);
680 }
681
682 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
683 Note that the refcounts on the fds being moved will not change here.
684 This is why the last param in the following two functions is 'false') */
685 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
686 polling_island_remove_all_fds_locked(p, false, error);
687
688 /* Wakeup all the pollers (if any) on p so that they pickup this change */
689 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
690
691 /* Add the 'merged_to' link from p --> q */
692 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
693 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700694 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700695 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700696
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700697 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700698
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700699 /* Return the merged polling island (Note that no merge would have happened
700 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700701 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700702}
703
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700704static grpc_error *polling_island_global_init() {
705 grpc_error *error = GRPC_ERROR_NONE;
706
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700707 gpr_mu_init(&g_pi_freelist_mu);
708 g_pi_freelist = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700709
710 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
711 if (error == GRPC_ERROR_NONE) {
712 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
713 }
714
715 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700716}
717
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700718static void polling_island_global_shutdown() {
719 polling_island *next;
720 gpr_mu_lock(&g_pi_freelist_mu);
721 gpr_mu_unlock(&g_pi_freelist_mu);
722 while (g_pi_freelist != NULL) {
723 next = g_pi_freelist->next_free;
724 gpr_mu_destroy(&g_pi_freelist->mu);
725 gpr_free(g_pi_freelist->fds);
726 gpr_free(g_pi_freelist);
727 g_pi_freelist = next;
728 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700729 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700730
731 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700732}
733
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700734/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700735 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700736 */
737
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700738/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700739 * but instead so that implementations with multiple threads in (for example)
740 * epoll_wait deal with the race between pollset removal and incoming poll
741 * notifications.
742 *
743 * The problem is that the poller ultimately holds a reference to this
744 * object, so it is very difficult to know when is safe to free it, at least
745 * without some expensive synchronization.
746 *
747 * If we keep the object freelisted, in the worst case losing this race just
748 * becomes a spurious read notification on a reused fd.
749 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700750
751/* The alarm system needs to be able to wakeup 'some poller' sometimes
752 * (specifically when a new alarm needs to be triggered earlier than the next
753 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
754 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700755
756/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
757 * sure to wake up one polling thread (which can wake up other threads if
758 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700759grpc_wakeup_fd grpc_global_wakeup_fd;
760
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700761static grpc_fd *fd_freelist = NULL;
762static gpr_mu fd_freelist_mu;
763
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700764#ifdef GRPC_FD_REF_COUNT_DEBUG
765#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
766#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
767static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
768 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700769 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
770 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700771 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
772#else
773#define REF_BY(fd, n, reason) ref_by(fd, n)
774#define UNREF_BY(fd, n, reason) unref_by(fd, n)
775static void ref_by(grpc_fd *fd, int n) {
776#endif
777 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
778}
779
780#ifdef GRPC_FD_REF_COUNT_DEBUG
781static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
782 int line) {
783 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700784 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
785 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700786 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
787#else
788static void unref_by(grpc_fd *fd, int n) {
789 gpr_atm old;
790#endif
791 old = gpr_atm_full_fetch_add(&fd->refst, -n);
792 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700793 /* Add the fd to the freelist */
794 gpr_mu_lock(&fd_freelist_mu);
795 fd->freelist_next = fd_freelist;
796 fd_freelist = fd;
797 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700798
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700799 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700800 } else {
801 GPR_ASSERT(old > n);
802 }
803}
804
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700805/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700806#ifdef GRPC_FD_REF_COUNT_DEBUG
807static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
808 int line) {
809 ref_by(fd, 2, reason, file, line);
810}
811
812static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
813 int line) {
814 unref_by(fd, 2, reason, file, line);
815}
816#else
817static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700818static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
819#endif
820
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700821static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
822
823static void fd_global_shutdown(void) {
824 gpr_mu_lock(&fd_freelist_mu);
825 gpr_mu_unlock(&fd_freelist_mu);
826 while (fd_freelist != NULL) {
827 grpc_fd *fd = fd_freelist;
828 fd_freelist = fd_freelist->freelist_next;
829 gpr_mu_destroy(&fd->mu);
830 gpr_free(fd);
831 }
832 gpr_mu_destroy(&fd_freelist_mu);
833}
834
835static grpc_fd *fd_create(int fd, const char *name) {
836 grpc_fd *new_fd = NULL;
837
838 gpr_mu_lock(&fd_freelist_mu);
839 if (fd_freelist != NULL) {
840 new_fd = fd_freelist;
841 fd_freelist = fd_freelist->freelist_next;
842 }
843 gpr_mu_unlock(&fd_freelist_mu);
844
845 if (new_fd == NULL) {
846 new_fd = gpr_malloc(sizeof(grpc_fd));
847 gpr_mu_init(&new_fd->mu);
848 gpr_mu_init(&new_fd->pi_mu);
849 }
850
851 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
852 newly created fd (or an fd we got from the freelist), no one else would be
853 holding a lock to it anyway. */
854 gpr_mu_lock(&new_fd->mu);
855
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700856 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700857 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700858 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700859 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700860 new_fd->read_closure = CLOSURE_NOT_READY;
861 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700862 new_fd->polling_island = NULL;
863 new_fd->freelist_next = NULL;
864 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700865 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700866
867 gpr_mu_unlock(&new_fd->mu);
868
869 char *fd_name;
870 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
871 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700872#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700873 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700874#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700875 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700876 return new_fd;
877}
878
879static bool fd_is_orphaned(grpc_fd *fd) {
880 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
881}
882
883static int fd_wrapped_fd(grpc_fd *fd) {
884 int ret_fd = -1;
885 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700886 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700887 ret_fd = fd->fd;
888 }
889 gpr_mu_unlock(&fd->mu);
890
891 return ret_fd;
892}
893
894static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
895 grpc_closure *on_done, int *release_fd,
896 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700897 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700898 grpc_error *error = GRPC_ERROR_NONE;
899
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700900 gpr_mu_lock(&fd->mu);
901 fd->on_done_closure = on_done;
902
903 /* If release_fd is not NULL, we should be relinquishing control of the file
904 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700905 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700906 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700907 } else {
908 close(fd->fd);
909 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700910 }
911
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700912 fd->orphaned = true;
913
914 /* Remove the active status but keep referenced. We want this grpc_fd struct
915 to be alive (and not added to freelist) until the end of this function */
916 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700917
918 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700919 - Get a lock on the latest polling island (i.e the last island in the
920 linked list pointed by fd->polling_island). This is the island that
921 would actually contain the fd
922 - Remove the fd from the latest polling island
923 - Unlock the latest polling island
924 - Set fd->polling_island to NULL (but remove the ref on the polling island
925 before doing this.) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700926 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700927 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700928 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700929 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700930 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700931
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700932 PI_UNREF(fd->polling_island, "fd_orphan");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700933 fd->polling_island = NULL;
934 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700935 gpr_mu_unlock(&fd->pi_mu);
936
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700937 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700938
939 gpr_mu_unlock(&fd->mu);
940 UNREF_BY(fd, 2, reason); /* Drop the reference */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700941 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700942}
943
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700944static grpc_error *fd_shutdown_error(bool shutdown) {
945 if (!shutdown) {
946 return GRPC_ERROR_NONE;
947 } else {
948 return GRPC_ERROR_CREATE("FD shutdown");
949 }
950}
951
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700952static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
953 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700954 if (fd->shutdown) {
955 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
956 NULL);
957 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700958 /* not ready ==> switch to a waiting state by setting the closure */
959 *st = closure;
960 } else if (*st == CLOSURE_READY) {
961 /* already ready ==> queue the closure to run immediately */
962 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700963 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
964 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700965 } else {
966 /* upcallptr was set to a different closure. This is an error! */
967 gpr_log(GPR_ERROR,
968 "User called a notify_on function with a previous callback still "
969 "pending");
970 abort();
971 }
972}
973
974/* returns 1 if state becomes not ready */
975static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
976 grpc_closure **st) {
977 if (*st == CLOSURE_READY) {
978 /* duplicate ready ==> ignore */
979 return 0;
980 } else if (*st == CLOSURE_NOT_READY) {
981 /* not ready, and not waiting ==> flag ready */
982 *st = CLOSURE_READY;
983 return 0;
984 } else {
985 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700986 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700987 *st = CLOSURE_NOT_READY;
988 return 1;
989 }
990}
991
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700992static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
993 grpc_fd *fd) {
994 grpc_pollset *notifier = NULL;
995
996 gpr_mu_lock(&fd->mu);
997 notifier = fd->read_notifier_pollset;
998 gpr_mu_unlock(&fd->mu);
999
1000 return notifier;
1001}
1002
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001003static bool fd_is_shutdown(grpc_fd *fd) {
1004 gpr_mu_lock(&fd->mu);
1005 const bool r = fd->shutdown;
1006 gpr_mu_unlock(&fd->mu);
1007 return r;
1008}
1009
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001010/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001011static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1012 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001013 /* Do the actual shutdown only once */
1014 if (!fd->shutdown) {
1015 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001016
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001017 shutdown(fd->fd, SHUT_RDWR);
1018 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1019 at this point, the closures would be called with 'success = false' */
1020 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1021 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1022 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001023 gpr_mu_unlock(&fd->mu);
1024}
1025
1026static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1027 grpc_closure *closure) {
1028 gpr_mu_lock(&fd->mu);
1029 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1030 gpr_mu_unlock(&fd->mu);
1031}
1032
1033static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1034 grpc_closure *closure) {
1035 gpr_mu_lock(&fd->mu);
1036 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1037 gpr_mu_unlock(&fd->mu);
1038}
1039
1040/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001041 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001042 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001043GPR_TLS_DECL(g_current_thread_pollset);
1044GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001045static __thread bool g_initialized_sigmask;
1046static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001047
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001048static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001049#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001050 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001051#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001052}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001053
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001054static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001055
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001056/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001057static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001058 gpr_tls_init(&g_current_thread_pollset);
1059 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001060 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001061 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001062}
1063
1064static void pollset_global_shutdown(void) {
1065 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001066 gpr_tls_destroy(&g_current_thread_pollset);
1067 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001068}
1069
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001070static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1071 grpc_error *err = GRPC_ERROR_NONE;
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001072
1073 /* Kick the worker only if it was not already kicked */
1074 if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
1075 GRPC_POLLING_TRACE(
1076 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
1077 (void *)worker, worker->pt_id);
1078 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1079 if (err_num != 0) {
1080 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1081 }
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001082 }
1083 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001084}
1085
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001086/* Return 1 if the pollset has active threads in pollset_work (pollset must
1087 * be locked) */
1088static int pollset_has_workers(grpc_pollset *p) {
1089 return p->root_worker.next != &p->root_worker;
1090}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001091
1092static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1093 worker->prev->next = worker->next;
1094 worker->next->prev = worker->prev;
1095}
1096
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001097static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1098 if (pollset_has_workers(p)) {
1099 grpc_pollset_worker *w = p->root_worker.next;
1100 remove_worker(p, w);
1101 return w;
1102 } else {
1103 return NULL;
1104 }
1105}
1106
1107static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1108 worker->next = &p->root_worker;
1109 worker->prev = worker->next->prev;
1110 worker->prev->next = worker->next->prev = worker;
1111}
1112
1113static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1114 worker->prev = &p->root_worker;
1115 worker->next = worker->prev->next;
1116 worker->prev->next = worker->next->prev = worker;
1117}
1118
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001119/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001120static grpc_error *pollset_kick(grpc_pollset *p,
1121 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001122 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001123 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001124 const char *err_desc = "Kick Failure";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001125 grpc_pollset_worker *worker = specific_worker;
1126 if (worker != NULL) {
1127 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001128 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001129 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001130 for (worker = p->root_worker.next; worker != &p->root_worker;
1131 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001132 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001133 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001134 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001135 }
Craig Tillera218a062016-06-26 09:58:37 -07001136 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001137 } else {
1138 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001139 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001140 } else {
1141 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001142 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001143 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001144 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001145 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001146 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1147 /* Since worker == NULL, it means that we can kick "any" worker on this
1148 pollset 'p'. If 'p' happens to be the same pollset this thread is
1149 currently polling (i.e in pollset_work() function), then there is no need
1150 to kick any other worker since the current thread can just absorb the
1151 kick. This is the reason why we enter this case only when
1152 g_current_thread_pollset is != p */
1153
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001154 GPR_TIMER_MARK("kick_anonymous", 0);
1155 worker = pop_front_worker(p);
1156 if (worker != NULL) {
1157 GPR_TIMER_MARK("finally_kick", 0);
1158 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001159 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001160 } else {
1161 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001162 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001163 }
1164 }
1165
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001166 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001167 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1168 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001169}
1170
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001171static grpc_error *kick_poller(void) {
1172 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1173}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001174
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001175static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1176 gpr_mu_init(&pollset->mu);
1177 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001178
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001179 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001180 pollset->kicked_without_pollers = false;
1181
1182 pollset->shutting_down = false;
1183 pollset->finish_shutdown_called = false;
1184 pollset->shutdown_done = NULL;
1185
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001186 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001187}
1188
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001189/* Convert a timespec to milliseconds:
1190 - Very small or negative poll times are clamped to zero to do a non-blocking
1191 poll (which becomes spin polling)
1192 - Other small values are rounded up to one millisecond
1193 - Longer than a millisecond polls are rounded up to the next nearest
1194 millisecond to avoid spinning
1195 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001196static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1197 gpr_timespec now) {
1198 gpr_timespec timeout;
1199 static const int64_t max_spin_polling_us = 10;
1200 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1201 return -1;
1202 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001203
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001204 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1205 max_spin_polling_us,
1206 GPR_TIMESPAN))) <= 0) {
1207 return 0;
1208 }
1209 timeout = gpr_time_sub(deadline, now);
1210 return gpr_time_to_millis(gpr_time_add(
1211 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1212}
1213
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001214static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1215 grpc_pollset *notifier) {
1216 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001217 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001218 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1219 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001220 gpr_mu_unlock(&fd->mu);
1221}
1222
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001223static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001224 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1225 gpr_mu_lock(&fd->mu);
1226 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1227 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001228}
1229
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001230static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001231 if (ps->polling_island != NULL) {
1232 PI_UNREF(ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001233 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001234 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001235}
1236
1237static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1238 grpc_pollset *pollset) {
1239 /* The pollset cannot have any workers if we are at this stage */
1240 GPR_ASSERT(!pollset_has_workers(pollset));
1241
1242 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001243
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001244 /* Release the ref and set pollset->polling_island to NULL */
1245 pollset_release_polling_island(pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001246 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001247}
1248
1249/* pollset->mu lock must be held by the caller before calling this */
1250static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1251 grpc_closure *closure) {
1252 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1253 GPR_ASSERT(!pollset->shutting_down);
1254 pollset->shutting_down = true;
1255 pollset->shutdown_done = closure;
1256 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1257
1258 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1259 because it would release the underlying polling island. In such a case, we
1260 let the last worker call finish_shutdown_locked() from pollset_work() */
1261 if (!pollset_has_workers(pollset)) {
1262 GPR_ASSERT(!pollset->finish_shutdown_called);
1263 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1264 finish_shutdown_locked(exec_ctx, pollset);
1265 }
1266 GPR_TIMER_END("pollset_shutdown", 0);
1267}
1268
1269/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1270 * than destroying the mutexes, there is nothing special that needs to be done
1271 * here */
1272static void pollset_destroy(grpc_pollset *pollset) {
1273 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001274 gpr_mu_destroy(&pollset->mu);
1275}
1276
1277static void pollset_reset(grpc_pollset *pollset) {
1278 GPR_ASSERT(pollset->shutting_down);
1279 GPR_ASSERT(!pollset_has_workers(pollset));
1280 pollset->shutting_down = false;
1281 pollset->finish_shutdown_called = false;
1282 pollset->kicked_without_pollers = false;
1283 pollset->shutdown_done = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001284 pollset_release_polling_island(pollset, "ps_reset");
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001285}
1286
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001287#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001288/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1289static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001290 grpc_pollset *pollset,
1291 grpc_pollset_worker *worker, int timeout_ms,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001292 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001293 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001294 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001295 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001296 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001297 char *err_msg;
1298 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001299 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1300
1301 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001302 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001303
1304 Since epoll_fd is immutable, we can read it without obtaining the polling
1305 island lock. There is however a possibility that the polling island (from
1306 which we got the epoll_fd) got merged with another island while we are
1307 in this function. This is still okay because in such a case, we will wakeup
1308 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001309 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001310
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001311 if (pollset->polling_island == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001312 pollset->polling_island = polling_island_create(NULL, error);
1313 if (pollset->polling_island == NULL) {
1314 GPR_TIMER_END("pollset_work_and_unlock", 0);
1315 return; /* Fatal error. We cannot continue */
1316 }
1317
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001318 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001319 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1320 (void *)pollset, (void *)pollset->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001321 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001322
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001323 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001324 epoll_fd = pi->epoll_fd;
1325
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001326 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001327 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001328 if (pollset->polling_island != pi) {
1329 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1330 polling island to be deleted */
1331 PI_ADD_REF(pi, "ps");
1332 PI_UNREF(pollset->polling_island, "ps");
1333 pollset->polling_island = pi;
1334 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001335
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001336 /* Add an extra ref so that the island does not get destroyed (which means
1337 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1338 epoll_fd */
1339 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001340 gpr_mu_unlock(&pollset->mu);
1341
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001342 do {
1343 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1344 sig_mask);
1345 if (ep_rv < 0) {
1346 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001347 gpr_asprintf(&err_msg,
1348 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1349 epoll_fd, errno, strerror(errno));
1350 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001351 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001352 /* We were interrupted. Save an interation by doing a zero timeout
1353 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001354 GRPC_POLLING_TRACE(
1355 "pollset_work: pollset: %p, worker: %p received kick",
1356 (void *)pollset, (void *)worker);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001357 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001358 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001359 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001360
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001361#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001362 /* See the definition of g_poll_sync for more details */
1363 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001364#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001365
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001366 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001367 void *data_ptr = ep_ev[i].data.ptr;
1368 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001369 append_error(error,
1370 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1371 err_desc);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001372 } else if (data_ptr == &polling_island_wakeup_fd) {
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001373 GRPC_POLLING_TRACE(
1374 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1375 "%d) got merged",
1376 (void *)pollset, (void *)worker, epoll_fd);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001377 /* This means that our polling island is merged with a different
1378 island. We do not have to do anything here since the subsequent call
1379 to the function pollset_work_and_unlock() will pick up the correct
1380 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001381 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001382 grpc_fd *fd = data_ptr;
1383 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1384 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1385 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001386 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001387 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001388 }
1389 if (write_ev || cancel) {
1390 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001391 }
1392 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001393 }
1394 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001395
1396 GPR_ASSERT(pi != NULL);
1397
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001398 /* Before leaving, release the extra ref we added to the polling island. It
1399 is important to use "pi" here (i.e our old copy of pollset->polling_island
1400 that we got before releasing the polling island lock). This is because
1401 pollset->polling_island pointer might get udpated in other parts of the
1402 code when there is an island merge while we are doing epoll_wait() above */
1403 PI_UNREF(pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001404
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001405 GPR_TIMER_END("pollset_work_and_unlock", 0);
1406}
1407
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001408/* pollset->mu lock must be held by the caller before calling this.
1409 The function pollset_work() may temporarily release the lock (pollset->mu)
1410 during the course of its execution but it will always re-acquire the lock and
1411 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001412static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1413 grpc_pollset_worker **worker_hdl,
1414 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001415 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001416 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001417 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1418
1419 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001420
1421 grpc_pollset_worker worker;
1422 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001423 worker.pt_id = pthread_self();
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001424 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001425
1426 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001427
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001428 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1429 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001430
1431 if (pollset->kicked_without_pollers) {
1432 /* If the pollset was kicked without pollers, pretend that the current
1433 worker got the kick and skip polling. A kick indicates that there is some
1434 work that needs attention like an event on the completion queue or an
1435 alarm */
1436 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1437 pollset->kicked_without_pollers = 0;
1438 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001439 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001440 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1441 worker that there is some pending work that needs immediate attention
1442 (like an event on the completion queue, or a polling island merge that
1443 results in a new epoll-fd to wait on) and that the worker should not
1444 spend time waiting in epoll_pwait().
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001445
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001446 A worker can be kicked anytime from the point it is added to the pollset
1447 via push_front_worker() (or push_back_worker()) to the point it is
1448 removed via remove_worker().
1449 If the worker is kicked before/during it calls epoll_pwait(), it should
1450 immediately exit from epoll_wait(). If the worker is kicked after it
1451 returns from epoll_wait(), then nothing really needs to be done.
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001452
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001453 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001454 times *except* when it is in epoll_pwait(). This way, the worker never
1455 misses acting on a kick */
1456
Craig Tiller19196992016-06-27 18:45:56 -07001457 if (!g_initialized_sigmask) {
1458 sigemptyset(&new_mask);
1459 sigaddset(&new_mask, grpc_wakeup_signal);
1460 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1461 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1462 g_initialized_sigmask = true;
1463 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1464 This is the mask used at all times *except during
1465 epoll_wait()*"
1466 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
Craig Tiller510ff692016-06-27 20:31:49 -07001467 this is the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001468
Craig Tiller19196992016-06-27 18:45:56 -07001469 The new_mask is set on the worker before it is added to the pollset
Craig Tiller510ff692016-06-27 20:31:49 -07001470 (i.e before it can be kicked) */
Craig Tiller19196992016-06-27 18:45:56 -07001471 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001472
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001473 push_front_worker(pollset, &worker); /* Add worker to pollset */
1474
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001475 pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
1476 &g_orig_sigmask, &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001477 grpc_exec_ctx_flush(exec_ctx);
1478
1479 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla34217242016-06-29 00:19:07 -07001480
1481 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1482 longer going to use this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001483 remove_worker(pollset, &worker);
1484 }
1485
1486 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1487 false at this point) and the pollset is shutting down, we may have to
1488 finish the shutdown process by calling finish_shutdown_locked().
1489 See pollset_shutdown() for more details.
1490
1491 Note: Continuing to access pollset here is safe; it is the caller's
1492 responsibility to not destroy a pollset when it has outstanding calls to
1493 pollset_work() */
1494 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1495 !pollset->finish_shutdown_called) {
1496 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1497 finish_shutdown_locked(exec_ctx, pollset);
1498
1499 gpr_mu_unlock(&pollset->mu);
1500 grpc_exec_ctx_flush(exec_ctx);
1501 gpr_mu_lock(&pollset->mu);
1502 }
1503
1504 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001505
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001506 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1507 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001508
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001509 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001510
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001511 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1512 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001513}
1514
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001515static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1516 grpc_fd *fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001517 grpc_error *error = GRPC_ERROR_NONE;
1518
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001519 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001520 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001521
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001522 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001523
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001524 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1525 * equal, do nothing.
1526 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1527 * a new polling island (with a refcount of 2) and make the polling_island
1528 * fields in both fd and pollset to point to the new island
1529 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1530 * the NULL polling_island field to point to the non-NULL polling_island
1531 * field (ensure that the refcount on the polling island is incremented by
1532 * 1 to account for the newly added reference)
1533 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1534 * and different, merge both the polling islands and update the
1535 * polling_island fields in both fd and pollset to point to the merged
1536 * polling island.
1537 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001538 if (fd->polling_island == pollset->polling_island) {
1539 pi_new = fd->polling_island;
1540 if (pi_new == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001541 pi_new = polling_island_create(fd, &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001542
1543 GRPC_POLLING_TRACE(
Sree Kuchibhotla9de42ab2016-06-28 17:41:21 -07001544 "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, "
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001545 "pollset: %p)",
1546 (void *)pi_new, fd->fd, (void *)pollset);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001547 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001548 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001549 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001550 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001551 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001552
1553 GRPC_POLLING_TRACE(
1554 "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, "
1555 "pollset->pi: %p)",
1556 (void *)pi_new, fd->fd, (void *)pollset,
1557 (void *)pollset->polling_island);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001558 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001559 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001560 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001561
1562 GRPC_POLLING_TRACE(
1563 "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: "
1564 "%p, fd->pi: %p",
1565 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001566 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001567 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1568 &error);
Sree Kuchibhotla1e776682016-06-28 14:09:26 -07001569 GRPC_POLLING_TRACE(
1570 "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: "
1571 "%p, fd->pi: %p, pollset->pi: %p)",
1572 (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island,
1573 (void *)pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001574 }
1575
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001576 /* At this point, pi_new is the polling island that both fd->polling_island
1577 and pollset->polling_island must be pointing to */
1578
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001579 if (fd->polling_island != pi_new) {
1580 PI_ADD_REF(pi_new, "fd");
1581 if (fd->polling_island != NULL) {
1582 PI_UNREF(fd->polling_island, "fd");
1583 }
1584 fd->polling_island = pi_new;
1585 }
1586
1587 if (pollset->polling_island != pi_new) {
1588 PI_ADD_REF(pi_new, "ps");
1589 if (pollset->polling_island != NULL) {
1590 PI_UNREF(pollset->polling_island, "ps");
1591 }
1592 pollset->polling_island = pi_new;
1593 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001594
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001595 gpr_mu_unlock(&fd->pi_mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001596 gpr_mu_unlock(&pollset->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001597}
1598
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001599/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001600 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001601 */
1602
1603static grpc_pollset_set *pollset_set_create(void) {
1604 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1605 memset(pollset_set, 0, sizeof(*pollset_set));
1606 gpr_mu_init(&pollset_set->mu);
1607 return pollset_set;
1608}
1609
1610static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1611 size_t i;
1612 gpr_mu_destroy(&pollset_set->mu);
1613 for (i = 0; i < pollset_set->fd_count; i++) {
1614 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1615 }
1616 gpr_free(pollset_set->pollsets);
1617 gpr_free(pollset_set->pollset_sets);
1618 gpr_free(pollset_set->fds);
1619 gpr_free(pollset_set);
1620}
1621
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001622static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1623 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1624 size_t i;
1625 gpr_mu_lock(&pollset_set->mu);
1626 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1627 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1628 pollset_set->fds = gpr_realloc(
1629 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1630 }
1631 GRPC_FD_REF(fd, "pollset_set");
1632 pollset_set->fds[pollset_set->fd_count++] = fd;
1633 for (i = 0; i < pollset_set->pollset_count; i++) {
1634 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1635 }
1636 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1637 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1638 }
1639 gpr_mu_unlock(&pollset_set->mu);
1640}
1641
1642static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1643 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1644 size_t i;
1645 gpr_mu_lock(&pollset_set->mu);
1646 for (i = 0; i < pollset_set->fd_count; i++) {
1647 if (pollset_set->fds[i] == fd) {
1648 pollset_set->fd_count--;
1649 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1650 pollset_set->fds[pollset_set->fd_count]);
1651 GRPC_FD_UNREF(fd, "pollset_set");
1652 break;
1653 }
1654 }
1655 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1656 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1657 }
1658 gpr_mu_unlock(&pollset_set->mu);
1659}
1660
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001661static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1662 grpc_pollset_set *pollset_set,
1663 grpc_pollset *pollset) {
1664 size_t i, j;
1665 gpr_mu_lock(&pollset_set->mu);
1666 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1667 pollset_set->pollset_capacity =
1668 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1669 pollset_set->pollsets =
1670 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1671 sizeof(*pollset_set->pollsets));
1672 }
1673 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1674 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1675 if (fd_is_orphaned(pollset_set->fds[i])) {
1676 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1677 } else {
1678 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1679 pollset_set->fds[j++] = pollset_set->fds[i];
1680 }
1681 }
1682 pollset_set->fd_count = j;
1683 gpr_mu_unlock(&pollset_set->mu);
1684}
1685
1686static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1687 grpc_pollset_set *pollset_set,
1688 grpc_pollset *pollset) {
1689 size_t i;
1690 gpr_mu_lock(&pollset_set->mu);
1691 for (i = 0; i < pollset_set->pollset_count; i++) {
1692 if (pollset_set->pollsets[i] == pollset) {
1693 pollset_set->pollset_count--;
1694 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1695 pollset_set->pollsets[pollset_set->pollset_count]);
1696 break;
1697 }
1698 }
1699 gpr_mu_unlock(&pollset_set->mu);
1700}
1701
1702static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1703 grpc_pollset_set *bag,
1704 grpc_pollset_set *item) {
1705 size_t i, j;
1706 gpr_mu_lock(&bag->mu);
1707 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1708 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1709 bag->pollset_sets =
1710 gpr_realloc(bag->pollset_sets,
1711 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1712 }
1713 bag->pollset_sets[bag->pollset_set_count++] = item;
1714 for (i = 0, j = 0; i < bag->fd_count; i++) {
1715 if (fd_is_orphaned(bag->fds[i])) {
1716 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1717 } else {
1718 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1719 bag->fds[j++] = bag->fds[i];
1720 }
1721 }
1722 bag->fd_count = j;
1723 gpr_mu_unlock(&bag->mu);
1724}
1725
1726static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1727 grpc_pollset_set *bag,
1728 grpc_pollset_set *item) {
1729 size_t i;
1730 gpr_mu_lock(&bag->mu);
1731 for (i = 0; i < bag->pollset_set_count; i++) {
1732 if (bag->pollset_sets[i] == item) {
1733 bag->pollset_set_count--;
1734 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1735 bag->pollset_sets[bag->pollset_set_count]);
1736 break;
1737 }
1738 }
1739 gpr_mu_unlock(&bag->mu);
1740}
1741
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001742/* Test helper functions
1743 * */
1744void *grpc_fd_get_polling_island(grpc_fd *fd) {
1745 polling_island *pi;
1746
1747 gpr_mu_lock(&fd->pi_mu);
1748 pi = fd->polling_island;
1749 gpr_mu_unlock(&fd->pi_mu);
1750
1751 return pi;
1752}
1753
1754void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1755 polling_island *pi;
1756
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001757 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001758 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001759 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001760
1761 return pi;
1762}
1763
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001764bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001765 polling_island *p1 = p;
1766 polling_island *p2 = q;
1767
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001768 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1769 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001770 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001771 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001772
1773 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001774}
1775
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001776/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001777 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001778 */
1779
1780static void shutdown_engine(void) {
1781 fd_global_shutdown();
1782 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001783 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001784}
1785
1786static const grpc_event_engine_vtable vtable = {
1787 .pollset_size = sizeof(grpc_pollset),
1788
1789 .fd_create = fd_create,
1790 .fd_wrapped_fd = fd_wrapped_fd,
1791 .fd_orphan = fd_orphan,
1792 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001793 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001794 .fd_notify_on_read = fd_notify_on_read,
1795 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001796 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001797
1798 .pollset_init = pollset_init,
1799 .pollset_shutdown = pollset_shutdown,
1800 .pollset_reset = pollset_reset,
1801 .pollset_destroy = pollset_destroy,
1802 .pollset_work = pollset_work,
1803 .pollset_kick = pollset_kick,
1804 .pollset_add_fd = pollset_add_fd,
1805
1806 .pollset_set_create = pollset_set_create,
1807 .pollset_set_destroy = pollset_set_destroy,
1808 .pollset_set_add_pollset = pollset_set_add_pollset,
1809 .pollset_set_del_pollset = pollset_set_del_pollset,
1810 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1811 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1812 .pollset_set_add_fd = pollset_set_add_fd,
1813 .pollset_set_del_fd = pollset_set_del_fd,
1814
1815 .kick_poller = kick_poller,
1816
1817 .shutdown_engine = shutdown_engine,
1818};
1819
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001820/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1821 * Create a dummy epoll_fd to make sure epoll support is available */
1822static bool is_epoll_available() {
1823 int fd = epoll_create1(EPOLL_CLOEXEC);
1824 if (fd < 0) {
1825 gpr_log(
1826 GPR_ERROR,
1827 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1828 fd);
1829 return false;
1830 }
1831 close(fd);
1832 return true;
1833}
1834
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001835const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001836 /* If use of signals is disabled, we cannot use epoll engine*/
1837 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1838 return NULL;
1839 }
1840
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001841 if (!is_epoll_available()) {
1842 return NULL;
1843 }
1844
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001845 if (!is_grpc_wakeup_signal_initialized) {
1846 grpc_use_signal(SIGRTMIN + 2);
1847 }
1848
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001849 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001850
1851 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1852 return NULL;
1853 }
1854
1855 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1856 polling_island_global_init())) {
1857 return NULL;
1858 }
1859
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001860 return &vtable;
1861}
1862
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001863#else /* defined(GPR_LINUX_EPOLL) */
1864#if defined(GPR_POSIX_SOCKET)
1865#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001866/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1867 * NULL */
1868const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001869#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001870
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001871void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001872#endif /* !defined(GPR_LINUX_EPOLL) */