blob: 6464d3ba348b281d525b11b69c4ef8b5531ead80 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070037#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070038
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070039#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070040
41#include <assert.h>
42#include <errno.h>
43#include <poll.h>
44#include <signal.h>
45#include <string.h>
46#include <sys/epoll.h>
47#include <sys/socket.h>
48#include <unistd.h>
49
50#include <grpc/support/alloc.h>
51#include <grpc/support/log.h>
52#include <grpc/support/string_util.h>
53#include <grpc/support/tls.h>
54#include <grpc/support/useful.h>
55
56#include "src/core/lib/iomgr/ev_posix.h"
57#include "src/core/lib/iomgr/iomgr_internal.h"
58#include "src/core/lib/iomgr/wakeup_fd_posix.h"
59#include "src/core/lib/profiling/timers.h"
60#include "src/core/lib/support/block_annotate.h"
61
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070062static int grpc_wakeup_signal = -1;
63static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070064
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070065/* Implements the function defined in grpc_posix.h. This function might be
66 * called before even calling grpc_init() to set either a different signal to
67 * use. If signum == -1, then the use of signals is disabled */
68void grpc_use_signal(int signum) {
69 grpc_wakeup_signal = signum;
70 is_grpc_wakeup_signal_initialized = true;
71
72 if (grpc_wakeup_signal < 0) {
73 gpr_log(GPR_INFO,
74 "Use of signals is disabled. Epoll engine will not be used");
75 } else {
76 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
77 grpc_wakeup_signal);
78 }
79}
80
81struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070082
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070083/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070084 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070085 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070086struct grpc_fd {
87 int fd;
88 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070089 bit 0 : 1=Active / 0=Orphaned
90 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070091 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070092 gpr_atm refst;
93
94 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070095
96 /* Indicates that the fd is shutdown and that any pending read/write closures
97 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070098 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070099
100 /* The fd is either closed or we relinquished control of it. In either cases,
101 this indicates that the 'fd' on this structure is no longer valid */
102 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700103
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700104 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700105 grpc_closure *read_closure;
106 grpc_closure *write_closure;
107
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700108 /* The polling island to which this fd belongs to and the mutex protecting the
109 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700110 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 struct polling_island *polling_island;
112
113 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700114 grpc_closure *on_done_closure;
115
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700116 /* The pollset that last noticed that the fd is readable */
117 grpc_pollset *read_notifier_pollset;
118
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700119 grpc_iomgr_object iomgr_object;
120};
121
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700122/* Reference counting for fds */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123#ifdef GRPC_FD_REF_COUNT_DEBUG
124static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
125static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
126 int line);
127#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
128#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
129#else
130static void fd_ref(grpc_fd *fd);
131static void fd_unref(grpc_fd *fd);
132#define GRPC_FD_REF(fd, reason) fd_ref(fd)
133#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
134#endif
135
136static void fd_global_init(void);
137static void fd_global_shutdown(void);
138
139#define CLOSURE_NOT_READY ((grpc_closure *)0)
140#define CLOSURE_READY ((grpc_closure *)1)
141
142/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700143 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700144 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700145
146// #define GRPC_PI_REF_COUNT_DEBUG
147#ifdef GRPC_PI_REF_COUNT_DEBUG
148
149#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), 1, (r), __FILE__, __LINE__)
150#define PI_UNREF(p, r) pi_unref_dbg((p), 1, (r), __FILE__, __LINE__)
151
152#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
153
154#define PI_ADD_REF(p, r) pi_add_ref((p), 1)
155#define PI_UNREF(p, r) pi_unref((p), 1)
156
157#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
158
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700159typedef struct polling_island {
160 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700161 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
162 the refcount.
163 Once the ref count becomes zero, this structure is destroyed which means
164 we should ensure that there is never a scenario where a PI_ADD_REF() is
165 racing with a PI_UNREF() that just made the ref_count zero. */
166 gpr_atm ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700167
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700168 /* Pointer to the polling_island this merged into.
169 * merged_to value is only set once in polling_island's lifetime (and that too
170 * only if the island is merged with another island). Because of this, we can
171 * use gpr_atm type here so that we can do atomic access on this and reduce
172 * lock contention on 'mu' mutex.
173 *
174 * Note that if this field is not NULL (i.e not 0), all the remaining fields
175 * (except mu and ref_count) are invalid and must be ignored. */
176 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700177
178 /* The fd of the underlying epoll set */
179 int epoll_fd;
180
181 /* The file descriptors in the epoll set */
182 size_t fd_cnt;
183 size_t fd_capacity;
184 grpc_fd **fds;
185
186 /* Polling islands that are no longer needed are kept in a freelist so that
187 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700188 free list */
189 struct polling_island *next_free;
190} polling_island;
191
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700192/*******************************************************************************
193 * Pollset Declarations
194 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700195struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700196 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700197 struct grpc_pollset_worker *next;
198 struct grpc_pollset_worker *prev;
199};
200
201struct grpc_pollset {
202 gpr_mu mu;
203 grpc_pollset_worker root_worker;
204 bool kicked_without_pollers;
205
206 bool shutting_down; /* Is the pollset shutting down ? */
207 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
208 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
209
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700210 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700211 struct polling_island *polling_island;
212};
213
214/*******************************************************************************
215 * Pollset-set Declarations
216 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700217/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
218 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
219 * the current pollset_set would result in polling island merges. This would
220 * remove the need to maintain fd_count here. This will also significantly
221 * simplify the grpc_fd structure since we would no longer need to explicitly
222 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700223struct grpc_pollset_set {
224 gpr_mu mu;
225
226 size_t pollset_count;
227 size_t pollset_capacity;
228 grpc_pollset **pollsets;
229
230 size_t pollset_set_count;
231 size_t pollset_set_capacity;
232 struct grpc_pollset_set **pollset_sets;
233
234 size_t fd_count;
235 size_t fd_capacity;
236 grpc_fd **fds;
237};
238
239/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700240 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700241 */
242
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700243/* The wakeup fd that is used to wake up all threads in a Polling island. This
244 is useful in the polling island merge operation where we need to wakeup all
245 the threads currently polling the smaller polling island (so that they can
246 start polling the new/merged polling island)
247
248 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
249 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
250static grpc_wakeup_fd polling_island_wakeup_fd;
251
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700252/* Polling island freelist */
253static gpr_mu g_pi_freelist_mu;
254static polling_island *g_pi_freelist = NULL;
255
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700256static void polling_island_delete(); /* Forward declaration */
257
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700258#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700259/* Currently TSAN may incorrectly flag data races between epoll_ctl and
260 epoll_wait for any grpc_fd structs that are added to the epoll set via
261 epoll_ctl and are returned (within a very short window) via epoll_wait().
262
263 To work-around this race, we establish a happens-before relation between
264 the code just-before epoll_ctl() and the code after epoll_wait() by using
265 this atomic */
266gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700267#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700268
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700269#ifdef GRPC_PI_REF_COUNT_DEBUG
270long pi_add_ref(polling_island *pi, int ref_cnt);
271long pi_unref(polling_island *pi, int ref_cnt);
272
273void pi_add_ref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
274 int line) {
275 long old_cnt = pi_add_ref(pi, ref_cnt);
276 gpr_log(GPR_DEBUG, "Add ref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
277 (void *)pi, old_cnt, (old_cnt + ref_cnt), reason, file, line);
278}
279
280void pi_unref_dbg(polling_island *pi, int ref_cnt, char *reason, char *file,
281 int line) {
282 long old_cnt = pi_unref(pi, ref_cnt);
283 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
284 (void *)pi, old_cnt, (old_cnt - ref_cnt), reason, file, line);
285}
286#endif
287
288long pi_add_ref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700289 return gpr_atm_full_fetch_add(&pi->ref_count, ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700290}
291
292long pi_unref(polling_island *pi, int ref_cnt) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700293 long old_cnt = gpr_atm_full_fetch_add(&pi->ref_count, -ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700294
295 /* If ref count went to zero, delete the polling island. Note that this need
296 not be done under a lock. Once the ref count goes to zero, we are
297 guaranteed that no one else holds a reference to the polling island (and
298 that there is no racing pi_add_ref() call either.
299
300 Also, if we are deleting the polling island and the merged_to field is
301 non-empty, we should remove a ref to the merged_to polling island
302 */
303 if (old_cnt == ref_cnt) {
304 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
305 polling_island_delete(pi);
306 if (next != NULL) {
307 PI_UNREF(next, "pi_delete"); /* Recursive call */
308 }
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700309 } else {
310 GPR_ASSERT(old_cnt > ref_cnt);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700311 }
312
313 return old_cnt;
314}
315
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700316/* The caller is expected to hold pi->mu lock before calling this function */
317static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700318 size_t fd_count, bool add_fd_refs) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700319 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700320 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700321 struct epoll_event ev;
322
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700323#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700324 /* See the definition of g_epoll_sync for more context */
325 gpr_atm_rel_store(&g_epoll_sync, 0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700326#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700327
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700328 for (i = 0; i < fd_count; i++) {
329 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
330 ev.data.ptr = fds[i];
331 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700332
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700333 if (err < 0) {
334 if (errno != EEXIST) {
335 /* TODO: sreek - We need a better way to bubble up this error instead of
336 just logging a message */
337 gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
338 fds[i]->fd, strerror(errno));
339 }
340
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700341 continue;
342 }
343
344 if (pi->fd_cnt == pi->fd_capacity) {
345 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
346 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
347 }
348
349 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700350 if (add_fd_refs) {
351 GRPC_FD_REF(fds[i], "polling_island");
352 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700353 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700354}
355
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700356/* The caller is expected to hold pi->mu before calling this */
357static void polling_island_add_wakeup_fd_locked(polling_island *pi,
358 grpc_wakeup_fd *wakeup_fd) {
359 struct epoll_event ev;
360 int err;
361
362 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
363 ev.data.ptr = wakeup_fd;
364 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
365 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
366 if (err < 0) {
367 gpr_log(GPR_ERROR,
368 "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
369 ". Error: %s",
370 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
371 strerror(errno));
372 }
373}
374
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700375/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700376static void polling_island_remove_all_fds_locked(polling_island *pi,
377 bool remove_fd_refs) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700378 int err;
379 size_t i;
380
381 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700382 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700383 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700384 /* TODO: sreek - We need a better way to bubble up this error instead of
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700385 * just logging a message */
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -0700386 gpr_log(GPR_ERROR,
387 "epoll_ctl deleting fds[%zu]: %d failed with error: %s", i,
388 pi->fds[i]->fd, strerror(errno));
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700389 }
390
391 if (remove_fd_refs) {
392 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700393 }
394 }
395
396 pi->fd_cnt = 0;
397}
398
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700399/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700400static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700401 bool is_fd_closed) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700402 int err;
403 size_t i;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700404
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700405 /* If fd is already closed, then it would have been automatically been removed
406 from the epoll set */
407 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700408 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
409 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700410 gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700411 fd->fd, strerror(errno));
412 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700413 }
414
415 for (i = 0; i < pi->fd_cnt; i++) {
416 if (pi->fds[i] == fd) {
417 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700418 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700419 break;
420 }
421 }
422}
423
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700424static polling_island *polling_island_create(grpc_fd *initial_fd) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700425 polling_island *pi = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700426
427 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700428 gpr_mu_lock(&g_pi_freelist_mu);
429 if (g_pi_freelist != NULL) {
430 pi = g_pi_freelist;
431 g_pi_freelist = g_pi_freelist->next_free;
432 pi->next_free = NULL;
433 }
434 gpr_mu_unlock(&g_pi_freelist_mu);
435
436 /* Create new polling island if we could not get one from the free list */
437 if (pi == NULL) {
438 pi = gpr_malloc(sizeof(*pi));
439 gpr_mu_init(&pi->mu);
440 pi->fd_cnt = 0;
441 pi->fd_capacity = 0;
442 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700443 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700444
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -0700445 gpr_atm_rel_store(&pi->ref_count, 0);
446 gpr_atm_rel_store(&pi->merged_to, NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700447
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700448 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700449
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700450 if (pi->epoll_fd < 0) {
451 gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
452 strerror(errno));
453 }
454 GPR_ASSERT(pi->epoll_fd >= 0);
455
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700456 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700457
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700458 pi->next_free = NULL;
459
460 if (initial_fd != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700461 /* Lock the polling island here just in case we got this structure from the
462 freelist and the polling island lock was not released yet (by the code
463 that adds the polling island to the freelist) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700464 gpr_mu_lock(&pi->mu);
465 polling_island_add_fds_locked(pi, &initial_fd, 1, true);
466 gpr_mu_unlock(&pi->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700467 }
468
469 return pi;
470}
471
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700472static void polling_island_delete(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700473 GPR_ASSERT(pi->fd_cnt == 0);
474
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700475 gpr_atm_rel_store(&pi->merged_to, NULL);
476
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700477 close(pi->epoll_fd);
478 pi->epoll_fd = -1;
479
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700480 gpr_mu_lock(&g_pi_freelist_mu);
481 pi->next_free = g_pi_freelist;
482 g_pi_freelist = pi;
483 gpr_mu_unlock(&g_pi_freelist_mu);
484}
485
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700486/* Attempts to gets the last polling island in the linked list (liked by the
487 * 'merged_to' field). Since this does not lock the polling island, there are no
488 * guarantees that the island returned is the last island */
489static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
490 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
491 while (next != NULL) {
492 pi = next;
493 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
494 }
495
496 return pi;
497}
498
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700499/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700500 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700501 returned polling island's mu.
502 Usage: To lock/unlock polling island "pi", do the following:
503 polling_island *pi_latest = polling_island_lock(pi);
504 ...
505 ... critical section ..
506 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700507 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
508static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700509 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700510
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700511 while (true) {
512 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
513 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700514 /* Looks like 'pi' is the last node in the linked list but unless we check
515 this by holding the pi->mu lock, we cannot be sure (i.e without the
516 pi->mu lock, we don't prevent island merges).
517 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700518 gpr_mu_lock(&pi->mu);
519 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
520 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700521 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700522 break;
523 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700524
525 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
526 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700527 gpr_mu_unlock(&pi->mu);
528 }
529
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700530 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700531 }
532
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700533 return pi;
534}
535
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700536/* Gets the lock on the *latest* polling islands pointed by *p and *q.
537 This function is needed because calling the following block of code to obtain
538 locks on polling islands (*p and *q) is prone to deadlocks.
539 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700540 polling_island_lock(*p, true);
541 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700542 }
543
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700544 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700545 polling_island *p1;
546 polling_island *p2;
547 ..
548 polling_island_lock_pair(&p1, &p2);
549 ..
550 .. Critical section with both p1 and p2 locked
551 ..
552 // Release locks
553 // **IMPORTANT**: Make sure you check p1 == p2 AFTER the function
554 // polling_island_lock_pair() was called and if so, release the lock only
555 // once. Note: Even if p1 != p2 beforec calling polling_island_lock_pair(),
556 // they might be after the function returns:
557 if (p1 == p2) {
558 gpr_mu_unlock(&p1->mu)
559 } else {
560 gpr_mu_unlock(&p1->mu);
561 gpr_mu_unlock(&p2->mu);
562 }
563
564*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700565static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700566 polling_island *pi_1 = *p;
567 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700568 polling_island *next_1 = NULL;
569 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700570
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700571 /* The algorithm is simple:
572 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
573 keep updating pi_1 and pi_2)
574 - Then obtain locks on the islands by following a lock order rule of
575 locking polling_island with lower address first
576 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
577 pointing to the same island. If that is the case, we can just call
578 polling_island_lock()
579 - After obtaining both the locks, double check that the polling islands
580 are still the last polling islands in their respective linked lists
581 (this is because there might have been polling island merges before
582 we got the lock)
583 - If the polling islands are the last islands, we are done. If not,
584 release the locks and continue the process from the first step */
585 while (true) {
586 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
587 while (next_1 != NULL) {
588 pi_1 = next_1;
589 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700590 }
591
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700592 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
593 while (next_2 != NULL) {
594 pi_2 = next_2;
595 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
596 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700597
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700598 if (pi_1 == pi_2) {
599 pi_1 = pi_2 = polling_island_lock(pi_1);
600 break;
601 }
602
603 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700604 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700605 gpr_mu_lock(&pi_2->mu);
606 } else {
607 gpr_mu_lock(&pi_2->mu);
608 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700609 }
610
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700611 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
612 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
613 if (next_1 == NULL && next_2 == NULL) {
614 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700615 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700616
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700617 gpr_mu_unlock(&pi_1->mu);
618 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700619 }
620
621 *p = pi_1;
622 *q = pi_2;
623}
624
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700625static polling_island *polling_island_merge(polling_island *p,
626 polling_island *q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700627 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700628 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700629
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700630 if (p == q) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700631 /* Nothing needs to be done here */
632 gpr_mu_unlock(&p->mu);
633 return p;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700634 }
635
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700636 /* Make sure that p points to the polling island with fewer fds than q */
637 if (p->fd_cnt > q->fd_cnt) {
638 GPR_SWAP(polling_island *, p, q);
639 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700640
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700641 /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
Sree Kuchibhotla0553a432016-06-09 00:42:41 -0700642 Note that the refcounts on the fds being moved will not change here. This
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700643 is why the last parameter in the following two functions is 'false') */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700644 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
645 polling_island_remove_all_fds_locked(p, false);
646
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700647 /* Wakeup all the pollers (if any) on p so that they can pickup this change */
648 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
649
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700650 /* Add the 'merged_to' link from p --> q */
651 gpr_atm_rel_store(&p->merged_to, q);
652 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla58e58962016-06-13 00:52:56 -0700653
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700654 gpr_mu_unlock(&p->mu);
655 gpr_mu_unlock(&q->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700656
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700657 /* Return the merged polling island */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700658 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700659}
660
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700661static grpc_error *polling_island_global_init() {
662 grpc_error *error = GRPC_ERROR_NONE;
663
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700664 gpr_mu_init(&g_pi_freelist_mu);
665 g_pi_freelist = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700666
667 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
668 if (error == GRPC_ERROR_NONE) {
669 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
670 }
671
672 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700673}
674
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700675static void polling_island_global_shutdown() {
676 polling_island *next;
677 gpr_mu_lock(&g_pi_freelist_mu);
678 gpr_mu_unlock(&g_pi_freelist_mu);
679 while (g_pi_freelist != NULL) {
680 next = g_pi_freelist->next_free;
681 gpr_mu_destroy(&g_pi_freelist->mu);
682 gpr_free(g_pi_freelist->fds);
683 gpr_free(g_pi_freelist);
684 g_pi_freelist = next;
685 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700686 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700687
688 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700689}
690
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700691/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700692 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700693 */
694
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700695/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700696 * but instead so that implementations with multiple threads in (for example)
697 * epoll_wait deal with the race between pollset removal and incoming poll
698 * notifications.
699 *
700 * The problem is that the poller ultimately holds a reference to this
701 * object, so it is very difficult to know when is safe to free it, at least
702 * without some expensive synchronization.
703 *
704 * If we keep the object freelisted, in the worst case losing this race just
705 * becomes a spurious read notification on a reused fd.
706 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700707
708/* The alarm system needs to be able to wakeup 'some poller' sometimes
709 * (specifically when a new alarm needs to be triggered earlier than the next
710 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
711 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700712
713/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
714 * sure to wake up one polling thread (which can wake up other threads if
715 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700716grpc_wakeup_fd grpc_global_wakeup_fd;
717
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700718static grpc_fd *fd_freelist = NULL;
719static gpr_mu fd_freelist_mu;
720
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700721#ifdef GRPC_FD_REF_COUNT_DEBUG
722#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
723#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
724static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
725 int line) {
726 gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
727 gpr_atm_no_barrier_load(&fd->refst),
728 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
729#else
730#define REF_BY(fd, n, reason) ref_by(fd, n)
731#define UNREF_BY(fd, n, reason) unref_by(fd, n)
732static void ref_by(grpc_fd *fd, int n) {
733#endif
734 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
735}
736
737#ifdef GRPC_FD_REF_COUNT_DEBUG
738static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
739 int line) {
740 gpr_atm old;
741 gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
742 gpr_atm_no_barrier_load(&fd->refst),
743 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
744#else
745static void unref_by(grpc_fd *fd, int n) {
746 gpr_atm old;
747#endif
748 old = gpr_atm_full_fetch_add(&fd->refst, -n);
749 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700750 /* Add the fd to the freelist */
751 gpr_mu_lock(&fd_freelist_mu);
752 fd->freelist_next = fd_freelist;
753 fd_freelist = fd;
754 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700755
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700756 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700757 } else {
758 GPR_ASSERT(old > n);
759 }
760}
761
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700762/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700763#ifdef GRPC_FD_REF_COUNT_DEBUG
764static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
765 int line) {
766 ref_by(fd, 2, reason, file, line);
767}
768
769static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
770 int line) {
771 unref_by(fd, 2, reason, file, line);
772}
773#else
774static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700775static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
776#endif
777
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700778static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
779
780static void fd_global_shutdown(void) {
781 gpr_mu_lock(&fd_freelist_mu);
782 gpr_mu_unlock(&fd_freelist_mu);
783 while (fd_freelist != NULL) {
784 grpc_fd *fd = fd_freelist;
785 fd_freelist = fd_freelist->freelist_next;
786 gpr_mu_destroy(&fd->mu);
787 gpr_free(fd);
788 }
789 gpr_mu_destroy(&fd_freelist_mu);
790}
791
792static grpc_fd *fd_create(int fd, const char *name) {
793 grpc_fd *new_fd = NULL;
794
795 gpr_mu_lock(&fd_freelist_mu);
796 if (fd_freelist != NULL) {
797 new_fd = fd_freelist;
798 fd_freelist = fd_freelist->freelist_next;
799 }
800 gpr_mu_unlock(&fd_freelist_mu);
801
802 if (new_fd == NULL) {
803 new_fd = gpr_malloc(sizeof(grpc_fd));
804 gpr_mu_init(&new_fd->mu);
805 gpr_mu_init(&new_fd->pi_mu);
806 }
807
808 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
809 newly created fd (or an fd we got from the freelist), no one else would be
810 holding a lock to it anyway. */
811 gpr_mu_lock(&new_fd->mu);
812
813 gpr_atm_rel_store(&new_fd->refst, 1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700814 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700815 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700816 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700817 new_fd->read_closure = CLOSURE_NOT_READY;
818 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700819 new_fd->polling_island = NULL;
820 new_fd->freelist_next = NULL;
821 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700822 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700823
824 gpr_mu_unlock(&new_fd->mu);
825
826 char *fd_name;
827 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
828 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
829 gpr_free(fd_name);
830#ifdef GRPC_FD_REF_COUNT_DEBUG
831 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, fd_name);
832#endif
833 return new_fd;
834}
835
836static bool fd_is_orphaned(grpc_fd *fd) {
837 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
838}
839
840static int fd_wrapped_fd(grpc_fd *fd) {
841 int ret_fd = -1;
842 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700843 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700844 ret_fd = fd->fd;
845 }
846 gpr_mu_unlock(&fd->mu);
847
848 return ret_fd;
849}
850
851static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
852 grpc_closure *on_done, int *release_fd,
853 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700854 bool is_fd_closed = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700855 gpr_mu_lock(&fd->mu);
856 fd->on_done_closure = on_done;
857
858 /* If release_fd is not NULL, we should be relinquishing control of the file
859 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700860 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700861 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700862 } else {
863 close(fd->fd);
864 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700865 }
866
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700867 fd->orphaned = true;
868
869 /* Remove the active status but keep referenced. We want this grpc_fd struct
870 to be alive (and not added to freelist) until the end of this function */
871 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700872
873 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700874 - Get a lock on the latest polling island (i.e the last island in the
875 linked list pointed by fd->polling_island). This is the island that
876 would actually contain the fd
877 - Remove the fd from the latest polling island
878 - Unlock the latest polling island
879 - Set fd->polling_island to NULL (but remove the ref on the polling island
880 before doing this.) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700881 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700882 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700883 polling_island *pi_latest = polling_island_lock(fd->polling_island);
884 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed);
885 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700886
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700887 PI_UNREF(fd->polling_island, "fd_orphan");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700888 fd->polling_island = NULL;
889 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700890 gpr_mu_unlock(&fd->pi_mu);
891
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700892 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700893
894 gpr_mu_unlock(&fd->mu);
895 UNREF_BY(fd, 2, reason); /* Drop the reference */
896}
897
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700898static grpc_error *fd_shutdown_error(bool shutdown) {
899 if (!shutdown) {
900 return GRPC_ERROR_NONE;
901 } else {
902 return GRPC_ERROR_CREATE("FD shutdown");
903 }
904}
905
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700906static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
907 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700908 if (fd->shutdown) {
909 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
910 NULL);
911 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700912 /* not ready ==> switch to a waiting state by setting the closure */
913 *st = closure;
914 } else if (*st == CLOSURE_READY) {
915 /* already ready ==> queue the closure to run immediately */
916 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700917 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
918 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700919 } else {
920 /* upcallptr was set to a different closure. This is an error! */
921 gpr_log(GPR_ERROR,
922 "User called a notify_on function with a previous callback still "
923 "pending");
924 abort();
925 }
926}
927
928/* returns 1 if state becomes not ready */
929static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
930 grpc_closure **st) {
931 if (*st == CLOSURE_READY) {
932 /* duplicate ready ==> ignore */
933 return 0;
934 } else if (*st == CLOSURE_NOT_READY) {
935 /* not ready, and not waiting ==> flag ready */
936 *st = CLOSURE_READY;
937 return 0;
938 } else {
939 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700940 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700941 *st = CLOSURE_NOT_READY;
942 return 1;
943 }
944}
945
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700946static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
947 grpc_fd *fd) {
948 grpc_pollset *notifier = NULL;
949
950 gpr_mu_lock(&fd->mu);
951 notifier = fd->read_notifier_pollset;
952 gpr_mu_unlock(&fd->mu);
953
954 return notifier;
955}
956
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -0700957static bool fd_is_shutdown(grpc_fd *fd) {
958 gpr_mu_lock(&fd->mu);
959 const bool r = fd->shutdown;
960 gpr_mu_unlock(&fd->mu);
961 return r;
962}
963
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700964/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700965static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
966 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700967 /* Do the actual shutdown only once */
968 if (!fd->shutdown) {
969 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700970
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700971 shutdown(fd->fd, SHUT_RDWR);
972 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
973 at this point, the closures would be called with 'success = false' */
974 set_ready_locked(exec_ctx, fd, &fd->read_closure);
975 set_ready_locked(exec_ctx, fd, &fd->write_closure);
976 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700977 gpr_mu_unlock(&fd->mu);
978}
979
980static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
981 grpc_closure *closure) {
982 gpr_mu_lock(&fd->mu);
983 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
984 gpr_mu_unlock(&fd->mu);
985}
986
987static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
988 grpc_closure *closure) {
989 gpr_mu_lock(&fd->mu);
990 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
991 gpr_mu_unlock(&fd->mu);
992}
993
994/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700995 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700996 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -0700997GPR_TLS_DECL(g_current_thread_pollset);
998GPR_TLS_DECL(g_current_thread_worker);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700999
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001000static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001001#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001002 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001003#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001004}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001005
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001006static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001007
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001008/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001009static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001010 gpr_tls_init(&g_current_thread_pollset);
1011 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001012 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001013 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001014}
1015
1016static void pollset_global_shutdown(void) {
1017 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001018 gpr_tls_destroy(&g_current_thread_pollset);
1019 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001020}
1021
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001022static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1023 grpc_error *err = GRPC_ERROR_NONE;
1024 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1025 if (err_num != 0) {
1026 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1027 }
1028 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001029}
1030
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001031/* Return 1 if the pollset has active threads in pollset_work (pollset must
1032 * be locked) */
1033static int pollset_has_workers(grpc_pollset *p) {
1034 return p->root_worker.next != &p->root_worker;
1035}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001036
1037static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1038 worker->prev->next = worker->next;
1039 worker->next->prev = worker->prev;
1040}
1041
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001042static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1043 if (pollset_has_workers(p)) {
1044 grpc_pollset_worker *w = p->root_worker.next;
1045 remove_worker(p, w);
1046 return w;
1047 } else {
1048 return NULL;
1049 }
1050}
1051
1052static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1053 worker->next = &p->root_worker;
1054 worker->prev = worker->next->prev;
1055 worker->prev->next = worker->next->prev = worker;
1056}
1057
1058static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1059 worker->prev = &p->root_worker;
1060 worker->next = worker->prev->next;
1061 worker->prev->next = worker->next->prev = worker;
1062}
1063
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001064static void kick_append_error(grpc_error **composite, grpc_error *error) {
1065 if (error == GRPC_ERROR_NONE) return;
1066 if (*composite == GRPC_ERROR_NONE) {
1067 *composite = GRPC_ERROR_CREATE("Kick Failure");
1068 }
1069 *composite = grpc_error_add_child(*composite, error);
1070}
1071
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001072/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001073static grpc_error *pollset_kick(grpc_pollset *p,
1074 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001075 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001076 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001077
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001078 grpc_pollset_worker *worker = specific_worker;
1079 if (worker != NULL) {
1080 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001081 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001082 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001083 for (worker = p->root_worker.next; worker != &p->root_worker;
1084 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001085 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001086 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001087 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001088 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001089 } else {
1090 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001091 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001092 GPR_TIMER_END("pollset_kick.broadcast", 0);
1093 } else {
1094 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001095 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001096 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001097 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001098 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001099 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1100 /* Since worker == NULL, it means that we can kick "any" worker on this
1101 pollset 'p'. If 'p' happens to be the same pollset this thread is
1102 currently polling (i.e in pollset_work() function), then there is no need
1103 to kick any other worker since the current thread can just absorb the
1104 kick. This is the reason why we enter this case only when
1105 g_current_thread_pollset is != p */
1106
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001107 GPR_TIMER_MARK("kick_anonymous", 0);
1108 worker = pop_front_worker(p);
1109 if (worker != NULL) {
1110 GPR_TIMER_MARK("finally_kick", 0);
1111 push_back_worker(p, worker);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001112 kick_append_error(&error, pollset_worker_kick(worker));
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001113 } else {
1114 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001115 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001116 }
1117 }
1118
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001119 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001120 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1121 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001122}
1123
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001124static grpc_error *kick_poller(void) {
1125 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1126}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001127
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001128static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1129 gpr_mu_init(&pollset->mu);
1130 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001131
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001132 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001133 pollset->kicked_without_pollers = false;
1134
1135 pollset->shutting_down = false;
1136 pollset->finish_shutdown_called = false;
1137 pollset->shutdown_done = NULL;
1138
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001139 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001140}
1141
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001142/* Convert a timespec to milliseconds:
1143 - Very small or negative poll times are clamped to zero to do a non-blocking
1144 poll (which becomes spin polling)
1145 - Other small values are rounded up to one millisecond
1146 - Longer than a millisecond polls are rounded up to the next nearest
1147 millisecond to avoid spinning
1148 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001149static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1150 gpr_timespec now) {
1151 gpr_timespec timeout;
1152 static const int64_t max_spin_polling_us = 10;
1153 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1154 return -1;
1155 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001156
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001157 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1158 max_spin_polling_us,
1159 GPR_TIMESPAN))) <= 0) {
1160 return 0;
1161 }
1162 timeout = gpr_time_sub(deadline, now);
1163 return gpr_time_to_millis(gpr_time_add(
1164 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1165}
1166
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001167static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1168 grpc_pollset *notifier) {
1169 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001170 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001171 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1172 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001173 gpr_mu_unlock(&fd->mu);
1174}
1175
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001176static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001177 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1178 gpr_mu_lock(&fd->mu);
1179 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1180 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001181}
1182
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001183static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001184 if (ps->polling_island != NULL) {
1185 PI_UNREF(ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001186 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001187 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001188}
1189
1190static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1191 grpc_pollset *pollset) {
1192 /* The pollset cannot have any workers if we are at this stage */
1193 GPR_ASSERT(!pollset_has_workers(pollset));
1194
1195 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001196
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001197 /* Release the ref and set pollset->polling_island to NULL */
1198 pollset_release_polling_island(pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001199 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001200}
1201
1202/* pollset->mu lock must be held by the caller before calling this */
1203static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1204 grpc_closure *closure) {
1205 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1206 GPR_ASSERT(!pollset->shutting_down);
1207 pollset->shutting_down = true;
1208 pollset->shutdown_done = closure;
1209 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1210
1211 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1212 because it would release the underlying polling island. In such a case, we
1213 let the last worker call finish_shutdown_locked() from pollset_work() */
1214 if (!pollset_has_workers(pollset)) {
1215 GPR_ASSERT(!pollset->finish_shutdown_called);
1216 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1217 finish_shutdown_locked(exec_ctx, pollset);
1218 }
1219 GPR_TIMER_END("pollset_shutdown", 0);
1220}
1221
1222/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1223 * than destroying the mutexes, there is nothing special that needs to be done
1224 * here */
1225static void pollset_destroy(grpc_pollset *pollset) {
1226 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001227 gpr_mu_destroy(&pollset->mu);
1228}
1229
1230static void pollset_reset(grpc_pollset *pollset) {
1231 GPR_ASSERT(pollset->shutting_down);
1232 GPR_ASSERT(!pollset_has_workers(pollset));
1233 pollset->shutting_down = false;
1234 pollset->finish_shutdown_called = false;
1235 pollset->kicked_without_pollers = false;
1236 pollset->shutdown_done = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001237 pollset_release_polling_island(pollset, "ps_reset");
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001238}
1239
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001240static void work_combine_error(grpc_error **composite, grpc_error *error) {
1241 if (error == GRPC_ERROR_NONE) return;
1242 if (*composite == GRPC_ERROR_NONE) {
1243 *composite = GRPC_ERROR_CREATE("pollset_work");
1244 }
1245 *composite = grpc_error_add_child(*composite, error);
1246}
1247
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001248#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001249static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1250 grpc_pollset *pollset,
1251 int timeout_ms, sigset_t *sig_mask) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001252 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001253 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001254 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001255 polling_island *pi = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001256 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001257 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1258
1259 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001260 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001261
1262 Since epoll_fd is immutable, we can read it without obtaining the polling
1263 island lock. There is however a possibility that the polling island (from
1264 which we got the epoll_fd) got merged with another island while we are
1265 in this function. This is still okay because in such a case, we will wakeup
1266 right-away from epoll_wait() and pick up the latest polling_island the next
1267 this function (i.e pollset_work_and_unlock()) is called.
1268 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001269
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001270 if (pollset->polling_island == NULL) {
1271 pollset->polling_island = polling_island_create(NULL);
1272 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001273 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001274
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001275 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001276 epoll_fd = pi->epoll_fd;
1277
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001278 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001279 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001280 if (pollset->polling_island != pi) {
1281 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1282 polling island to be deleted */
1283 PI_ADD_REF(pi, "ps");
1284 PI_UNREF(pollset->polling_island, "ps");
1285 pollset->polling_island = pi;
1286 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001287
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001288 /* Add an extra ref so that the island does not get destroyed (which means
1289 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1290 epoll_fd */
1291 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001292 gpr_mu_unlock(&pollset->mu);
1293
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001294 do {
1295 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1296 sig_mask);
1297 if (ep_rv < 0) {
1298 if (errno != EINTR) {
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001299 gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001300 work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_pwait"));
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001301 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001302 /* We were interrupted. Save an interation by doing a zero timeout
1303 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001304 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001305 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001306 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001307
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001308#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001309 /* See the definition of g_poll_sync for more details */
1310 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001311#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001312
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001313 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001314 void *data_ptr = ep_ev[i].data.ptr;
1315 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001316 work_combine_error(
1317 &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001318 } else if (data_ptr == &polling_island_wakeup_fd) {
1319 /* This means that our polling island is merged with a different
1320 island. We do not have to do anything here since the subsequent call
1321 to the function pollset_work_and_unlock() will pick up the correct
1322 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001323 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001324 grpc_fd *fd = data_ptr;
1325 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1326 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1327 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001328 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001329 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001330 }
1331 if (write_ev || cancel) {
1332 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001333 }
1334 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001335 }
1336 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001337
1338 GPR_ASSERT(pi != NULL);
1339
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001340 /* Before leaving, release the extra ref we added to the polling island. It
1341 is important to use "pi" here (i.e our old copy of pollset->polling_island
1342 that we got before releasing the polling island lock). This is because
1343 pollset->polling_island pointer might get udpated in other parts of the
1344 code when there is an island merge while we are doing epoll_wait() above */
1345 PI_UNREF(pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001346
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001347 GPR_TIMER_END("pollset_work_and_unlock", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001348 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001349}
1350
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001351/* pollset->mu lock must be held by the caller before calling this.
1352 The function pollset_work() may temporarily release the lock (pollset->mu)
1353 during the course of its execution but it will always re-acquire the lock and
1354 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001355static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1356 grpc_pollset_worker **worker_hdl,
1357 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001358 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001359 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001360 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1361
1362 sigset_t new_mask;
1363 sigset_t orig_mask;
1364
1365 grpc_pollset_worker worker;
1366 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001367 worker.pt_id = pthread_self();
1368
1369 *worker_hdl = &worker;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001370 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1371 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001372
1373 if (pollset->kicked_without_pollers) {
1374 /* If the pollset was kicked without pollers, pretend that the current
1375 worker got the kick and skip polling. A kick indicates that there is some
1376 work that needs attention like an event on the completion queue or an
1377 alarm */
1378 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1379 pollset->kicked_without_pollers = 0;
1380 } else if (!pollset->shutting_down) {
1381 sigemptyset(&new_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001382 sigaddset(&new_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001383 pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001384 sigdelset(&orig_mask, grpc_wakeup_signal);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001385
1386 push_front_worker(pollset, &worker);
1387
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001388 error = pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001389 grpc_exec_ctx_flush(exec_ctx);
1390
1391 gpr_mu_lock(&pollset->mu);
1392 remove_worker(pollset, &worker);
1393 }
1394
1395 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1396 false at this point) and the pollset is shutting down, we may have to
1397 finish the shutdown process by calling finish_shutdown_locked().
1398 See pollset_shutdown() for more details.
1399
1400 Note: Continuing to access pollset here is safe; it is the caller's
1401 responsibility to not destroy a pollset when it has outstanding calls to
1402 pollset_work() */
1403 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1404 !pollset->finish_shutdown_called) {
1405 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1406 finish_shutdown_locked(exec_ctx, pollset);
1407
1408 gpr_mu_unlock(&pollset->mu);
1409 grpc_exec_ctx_flush(exec_ctx);
1410 gpr_mu_lock(&pollset->mu);
1411 }
1412
1413 *worker_hdl = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001414 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1415 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001416 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001417 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1418 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001419}
1420
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001421static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1422 grpc_fd *fd) {
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001423 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001424 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001425
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001426 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001427
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001428 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1429 * equal, do nothing.
1430 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1431 * a new polling island (with a refcount of 2) and make the polling_island
1432 * fields in both fd and pollset to point to the new island
1433 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1434 * the NULL polling_island field to point to the non-NULL polling_island
1435 * field (ensure that the refcount on the polling island is incremented by
1436 * 1 to account for the newly added reference)
1437 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1438 * and different, merge both the polling islands and update the
1439 * polling_island fields in both fd and pollset to point to the merged
1440 * polling island.
1441 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001442 if (fd->polling_island == pollset->polling_island) {
1443 pi_new = fd->polling_island;
1444 if (pi_new == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001445 pi_new = polling_island_create(fd);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001446 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001447 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001448 pi_new = polling_island_lock(pollset->polling_island);
1449 polling_island_add_fds_locked(pi_new, &fd, 1, true);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001450 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001451 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001452 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001453 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001454 } else {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001455 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001456 }
1457
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001458 if (fd->polling_island != pi_new) {
1459 PI_ADD_REF(pi_new, "fd");
1460 if (fd->polling_island != NULL) {
1461 PI_UNREF(fd->polling_island, "fd");
1462 }
1463 fd->polling_island = pi_new;
1464 }
1465
1466 if (pollset->polling_island != pi_new) {
1467 PI_ADD_REF(pi_new, "ps");
1468 if (pollset->polling_island != NULL) {
1469 PI_UNREF(pollset->polling_island, "ps");
1470 }
1471 pollset->polling_island = pi_new;
1472 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001473
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001474 gpr_mu_unlock(&fd->pi_mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001475 gpr_mu_unlock(&pollset->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001476}
1477
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001478/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001479 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001480 */
1481
1482static grpc_pollset_set *pollset_set_create(void) {
1483 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1484 memset(pollset_set, 0, sizeof(*pollset_set));
1485 gpr_mu_init(&pollset_set->mu);
1486 return pollset_set;
1487}
1488
1489static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1490 size_t i;
1491 gpr_mu_destroy(&pollset_set->mu);
1492 for (i = 0; i < pollset_set->fd_count; i++) {
1493 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1494 }
1495 gpr_free(pollset_set->pollsets);
1496 gpr_free(pollset_set->pollset_sets);
1497 gpr_free(pollset_set->fds);
1498 gpr_free(pollset_set);
1499}
1500
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001501static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1502 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1503 size_t i;
1504 gpr_mu_lock(&pollset_set->mu);
1505 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1506 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1507 pollset_set->fds = gpr_realloc(
1508 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1509 }
1510 GRPC_FD_REF(fd, "pollset_set");
1511 pollset_set->fds[pollset_set->fd_count++] = fd;
1512 for (i = 0; i < pollset_set->pollset_count; i++) {
1513 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1514 }
1515 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1516 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1517 }
1518 gpr_mu_unlock(&pollset_set->mu);
1519}
1520
1521static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1522 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1523 size_t i;
1524 gpr_mu_lock(&pollset_set->mu);
1525 for (i = 0; i < pollset_set->fd_count; i++) {
1526 if (pollset_set->fds[i] == fd) {
1527 pollset_set->fd_count--;
1528 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1529 pollset_set->fds[pollset_set->fd_count]);
1530 GRPC_FD_UNREF(fd, "pollset_set");
1531 break;
1532 }
1533 }
1534 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1535 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1536 }
1537 gpr_mu_unlock(&pollset_set->mu);
1538}
1539
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001540static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1541 grpc_pollset_set *pollset_set,
1542 grpc_pollset *pollset) {
1543 size_t i, j;
1544 gpr_mu_lock(&pollset_set->mu);
1545 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1546 pollset_set->pollset_capacity =
1547 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1548 pollset_set->pollsets =
1549 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1550 sizeof(*pollset_set->pollsets));
1551 }
1552 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1553 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1554 if (fd_is_orphaned(pollset_set->fds[i])) {
1555 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1556 } else {
1557 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1558 pollset_set->fds[j++] = pollset_set->fds[i];
1559 }
1560 }
1561 pollset_set->fd_count = j;
1562 gpr_mu_unlock(&pollset_set->mu);
1563}
1564
1565static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1566 grpc_pollset_set *pollset_set,
1567 grpc_pollset *pollset) {
1568 size_t i;
1569 gpr_mu_lock(&pollset_set->mu);
1570 for (i = 0; i < pollset_set->pollset_count; i++) {
1571 if (pollset_set->pollsets[i] == pollset) {
1572 pollset_set->pollset_count--;
1573 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1574 pollset_set->pollsets[pollset_set->pollset_count]);
1575 break;
1576 }
1577 }
1578 gpr_mu_unlock(&pollset_set->mu);
1579}
1580
1581static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1582 grpc_pollset_set *bag,
1583 grpc_pollset_set *item) {
1584 size_t i, j;
1585 gpr_mu_lock(&bag->mu);
1586 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1587 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1588 bag->pollset_sets =
1589 gpr_realloc(bag->pollset_sets,
1590 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1591 }
1592 bag->pollset_sets[bag->pollset_set_count++] = item;
1593 for (i = 0, j = 0; i < bag->fd_count; i++) {
1594 if (fd_is_orphaned(bag->fds[i])) {
1595 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1596 } else {
1597 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1598 bag->fds[j++] = bag->fds[i];
1599 }
1600 }
1601 bag->fd_count = j;
1602 gpr_mu_unlock(&bag->mu);
1603}
1604
1605static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1606 grpc_pollset_set *bag,
1607 grpc_pollset_set *item) {
1608 size_t i;
1609 gpr_mu_lock(&bag->mu);
1610 for (i = 0; i < bag->pollset_set_count; i++) {
1611 if (bag->pollset_sets[i] == item) {
1612 bag->pollset_set_count--;
1613 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1614 bag->pollset_sets[bag->pollset_set_count]);
1615 break;
1616 }
1617 }
1618 gpr_mu_unlock(&bag->mu);
1619}
1620
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001621/* Test helper functions
1622 * */
1623void *grpc_fd_get_polling_island(grpc_fd *fd) {
1624 polling_island *pi;
1625
1626 gpr_mu_lock(&fd->pi_mu);
1627 pi = fd->polling_island;
1628 gpr_mu_unlock(&fd->pi_mu);
1629
1630 return pi;
1631}
1632
1633void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1634 polling_island *pi;
1635
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001636 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001637 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001638 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001639
1640 return pi;
1641}
1642
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001643bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001644 polling_island *p1 = p;
1645 polling_island *p2 = q;
1646
1647 polling_island_lock_pair(&p1, &p2);
1648 if (p1 == p2) {
1649 gpr_mu_unlock(&p1->mu);
1650 } else {
1651 gpr_mu_unlock(&p1->mu);
1652 gpr_mu_unlock(&p2->mu);
1653 }
1654
1655 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001656}
1657
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001658/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001659 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001660 */
1661
1662static void shutdown_engine(void) {
1663 fd_global_shutdown();
1664 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001665 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001666}
1667
1668static const grpc_event_engine_vtable vtable = {
1669 .pollset_size = sizeof(grpc_pollset),
1670
1671 .fd_create = fd_create,
1672 .fd_wrapped_fd = fd_wrapped_fd,
1673 .fd_orphan = fd_orphan,
1674 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001675 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001676 .fd_notify_on_read = fd_notify_on_read,
1677 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001678 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001679
1680 .pollset_init = pollset_init,
1681 .pollset_shutdown = pollset_shutdown,
1682 .pollset_reset = pollset_reset,
1683 .pollset_destroy = pollset_destroy,
1684 .pollset_work = pollset_work,
1685 .pollset_kick = pollset_kick,
1686 .pollset_add_fd = pollset_add_fd,
1687
1688 .pollset_set_create = pollset_set_create,
1689 .pollset_set_destroy = pollset_set_destroy,
1690 .pollset_set_add_pollset = pollset_set_add_pollset,
1691 .pollset_set_del_pollset = pollset_set_del_pollset,
1692 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1693 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1694 .pollset_set_add_fd = pollset_set_add_fd,
1695 .pollset_set_del_fd = pollset_set_del_fd,
1696
1697 .kick_poller = kick_poller,
1698
1699 .shutdown_engine = shutdown_engine,
1700};
1701
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001702/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1703 * Create a dummy epoll_fd to make sure epoll support is available */
1704static bool is_epoll_available() {
1705 int fd = epoll_create1(EPOLL_CLOEXEC);
1706 if (fd < 0) {
1707 gpr_log(
1708 GPR_ERROR,
1709 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1710 fd);
1711 return false;
1712 }
1713 close(fd);
1714 return true;
1715}
1716
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001717const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001718 /* If use of signals is disabled, we cannot use epoll engine*/
1719 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1720 return NULL;
1721 }
1722
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001723 if (!is_epoll_available()) {
1724 return NULL;
1725 }
1726
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001727 if (!is_grpc_wakeup_signal_initialized) {
1728 grpc_use_signal(SIGRTMIN + 2);
1729 }
1730
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001731 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001732
1733 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1734 return NULL;
1735 }
1736
1737 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1738 polling_island_global_init())) {
1739 return NULL;
1740 }
1741
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001742 return &vtable;
1743}
1744
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001745#else /* defined(GPR_LINUX_EPOLL) */
1746#if defined(GPR_POSIX_SOCKET)
1747#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001748/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1749 * NULL */
1750const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001751#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001752
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001753void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001754#endif /* !defined(GPR_LINUX_EPOLL) */