blob: 88cbc5863493c18c49b36d3c23c417778fa5970b [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
45#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/wakeup_fd_posix.h"
60#include "src/core/lib/profiling/timers.h"
61#include "src/core/lib/support/block_annotate.h"
62
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070063static int grpc_wakeup_signal = -1;
64static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070065
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070066/* Implements the function defined in grpc_posix.h. This function might be
67 * called before even calling grpc_init() to set either a different signal to
68 * use. If signum == -1, then the use of signals is disabled */
69void grpc_use_signal(int signum) {
70 grpc_wakeup_signal = signum;
71 is_grpc_wakeup_signal_initialized = true;
72
73 if (grpc_wakeup_signal < 0) {
74 gpr_log(GPR_INFO,
75 "Use of signals is disabled. Epoll engine will not be used");
76 } else {
77 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
78 grpc_wakeup_signal);
79 }
80}
81
82struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070083
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070084/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070085 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070086 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070087struct grpc_fd {
88 int fd;
89 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070090 bit 0 : 1=Active / 0=Orphaned
91 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070092 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093 gpr_atm refst;
94
95 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070096
97 /* Indicates that the fd is shutdown and that any pending read/write closures
98 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070099 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700100
101 /* The fd is either closed or we relinquished control of it. In either cases,
102 this indicates that the 'fd' on this structure is no longer valid */
103 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700104
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700105 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700106 grpc_closure *read_closure;
107 grpc_closure *write_closure;
108
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700109 /* The polling island to which this fd belongs to and the mutex protecting the
110 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700112 struct polling_island *polling_island;
113
114 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700115 grpc_closure *on_done_closure;
116
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700117 /* The pollset that last noticed that the fd is readable */
118 grpc_pollset *read_notifier_pollset;
119
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700120 grpc_iomgr_object iomgr_object;
121};
122
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700124#ifdef GRPC_FD_REF_COUNT_DEBUG
125static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
126static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
127 int line);
128#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
129#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
130#else
131static void fd_ref(grpc_fd *fd);
132static void fd_unref(grpc_fd *fd);
133#define GRPC_FD_REF(fd, reason) fd_ref(fd)
134#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
135#endif
136
137static void fd_global_init(void);
138static void fd_global_shutdown(void);
139
140#define CLOSURE_NOT_READY ((grpc_closure *)0)
141#define CLOSURE_READY ((grpc_closure *)1)
142
143/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700144 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700145 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700146
147// #define GRPC_PI_REF_COUNT_DEBUG
148#ifdef GRPC_PI_REF_COUNT_DEBUG
149
150#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), 1, (r), __FILE__, __LINE__)
151#define PI_UNREF(p, r) pi_unref_dbg((p), 1, (r), __FILE__, __LINE__)
152
153#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
154
155#define PI_ADD_REF(p, r) pi_add_ref((p), 1)
156#define PI_UNREF(p, r) pi_unref((p), 1)
157
158#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
159
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700160typedef struct polling_island {
161 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700162 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
163 the refcount.
164 Once the ref count becomes zero, this structure is destroyed which means
165 we should ensure that there is never a scenario where a PI_ADD_REF() is
166 racing with a PI_UNREF() that just made the ref_count zero. */
167 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700168
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700169 /* Pointer to the polling_island this merged into.
170 * merged_to value is only set once in polling_island's lifetime (and that too
171 * only if the island is merged with another island). Because of this, we can
172 * use gpr_atm type here so that we can do atomic access on this and reduce
173 * lock contention on 'mu' mutex.
174 *
175 * Note that if this field is not NULL (i.e not 0), all the remaining fields
176 * (except mu and ref_count) are invalid and must be ignored. */
177 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700178
179 /* The fd of the underlying epoll set */
180 int epoll_fd;
181
182 /* The file descriptors in the epoll set */
183 size_t fd_cnt;
184 size_t fd_capacity;
185 grpc_fd **fds;
186
187 /* Polling islands that are no longer needed are kept in a freelist so that
188 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700189 free list */
190 struct polling_island *next_free;
191} polling_island;
192
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700193/*******************************************************************************
194 * Pollset Declarations
195 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700196struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700197 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700198 struct grpc_pollset_worker *next;
199 struct grpc_pollset_worker *prev;
200};
201
202struct grpc_pollset {
203 gpr_mu mu;
204 grpc_pollset_worker root_worker;
205 bool kicked_without_pollers;
206
207 bool shutting_down; /* Is the pollset shutting down ? */
208 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
209 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
210
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700211 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700212 struct polling_island *polling_island;
213};
214
215/*******************************************************************************
216 * Pollset-set Declarations
217 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700218/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
219 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
220 * the current pollset_set would result in polling island merges. This would
221 * remove the need to maintain fd_count here. This will also significantly
222 * simplify the grpc_fd structure since we would no longer need to explicitly
223 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700224struct grpc_pollset_set {
225 gpr_mu mu;
226
227 size_t pollset_count;
228 size_t pollset_capacity;
229 grpc_pollset **pollsets;
230
231 size_t pollset_set_count;
232 size_t pollset_set_capacity;
233 struct grpc_pollset_set **pollset_sets;
234
235 size_t fd_count;
236 size_t fd_capacity;
237 grpc_fd **fds;
238};
239
240/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700241 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700242 */
243
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700244/* The wakeup fd that is used to wake up all threads in a Polling island. This
245 is useful in the polling island merge operation where we need to wakeup all
246 the threads currently polling the smaller polling island (so that they can
247 start polling the new/merged polling island)
248
249 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
250 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
251static grpc_wakeup_fd polling_island_wakeup_fd;
252
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700253/* Polling island freelist */
254static gpr_mu g_pi_freelist_mu;
255static polling_island *g_pi_freelist = NULL;
256
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700257static void polling_island_delete(); /* Forward declaration */
258
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700259#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700260/* Currently TSAN may incorrectly flag data races between epoll_ctl and
261 epoll_wait for any grpc_fd structs that are added to the epoll set via
262 epoll_ctl and are returned (within a very short window) via epoll_wait().
263
264 To work-around this race, we establish a happens-before relation between
265 the code just-before epoll_ctl() and the code after epoll_wait() by using
266 this atomic */
267gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700268#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700269
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700270#ifdef GRPC_PI_REF_COUNT_DEBUG
271long pi_add_ref(polling_island *pi, int ref_cnt);
272long pi_unref(polling_island *pi, int ref_cnt);
273
274void pi_add_ref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
275 int line) {
276 long old_cnt = pi_add_ref(pi, ref_cnt);
277 gpr_log(GPR_DEBUG, "Add ref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
278 (void *)pi, old_cnt, (old_cnt + ref_cnt), reason, file, line);
279}
280
281void pi_unref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
282 int line) {
283 long old_cnt = pi_unref(pi, ref_cnt);
284 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
285 (void *)pi, old_cnt, (old_cnt - ref_cnt), reason, file, line);
286}
287#endif
288
289long pi_add_ref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700290 return gpr_atm_full_fetch_add(&pi->ref_count, ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700291}
292
293long pi_unref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700294 long old_cnt = gpr_atm_full_fetch_add(&pi->ref_count, -ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700295
296 /* If ref count went to zero, delete the polling island. Note that this need
297 not be done under a lock. Once the ref count goes to zero, we are
298 guaranteed that no one else holds a reference to the polling island (and
299 that there is no racing pi_add_ref() call either.
300
301 Also, if we are deleting the polling island and the merged_to field is
302 non-empty, we should remove a ref to the merged_to polling island
303 */
304 if (old_cnt == ref_cnt) {
305 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
306 polling_island_delete(pi);
307 if (next != NULL) {
308 PI_UNREF(next, "pi_delete"); /* Recursive call */
309 }
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700310 } else {
311 GPR_ASSERT(old_cnt > ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700312 }
313
314 return old_cnt;
315}
316
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700317/* The caller is expected to hold pi->mu lock before calling this function */
318static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700319 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700320 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700321 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700322 struct epoll_event ev;
323
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700324#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700325 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla76a07952016-06-22 15:09:06 -0700326 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm) 0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700327#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700328
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700329 for (i = 0; i < fd_count; i++) {
330 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
331 ev.data.ptr = fds[i];
332 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700333
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700334 if (err < 0) {
335 if (errno != EEXIST) {
336 /* TODO: sreek - We need a better way to bubble up this error instead of
337 just logging a message */
338 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
339 fds[i]->fd, strerror(errno));
340 }
341
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700342 continue;
343 }
344
345 if (pi->fd_cnt == pi->fd_capacity) {
346 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
347 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
348 }
349
350 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700351 if (add_fd_refs) {
352 GRPC_FD_REF(fds[i], "polling_island");
353 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700354 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700355}
356
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700357/* The caller is expected to hold pi->mu before calling this */
358static void polling_island_add_wakeup_fd_locked(polling_island *pi,
359 grpc_wakeup_fd *wakeup_fd) {
360 struct epoll_event ev;
361 int err;
362
363 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
364 ev.data.ptr = wakeup_fd;
365 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
366 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
367 if (err < 0) {
368 gpr_log(GPR_ERROR,
369 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
370 ". Error: %s",
371 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
372 strerror(errno));
373 }
374}
375
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700376/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700377static void polling_island_remove_all_fds_locked(polling_island *pi,
378 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700379 int err;
380 size_t i;
381
382 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700383 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700384 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700385 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700386 * just logging a message */
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -0700387 gpr_log(GPR_ERROR,
388 "epoll_ctl deleting fds[%zu]: %d failed with error: %s", i,
389 pi->fds[i]->fd, strerror(errno));
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700390 }
391
392 if (remove_fd_refs) {
393 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700394 }
395 }
396
397 pi->fd_cnt = 0;
398}
399
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700400/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700401static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700402 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700403 int err;
404 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700405
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700406 /* If fd is already closed, then it would have been automatically been removed
407 from the epoll set */
408 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700409 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
410 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700411 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700412 fd->fd, strerror(errno));
413 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700414 }
415
416 for (i = 0; i < pi->fd_cnt; i++) {
417 if (pi->fds[i] == fd) {
418 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700419 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700420 break;
421 }
422 }
423}
424
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700425static polling_island *polling_island_create(grpc_fd *initial_fd) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700426 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700427
428 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700429 gpr_mu_lock(&g_pi_freelist_mu);
430 if (g_pi_freelist != NULL) {
431 pi = g_pi_freelist;
432 g_pi_freelist = g_pi_freelist->next_free;
433 pi->next_free = NULL;
434 }
435 gpr_mu_unlock(&g_pi_freelist_mu);
436
437 /* Create new polling island if we could not get one from the free list */
438 if (pi == NULL) {
439 pi = gpr_malloc(sizeof(*pi));
440 gpr_mu_init(&pi->mu);
441 pi->fd_cnt = 0;
442 pi->fd_capacity = 0;
443 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700444 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700445
Sree Kuchibhotla76a07952016-06-22 15:09:06 -0700446 gpr_atm_rel_store(&pi->ref_count, (gpr_atm) 0);
447 gpr_atm_rel_store(&pi->merged_to, (gpr_atm) NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700448
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700449 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700450
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700451 if (pi->epoll_fd < 0) {
452 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
453 strerror(errno));
454 }
455 GPR_ASSERT(pi->epoll_fd >= 0);
456
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700457 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700458
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700459 pi->next_free = NULL;
460
461 if (initial_fd != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700462 /* Lock the polling island here just in case we got this structure from the
463 freelist and the polling island lock was not released yet (by the code
464 that adds the polling island to the freelist) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700465 gpr_mu_lock(&pi->mu);
466 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
467 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700468 }
469
470 return pi;
471}
472
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700473static void polling_island_delete(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700474 GPR_ASSERT(pi->fd_cnt == 0);
475
Sree Kuchibhotla76a07952016-06-22 15:09:06 -0700476 gpr_atm_rel_store(&pi->merged_to, (gpr_atm) NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700477
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700478 close(pi->epoll_fd);
479 pi->epoll_fd = -1;
480
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700481 gpr_mu_lock(&g_pi_freelist_mu);
482 pi->next_free = g_pi_freelist;
483 g_pi_freelist = pi;
484 gpr_mu_unlock(&g_pi_freelist_mu);
485}
486
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700487/* Attempts to gets the last polling island in the linked list (liked by the
488 * 'merged_to' field). Since this does not lock the polling island, there are no
489 * guarantees that the island returned is the last island */
490static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
491 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
492 while (next != NULL) {
493 pi = next;
494 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
495 }
496
497 return pi;
498}
499
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700500/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700501 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700502 returned polling island's mu.
503 Usage: To lock/unlock polling island "pi", do the following:
504 polling_island *pi_latest = polling_island_lock(pi);
505 ...
506 ... critical section ..
507 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700508 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
509static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700510 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700511
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700512 while (true) {
513 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
514 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700515 /* Looks like 'pi' is the last node in the linked list but unless we check
516 this by holding the pi->mu lock, we cannot be sure (i.e without the
517 pi->mu lock, we don't prevent island merges).
518 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700519 gpr_mu_lock(&pi->mu);
520 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
521 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700522 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700523 break;
524 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700525
526 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
527 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700528 gpr_mu_unlock(&pi->mu);
529 }
530
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700531 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700532 }
533
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700534 return pi;
535}
536
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700537/* Gets the lock on the *latest* polling islands pointed by *p and *q.
538 This function is needed because calling the following block of code to obtain
539 locks on polling islands (*p and *q) is prone to deadlocks.
540 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700541 polling_island_lock(*p, true);
542 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700543 }
544
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700545 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700546 polling_island *p1;
547 polling_island *p2;
548 ..
549 polling_island_lock_pair(&p1, &p2);
550 ..
551 .. Critical section with both p1 and p2 locked
552 ..
553 // Release locks
554 // **IMPORTANT**: Make sure you check p1 == p2 AFTER the function
555 // polling_island_lock_pair() was called and if so, release the lock only
556 // once. Note: Even if p1 != p2 beforec calling polling_island_lock_pair(),
557 // they might be after the function returns:
558 if (p1 == p2) {
559 gpr_mu_unlock(&p1->mu)
560 } else {
561 gpr_mu_unlock(&p1->mu);
562 gpr_mu_unlock(&p2->mu);
563 }
564
565*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700566static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700567 polling_island *pi_1 = *p;
568 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700569 polling_island *next_1 = NULL;
570 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700571
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700572 /* The algorithm is simple:
573 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
574 keep updating pi_1 and pi_2)
575 - Then obtain locks on the islands by following a lock order rule of
576 locking polling_island with lower address first
577 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
578 pointing to the same island. If that is the case, we can just call
579 polling_island_lock()
580 - After obtaining both the locks, double check that the polling islands
581 are still the last polling islands in their respective linked lists
582 (this is because there might have been polling island merges before
583 we got the lock)
584 - If the polling islands are the last islands, we are done. If not,
585 release the locks and continue the process from the first step */
586 while (true) {
587 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
588 while (next_1 != NULL) {
589 pi_1 = next_1;
590 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700591 }
592
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700593 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
594 while (next_2 != NULL) {
595 pi_2 = next_2;
596 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
597 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700598
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700599 if (pi_1 == pi_2) {
600 pi_1 = pi_2 = polling_island_lock(pi_1);
601 break;
602 }
603
604 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700605 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700606 gpr_mu_lock(&pi_2->mu);
607 } else {
608 gpr_mu_lock(&pi_2->mu);
609 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700610 }
611
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700612 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
613 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
614 if (next_1 == NULL && next_2 == NULL) {
615 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700616 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700617
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700618 gpr_mu_unlock(&pi_1->mu);
619 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700620 }
621
622 *p = pi_1;
623 *q = pi_2;
624}
625
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700626static polling_island *polling_island_merge(polling_island *p,
627 polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700628 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700629 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700630
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700631 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700632 /* Nothing needs to be done here */
633 gpr_mu_unlock(&p->mu);
634 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700635 }
636
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700637 /* Make sure that p points to the polling island with fewer fds than q */
638 if (p->fd_cnt > q->fd_cnt) {
639 GPR_SWAP(polling_island *, p, q);
640 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700641
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700642 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700643 Note that the refcounts on the fds being moved will not change here. This
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700644 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700645 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
646 polling_island_remove_all_fds_locked(p, false);
647
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700648 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
649 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
650
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700651 /* Add the 'merged_to' link from p --> q */
Sree Kuchibhotla76a07952016-06-22 15:09:06 -0700652 gpr_atm_rel_store(&p->merged_to, (gpr_atm) q);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700653 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla58e58962016-06-13 00:52:56 -0700654
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700655 gpr_mu_unlock(&p->mu);
656 gpr_mu_unlock(&q->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700657
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700658 /* Return the merged polling island */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700659 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700660}
661
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700662static grpc_error *polling_island_global_init() {
663 grpc_error *error = GRPC_ERROR_NONE;
664
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700665 gpr_mu_init(&g_pi_freelist_mu);
666 g_pi_freelist = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700667
668 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
669 if (error == GRPC_ERROR_NONE) {
670 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
671 }
672
673 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700674}
675
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700676static void polling_island_global_shutdown() {
677 polling_island *next;
678 gpr_mu_lock(&g_pi_freelist_mu);
679 gpr_mu_unlock(&g_pi_freelist_mu);
680 while (g_pi_freelist != NULL) {
681 next = g_pi_freelist->next_free;
682 gpr_mu_destroy(&g_pi_freelist->mu);
683 gpr_free(g_pi_freelist->fds);
684 gpr_free(g_pi_freelist);
685 g_pi_freelist = next;
686 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700687 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700688
689 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700690}
691
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700692/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700693 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700694 */
695
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700696/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700697 * but instead so that implementations with multiple threads in (for example)
698 * epoll_wait deal with the race between pollset removal and incoming poll
699 * notifications.
700 *
701 * The problem is that the poller ultimately holds a reference to this
702 * object, so it is very difficult to know when is safe to free it, at least
703 * without some expensive synchronization.
704 *
705 * If we keep the object freelisted, in the worst case losing this race just
706 * becomes a spurious read notification on a reused fd.
707 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700708
709/* The alarm system needs to be able to wakeup 'some poller' sometimes
710 * (specifically when a new alarm needs to be triggered earlier than the next
711 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
712 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700713
714/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
715 * sure to wake up one polling thread (which can wake up other threads if
716 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700717grpc_wakeup_fd grpc_global_wakeup_fd;
718
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700719static grpc_fd *fd_freelist = NULL;
720static gpr_mu fd_freelist_mu;
721
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700722#ifdef GRPC_FD_REF_COUNT_DEBUG
723#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
724#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
725static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
726 int line) {
727 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
728 gpr_atm_no_barrier_load(&fd->refst),
729 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
730#else
731#define REF_BY(fd, n, reason) ref_by(fd, n)
732#define UNREF_BY(fd, n, reason) unref_by(fd, n)
733static void ref_by(grpc_fd *fd, int n) {
734#endif
735 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
736}
737
738#ifdef GRPC_FD_REF_COUNT_DEBUG
739static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
740 int line) {
741 gpr_atm old;
742 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
743 gpr_atm_no_barrier_load(&fd->refst),
744 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
745#else
746static void unref_by(grpc_fd *fd, int n) {
747 gpr_atm old;
748#endif
749 old = gpr_atm_full_fetch_add(&fd->refst, -n);
750 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700751 /* Add the fd to the freelist */
752 gpr_mu_lock(&fd_freelist_mu);
753 fd->freelist_next = fd_freelist;
754 fd_freelist = fd;
755 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700756
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700757 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700758 } else {
759 GPR_ASSERT(old > n);
760 }
761}
762
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700763/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700764#ifdef GRPC_FD_REF_COUNT_DEBUG
765static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
766 int line) {
767 ref_by(fd, 2, reason, file, line);
768}
769
770static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
771 int line) {
772 unref_by(fd, 2, reason, file, line);
773}
774#else
775static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700776static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
777#endif
778
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700779static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
780
781static void fd_global_shutdown(void) {
782 gpr_mu_lock(&fd_freelist_mu);
783 gpr_mu_unlock(&fd_freelist_mu);
784 while (fd_freelist != NULL) {
785 grpc_fd *fd = fd_freelist;
786 fd_freelist = fd_freelist->freelist_next;
787 gpr_mu_destroy(&fd->mu);
788 gpr_free(fd);
789 }
790 gpr_mu_destroy(&fd_freelist_mu);
791}
792
793static grpc_fd *fd_create(int fd, const char *name) {
794 grpc_fd *new_fd = NULL;
795
796 gpr_mu_lock(&fd_freelist_mu);
797 if (fd_freelist != NULL) {
798 new_fd = fd_freelist;
799 fd_freelist = fd_freelist->freelist_next;
800 }
801 gpr_mu_unlock(&fd_freelist_mu);
802
803 if (new_fd == NULL) {
804 new_fd = gpr_malloc(sizeof(grpc_fd));
805 gpr_mu_init(&new_fd->mu);
806 gpr_mu_init(&new_fd->pi_mu);
807 }
808
809 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
810 newly created fd (or an fd we got from the freelist), no one else would be
811 holding a lock to it anyway. */
812 gpr_mu_lock(&new_fd->mu);
813
Sree Kuchibhotla76a07952016-06-22 15:09:06 -0700814 gpr_atm_rel_store(&new_fd->refst, (gpr_atm) 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700815 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700816 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700817 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700818 new_fd->read_closure = CLOSURE_NOT_READY;
819 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700820 new_fd->polling_island = NULL;
821 new_fd->freelist_next = NULL;
822 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700823 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700824
825 gpr_mu_unlock(&new_fd->mu);
826
827 char *fd_name;
828 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
829 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
830 gpr_free(fd_name);
831#ifdef GRPC_FD_REF_COUNT_DEBUG
832 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
833#endif
834 return new_fd;
835}
836
837static bool fd_is_orphaned(grpc_fd *fd) {
838 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
839}
840
841static int fd_wrapped_fd(grpc_fd *fd) {
842 int ret_fd = -1;
843 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700844 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700845 ret_fd = fd->fd;
846 }
847 gpr_mu_unlock(&fd->mu);
848
849 return ret_fd;
850}
851
852static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
853 grpc_closure *on_done, int *release_fd,
854 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700855 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700856 gpr_mu_lock(&fd->mu);
857 fd->on_done_closure = on_done;
858
859 /* If release_fd is not NULL, we should be relinquishing control of the file
860 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700861 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700862 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700863 } else {
864 close(fd->fd);
865 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700866 }
867
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700868 fd->orphaned = true;
869
870 /* Remove the active status but keep referenced. We want this grpc_fd struct
871 to be alive (and not added to freelist) until the end of this function */
872 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700873
874 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700875 - Get a lock on the latest polling island (i.e the last island in the
876 linked list pointed by fd->polling_island). This is the island that
877 would actually contain the fd
878 - Remove the fd from the latest polling island
879 - Unlock the latest polling island
880 - Set fd->polling_island to NULL (but remove the ref on the polling island
881 before doing this.) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700882 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700883 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700884 polling_island *pi_latest = polling_island_lock(fd->polling_island);
885 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed);
886 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700887
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700888 PI_UNREF(fd->polling_island, "fd_orphan");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700889 fd->polling_island = NULL;
890 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700891 gpr_mu_unlock(&fd->pi_mu);
892
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700893 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700894
895 gpr_mu_unlock(&fd->mu);
896 UNREF_BY(fd, 2, reason); /* Drop the reference */
897}
898
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700899static grpc_error *fd_shutdown_error(bool shutdown) {
900 if (!shutdown) {
901 return GRPC_ERROR_NONE;
902 } else {
903 return GRPC_ERROR_CREATE("FD shutdown");
904 }
905}
906
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700907static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
908 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700909 if (fd->shutdown) {
910 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
911 NULL);
912 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700913 /* not ready ==> switch to a waiting state by setting the closure */
914 *st = closure;
915 } else if (*st == CLOSURE_READY) {
916 /* already ready ==> queue the closure to run immediately */
917 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700918 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
919 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700920 } else {
921 /* upcallptr was set to a different closure. This is an error! */
922 gpr_log(GPR_ERROR,
923 "User called a notify_on function with a previous callback still "
924 "pending");
925 abort();
926 }
927}
928
929/* returns 1 if state becomes not ready */
930static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
931 grpc_closure **st) {
932 if (*st == CLOSURE_READY) {
933 /* duplicate ready ==> ignore */
934 return 0;
935 } else if (*st == CLOSURE_NOT_READY) {
936 /* not ready, and not waiting ==> flag ready */
937 *st = CLOSURE_READY;
938 return 0;
939 } else {
940 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700941 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700942 *st = CLOSURE_NOT_READY;
943 return 1;
944 }
945}
946
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700947static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
948 grpc_fd *fd) {
949 grpc_pollset *notifier = NULL;
950
951 gpr_mu_lock(&fd->mu);
952 notifier = fd->read_notifier_pollset;
953 gpr_mu_unlock(&fd->mu);
954
955 return notifier;
956}
957
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -0700958static bool fd_is_shutdown(grpc_fd *fd) {
959 gpr_mu_lock(&fd->mu);
960 const bool r = fd->shutdown;
961 gpr_mu_unlock(&fd->mu);
962 return r;
963}
964
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700965/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700966static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
967 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700968 /* Do the actual shutdown only once */
969 if (!fd->shutdown) {
970 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700971
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700972 shutdown(fd->fd, SHUT_RDWR);
973 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
974 at this point, the closures would be called with 'success = false' */
975 set_ready_locked(exec_ctx, fd, &fd->read_closure);
976 set_ready_locked(exec_ctx, fd, &fd->write_closure);
977 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700978 gpr_mu_unlock(&fd->mu);
979}
980
981static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
982 grpc_closure *closure) {
983 gpr_mu_lock(&fd->mu);
984 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
985 gpr_mu_unlock(&fd->mu);
986}
987
988static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
989 grpc_closure *closure) {
990 gpr_mu_lock(&fd->mu);
991 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
992 gpr_mu_unlock(&fd->mu);
993}
994
995/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700996 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700997 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700998GPR_TLS_DECL(g_current_thread_pollset);
999GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001000
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001001static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001002#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001003 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001004#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001005}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001006
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001007static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001008
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001009/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001010static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001011 gpr_tls_init(&g_current_thread_pollset);
1012 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001013 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001014 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001015}
1016
1017static void pollset_global_shutdown(void) {
1018 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001019 gpr_tls_destroy(&g_current_thread_pollset);
1020 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001021}
1022
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001023static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1024 grpc_error *err = GRPC_ERROR_NONE;
1025 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1026 if (err_num != 0) {
1027 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1028 }
1029 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001030}
1031
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001032/* Return 1 if the pollset has active threads in pollset_work (pollset must
1033 * be locked) */
1034static int pollset_has_workers(grpc_pollset *p) {
1035 return p->root_worker.next != &p->root_worker;
1036}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001037
1038static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1039 worker->prev->next = worker->next;
1040 worker->next->prev = worker->prev;
1041}
1042
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001043static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1044 if (pollset_has_workers(p)) {
1045 grpc_pollset_worker *w = p->root_worker.next;
1046 remove_worker(p, w);
1047 return w;
1048 } else {
1049 return NULL;
1050 }
1051}
1052
1053static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1054 worker->next = &p->root_worker;
1055 worker->prev = worker->next->prev;
1056 worker->prev->next = worker->next->prev = worker;
1057}
1058
1059static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1060 worker->prev = &p->root_worker;
1061 worker->next = worker->prev->next;
1062 worker->prev->next = worker->next->prev = worker;
1063}
1064
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001065static void kick_append_error(grpc_error **composite, grpc_error *error) {
1066 if (error == GRPC_ERROR_NONE) return;
1067 if (*composite == GRPC_ERROR_NONE) {
1068 *composite = GRPC_ERROR_CREATE("Kick Failure");
1069 }
1070 *composite = grpc_error_add_child(*composite, error);
1071}
1072
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001073/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001074static grpc_error *pollset_kick(grpc_pollset *p,
1075 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001076 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001077 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001078
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001079 grpc_pollset_worker *worker = specific_worker;
1080 if (worker != NULL) {
1081 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001082 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001083 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001084 for (worker = p->root_worker.next; worker != &p->root_worker;
1085 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001086 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001087 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001088 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001089 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001090 } else {
1091 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001092 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001093 GPR_TIMER_END("pollset_kick.broadcast", 0);
1094 } else {
1095 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001096 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001097 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001098 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001099 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001100 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1101 /* Since worker == NULL, it means that we can kick "any" worker on this
1102 pollset 'p'. If 'p' happens to be the same pollset this thread is
1103 currently polling (i.e in pollset_work() function), then there is no need
1104 to kick any other worker since the current thread can just absorb the
1105 kick. This is the reason why we enter this case only when
1106 g_current_thread_pollset is != p */
1107
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001108 GPR_TIMER_MARK("kick_anonymous", 0);
1109 worker = pop_front_worker(p);
1110 if (worker != NULL) {
1111 GPR_TIMER_MARK("finally_kick", 0);
1112 push_back_worker(p, worker);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001113 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001114 } else {
1115 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001116 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001117 }
1118 }
1119
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001120 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001121 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1122 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001123}
1124
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001125static grpc_error *kick_poller(void) {
1126 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1127}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001128
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001129static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1130 gpr_mu_init(&pollset->mu);
1131 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001132
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001133 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001134 pollset->kicked_without_pollers = false;
1135
1136 pollset->shutting_down = false;
1137 pollset->finish_shutdown_called = false;
1138 pollset->shutdown_done = NULL;
1139
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001140 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001141}
1142
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001143/* Convert a timespec to milliseconds:
1144 - Very small or negative poll times are clamped to zero to do a non-blocking
1145 poll (which becomes spin polling)
1146 - Other small values are rounded up to one millisecond
1147 - Longer than a millisecond polls are rounded up to the next nearest
1148 millisecond to avoid spinning
1149 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001150static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1151 gpr_timespec now) {
1152 gpr_timespec timeout;
1153 static const int64_t max_spin_polling_us = 10;
1154 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1155 return -1;
1156 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001157
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001158 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1159 max_spin_polling_us,
1160 GPR_TIMESPAN))) <= 0) {
1161 return 0;
1162 }
1163 timeout = gpr_time_sub(deadline, now);
1164 return gpr_time_to_millis(gpr_time_add(
1165 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1166}
1167
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001168static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1169 grpc_pollset *notifier) {
1170 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001171 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001172 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1173 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001174 gpr_mu_unlock(&fd->mu);
1175}
1176
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001177static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001178 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1179 gpr_mu_lock(&fd->mu);
1180 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1181 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001182}
1183
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001184static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001185 if (ps->polling_island != NULL) {
1186 PI_UNREF(ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001187 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001188 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001189}
1190
1191static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1192 grpc_pollset *pollset) {
1193 /* The pollset cannot have any workers if we are at this stage */
1194 GPR_ASSERT(!pollset_has_workers(pollset));
1195
1196 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001197
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001198 /* Release the ref and set pollset->polling_island to NULL */
1199 pollset_release_polling_island(pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001200 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001201}
1202
1203/* pollset->mu lock must be held by the caller before calling this */
1204static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1205 grpc_closure *closure) {
1206 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1207 GPR_ASSERT(!pollset->shutting_down);
1208 pollset->shutting_down = true;
1209 pollset->shutdown_done = closure;
1210 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1211
1212 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1213 because it would release the underlying polling island. In such a case, we
1214 let the last worker call finish_shutdown_locked() from pollset_work() */
1215 if (!pollset_has_workers(pollset)) {
1216 GPR_ASSERT(!pollset->finish_shutdown_called);
1217 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1218 finish_shutdown_locked(exec_ctx, pollset);
1219 }
1220 GPR_TIMER_END("pollset_shutdown", 0);
1221}
1222
1223/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1224 * than destroying the mutexes, there is nothing special that needs to be done
1225 * here */
1226static void pollset_destroy(grpc_pollset *pollset) {
1227 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001228 gpr_mu_destroy(&pollset->mu);
1229}
1230
1231static void pollset_reset(grpc_pollset *pollset) {
1232 GPR_ASSERT(pollset->shutting_down);
1233 GPR_ASSERT(!pollset_has_workers(pollset));
1234 pollset->shutting_down = false;
1235 pollset->finish_shutdown_called = false;
1236 pollset->kicked_without_pollers = false;
1237 pollset->shutdown_done = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001238 pollset_release_polling_island(pollset, "ps_reset");
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001239}
1240
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001241static void work_combine_error(grpc_error **composite, grpc_error *error) {
1242 if (error == GRPC_ERROR_NONE) return;
1243 if (*composite == GRPC_ERROR_NONE) {
1244 *composite = GRPC_ERROR_CREATE("pollset_work");
1245 }
1246 *composite = grpc_error_add_child(*composite, error);
1247}
1248
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001249#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001250static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1251 grpc_pollset *pollset,
1252 int timeout_ms, sigset_t *sig_mask) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001253 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001254 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001255 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001256 polling_island *pi = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001257 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001258 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1259
1260 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001261 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001262
1263 Since epoll_fd is immutable, we can read it without obtaining the polling
1264 island lock. There is however a possibility that the polling island (from
1265 which we got the epoll_fd) got merged with another island while we are
1266 in this function. This is still okay because in such a case, we will wakeup
1267 right-away from epoll_wait() and pick up the latest polling_island the next
1268 this function (i.e pollset_work_and_unlock()) is called.
1269 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001270
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001271 if (pollset->polling_island == NULL) {
1272 pollset->polling_island = polling_island_create(NULL);
1273 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001274 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001275
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001276 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001277 epoll_fd = pi->epoll_fd;
1278
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001279 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001280 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001281 if (pollset->polling_island != pi) {
1282 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1283 polling island to be deleted */
1284 PI_ADD_REF(pi, "ps");
1285 PI_UNREF(pollset->polling_island, "ps");
1286 pollset->polling_island = pi;
1287 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001288
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001289 /* Add an extra ref so that the island does not get destroyed (which means
1290 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1291 epoll_fd */
1292 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001293 gpr_mu_unlock(&pollset->mu);
1294
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001295 do {
1296 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1297 sig_mask);
1298 if (ep_rv < 0) {
1299 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001300 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001301 work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_pwait"));
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001302 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001303 /* We were interrupted. Save an interation by doing a zero timeout
1304 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001305 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001306 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001307 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001308
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001309#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001310 /* See the definition of g_poll_sync for more details */
1311 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001312#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001313
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001314 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001315 void *data_ptr = ep_ev[i].data.ptr;
1316 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001317 work_combine_error(
1318 &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001319 } else if (data_ptr == &polling_island_wakeup_fd) {
1320 /* This means that our polling island is merged with a different
1321 island. We do not have to do anything here since the subsequent call
1322 to the function pollset_work_and_unlock() will pick up the correct
1323 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001324 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001325 grpc_fd *fd = data_ptr;
1326 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1327 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1328 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001329 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001330 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001331 }
1332 if (write_ev || cancel) {
1333 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001334 }
1335 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001336 }
1337 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001338
1339 GPR_ASSERT(pi != NULL);
1340
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001341 /* Before leaving, release the extra ref we added to the polling island. It
1342 is important to use "pi" here (i.e our old copy of pollset->polling_island
1343 that we got before releasing the polling island lock). This is because
1344 pollset->polling_island pointer might get udpated in other parts of the
1345 code when there is an island merge while we are doing epoll_wait() above */
1346 PI_UNREF(pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001347
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001348 GPR_TIMER_END("pollset_work_and_unlock", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001349 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001350}
1351
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001352/* pollset->mu lock must be held by the caller before calling this.
1353 The function pollset_work() may temporarily release the lock (pollset->mu)
1354 during the course of its execution but it will always re-acquire the lock and
1355 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001356static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1357 grpc_pollset_worker **worker_hdl,
1358 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001359 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001360 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001361 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1362
1363 sigset_t new_mask;
1364 sigset_t orig_mask;
1365
1366 grpc_pollset_worker worker;
1367 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001368 worker.pt_id = pthread_self();
1369
1370 *worker_hdl = &worker;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001371 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1372 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001373
1374 if (pollset->kicked_without_pollers) {
1375 /* If the pollset was kicked without pollers, pretend that the current
1376 worker got the kick and skip polling. A kick indicates that there is some
1377 work that needs attention like an event on the completion queue or an
1378 alarm */
1379 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1380 pollset->kicked_without_pollers = 0;
1381 } else if (!pollset->shutting_down) {
1382 sigemptyset(&new_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001383 sigaddset(&new_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001384 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001385 sigdelset(&orig_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001386
1387 push_front_worker(pollset, &worker);
1388
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001389 error = pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001390 grpc_exec_ctx_flush(exec_ctx);
1391
1392 gpr_mu_lock(&pollset->mu);
1393 remove_worker(pollset, &worker);
1394 }
1395
1396 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1397 false at this point) and the pollset is shutting down, we may have to
1398 finish the shutdown process by calling finish_shutdown_locked().
1399 See pollset_shutdown() for more details.
1400
1401 Note: Continuing to access pollset here is safe; it is the caller's
1402 responsibility to not destroy a pollset when it has outstanding calls to
1403 pollset_work() */
1404 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1405 !pollset->finish_shutdown_called) {
1406 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1407 finish_shutdown_locked(exec_ctx, pollset);
1408
1409 gpr_mu_unlock(&pollset->mu);
1410 grpc_exec_ctx_flush(exec_ctx);
1411 gpr_mu_lock(&pollset->mu);
1412 }
1413
1414 *worker_hdl = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001415 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1416 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001417 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001418 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1419 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001420}
1421
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001422static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1423 grpc_fd *fd) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001424 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001425 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001426
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001427 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001428
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001429 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1430 * equal, do nothing.
1431 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1432 * a new polling island (with a refcount of 2) and make the polling_island
1433 * fields in both fd and pollset to point to the new island
1434 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1435 * the NULL polling_island field to point to the non-NULL polling_island
1436 * field (ensure that the refcount on the polling island is incremented by
1437 * 1 to account for the newly added reference)
1438 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1439 * and different, merge both the polling islands and update the
1440 * polling_island fields in both fd and pollset to point to the merged
1441 * polling island.
1442 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001443 if (fd->polling_island == pollset->polling_island) {
1444 pi_new = fd->polling_island;
1445 if (pi_new == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001446 pi_new = polling_island_create(fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001447 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001448 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001449 pi_new = polling_island_lock(pollset->polling_island);
1450 polling_island_add_fds_locked(pi_new, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001451 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001452 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001453 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001454 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001455 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001456 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001457 }
1458
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001459 if (fd->polling_island != pi_new) {
1460 PI_ADD_REF(pi_new, "fd");
1461 if (fd->polling_island != NULL) {
1462 PI_UNREF(fd->polling_island, "fd");
1463 }
1464 fd->polling_island = pi_new;
1465 }
1466
1467 if (pollset->polling_island != pi_new) {
1468 PI_ADD_REF(pi_new, "ps");
1469 if (pollset->polling_island != NULL) {
1470 PI_UNREF(pollset->polling_island, "ps");
1471 }
1472 pollset->polling_island = pi_new;
1473 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001474
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001475 gpr_mu_unlock(&fd->pi_mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001476 gpr_mu_unlock(&pollset->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001477}
1478
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001479/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001480 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001481 */
1482
1483static grpc_pollset_set *pollset_set_create(void) {
1484 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1485 memset(pollset_set, 0, sizeof(*pollset_set));
1486 gpr_mu_init(&pollset_set->mu);
1487 return pollset_set;
1488}
1489
1490static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1491 size_t i;
1492 gpr_mu_destroy(&pollset_set->mu);
1493 for (i = 0; i < pollset_set->fd_count; i++) {
1494 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1495 }
1496 gpr_free(pollset_set->pollsets);
1497 gpr_free(pollset_set->pollset_sets);
1498 gpr_free(pollset_set->fds);
1499 gpr_free(pollset_set);
1500}
1501
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001502static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1503 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1504 size_t i;
1505 gpr_mu_lock(&pollset_set->mu);
1506 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1507 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1508 pollset_set->fds = gpr_realloc(
1509 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1510 }
1511 GRPC_FD_REF(fd, "pollset_set");
1512 pollset_set->fds[pollset_set->fd_count++] = fd;
1513 for (i = 0; i < pollset_set->pollset_count; i++) {
1514 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1515 }
1516 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1517 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1518 }
1519 gpr_mu_unlock(&pollset_set->mu);
1520}
1521
1522static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1523 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1524 size_t i;
1525 gpr_mu_lock(&pollset_set->mu);
1526 for (i = 0; i < pollset_set->fd_count; i++) {
1527 if (pollset_set->fds[i] == fd) {
1528 pollset_set->fd_count--;
1529 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1530 pollset_set->fds[pollset_set->fd_count]);
1531 GRPC_FD_UNREF(fd, "pollset_set");
1532 break;
1533 }
1534 }
1535 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1536 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1537 }
1538 gpr_mu_unlock(&pollset_set->mu);
1539}
1540
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001541static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1542 grpc_pollset_set *pollset_set,
1543 grpc_pollset *pollset) {
1544 size_t i, j;
1545 gpr_mu_lock(&pollset_set->mu);
1546 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1547 pollset_set->pollset_capacity =
1548 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1549 pollset_set->pollsets =
1550 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1551 sizeof(*pollset_set->pollsets));
1552 }
1553 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1554 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1555 if (fd_is_orphaned(pollset_set->fds[i])) {
1556 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1557 } else {
1558 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1559 pollset_set->fds[j++] = pollset_set->fds[i];
1560 }
1561 }
1562 pollset_set->fd_count = j;
1563 gpr_mu_unlock(&pollset_set->mu);
1564}
1565
1566static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1567 grpc_pollset_set *pollset_set,
1568 grpc_pollset *pollset) {
1569 size_t i;
1570 gpr_mu_lock(&pollset_set->mu);
1571 for (i = 0; i < pollset_set->pollset_count; i++) {
1572 if (pollset_set->pollsets[i] == pollset) {
1573 pollset_set->pollset_count--;
1574 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1575 pollset_set->pollsets[pollset_set->pollset_count]);
1576 break;
1577 }
1578 }
1579 gpr_mu_unlock(&pollset_set->mu);
1580}
1581
1582static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1583 grpc_pollset_set *bag,
1584 grpc_pollset_set *item) {
1585 size_t i, j;
1586 gpr_mu_lock(&bag->mu);
1587 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1588 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1589 bag->pollset_sets =
1590 gpr_realloc(bag->pollset_sets,
1591 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1592 }
1593 bag->pollset_sets[bag->pollset_set_count++] = item;
1594 for (i = 0, j = 0; i < bag->fd_count; i++) {
1595 if (fd_is_orphaned(bag->fds[i])) {
1596 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1597 } else {
1598 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1599 bag->fds[j++] = bag->fds[i];
1600 }
1601 }
1602 bag->fd_count = j;
1603 gpr_mu_unlock(&bag->mu);
1604}
1605
1606static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1607 grpc_pollset_set *bag,
1608 grpc_pollset_set *item) {
1609 size_t i;
1610 gpr_mu_lock(&bag->mu);
1611 for (i = 0; i < bag->pollset_set_count; i++) {
1612 if (bag->pollset_sets[i] == item) {
1613 bag->pollset_set_count--;
1614 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1615 bag->pollset_sets[bag->pollset_set_count]);
1616 break;
1617 }
1618 }
1619 gpr_mu_unlock(&bag->mu);
1620}
1621
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001622/* Test helper functions
1623 * */
1624void *grpc_fd_get_polling_island(grpc_fd *fd) {
1625 polling_island *pi;
1626
1627 gpr_mu_lock(&fd->pi_mu);
1628 pi = fd->polling_island;
1629 gpr_mu_unlock(&fd->pi_mu);
1630
1631 return pi;
1632}
1633
1634void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1635 polling_island *pi;
1636
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001637 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001638 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001639 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001640
1641 return pi;
1642}
1643
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001644bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001645 polling_island *p1 = p;
1646 polling_island *p2 = q;
1647
1648 polling_island_lock_pair(&p1, &p2);
1649 if (p1 == p2) {
1650 gpr_mu_unlock(&p1->mu);
1651 } else {
1652 gpr_mu_unlock(&p1->mu);
1653 gpr_mu_unlock(&p2->mu);
1654 }
1655
1656 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001657}
1658
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001659/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001660 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001661 */
1662
1663static void shutdown_engine(void) {
1664 fd_global_shutdown();
1665 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001666 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001667}
1668
1669static const grpc_event_engine_vtable vtable = {
1670 .pollset_size = sizeof(grpc_pollset),
1671
1672 .fd_create = fd_create,
1673 .fd_wrapped_fd = fd_wrapped_fd,
1674 .fd_orphan = fd_orphan,
1675 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001676 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001677 .fd_notify_on_read = fd_notify_on_read,
1678 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001679 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001680
1681 .pollset_init = pollset_init,
1682 .pollset_shutdown = pollset_shutdown,
1683 .pollset_reset = pollset_reset,
1684 .pollset_destroy = pollset_destroy,
1685 .pollset_work = pollset_work,
1686 .pollset_kick = pollset_kick,
1687 .pollset_add_fd = pollset_add_fd,
1688
1689 .pollset_set_create = pollset_set_create,
1690 .pollset_set_destroy = pollset_set_destroy,
1691 .pollset_set_add_pollset = pollset_set_add_pollset,
1692 .pollset_set_del_pollset = pollset_set_del_pollset,
1693 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1694 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1695 .pollset_set_add_fd = pollset_set_add_fd,
1696 .pollset_set_del_fd = pollset_set_del_fd,
1697
1698 .kick_poller = kick_poller,
1699
1700 .shutdown_engine = shutdown_engine,
1701};
1702
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001703/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1704 * Create a dummy epoll_fd to make sure epoll support is available */
1705static bool is_epoll_available() {
1706 int fd = epoll_create1(EPOLL_CLOEXEC);
1707 if (fd < 0) {
1708 gpr_log(
1709 GPR_ERROR,
1710 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1711 fd);
1712 return false;
1713 }
1714 close(fd);
1715 return true;
1716}
1717
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001718const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001719 /* If use of signals is disabled, we cannot use epoll engine*/
1720 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1721 return NULL;
1722 }
1723
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001724 if (!is_epoll_available()) {
1725 return NULL;
1726 }
1727
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001728 if (!is_grpc_wakeup_signal_initialized) {
1729 grpc_use_signal(SIGRTMIN + 2);
1730 }
1731
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001732 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001733
1734 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1735 return NULL;
1736 }
1737
1738 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1739 polling_island_global_init())) {
1740 return NULL;
1741 }
1742
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001743 return &vtable;
1744}
1745
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001746#else /* defined(GPR_LINUX_EPOLL) */
1747#if defined(GPR_POSIX_SOCKET)
1748#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001749/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1750 * NULL */
1751const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001752#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001753
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001754void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001755#endif /* !defined(GPR_LINUX_EPOLL) */