blob: 099f8e85b5b3aef6128ca2e0b9aaebb105ff9330 [file] [log] [blame]
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070034#include <grpc/grpc_posix.h>
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070035#include <grpc/support/port_platform.h>
36
Sree Kuchibhotla76a07952016-06-22 15:09:06 -070037/* This polling engine is only relevant on linux kernels supporting epoll() */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070038#ifdef GPR_LINUX_EPOLL
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070039
Sree Kuchibhotla4c11a202016-06-06 09:23:25 -070040#include "src/core/lib/iomgr/ev_epoll_linux.h"
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070041
42#include <assert.h>
43#include <errno.h>
44#include <poll.h>
45#include <signal.h>
46#include <string.h>
47#include <sys/epoll.h>
48#include <sys/socket.h>
49#include <unistd.h>
50
51#include <grpc/support/alloc.h>
52#include <grpc/support/log.h>
53#include <grpc/support/string_util.h>
54#include <grpc/support/tls.h>
55#include <grpc/support/useful.h>
56
57#include "src/core/lib/iomgr/ev_posix.h"
58#include "src/core/lib/iomgr/iomgr_internal.h"
59#include "src/core/lib/iomgr/wakeup_fd_posix.h"
60#include "src/core/lib/profiling/timers.h"
61#include "src/core/lib/support/block_annotate.h"
62
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070063static int grpc_wakeup_signal = -1;
64static bool is_grpc_wakeup_signal_initialized = false;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070065
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -070066/* Implements the function defined in grpc_posix.h. This function might be
67 * called before even calling grpc_init() to set either a different signal to
68 * use. If signum == -1, then the use of signals is disabled */
69void grpc_use_signal(int signum) {
70 grpc_wakeup_signal = signum;
71 is_grpc_wakeup_signal_initialized = true;
72
73 if (grpc_wakeup_signal < 0) {
74 gpr_log(GPR_INFO,
75 "Use of signals is disabled. Epoll engine will not be used");
76 } else {
77 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
78 grpc_wakeup_signal);
79 }
80}
81
82struct polling_island;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -070083
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070084/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070085 * Fd Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070086 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070087struct grpc_fd {
88 int fd;
89 /* refst format:
Sree Kuchibhotla5098f912016-05-31 10:58:17 -070090 bit 0 : 1=Active / 0=Orphaned
91 bits 1-n : refcount
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070092 Ref/Unref by two to avoid altering the orphaned bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -070093 gpr_atm refst;
94
95 gpr_mu mu;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -070096
97 /* Indicates that the fd is shutdown and that any pending read/write closures
98 should fail */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -070099 bool shutdown;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700100
101 /* The fd is either closed or we relinquished control of it. In either cases,
102 this indicates that the 'fd' on this structure is no longer valid */
103 bool orphaned;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700104
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700105 /* TODO: sreek - Move this to a lockfree implementation */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700106 grpc_closure *read_closure;
107 grpc_closure *write_closure;
108
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700109 /* The polling island to which this fd belongs to and the mutex protecting the
110 the field */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700111 gpr_mu pi_mu;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700112 struct polling_island *polling_island;
113
114 struct grpc_fd *freelist_next;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700115 grpc_closure *on_done_closure;
116
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700117 /* The pollset that last noticed that the fd is readable */
118 grpc_pollset *read_notifier_pollset;
119
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700120 grpc_iomgr_object iomgr_object;
121};
122
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700123/* Reference counting for fds */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700124// #define GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700125#ifdef GRPC_FD_REF_COUNT_DEBUG
126static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
127static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
128 int line);
129#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
130#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
131#else
132static void fd_ref(grpc_fd *fd);
133static void fd_unref(grpc_fd *fd);
134#define GRPC_FD_REF(fd, reason) fd_ref(fd)
135#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
136#endif
137
138static void fd_global_init(void);
139static void fd_global_shutdown(void);
140
141#define CLOSURE_NOT_READY ((grpc_closure *)0)
142#define CLOSURE_READY ((grpc_closure *)1)
143
144/*******************************************************************************
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700145 * Polling island Declarations
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700146 */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700147
148// #define GRPC_PI_REF_COUNT_DEBUG
149#ifdef GRPC_PI_REF_COUNT_DEBUG
150
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700151#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
152#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__)
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700153
154#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
155
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700156#define PI_ADD_REF(p, r) pi_add_ref((p))
157#define PI_UNREF(p, r) pi_unref((p))
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700158
159#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
160
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700161typedef struct polling_island {
162 gpr_mu mu;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700163 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
164 the refcount.
165 Once the ref count becomes zero, this structure is destroyed which means
166 we should ensure that there is never a scenario where a PI_ADD_REF() is
167 racing with a PI_UNREF() that just made the ref_count zero. */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700168 gpr_refcount ref_count;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700169
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700170 /* Pointer to the polling_island this merged into.
171 * merged_to value is only set once in polling_island's lifetime (and that too
172 * only if the island is merged with another island). Because of this, we can
173 * use gpr_atm type here so that we can do atomic access on this and reduce
174 * lock contention on 'mu' mutex.
175 *
176 * Note that if this field is not NULL (i.e not 0), all the remaining fields
177 * (except mu and ref_count) are invalid and must be ignored. */
178 gpr_atm merged_to;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700179
180 /* The fd of the underlying epoll set */
181 int epoll_fd;
182
183 /* The file descriptors in the epoll set */
184 size_t fd_cnt;
185 size_t fd_capacity;
186 grpc_fd **fds;
187
188 /* Polling islands that are no longer needed are kept in a freelist so that
189 they can be reused. This field points to the next polling island in the
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700190 free list */
191 struct polling_island *next_free;
192} polling_island;
193
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700194/*******************************************************************************
195 * Pollset Declarations
196 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700197struct grpc_pollset_worker {
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700198 pthread_t pt_id; /* Thread id of this worker */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700199 struct grpc_pollset_worker *next;
200 struct grpc_pollset_worker *prev;
201};
202
203struct grpc_pollset {
204 gpr_mu mu;
205 grpc_pollset_worker root_worker;
206 bool kicked_without_pollers;
207
208 bool shutting_down; /* Is the pollset shutting down ? */
209 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
210 grpc_closure *shutdown_done; /* Called after after shutdown is complete */
211
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700212 /* The polling island to which this pollset belongs to */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700213 struct polling_island *polling_island;
214};
215
216/*******************************************************************************
217 * Pollset-set Declarations
218 */
Sree Kuchibhotla3dbf4d62016-06-08 16:26:45 -0700219/* TODO: sreek - Change the pollset_set implementation such that a pollset_set
220 * directly points to a polling_island (and adding an fd/pollset/pollset_set to
221 * the current pollset_set would result in polling island merges. This would
222 * remove the need to maintain fd_count here. This will also significantly
223 * simplify the grpc_fd structure since we would no longer need to explicitly
224 * maintain the orphaned state */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700225struct grpc_pollset_set {
226 gpr_mu mu;
227
228 size_t pollset_count;
229 size_t pollset_capacity;
230 grpc_pollset **pollsets;
231
232 size_t pollset_set_count;
233 size_t pollset_set_capacity;
234 struct grpc_pollset_set **pollset_sets;
235
236 size_t fd_count;
237 size_t fd_capacity;
238 grpc_fd **fds;
239};
240
241/*******************************************************************************
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700242 * Common helpers
243 */
244
245static void append_error(grpc_error **composite, grpc_error *error,
246 const char *desc) {
247 if (error == GRPC_ERROR_NONE) return;
248 if (*composite == GRPC_ERROR_NONE) {
249 *composite = GRPC_ERROR_CREATE(desc);
250 }
251 *composite = grpc_error_add_child(*composite, error);
252}
253
254/*******************************************************************************
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700255 * Polling island Definitions
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700256 */
257
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700258/* The wakeup fd that is used to wake up all threads in a Polling island. This
259 is useful in the polling island merge operation where we need to wakeup all
260 the threads currently polling the smaller polling island (so that they can
261 start polling the new/merged polling island)
262
263 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
264 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
265static grpc_wakeup_fd polling_island_wakeup_fd;
266
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700267/* Polling island freelist */
268static gpr_mu g_pi_freelist_mu;
269static polling_island *g_pi_freelist = NULL;
270
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700271static void polling_island_delete(); /* Forward declaration */
272
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700273#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700274/* Currently TSAN may incorrectly flag data races between epoll_ctl and
275 epoll_wait for any grpc_fd structs that are added to the epoll set via
276 epoll_ctl and are returned (within a very short window) via epoll_wait().
277
278 To work-around this race, we establish a happens-before relation between
279 the code just-before epoll_ctl() and the code after epoll_wait() by using
280 this atomic */
281gpr_atm g_epoll_sync;
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700282#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700283
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700284#ifdef GRPC_PI_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700285void pi_add_ref(polling_island *pi);
286void pi_unref(polling_island *pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700287
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700288void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) {
289 long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
290 pi_add_ref(pi);
291 gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
292 (void *)pi, old_cnt, old_cnt + 1, reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700293}
294
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700295void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) {
296 long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count));
297 pi_unref(pi);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700298 gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700299 (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700300}
301#endif
302
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700303void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700304
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700305void pi_unref(polling_island *pi) {
306 /* If ref count went to zero, delete the polling island.
307 Note that this deletion not be done under a lock. Once the ref count goes
308 to zero, we are guaranteed that no one else holds a reference to the
309 polling island (and that there is no racing pi_add_ref() call either).
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700310
311 Also, if we are deleting the polling island and the merged_to field is
312 non-empty, we should remove a ref to the merged_to polling island
313 */
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700314 if (gpr_unref(&pi->ref_count)) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700315 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
316 polling_island_delete(pi);
317 if (next != NULL) {
318 PI_UNREF(next, "pi_delete"); /* Recursive call */
319 }
320 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700321}
322
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700323/* The caller is expected to hold pi->mu lock before calling this function */
324static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700325 size_t fd_count, bool add_fd_refs,
326 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700327 int err;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700328 size_t i;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700329 struct epoll_event ev;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700330 char *err_msg;
331 const char *err_desc = "polling_island_add_fds";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700332
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700333#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700334 /* See the definition of g_epoll_sync for more context */
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700335 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -0700336#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700337
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700338 for (i = 0; i < fd_count; i++) {
339 ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
340 ev.data.ptr = fds[i];
341 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700342
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700343 if (err < 0) {
344 if (errno != EEXIST) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700345 gpr_asprintf(
346 &err_msg,
347 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
348 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
349 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
350 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700351 }
352
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700353 continue;
354 }
355
356 if (pi->fd_cnt == pi->fd_capacity) {
357 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
358 pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
359 }
360
361 pi->fds[pi->fd_cnt++] = fds[i];
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700362 if (add_fd_refs) {
363 GRPC_FD_REF(fds[i], "polling_island");
364 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700365 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700366}
367
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700368/* The caller is expected to hold pi->mu before calling this */
369static void polling_island_add_wakeup_fd_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700370 grpc_wakeup_fd *wakeup_fd,
371 grpc_error **error) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700372 struct epoll_event ev;
373 int err;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700374 char *err_msg;
375 const char *err_desc = "polling_island_add_wakeup_fd";
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700376
377 ev.events = (uint32_t)(EPOLLIN | EPOLLET);
378 ev.data.ptr = wakeup_fd;
379 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
380 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700381 if (err < 0 && errno != EEXIST) {
382 gpr_asprintf(&err_msg,
383 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
384 "error: %d (%s)",
385 pi->epoll_fd,
386 GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
387 strerror(errno));
388 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
389 gpr_free(err_msg);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700390 }
391}
392
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700393/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700394static void polling_island_remove_all_fds_locked(polling_island *pi,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700395 bool remove_fd_refs,
396 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700397 int err;
398 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700399 char *err_msg;
400 const char *err_desc = "polling_island_remove_fds";
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700401
402 for (i = 0; i < pi->fd_cnt; i++) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700403 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700404 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700405 gpr_asprintf(&err_msg,
406 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
407 "error: %d (%s)",
408 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
409 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
410 gpr_free(err_msg);
Sree Kuchibhotlaad162ba2016-06-06 16:23:37 -0700411 }
412
413 if (remove_fd_refs) {
414 GRPC_FD_UNREF(pi->fds[i], "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700415 }
416 }
417
418 pi->fd_cnt = 0;
419}
420
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700421/* The caller is expected to hold pi->mu lock before calling this function */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700422static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700423 bool is_fd_closed,
424 grpc_error **error) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700425 int err;
426 size_t i;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700427 char *err_msg;
428 const char *err_desc = "polling_island_remove_fd";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700429
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700430 /* If fd is already closed, then it would have been automatically been removed
431 from the epoll set */
432 if (!is_fd_closed) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700433 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
434 if (err < 0 && errno != ENOENT) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700435 gpr_asprintf(
436 &err_msg,
437 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
438 pi->epoll_fd, fd->fd, errno, strerror(errno));
439 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
440 gpr_free(err_msg);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700441 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700442 }
443
444 for (i = 0; i < pi->fd_cnt; i++) {
445 if (pi->fds[i] == fd) {
446 pi->fds[i] = pi->fds[--pi->fd_cnt];
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700447 GRPC_FD_UNREF(fd, "polling_island");
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700448 break;
449 }
450 }
451}
452
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700453/* Might return NULL in case of an error */
454static polling_island *polling_island_create(grpc_fd *initial_fd,
455 grpc_error **error) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700456 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700457 char *err_msg;
458 const char *err_desc = "polling_island_create";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700459
460 /* Try to get one from the polling island freelist */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700461 gpr_mu_lock(&g_pi_freelist_mu);
462 if (g_pi_freelist != NULL) {
463 pi = g_pi_freelist;
464 g_pi_freelist = g_pi_freelist->next_free;
465 pi->next_free = NULL;
466 }
467 gpr_mu_unlock(&g_pi_freelist_mu);
468
469 /* Create new polling island if we could not get one from the free list */
470 if (pi == NULL) {
471 pi = gpr_malloc(sizeof(*pi));
472 gpr_mu_init(&pi->mu);
473 pi->fd_cnt = 0;
474 pi->fd_capacity = 0;
475 pi->fds = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700476 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700477
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700478 gpr_ref_init(&pi->ref_count, 0);
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700479 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700480
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700481 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
Sree Kuchibhotla41622a82016-06-13 16:43:14 -0700482
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700483 if (pi->epoll_fd < 0) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700484 gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno,
485 strerror(errno));
486 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
487 gpr_free(err_msg);
488 } else {
489 polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
490 pi->next_free = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700491
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700492 if (initial_fd != NULL) {
493 /* Lock the polling island here just in case we got this structure from
494 the freelist and the polling island lock was not released yet (by the
495 code that adds the polling island to the freelist) */
496 gpr_mu_lock(&pi->mu);
497 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
498 gpr_mu_unlock(&pi->mu);
499 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700500 }
501
502 return pi;
503}
504
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700505static void polling_island_delete(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700506 GPR_ASSERT(pi->fd_cnt == 0);
507
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700508 gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700509
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700510 close(pi->epoll_fd);
511 pi->epoll_fd = -1;
512
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700513 gpr_mu_lock(&g_pi_freelist_mu);
514 pi->next_free = g_pi_freelist;
515 g_pi_freelist = pi;
516 gpr_mu_unlock(&g_pi_freelist_mu);
517}
518
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700519/* Attempts to gets the last polling island in the linked list (liked by the
520 * 'merged_to' field). Since this does not lock the polling island, there are no
521 * guarantees that the island returned is the last island */
522static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
523 polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
524 while (next != NULL) {
525 pi = next;
526 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
527 }
528
529 return pi;
530}
531
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700532/* Gets the lock on the *latest* polling island i.e the last polling island in
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700533 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700534 returned polling island's mu.
535 Usage: To lock/unlock polling island "pi", do the following:
536 polling_island *pi_latest = polling_island_lock(pi);
537 ...
538 ... critical section ..
539 ...
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700540 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
541static polling_island *polling_island_lock(polling_island *pi) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700542 polling_island *next = NULL;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700543
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700544 while (true) {
545 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
546 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700547 /* Looks like 'pi' is the last node in the linked list but unless we check
548 this by holding the pi->mu lock, we cannot be sure (i.e without the
549 pi->mu lock, we don't prevent island merges).
550 To be absolutely sure, check once more by holding the pi->mu lock */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700551 gpr_mu_lock(&pi->mu);
552 next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
553 if (next == NULL) {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700554 /* pi is infact the last node and we have the pi->mu lock. we're done */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700555 break;
556 }
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700557
558 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
559 * isn't the lock we are interested in. Continue traversing the list */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700560 gpr_mu_unlock(&pi->mu);
561 }
562
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700563 pi = next;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700564 }
565
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700566 return pi;
567}
568
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700569/* Gets the lock on the *latest* polling islands in the linked lists pointed by
570 *p and *q (and also updates *p and *q to point to the latest polling islands)
571
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700572 This function is needed because calling the following block of code to obtain
573 locks on polling islands (*p and *q) is prone to deadlocks.
574 {
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700575 polling_island_lock(*p, true);
576 polling_island_lock(*q, true);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700577 }
578
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700579 Usage/example:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700580 polling_island *p1;
581 polling_island *p2;
582 ..
583 polling_island_lock_pair(&p1, &p2);
584 ..
585 .. Critical section with both p1 and p2 locked
586 ..
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700587 // Release locks: Always call polling_island_unlock_pair() to release locks
588 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700589*/
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700590static void polling_island_lock_pair(polling_island **p, polling_island **q) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700591 polling_island *pi_1 = *p;
592 polling_island *pi_2 = *q;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700593 polling_island *next_1 = NULL;
594 polling_island *next_2 = NULL;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700595
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700596 /* The algorithm is simple:
597 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
598 keep updating pi_1 and pi_2)
599 - Then obtain locks on the islands by following a lock order rule of
600 locking polling_island with lower address first
601 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
602 pointing to the same island. If that is the case, we can just call
603 polling_island_lock()
604 - After obtaining both the locks, double check that the polling islands
605 are still the last polling islands in their respective linked lists
606 (this is because there might have been polling island merges before
607 we got the lock)
608 - If the polling islands are the last islands, we are done. If not,
609 release the locks and continue the process from the first step */
610 while (true) {
611 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
612 while (next_1 != NULL) {
613 pi_1 = next_1;
614 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700615 }
616
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700617 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
618 while (next_2 != NULL) {
619 pi_2 = next_2;
620 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
621 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700622
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700623 if (pi_1 == pi_2) {
624 pi_1 = pi_2 = polling_island_lock(pi_1);
625 break;
626 }
627
628 if (pi_1 < pi_2) {
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700629 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700630 gpr_mu_lock(&pi_2->mu);
631 } else {
632 gpr_mu_lock(&pi_2->mu);
633 gpr_mu_lock(&pi_1->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700634 }
635
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700636 next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
637 next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
638 if (next_1 == NULL && next_2 == NULL) {
639 break;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700640 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700641
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700642 gpr_mu_unlock(&pi_1->mu);
643 gpr_mu_unlock(&pi_2->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700644 }
645
646 *p = pi_1;
647 *q = pi_2;
648}
649
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700650static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
651 if (p == q) {
652 gpr_mu_unlock(&p->mu);
653 } else {
654 gpr_mu_unlock(&p->mu);
655 gpr_mu_unlock(&q->mu);
656 }
657}
658
Sree Kuchibhotla229533b12016-06-21 20:42:52 -0700659static polling_island *polling_island_merge(polling_island *p,
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700660 polling_island *q,
661 grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700662 /* Get locks on both the polling islands */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700663 polling_island_lock_pair(&p, &q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700664
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700665 if (p != q) {
666 /* Make sure that p points to the polling island with fewer fds than q */
667 if (p->fd_cnt > q->fd_cnt) {
668 GPR_SWAP(polling_island *, p, q);
669 }
670
671 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
672 Note that the refcounts on the fds being moved will not change here.
673 This is why the last param in the following two functions is 'false') */
674 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
675 polling_island_remove_all_fds_locked(p, false, error);
676
677 /* Wakeup all the pollers (if any) on p so that they pickup this change */
678 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
679
680 /* Add the 'merged_to' link from p --> q */
681 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
682 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700683 }
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700684 /* else if p == q, nothing needs to be done */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700685
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700686 polling_island_unlock_pair(p, q);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700687
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700688 /* Return the merged polling island (Note that no merge would have happened
689 if p == q which is ok) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700690 return q;
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -0700691}
692
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700693static grpc_error *polling_island_global_init() {
694 grpc_error *error = GRPC_ERROR_NONE;
695
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700696 gpr_mu_init(&g_pi_freelist_mu);
697 g_pi_freelist = NULL;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700698
699 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
700 if (error == GRPC_ERROR_NONE) {
701 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
702 }
703
704 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700705}
706
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700707static void polling_island_global_shutdown() {
708 polling_island *next;
709 gpr_mu_lock(&g_pi_freelist_mu);
710 gpr_mu_unlock(&g_pi_freelist_mu);
711 while (g_pi_freelist != NULL) {
712 next = g_pi_freelist->next_free;
713 gpr_mu_destroy(&g_pi_freelist->mu);
714 gpr_free(g_pi_freelist->fds);
715 gpr_free(g_pi_freelist);
716 g_pi_freelist = next;
717 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700718 gpr_mu_destroy(&g_pi_freelist_mu);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -0700719
720 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
Sree Kuchibhotlad627c102016-06-06 15:49:32 -0700721}
722
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700723/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700724 * Fd Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700725 */
726
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700727/* We need to keep a freelist not because of any concerns of malloc performance
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700728 * but instead so that implementations with multiple threads in (for example)
729 * epoll_wait deal with the race between pollset removal and incoming poll
730 * notifications.
731 *
732 * The problem is that the poller ultimately holds a reference to this
733 * object, so it is very difficult to know when is safe to free it, at least
734 * without some expensive synchronization.
735 *
736 * If we keep the object freelisted, in the worst case losing this race just
737 * becomes a spurious read notification on a reused fd.
738 */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700739
740/* The alarm system needs to be able to wakeup 'some poller' sometimes
741 * (specifically when a new alarm needs to be triggered earlier than the next
742 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
743 * case occurs. */
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -0700744
745/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
746 * sure to wake up one polling thread (which can wake up other threads if
747 * needed) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700748grpc_wakeup_fd grpc_global_wakeup_fd;
749
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700750static grpc_fd *fd_freelist = NULL;
751static gpr_mu fd_freelist_mu;
752
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700753#ifdef GRPC_FD_REF_COUNT_DEBUG
754#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
755#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
756static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
757 int line) {
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700758 gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd,
759 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700760 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
761#else
762#define REF_BY(fd, n, reason) ref_by(fd, n)
763#define UNREF_BY(fd, n, reason) unref_by(fd, n)
764static void ref_by(grpc_fd *fd, int n) {
765#endif
766 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
767}
768
769#ifdef GRPC_FD_REF_COUNT_DEBUG
770static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
771 int line) {
772 gpr_atm old;
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700773 gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd,
774 (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst),
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700775 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
776#else
777static void unref_by(grpc_fd *fd, int n) {
778 gpr_atm old;
779#endif
780 old = gpr_atm_full_fetch_add(&fd->refst, -n);
781 if (old == n) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700782 /* Add the fd to the freelist */
783 gpr_mu_lock(&fd_freelist_mu);
784 fd->freelist_next = fd_freelist;
785 fd_freelist = fd;
786 grpc_iomgr_unregister_object(&fd->iomgr_object);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700787
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700788 gpr_mu_unlock(&fd_freelist_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700789 } else {
790 GPR_ASSERT(old > n);
791 }
792}
793
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700794/* Increment refcount by two to avoid changing the orphan bit */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700795#ifdef GRPC_FD_REF_COUNT_DEBUG
796static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
797 int line) {
798 ref_by(fd, 2, reason, file, line);
799}
800
801static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
802 int line) {
803 unref_by(fd, 2, reason, file, line);
804}
805#else
806static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700807static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
808#endif
809
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700810static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
811
812static void fd_global_shutdown(void) {
813 gpr_mu_lock(&fd_freelist_mu);
814 gpr_mu_unlock(&fd_freelist_mu);
815 while (fd_freelist != NULL) {
816 grpc_fd *fd = fd_freelist;
817 fd_freelist = fd_freelist->freelist_next;
818 gpr_mu_destroy(&fd->mu);
819 gpr_free(fd);
820 }
821 gpr_mu_destroy(&fd_freelist_mu);
822}
823
824static grpc_fd *fd_create(int fd, const char *name) {
825 grpc_fd *new_fd = NULL;
826
827 gpr_mu_lock(&fd_freelist_mu);
828 if (fd_freelist != NULL) {
829 new_fd = fd_freelist;
830 fd_freelist = fd_freelist->freelist_next;
831 }
832 gpr_mu_unlock(&fd_freelist_mu);
833
834 if (new_fd == NULL) {
835 new_fd = gpr_malloc(sizeof(grpc_fd));
836 gpr_mu_init(&new_fd->mu);
837 gpr_mu_init(&new_fd->pi_mu);
838 }
839
840 /* Note: It is not really needed to get the new_fd->mu lock here. If this is a
841 newly created fd (or an fd we got from the freelist), no one else would be
842 holding a lock to it anyway. */
843 gpr_mu_lock(&new_fd->mu);
844
Sree Kuchibhotla0224dcc2016-06-22 18:04:00 -0700845 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700846 new_fd->fd = fd;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700847 new_fd->shutdown = false;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700848 new_fd->orphaned = false;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700849 new_fd->read_closure = CLOSURE_NOT_READY;
850 new_fd->write_closure = CLOSURE_NOT_READY;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700851 new_fd->polling_island = NULL;
852 new_fd->freelist_next = NULL;
853 new_fd->on_done_closure = NULL;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700854 new_fd->read_notifier_pollset = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700855
856 gpr_mu_unlock(&new_fd->mu);
857
858 char *fd_name;
859 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
860 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700861#ifdef GRPC_FD_REF_COUNT_DEBUG
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700862 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700863#endif
Sree Kuchibhotla6a295452016-06-23 15:53:10 -0700864 gpr_free(fd_name);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700865 return new_fd;
866}
867
868static bool fd_is_orphaned(grpc_fd *fd) {
869 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
870}
871
872static int fd_wrapped_fd(grpc_fd *fd) {
873 int ret_fd = -1;
874 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700875 if (!fd->orphaned) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700876 ret_fd = fd->fd;
877 }
878 gpr_mu_unlock(&fd->mu);
879
880 return ret_fd;
881}
882
883static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
884 grpc_closure *on_done, int *release_fd,
885 const char *reason) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700886 bool is_fd_closed = false;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700887 grpc_error *error = GRPC_ERROR_NONE;
888
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700889 gpr_mu_lock(&fd->mu);
890 fd->on_done_closure = on_done;
891
892 /* If release_fd is not NULL, we should be relinquishing control of the file
893 descriptor fd->fd (but we still own the grpc_fd structure). */
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700894 if (release_fd != NULL) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700895 *release_fd = fd->fd;
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700896 } else {
897 close(fd->fd);
898 is_fd_closed = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700899 }
900
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700901 fd->orphaned = true;
902
903 /* Remove the active status but keep referenced. We want this grpc_fd struct
904 to be alive (and not added to freelist) until the end of this function */
905 REF_BY(fd, 1, reason);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700906
907 /* Remove the fd from the polling island:
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700908 - Get a lock on the latest polling island (i.e the last island in the
909 linked list pointed by fd->polling_island). This is the island that
910 would actually contain the fd
911 - Remove the fd from the latest polling island
912 - Unlock the latest polling island
913 - Set fd->polling_island to NULL (but remove the ref on the polling island
914 before doing this.) */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700915 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700916 if (fd->polling_island != NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700917 polling_island *pi_latest = polling_island_lock(fd->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700918 polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700919 gpr_mu_unlock(&pi_latest->mu);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -0700920
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -0700921 PI_UNREF(fd->polling_island, "fd_orphan");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -0700922 fd->polling_island = NULL;
923 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700924 gpr_mu_unlock(&fd->pi_mu);
925
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700926 grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700927
928 gpr_mu_unlock(&fd->mu);
929 UNREF_BY(fd, 2, reason); /* Drop the reference */
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -0700930 GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -0700931}
932
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700933static grpc_error *fd_shutdown_error(bool shutdown) {
934 if (!shutdown) {
935 return GRPC_ERROR_NONE;
936 } else {
937 return GRPC_ERROR_CREATE("FD shutdown");
938 }
939}
940
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700941static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
942 grpc_closure **st, grpc_closure *closure) {
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700943 if (fd->shutdown) {
944 grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
945 NULL);
946 } else if (*st == CLOSURE_NOT_READY) {
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700947 /* not ready ==> switch to a waiting state by setting the closure */
948 *st = closure;
949 } else if (*st == CLOSURE_READY) {
950 /* already ready ==> queue the closure to run immediately */
951 *st = CLOSURE_NOT_READY;
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700952 grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
953 NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700954 } else {
955 /* upcallptr was set to a different closure. This is an error! */
956 gpr_log(GPR_ERROR,
957 "User called a notify_on function with a previous callback still "
958 "pending");
959 abort();
960 }
961}
962
963/* returns 1 if state becomes not ready */
964static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
965 grpc_closure **st) {
966 if (*st == CLOSURE_READY) {
967 /* duplicate ready ==> ignore */
968 return 0;
969 } else if (*st == CLOSURE_NOT_READY) {
970 /* not ready, and not waiting ==> flag ready */
971 *st = CLOSURE_READY;
972 return 0;
973 } else {
974 /* waiting ==> queue closure */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -0700975 grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -0700976 *st = CLOSURE_NOT_READY;
977 return 1;
978 }
979}
980
Sree Kuchibhotla5855c472016-06-08 12:56:56 -0700981static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
982 grpc_fd *fd) {
983 grpc_pollset *notifier = NULL;
984
985 gpr_mu_lock(&fd->mu);
986 notifier = fd->read_notifier_pollset;
987 gpr_mu_unlock(&fd->mu);
988
989 return notifier;
990}
991
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -0700992static bool fd_is_shutdown(grpc_fd *fd) {
993 gpr_mu_lock(&fd->mu);
994 const bool r = fd->shutdown;
995 gpr_mu_unlock(&fd->mu);
996 return r;
997}
998
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -0700999/* Might be called multiple times */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001000static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
1001 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001002 /* Do the actual shutdown only once */
1003 if (!fd->shutdown) {
1004 fd->shutdown = true;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001005
Sree Kuchibhotla0100b2f2016-06-21 17:38:13 -07001006 shutdown(fd->fd, SHUT_RDWR);
1007 /* Flush any pending read and write closures. Since fd->shutdown is 'true'
1008 at this point, the closures would be called with 'success = false' */
1009 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1010 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1011 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001012 gpr_mu_unlock(&fd->mu);
1013}
1014
1015static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1016 grpc_closure *closure) {
1017 gpr_mu_lock(&fd->mu);
1018 notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
1019 gpr_mu_unlock(&fd->mu);
1020}
1021
1022static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1023 grpc_closure *closure) {
1024 gpr_mu_lock(&fd->mu);
1025 notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
1026 gpr_mu_unlock(&fd->mu);
1027}
1028
1029/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001030 * Pollset Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001031 */
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001032GPR_TLS_DECL(g_current_thread_pollset);
1033GPR_TLS_DECL(g_current_thread_worker);
Craig Tiller19196992016-06-27 18:45:56 -07001034static __thread bool g_initialized_sigmask;
1035static __thread sigset_t g_orig_sigmask;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001036
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001037static void sig_handler(int sig_num) {
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001038#ifdef GRPC_EPOLL_DEBUG
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001039 gpr_log(GPR_INFO, "Received signal %d", sig_num);
Sree Kuchibhotla9bc3d2d2016-06-06 10:27:56 -07001040#endif
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001041}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001042
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001043static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001044
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001045/* Global state management */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001046static grpc_error *pollset_global_init(void) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001047 gpr_tls_init(&g_current_thread_pollset);
1048 gpr_tls_init(&g_current_thread_worker);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001049 poller_kick_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001050 return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001051}
1052
1053static void pollset_global_shutdown(void) {
1054 grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001055 gpr_tls_destroy(&g_current_thread_pollset);
1056 gpr_tls_destroy(&g_current_thread_worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001057}
1058
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001059static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
1060 grpc_error *err = GRPC_ERROR_NONE;
1061 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
1062 if (err_num != 0) {
1063 err = GRPC_OS_ERROR(err_num, "pthread_kill");
1064 }
1065 return err;
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001066}
1067
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001068/* Return 1 if the pollset has active threads in pollset_work (pollset must
1069 * be locked) */
1070static int pollset_has_workers(grpc_pollset *p) {
1071 return p->root_worker.next != &p->root_worker;
1072}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001073
1074static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1075 worker->prev->next = worker->next;
1076 worker->next->prev = worker->prev;
1077}
1078
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001079static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
1080 if (pollset_has_workers(p)) {
1081 grpc_pollset_worker *w = p->root_worker.next;
1082 remove_worker(p, w);
1083 return w;
1084 } else {
1085 return NULL;
1086 }
1087}
1088
1089static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1090 worker->next = &p->root_worker;
1091 worker->prev = worker->next->prev;
1092 worker->prev->next = worker->next->prev = worker;
1093}
1094
1095static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
1096 worker->prev = &p->root_worker;
1097 worker->next = worker->prev->next;
1098 worker->prev->next = worker->next->prev = worker;
1099}
1100
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001101/* p->mu must be held before calling this function */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001102static grpc_error *pollset_kick(grpc_pollset *p,
1103 grpc_pollset_worker *specific_worker) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001104 GPR_TIMER_BEGIN("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001105 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001106 const char *err_desc = "Kick Failure";
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001107
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001108 grpc_pollset_worker *worker = specific_worker;
1109 if (worker != NULL) {
1110 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001111 if (pollset_has_workers(p)) {
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001112 GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001113 for (worker = p->root_worker.next; worker != &p->root_worker;
1114 worker = worker->next) {
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001115 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001116 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001117 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001118 }
Craig Tillera218a062016-06-26 09:58:37 -07001119 GPR_TIMER_END("pollset_kick.broadcast", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001120 } else {
1121 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001122 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001123 } else {
1124 GPR_TIMER_MARK("kicked_specifically", 0);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001125 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001126 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001127 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001128 }
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001129 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1130 /* Since worker == NULL, it means that we can kick "any" worker on this
1131 pollset 'p'. If 'p' happens to be the same pollset this thread is
1132 currently polling (i.e in pollset_work() function), then there is no need
1133 to kick any other worker since the current thread can just absorb the
1134 kick. This is the reason why we enter this case only when
1135 g_current_thread_pollset is != p */
1136
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001137 GPR_TIMER_MARK("kick_anonymous", 0);
1138 worker = pop_front_worker(p);
1139 if (worker != NULL) {
1140 GPR_TIMER_MARK("finally_kick", 0);
1141 push_back_worker(p, worker);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001142 append_error(&error, pollset_worker_kick(worker), err_desc);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001143 } else {
1144 GPR_TIMER_MARK("kicked_no_pollers", 0);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001145 p->kicked_without_pollers = true;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001146 }
1147 }
1148
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001149 GPR_TIMER_END("pollset_kick", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001150 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1151 return error;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001152}
1153
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001154static grpc_error *kick_poller(void) {
1155 return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
1156}
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001157
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001158static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
1159 gpr_mu_init(&pollset->mu);
1160 *mu = &pollset->mu;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001161
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001162 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001163 pollset->kicked_without_pollers = false;
1164
1165 pollset->shutting_down = false;
1166 pollset->finish_shutdown_called = false;
1167 pollset->shutdown_done = NULL;
1168
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001169 pollset->polling_island = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001170}
1171
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001172/* Convert a timespec to milliseconds:
1173 - Very small or negative poll times are clamped to zero to do a non-blocking
1174 poll (which becomes spin polling)
1175 - Other small values are rounded up to one millisecond
1176 - Longer than a millisecond polls are rounded up to the next nearest
1177 millisecond to avoid spinning
1178 - Infinite timeouts are converted to -1 */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001179static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
1180 gpr_timespec now) {
1181 gpr_timespec timeout;
1182 static const int64_t max_spin_polling_us = 10;
1183 if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
1184 return -1;
1185 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001186
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001187 if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
1188 max_spin_polling_us,
1189 GPR_TIMESPAN))) <= 0) {
1190 return 0;
1191 }
1192 timeout = gpr_time_sub(deadline, now);
1193 return gpr_time_to_millis(gpr_time_add(
1194 timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
1195}
1196
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001197static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
1198 grpc_pollset *notifier) {
1199 /* Need the fd->mu since we might be racing with fd_notify_on_read */
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001200 gpr_mu_lock(&fd->mu);
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001201 set_ready_locked(exec_ctx, fd, &fd->read_closure);
1202 fd->read_notifier_pollset = notifier;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001203 gpr_mu_unlock(&fd->mu);
1204}
1205
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001206static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001207 /* Need the fd->mu since we might be racing with fd_notify_on_write */
1208 gpr_mu_lock(&fd->mu);
1209 set_ready_locked(exec_ctx, fd, &fd->write_closure);
1210 gpr_mu_unlock(&fd->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001211}
1212
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001213static void pollset_release_polling_island(grpc_pollset *ps, char *reason) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001214 if (ps->polling_island != NULL) {
1215 PI_UNREF(ps->polling_island, reason);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001216 }
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001217 ps->polling_island = NULL;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001218}
1219
1220static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
1221 grpc_pollset *pollset) {
1222 /* The pollset cannot have any workers if we are at this stage */
1223 GPR_ASSERT(!pollset_has_workers(pollset));
1224
1225 pollset->finish_shutdown_called = true;
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001226
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001227 /* Release the ref and set pollset->polling_island to NULL */
1228 pollset_release_polling_island(pollset, "ps_shutdown");
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001229 grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001230}
1231
1232/* pollset->mu lock must be held by the caller before calling this */
1233static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1234 grpc_closure *closure) {
1235 GPR_TIMER_BEGIN("pollset_shutdown", 0);
1236 GPR_ASSERT(!pollset->shutting_down);
1237 pollset->shutting_down = true;
1238 pollset->shutdown_done = closure;
1239 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1240
1241 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1242 because it would release the underlying polling island. In such a case, we
1243 let the last worker call finish_shutdown_locked() from pollset_work() */
1244 if (!pollset_has_workers(pollset)) {
1245 GPR_ASSERT(!pollset->finish_shutdown_called);
1246 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1247 finish_shutdown_locked(exec_ctx, pollset);
1248 }
1249 GPR_TIMER_END("pollset_shutdown", 0);
1250}
1251
1252/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1253 * than destroying the mutexes, there is nothing special that needs to be done
1254 * here */
1255static void pollset_destroy(grpc_pollset *pollset) {
1256 GPR_ASSERT(!pollset_has_workers(pollset));
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001257 gpr_mu_destroy(&pollset->mu);
1258}
1259
1260static void pollset_reset(grpc_pollset *pollset) {
1261 GPR_ASSERT(pollset->shutting_down);
1262 GPR_ASSERT(!pollset_has_workers(pollset));
1263 pollset->shutting_down = false;
1264 pollset->finish_shutdown_called = false;
1265 pollset->kicked_without_pollers = false;
1266 pollset->shutdown_done = NULL;
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001267 pollset_release_polling_island(pollset, "ps_reset");
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001268}
1269
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001270#define GRPC_EPOLL_MAX_EVENTS 1000
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001271/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1272static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
1273 grpc_pollset *pollset, int timeout_ms,
1274 sigset_t *sig_mask, grpc_error **error) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001275 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001276 int epoll_fd = -1;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001277 int ep_rv;
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001278 polling_island *pi = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001279 char *err_msg;
1280 const char *err_desc = "pollset_work_and_unlock";
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001281 GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
1282
1283 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001284 latest polling island pointed by pollset->polling_island.
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001285
1286 Since epoll_fd is immutable, we can read it without obtaining the polling
1287 island lock. There is however a possibility that the polling island (from
1288 which we got the epoll_fd) got merged with another island while we are
1289 in this function. This is still okay because in such a case, we will wakeup
1290 right-away from epoll_wait() and pick up the latest polling_island the next
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001291 this function (i.e pollset_work_and_unlock()) is called */
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001292
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001293 if (pollset->polling_island == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001294 pollset->polling_island = polling_island_create(NULL, error);
1295 if (pollset->polling_island == NULL) {
1296 GPR_TIMER_END("pollset_work_and_unlock", 0);
1297 return; /* Fatal error. We cannot continue */
1298 }
1299
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001300 PI_ADD_REF(pollset->polling_island, "ps");
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001301 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001302
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001303 pi = polling_island_maybe_get_latest(pollset->polling_island);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001304 epoll_fd = pi->epoll_fd;
1305
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001306 /* Update the pollset->polling_island since the island being pointed by
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001307 pollset->polling_island maybe older than the one pointed by pi) */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001308 if (pollset->polling_island != pi) {
1309 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1310 polling island to be deleted */
1311 PI_ADD_REF(pi, "ps");
1312 PI_UNREF(pollset->polling_island, "ps");
1313 pollset->polling_island = pi;
1314 }
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001315
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001316 /* Add an extra ref so that the island does not get destroyed (which means
1317 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1318 epoll_fd */
1319 PI_ADD_REF(pi, "ps_work");
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001320 gpr_mu_unlock(&pollset->mu);
1321
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001322 do {
1323 ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
1324 sig_mask);
1325 if (ep_rv < 0) {
1326 if (errno != EINTR) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001327 gpr_asprintf(&err_msg,
1328 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1329 epoll_fd, errno, strerror(errno));
1330 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001331 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001332 /* We were interrupted. Save an interation by doing a zero timeout
1333 epoll_wait to see if there are any other events of interest */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001334 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001335 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001336 }
Sree Kuchibhotla79a62332016-06-04 14:01:03 -07001337
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001338#ifdef GRPC_TSAN
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001339 /* See the definition of g_poll_sync for more details */
1340 gpr_atm_acq_load(&g_epoll_sync);
Sree Kuchibhotlaad2c4772016-06-13 19:06:54 -07001341#endif /* defined(GRPC_TSAN) */
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001342
Sree Kuchibhotla58e58962016-06-13 00:52:56 -07001343 for (int i = 0; i < ep_rv; ++i) {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001344 void *data_ptr = ep_ev[i].data.ptr;
1345 if (data_ptr == &grpc_global_wakeup_fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001346 append_error(error,
1347 grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
1348 err_desc);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001349 } else if (data_ptr == &polling_island_wakeup_fd) {
1350 /* This means that our polling island is merged with a different
1351 island. We do not have to do anything here since the subsequent call
1352 to the function pollset_work_and_unlock() will pick up the correct
1353 epoll_fd */
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001354 } else {
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001355 grpc_fd *fd = data_ptr;
1356 int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
1357 int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
1358 int write_ev = ep_ev[i].events & EPOLLOUT;
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001359 if (read_ev || cancel) {
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001360 fd_become_readable(exec_ctx, fd, pollset);
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001361 }
1362 if (write_ev || cancel) {
1363 fd_become_writable(exec_ctx, fd);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001364 }
1365 }
Sree Kuchibhotlae5012ba2016-06-06 16:01:45 -07001366 }
1367 } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001368
1369 GPR_ASSERT(pi != NULL);
1370
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001371 /* Before leaving, release the extra ref we added to the polling island. It
1372 is important to use "pi" here (i.e our old copy of pollset->polling_island
1373 that we got before releasing the polling island lock). This is because
1374 pollset->polling_island pointer might get udpated in other parts of the
1375 code when there is an island merge while we are doing epoll_wait() above */
1376 PI_UNREF(pi, "ps_work");
Sree Kuchibhotla24b10622016-06-08 15:20:17 -07001377
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001378 GPR_TIMER_END("pollset_work_and_unlock", 0);
1379}
1380
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001381/* pollset->mu lock must be held by the caller before calling this.
1382 The function pollset_work() may temporarily release the lock (pollset->mu)
1383 during the course of its execution but it will always re-acquire the lock and
1384 ensure that it is held by the time the function returns */
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001385static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1386 grpc_pollset_worker **worker_hdl,
1387 gpr_timespec now, gpr_timespec deadline) {
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001388 GPR_TIMER_BEGIN("pollset_work", 0);
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001389 grpc_error *error = GRPC_ERROR_NONE;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001390 int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
1391
1392 sigset_t new_mask;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001393
1394 grpc_pollset_worker worker;
1395 worker.next = worker.prev = NULL;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001396 worker.pt_id = pthread_self();
1397
1398 *worker_hdl = &worker;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001399
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001400 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1401 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001402
1403 if (pollset->kicked_without_pollers) {
1404 /* If the pollset was kicked without pollers, pretend that the current
1405 worker got the kick and skip polling. A kick indicates that there is some
1406 work that needs attention like an event on the completion queue or an
1407 alarm */
1408 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1409 pollset->kicked_without_pollers = 0;
1410 } else if (!pollset->shutting_down) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001411 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
1412 (i.e 'kicking') a worker in the pollset.
1413 A 'kick' is a way to inform that worker that there is some pending work
1414 that needs immediate attention (like an event on the completion queue,
1415 or a polling island merge that results in a new epoll-fd to wait on) and
1416 that the worker should not spend time waiting in epoll_pwait().
1417
1418 A kick can come at anytime (i.e before/during or after the worker calls
1419 epoll_pwait()) but in all cases we have to make sure that when a worker
1420 gets a kick, it does not spend time in epoll_pwait(). In other words, one
1421 kick should result in skipping/exiting of one epoll_pwait();
1422
1423 To accomplish this, we mask 'grpc_wakeup_signal' on this worker at all
1424 times *except* when it is in epoll_pwait(). This way, the worker never
1425 misses acting on a kick */
1426
Craig Tiller19196992016-06-27 18:45:56 -07001427 if (!g_initialized_sigmask) {
1428 sigemptyset(&new_mask);
1429 sigaddset(&new_mask, grpc_wakeup_signal);
1430 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1431 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1432 g_initialized_sigmask = true;
1433 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1434 This is the mask used at all times *except during
1435 epoll_wait()*"
1436 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
1437 this is
1438 the mask to use *during epoll_wait()*
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001439
Craig Tiller19196992016-06-27 18:45:56 -07001440 The new_mask is set on the worker before it is added to the pollset
1441 (i.e
1442 before it can be kicked) */
1443 }
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001444
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001445 push_front_worker(pollset, &worker); /* Add worker to pollset */
1446
Craig Tiller19196992016-06-27 18:45:56 -07001447 pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &g_orig_sigmask,
1448 &error);
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001449 grpc_exec_ctx_flush(exec_ctx);
1450
1451 gpr_mu_lock(&pollset->mu);
1452 remove_worker(pollset, &worker);
1453 }
1454
1455 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1456 false at this point) and the pollset is shutting down, we may have to
1457 finish the shutdown process by calling finish_shutdown_locked().
1458 See pollset_shutdown() for more details.
1459
1460 Note: Continuing to access pollset here is safe; it is the caller's
1461 responsibility to not destroy a pollset when it has outstanding calls to
1462 pollset_work() */
1463 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1464 !pollset->finish_shutdown_called) {
1465 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1466 finish_shutdown_locked(exec_ctx, pollset);
1467
1468 gpr_mu_unlock(&pollset->mu);
1469 grpc_exec_ctx_flush(exec_ctx);
1470 gpr_mu_lock(&pollset->mu);
1471 }
1472
1473 *worker_hdl = NULL;
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001474
Sree Kuchibhotla8e4926c2016-06-08 20:33:19 -07001475 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1476 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001477
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001478 GPR_TIMER_END("pollset_work", 0);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001479
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001480 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1481 return error;
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001482}
1483
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001484static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
1485 grpc_fd *fd) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001486 grpc_error *error = GRPC_ERROR_NONE;
1487
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001488 gpr_mu_lock(&pollset->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001489 gpr_mu_lock(&fd->pi_mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001490
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001491 polling_island *pi_new = NULL;
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001492
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001493 /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and
1494 * equal, do nothing.
1495 * 2) If fd->polling_island and pollset->polling_island are both NULL, create
1496 * a new polling island (with a refcount of 2) and make the polling_island
1497 * fields in both fd and pollset to point to the new island
1498 * 3) If one of fd->polling_island or pollset->polling_island is NULL, update
1499 * the NULL polling_island field to point to the non-NULL polling_island
1500 * field (ensure that the refcount on the polling island is incremented by
1501 * 1 to account for the newly added reference)
1502 * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL
1503 * and different, merge both the polling islands and update the
1504 * polling_island fields in both fd and pollset to point to the merged
1505 * polling island.
1506 */
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001507 if (fd->polling_island == pollset->polling_island) {
1508 pi_new = fd->polling_island;
1509 if (pi_new == NULL) {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001510 pi_new = polling_island_create(fd, &error);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001511 }
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001512 } else if (fd->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001513 pi_new = polling_island_lock(pollset->polling_island);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001514 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001515 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001516 } else if (pollset->polling_island == NULL) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001517 pi_new = polling_island_lock(fd->polling_island);
Sree Kuchibhotla88ee12f2016-06-03 19:26:48 -07001518 gpr_mu_unlock(&pi_new->mu);
Sree Kuchibhotla5098f912016-05-31 10:58:17 -07001519 } else {
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001520 pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
1521 &error);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001522 }
1523
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001524 /* At this point, pi_new is the polling island that both fd->polling_island
1525 and pollset->polling_island must be pointing to */
1526
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001527 if (fd->polling_island != pi_new) {
1528 PI_ADD_REF(pi_new, "fd");
1529 if (fd->polling_island != NULL) {
1530 PI_UNREF(fd->polling_island, "fd");
1531 }
1532 fd->polling_island = pi_new;
1533 }
1534
1535 if (pollset->polling_island != pi_new) {
1536 PI_ADD_REF(pi_new, "ps");
1537 if (pollset->polling_island != NULL) {
1538 PI_UNREF(pollset->polling_island, "ps");
1539 }
1540 pollset->polling_island = pi_new;
1541 }
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001542
Sree Kuchibhotla9442bab2016-05-20 17:54:06 -07001543 gpr_mu_unlock(&fd->pi_mu);
Sree Kuchibhotlacddf6972016-06-21 08:27:07 -07001544 gpr_mu_unlock(&pollset->mu);
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001545}
1546
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001547/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001548 * Pollset-set Definitions
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001549 */
1550
1551static grpc_pollset_set *pollset_set_create(void) {
1552 grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
1553 memset(pollset_set, 0, sizeof(*pollset_set));
1554 gpr_mu_init(&pollset_set->mu);
1555 return pollset_set;
1556}
1557
1558static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
1559 size_t i;
1560 gpr_mu_destroy(&pollset_set->mu);
1561 for (i = 0; i < pollset_set->fd_count; i++) {
1562 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1563 }
1564 gpr_free(pollset_set->pollsets);
1565 gpr_free(pollset_set->pollset_sets);
1566 gpr_free(pollset_set->fds);
1567 gpr_free(pollset_set);
1568}
1569
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001570static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
1571 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1572 size_t i;
1573 gpr_mu_lock(&pollset_set->mu);
1574 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1575 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1576 pollset_set->fds = gpr_realloc(
1577 pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
1578 }
1579 GRPC_FD_REF(fd, "pollset_set");
1580 pollset_set->fds[pollset_set->fd_count++] = fd;
1581 for (i = 0; i < pollset_set->pollset_count; i++) {
1582 pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
1583 }
1584 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1585 pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1586 }
1587 gpr_mu_unlock(&pollset_set->mu);
1588}
1589
1590static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
1591 grpc_pollset_set *pollset_set, grpc_fd *fd) {
1592 size_t i;
1593 gpr_mu_lock(&pollset_set->mu);
1594 for (i = 0; i < pollset_set->fd_count; i++) {
1595 if (pollset_set->fds[i] == fd) {
1596 pollset_set->fd_count--;
1597 GPR_SWAP(grpc_fd *, pollset_set->fds[i],
1598 pollset_set->fds[pollset_set->fd_count]);
1599 GRPC_FD_UNREF(fd, "pollset_set");
1600 break;
1601 }
1602 }
1603 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1604 pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
1605 }
1606 gpr_mu_unlock(&pollset_set->mu);
1607}
1608
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001609static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
1610 grpc_pollset_set *pollset_set,
1611 grpc_pollset *pollset) {
1612 size_t i, j;
1613 gpr_mu_lock(&pollset_set->mu);
1614 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1615 pollset_set->pollset_capacity =
1616 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1617 pollset_set->pollsets =
1618 gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
1619 sizeof(*pollset_set->pollsets));
1620 }
1621 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1622 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1623 if (fd_is_orphaned(pollset_set->fds[i])) {
1624 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1625 } else {
1626 pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
1627 pollset_set->fds[j++] = pollset_set->fds[i];
1628 }
1629 }
1630 pollset_set->fd_count = j;
1631 gpr_mu_unlock(&pollset_set->mu);
1632}
1633
1634static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
1635 grpc_pollset_set *pollset_set,
1636 grpc_pollset *pollset) {
1637 size_t i;
1638 gpr_mu_lock(&pollset_set->mu);
1639 for (i = 0; i < pollset_set->pollset_count; i++) {
1640 if (pollset_set->pollsets[i] == pollset) {
1641 pollset_set->pollset_count--;
1642 GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
1643 pollset_set->pollsets[pollset_set->pollset_count]);
1644 break;
1645 }
1646 }
1647 gpr_mu_unlock(&pollset_set->mu);
1648}
1649
1650static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
1651 grpc_pollset_set *bag,
1652 grpc_pollset_set *item) {
1653 size_t i, j;
1654 gpr_mu_lock(&bag->mu);
1655 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1656 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1657 bag->pollset_sets =
1658 gpr_realloc(bag->pollset_sets,
1659 bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
1660 }
1661 bag->pollset_sets[bag->pollset_set_count++] = item;
1662 for (i = 0, j = 0; i < bag->fd_count; i++) {
1663 if (fd_is_orphaned(bag->fds[i])) {
1664 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1665 } else {
1666 pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
1667 bag->fds[j++] = bag->fds[i];
1668 }
1669 }
1670 bag->fd_count = j;
1671 gpr_mu_unlock(&bag->mu);
1672}
1673
1674static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
1675 grpc_pollset_set *bag,
1676 grpc_pollset_set *item) {
1677 size_t i;
1678 gpr_mu_lock(&bag->mu);
1679 for (i = 0; i < bag->pollset_set_count; i++) {
1680 if (bag->pollset_sets[i] == item) {
1681 bag->pollset_set_count--;
1682 GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
1683 bag->pollset_sets[bag->pollset_set_count]);
1684 break;
1685 }
1686 }
1687 gpr_mu_unlock(&bag->mu);
1688}
1689
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001690/* Test helper functions
1691 * */
1692void *grpc_fd_get_polling_island(grpc_fd *fd) {
1693 polling_island *pi;
1694
1695 gpr_mu_lock(&fd->pi_mu);
1696 pi = fd->polling_island;
1697 gpr_mu_unlock(&fd->pi_mu);
1698
1699 return pi;
1700}
1701
1702void *grpc_pollset_get_polling_island(grpc_pollset *ps) {
1703 polling_island *pi;
1704
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001705 gpr_mu_lock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001706 pi = ps->polling_island;
Sree Kuchibhotla229533b12016-06-21 20:42:52 -07001707 gpr_mu_unlock(&ps->mu);
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001708
1709 return pi;
1710}
1711
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001712bool grpc_are_polling_islands_equal(void *p, void *q) {
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001713 polling_island *p1 = p;
1714 polling_island *p2 = q;
1715
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001716 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1717 latest polling islands in their respective linked lists */
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001718 polling_island_lock_pair(&p1, &p2);
Sree Kuchibhotla20d0a162016-06-23 15:14:03 -07001719 polling_island_unlock_pair(p1, p2);
Sree Kuchibhotla2f8ade02016-06-17 13:28:38 -07001720
1721 return p1 == p2;
Sree Kuchibhotla2e12db92016-06-16 16:53:59 -07001722}
1723
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001724/*******************************************************************************
Sree Kuchibhotla0bcbd792016-06-01 15:43:03 -07001725 * Event engine binding
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001726 */
1727
1728static void shutdown_engine(void) {
1729 fd_global_shutdown();
1730 pollset_global_shutdown();
Sree Kuchibhotlad627c102016-06-06 15:49:32 -07001731 polling_island_global_shutdown();
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001732}
1733
1734static const grpc_event_engine_vtable vtable = {
1735 .pollset_size = sizeof(grpc_pollset),
1736
1737 .fd_create = fd_create,
1738 .fd_wrapped_fd = fd_wrapped_fd,
1739 .fd_orphan = fd_orphan,
1740 .fd_shutdown = fd_shutdown,
Sree Kuchibhotla24b6eae2016-06-21 18:01:14 -07001741 .fd_is_shutdown = fd_is_shutdown,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001742 .fd_notify_on_read = fd_notify_on_read,
1743 .fd_notify_on_write = fd_notify_on_write,
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001744 .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001745
1746 .pollset_init = pollset_init,
1747 .pollset_shutdown = pollset_shutdown,
1748 .pollset_reset = pollset_reset,
1749 .pollset_destroy = pollset_destroy,
1750 .pollset_work = pollset_work,
1751 .pollset_kick = pollset_kick,
1752 .pollset_add_fd = pollset_add_fd,
1753
1754 .pollset_set_create = pollset_set_create,
1755 .pollset_set_destroy = pollset_set_destroy,
1756 .pollset_set_add_pollset = pollset_set_add_pollset,
1757 .pollset_set_del_pollset = pollset_set_del_pollset,
1758 .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
1759 .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
1760 .pollset_set_add_fd = pollset_set_add_fd,
1761 .pollset_set_del_fd = pollset_set_del_fd,
1762
1763 .kick_poller = kick_poller,
1764
1765 .shutdown_engine = shutdown_engine,
1766};
1767
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001768/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1769 * Create a dummy epoll_fd to make sure epoll support is available */
1770static bool is_epoll_available() {
1771 int fd = epoll_create1(EPOLL_CLOEXEC);
1772 if (fd < 0) {
1773 gpr_log(
1774 GPR_ERROR,
1775 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1776 fd);
1777 return false;
1778 }
1779 close(fd);
1780 return true;
1781}
1782
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001783const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001784 /* If use of signals is disabled, we cannot use epoll engine*/
1785 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1786 return NULL;
1787 }
1788
Sree Kuchibhotla72744022016-06-09 09:42:06 -07001789 if (!is_epoll_available()) {
1790 return NULL;
1791 }
1792
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001793 if (!is_grpc_wakeup_signal_initialized) {
1794 grpc_use_signal(SIGRTMIN + 2);
1795 }
1796
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001797 fd_global_init();
Sree Kuchibhotla3131c262016-06-21 17:28:28 -07001798
1799 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1800 return NULL;
1801 }
1802
1803 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1804 polling_island_global_init())) {
1805 return NULL;
1806 }
1807
Sree Kuchibhotlaf448c342016-05-19 10:51:24 -07001808 return &vtable;
1809}
1810
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001811#else /* defined(GPR_LINUX_EPOLL) */
1812#if defined(GPR_POSIX_SOCKET)
1813#include "src/core/lib/iomgr/ev_posix.h"
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001814/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return
1815 * NULL */
1816const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; }
Sree Kuchibhotla41622a82016-06-13 16:43:14 -07001817#endif /* defined(GPR_POSIX_SOCKET) */
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001818
Sree Kuchibhotlac7be7c62016-06-09 17:08:50 -07001819void grpc_use_signal(int signum) {}
Sree Kuchibhotla5855c472016-06-08 12:56:56 -07001820#endif /* !defined(GPR_LINUX_EPOLL) */