Reduce contention on lock
Change the fd watcher from being O(active_pollers) to O(1), reducing time spent under the fd->watcher_mu lock, and ultimately scaling us much better.
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index b67c6cd..737ee01 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -68,7 +68,6 @@
static gpr_mu fd_freelist_mu;
static void freelist_fd(grpc_fd *fd) {
- gpr_free(fd->watchers);
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
@@ -93,9 +92,7 @@
gpr_atm_rel_store(&r->writest.state, NOT_READY);
gpr_atm_rel_store(&r->shutdown, 0);
r->fd = fd;
- r->watchers = NULL;
- r->watcher_count = 0;
- r->watcher_capacity = 0;
+ r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
r->freelist_next = NULL;
return r;
}
@@ -118,9 +115,7 @@
}
}
-void grpc_fd_global_init(void) {
- gpr_mu_init(&fd_freelist_mu);
-}
+void grpc_fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
void grpc_fd_global_shutdown(void) {
while (fd_freelist != NULL) {
@@ -145,11 +140,11 @@
}
static void wake_watchers(grpc_fd *fd) {
- size_t i, n;
+ grpc_fd_watcher *watcher;
gpr_mu_lock(&fd->watcher_mu);
- n = fd->watcher_count;
- for (i = 0; i < n; i++) {
- grpc_pollset_force_kick(fd->watchers[i]);
+ for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
+ watcher = watcher->next) {
+ grpc_pollset_force_kick(watcher->pollset);
}
gpr_mu_unlock(&fd->watcher_mu);
}
@@ -293,36 +288,27 @@
}
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
- gpr_uint32 read_mask, gpr_uint32 write_mask) {
+ gpr_uint32 read_mask, gpr_uint32 write_mask,
+ grpc_fd_watcher *watcher) {
/* keep track of pollers that have requested our events, in case they change
*/
gpr_mu_lock(&fd->watcher_mu);
- if (fd->watcher_capacity == fd->watcher_count) {
- fd->watcher_capacity =
- GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2);
- fd->watchers = gpr_realloc(fd->watchers,
- fd->watcher_capacity * sizeof(grpc_pollset *));
- }
- fd->watchers[fd->watcher_count++] = pollset;
+ watcher->next = &fd->watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ watcher->pollset = pollset;
+ watcher->fd = fd;
gpr_mu_unlock(&fd->watcher_mu);
return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
(gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
}
-void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) {
- size_t r, w, n;
-
- gpr_mu_lock(&fd->watcher_mu);
- n = fd->watcher_count;
- for (r = 0, w = 0; r < n; r++) {
- if (fd->watchers[r] == pollset) {
- fd->watcher_count--;
- continue;
- }
- fd->watchers[w++] = fd->watchers[r];
- }
- gpr_mu_unlock(&fd->watcher_mu);
+void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
+ gpr_mu_lock(&watcher->fd->watcher_mu);
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ gpr_mu_unlock(&watcher->fd->watcher_mu);
}
void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {