More scalable unary polling
- admit only one poller for read and one for write at a time
(poll is level triggered, so this avoids a thundering herd on each event)
- wake only one poller when more pollers are needed, again avoiding a thundering herd
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 9c8133d..afd6ddc 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -98,6 +98,7 @@
r->fd = fd;
r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
r->freelist_next = NULL;
+ r->read_watcher = r->write_watcher = NULL;
return r;
}
@@ -147,14 +148,24 @@
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-static void wake_watchers(grpc_fd *fd) {
- grpc_fd_watcher *watcher;
+static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+ if (fd->watcher_root.next != &fd->watcher_root) {
+ grpc_pollset_force_kick(fd->watcher_root.next->pollset);
+ }
+}
+
+static void maybe_wake_one_watcher(grpc_fd *fd) {
gpr_mu_lock(&fd->watcher_mu);
+ maybe_wake_one_watcher_locked(fd);
+ gpr_mu_unlock(&fd->watcher_mu);
+}
+
+static void wake_all_watchers(grpc_fd *fd) {
+ grpc_fd_watcher *watcher;
for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
watcher = watcher->next) {
grpc_pollset_force_kick(watcher->pollset);
}
- gpr_mu_unlock(&fd->watcher_mu);
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
@@ -162,7 +173,7 @@
fd->on_done_user_data = user_data;
shutdown(fd->fd, SHUT_RDWR);
ref_by(fd, 1); /* remove active status, but keep referenced */
- wake_watchers(fd);
+ wake_all_watchers(fd);
unref_by(fd, 2); /* drop the reference */
}
@@ -204,7 +215,7 @@
set_ready call. NOTE: we don't have an ABA problem here,
since we should never have concurrent calls to the same
notify_on function. */
- wake_watchers(fd);
+ maybe_wake_one_watcher(fd);
return;
}
/* swap was unsuccessful due to an intervening set_ready call.
@@ -290,29 +301,61 @@
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *watcher) {
+ gpr_uint32 mask = 0;
/* keep track of pollers that have requested our events, in case they change
*/
grpc_fd_ref(fd);
gpr_mu_lock(&fd->watcher_mu);
- watcher->next = &fd->watcher_root;
- watcher->prev = watcher->next->prev;
- watcher->next->prev = watcher->prev->next = watcher;
+ /* if there is nobody polling for read, but we need to, then start doing so */
+ if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
+ fd->read_watcher = watcher;
+ mask |= read_mask;
+ }
+ /* if there is nobody polling for write, but we need to, then start doing so */
+ if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
+ fd->write_watcher = watcher;
+ mask |= write_mask;
+ }
+ /* if not polling, remember this watcher in case we need someone to later */
+ if (mask == 0) {
+ watcher->next = &fd->watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ }
watcher->pollset = pollset;
watcher->fd = fd;
gpr_mu_unlock(&fd->watcher_mu);
- return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) |
- (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0);
+ return mask;
}
-void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
- gpr_mu_lock(&watcher->fd->watcher_mu);
- watcher->next->prev = watcher->prev;
- watcher->prev->next = watcher->next;
- gpr_mu_unlock(&watcher->fd->watcher_mu);
+void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
+ int was_polling = 0;
+ int kick = 0;
+ grpc_fd *fd = watcher->fd;
- grpc_fd_unref(watcher->fd);
+ gpr_mu_lock(&fd->watcher_mu);
+ if (watcher == fd->read_watcher) {
+ was_polling = 1;
+ kick |= !got_read;
+ fd->read_watcher = NULL;
+ }
+ if (watcher == fd->write_watcher) {
+ was_polling = 1;
+ kick |= !got_write;
+ fd->write_watcher = NULL;
+ }
+ if (!was_polling) {
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ }
+ if (kick) {
+ maybe_wake_one_watcher_locked(fd);
+ }
+ gpr_mu_unlock(&fd->watcher_mu);
+
+ grpc_fd_unref(fd);
}
void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {