Do not close epoll_fd while there are any pollers and add the ability to
wake up all pollers when an island is merged
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 69ab665..3a3c136 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -190,9 +190,18 @@
};
/*******************************************************************************
- * Polling-island Definitions
+ * Polling island Definitions
*/
+/* The wakeup fd that is used to wake up all threads in a Polling island. This
+ is useful in the polling island merge operation where we need to wakeup all
+ the threads currently polling the smaller polling island (so that they can
+ start polling the new/merged polling island)
+
+ NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
+ threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
+static grpc_wakeup_fd polling_island_wakeup_fd;
+
/* Polling island freelist */
static gpr_mu g_pi_freelist_mu;
static polling_island *g_pi_freelist = NULL;
@@ -232,6 +241,25 @@
}
}
+/* The caller is expected to hold pi->mu before calling this */
+static void polling_island_add_wakeup_fd_locked(polling_island *pi,
+ grpc_wakeup_fd *wakeup_fd) {
+ struct epoll_event ev;
+ int err;
+
+ ev.events = (uint32_t)(EPOLLIN | EPOLLET);
+ ev.data.ptr = wakeup_fd;
+ err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
+ GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
+ if (err < 0) {
+ gpr_log(GPR_ERROR,
+ "Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
+ ". Error: %s",
+ GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
+ strerror(errno));
+ }
+}
+
/* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_remove_all_fds_locked(polling_island *pi,
bool remove_fd_refs) {
@@ -283,8 +311,6 @@
static polling_island *polling_island_create(grpc_fd *initial_fd,
int initial_ref_cnt) {
polling_island *pi = NULL;
- struct epoll_event ev;
- int err;
/* Try to get one from the polling island freelist */
gpr_mu_lock(&g_pi_freelist_mu);
@@ -311,17 +337,7 @@
}
GPR_ASSERT(pi->epoll_fd >= 0);
- ev.events = (uint32_t)(EPOLLIN | EPOLLET);
- ev.data.ptr = NULL;
- err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
- GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
- if (err < 0) {
- gpr_log(GPR_ERROR,
- "Failed to add grpc_global_wake_up_fd (%d) to the epoll set "
- "(epoll_fd: %d) with error: %s",
- GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
- strerror(errno));
- }
+ polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
pi->ref_cnt = initial_ref_cnt;
pi->merged_to = NULL;
@@ -496,13 +512,15 @@
GPR_SWAP(polling_island *, p, q);
}
- /* "Merge" p with q i.e move all the fds from p (the polling_island with fewer
- fds) to q.
- Note: Not altering the ref counts on the affected fds here because they
- would effectively remain unchanged */
+ /* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
+ )Note that the refcounts on the fds being moved will not change here. This
+ is why the last parameter in the following two functions is 'false') */
polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
polling_island_remove_all_fds_locked(p, false);
+ /* Wakeup all the pollers (if any) on p so that they can pickup this change */
+ polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
+
/* The merged polling island inherits all the ref counts of the island merging
with it */
q->ref_cnt += p->ref_cnt;
@@ -516,6 +534,8 @@
static void polling_island_global_init() {
gpr_mu_init(&g_pi_freelist_mu);
g_pi_freelist = NULL;
+ grpc_wakeup_fd_init(&polling_island_wakeup_fd);
+ grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
}
static void polling_island_global_shutdown() {
@@ -529,8 +549,9 @@
gpr_free(g_pi_freelist);
g_pi_freelist = next;
}
-
gpr_mu_destroy(&g_pi_freelist_mu);
+
+ grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
}
/*******************************************************************************
@@ -973,6 +994,7 @@
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int epoll_fd = -1;
int ep_rv;
+ polling_island *pi = NULL;
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
@@ -983,13 +1005,19 @@
- pollset->polling_island->mu */
gpr_mu_lock(&pollset->pi_mu);
- if (pollset->polling_island == NULL) {
- pollset->polling_island = polling_island_create(NULL, 1);
+ pi = pollset->polling_island;
+ if (pi == NULL) {
+ pi = polling_island_create(NULL, 1);
}
- pollset->polling_island =
- polling_island_update_and_lock(pollset->polling_island, 1, 0);
- epoll_fd = pollset->polling_island->epoll_fd;
+ /* In addition to locking the polling island, add a ref so that the island
+ does not get destroyed (which means the epoll_fd won't be closed) while
+ we are are doing an epoll_wait() on the epoll_fd */
+ pi = polling_island_update_and_lock(pi, 1, 1);
+ epoll_fd = pi->epoll_fd;
+
+ /* Update the pollset->polling_island */
+ pollset->polling_island = pi;
#ifdef GRPC_EPOLL_DEBUG
if (pollset->polling_island->fd_cnt == 0) {
@@ -1013,25 +1041,29 @@
sig_mask);
if (ep_rv < 0) {
if (errno != EINTR) {
- /* TODO (sreek) - Do not log an error in case of bad file descriptor
- * (A bad file descriptor here would just mean that the epoll set was
- * merged with another epoll set and that the current epoll_fd is
- * closed) */
gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
} else {
+ /* We were interrupted. Save an interation by doing a zero timeout
+ epoll_wait to see if there are any other events of interest */
ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
}
}
int i;
for (i = 0; i < ep_rv; ++i) {
- grpc_fd *fd = ep_ev[i].data.ptr;
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (fd == NULL) {
+ void *data_ptr = ep_ev[i].data.ptr;
+ if (data_ptr == &grpc_global_wakeup_fd) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ } else if (data_ptr == &polling_island_wakeup_fd) {
+ /* This means that our polling island is merged with a different
+ island. We do not have to do anything here since the subsequent call
+ to the function pollset_work_and_unlock() will pick up the correct
+ epoll_fd */
} else {
+ grpc_fd *fd = data_ptr;
+ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+ int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write_ev = ep_ev[i].events & EPOLLOUT;
if (read_ev || cancel) {
fd_become_readable(exec_ctx, fd, pollset);
}
@@ -1041,6 +1073,21 @@
}
}
} while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
+
+ GPR_ASSERT(pi != NULL);
+
+ /* Before leaving, release the extra ref we added to the polling island */
+ /* It is important to note that at this point 'pi' may not be the same as
+ * pollset->polling_island. This is because pollset->polling_island pointer
+ * gets updated whenever the underlying polling island is merged with another
+ * island and while we are doing epoll_wait() above, the polling island may
+ * have been merged */
+
+ /* TODO (sreek) - Change the ref count on polling island to gpr_atm so that
+ * we do not have to do this here */
+ gpr_mu_lock(&pi->mu);
+ polling_island_unref_and_unlock(pi, 1);
+
GPR_TIMER_END("pollset_work_and_unlock", 0);
}