closures -> atomics
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 51842fc..c60ff85 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -68,7 +68,7 @@
gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
}
-/* Uncomment the following enable extra checks on poll_object operations */
+/* Uncomment the following to enable extra checks on poll_object operations */
/* #define PO_DEBUG */
static int grpc_wakeup_signal = -1;
@@ -141,23 +141,26 @@
gpr_atm refst;
/* Indicates that the fd is shutdown and that any pending read/write closures
- should fail */
- bool shutdown;
- grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */
+ should fail. */
+ // TODO: sreek storing bool and grpc_error*
+ gpr_atm shutdown1;
+ gpr_atm shutdown_error1; /* reason for shutdown: set iff shutdown==true */
/* The fd is either closed or we relinquished control of it. In either cases,
this indicates that the 'fd' on this structure is no longer valid */
bool orphaned;
- /* TODO: sreek - Move this to a lockfree implementation */
- grpc_closure *read_closure;
- grpc_closure *write_closure;
+ /* Closures to call when the fd is readable or writable. The actual type
+ stored in these is (grpc_closure *) */
+ gpr_atm read_closure;
+ gpr_atm write_closure;
struct grpc_fd *freelist_next;
grpc_closure *on_done_closure;
- /* The pollset that last noticed that the fd is readable */
- grpc_pollset *read_notifier_pollset;
+ /* The pollset that last noticed that the fd is readable. The actual type
+ * stored in this is (grpc_pollset *) */
+ gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
};
@@ -180,8 +183,8 @@
static void fd_global_init(void);
static void fd_global_shutdown(void);
-#define CLOSURE_NOT_READY ((grpc_closure *)0)
-#define CLOSURE_READY ((grpc_closure *)1)
+#define CLOSURE_NOT_READY ((gpr_atm)0)
+#define CLOSURE_READY ((gpr_atm)1)
/*******************************************************************************
* Polling island Declarations
@@ -908,7 +911,12 @@
fd->freelist_next = fd_freelist;
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
- if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
+
+ if ((bool)gpr_atm_acq_load(&fd->shutdown1)) {
+ grpc_error *err =
+ (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error1);
+ GRPC_ERROR_UNREF(err);
+ }
gpr_mu_unlock(&fd_freelist_mu);
} else {
@@ -972,13 +980,15 @@
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
- new_fd->shutdown = false;
+ gpr_atm_rel_store(&new_fd->shutdown1, (gpr_atm) false);
+ gpr_atm_rel_store(&new_fd->shutdown_error1, (gpr_atm)GRPC_ERROR_NONE);
new_fd->orphaned = false;
- new_fd->read_closure = CLOSURE_NOT_READY;
- new_fd->write_closure = CLOSURE_NOT_READY;
+ gpr_atm_rel_store(&new_fd->read_closure, CLOSURE_NOT_READY);
+ gpr_atm_rel_store(&new_fd->write_closure, CLOSURE_NOT_READY);
+ gpr_atm_rel_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
+
new_fd->freelist_next = NULL;
new_fd->on_done_closure = NULL;
- new_fd->read_notifier_pollset = NULL;
gpr_mu_unlock(&new_fd->po.mu);
@@ -1061,100 +1071,159 @@
}
static grpc_error *fd_shutdown_error(grpc_fd *fd) {
+ grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error1);
+ if (err != GRPC_ERROR_NONE) {
+ err = GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &err, 1);
+ }
+
+ return err;
+
+ /* TODO sreek - delete this */
+ /*
if (!fd->shutdown) {
return GRPC_ERROR_NONE;
} else {
return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
}
+ */
}
-static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure **st, grpc_closure *closure) {
- if (fd->shutdown) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
- } else if (*st == CLOSURE_NOT_READY) {
- /* not ready ==> switch to a waiting state by setting the closure */
- *st = closure;
- } else if (*st == CLOSURE_READY) {
- /* already ready ==> queue the closure to run immediately */
- *st = CLOSURE_NOT_READY;
- grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
- } else {
- /* upcallptr was set to a different closure. This is an error! */
- gpr_log(GPR_ERROR,
- "User called a notify_on function with a previous callback still "
- "pending");
- abort();
+static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
+ grpc_closure *closure) {
+ bool is_done = false;
+ while (!is_done) {
+ is_done = true;
+ if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
+ // CAS failed because the current value of 'state' is not
+ // 'CLOSURE_NOT_READY'
+ gpr_atm curr = gpr_atm_acq_load(state);
+
+ switch (curr) {
+ case CLOSURE_NOT_READY: {
+ // The CAS above failed because the state was not 'CLOSURE_NOT_READY'
+ // but it seems to be back to 'CLOSURE_NOT_READY'. Lets retry CAS
+ // again
+ is_done = false;
+ break;
+ }
+
+ case CLOSURE_READY: {
+ // Change the state to CLOSURE_NOT_READY and if successful, schedule
+ // the closure
+ if (gpr_atm_rel_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
+ grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
+ } else {
+ // Looks like the current state is not CLOSURE_READY anymore. Retry
+ // from the beginning
+ is_done = false;
+ }
+ }
+
+ default: {
+ // The current state already contains a closure. This is a fatal error
+ gpr_log(
+ GPR_ERROR,
+ "User called notify_on function with a previous callback still "
+ "pending");
+ abort();
+ break;
+ }
+ }
+ }
}
}
-/* returns 1 if state becomes not ready */
-static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure **st) {
- if (*st == CLOSURE_READY) {
- /* duplicate ready ==> ignore */
- return 0;
- } else if (*st == CLOSURE_NOT_READY) {
- /* not ready, and not waiting ==> flag ready */
- *st = CLOSURE_READY;
- return 0;
- } else {
- /* waiting ==> queue closure */
- grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
- *st = CLOSURE_NOT_READY;
- return 1;
- }
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
+ /* Try the fast-path first (i.e expect current value to be CLOSURE_NOT_READY
+ * and then try to change it to CLOSURE_READY) */
+ if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
+ /* CAS failed since the current state is not CLOSURE_NOT_READY. Find out
+ what is the current state */
+ gpr_atm curr = gpr_atm_acq_load(state);
+ switch (curr) {
+ case CLOSURE_READY: {
+ /* Already ready. We are done here */
+ break;
+ }
+
+ case CLOSURE_NOT_READY: {
+ /* The state was not CLOSURE_NOT_READY when we checked initially at the
+ beginning of this function but now it is CLOSURE_NOT_READY. This is
+ only possible if the state transitioned out of CLOSURE_NOT_READY to
+ either CLOSURE_READY or <some closure> and then back to
+ CLOSURE_NOT_READY again. So there is no need to make the state
+ CLOSURE_READY now */
+ break;
+ }
+
+ default: {
+ /* 'curr' is a closure. This closure should be enqueued and the current
+ state should be changed to CLOSURE_NOT_READY */
+ if (gpr_atm_rel_cas(state, curr, CLOSURE_NOT_READY)) {
+ grpc_closure_sched(exec_ctx, (grpc_closure *)*state,
+ fd_shutdown_error(fd));
+ } /* else the state changed again. This can only happen due to another
+ racing set_ready function (which means, we do not have to do
+ anything else here */
+ break;
+ }
+ }
+ } /* else fast-path succeeded. We are done */
}
static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
grpc_fd *fd) {
- grpc_pollset *notifier = NULL;
-
- gpr_mu_lock(&fd->po.mu);
- notifier = fd->read_notifier_pollset;
- gpr_mu_unlock(&fd->po.mu);
-
- return notifier;
+ gpr_atm notifier = gpr_atm_no_barrier_load(&fd->read_closure);
+ return (grpc_pollset *)notifier;
}
static bool fd_is_shutdown(grpc_fd *fd) {
- gpr_mu_lock(&fd->po.mu);
- const bool r = fd->shutdown;
- gpr_mu_unlock(&fd->po.mu);
- return r;
+ return (bool)gpr_atm_acq_load(&fd->shutdown1);
}
/* Might be called multiple times */
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
- gpr_mu_lock(&fd->po.mu);
- /* Do the actual shutdown only once */
- if (!fd->shutdown) {
- fd->shutdown = true;
- fd->shutdown_error = why;
+ if (gpr_atm_acq_cas(&fd->shutdown1, (gpr_atm) false, (gpr_atm) true)) {
+ gpr_atm_rel_store(&fd->shutdown_error1, (gpr_atm)why);
shutdown(fd->fd, SHUT_RDWR);
- /* Flush any pending read and write closures. Since fd->shutdown is 'true'
- at this point, the closures would be called with 'success = false' */
- set_ready_locked(exec_ctx, fd, &fd->read_closure);
- set_ready_locked(exec_ctx, fd, &fd->write_closure);
+
+ /* Flush any pending read and write closures at this point. Since
+ fd->shutdown_error1 is set, both the closures would be called with
+ success = false */
+ set_ready(exec_ctx, fd, &fd->read_closure);
+ set_ready(exec_ctx, fd, &fd->write_closure);
+
} else {
+ // Shutdown already called
GRPC_ERROR_UNREF(why);
}
- gpr_mu_unlock(&fd->po.mu);
+
+ // gpr_mu_lock(&fd->po.mu);
+ /* Do the actual shutdown only once */
+ // if (!fd->shutdown) {
+ // fd->shutdown = true;
+ // fd->shutdown_error = why;
+
+ // shutdown(fd->fd, SHUT_RDWR);
+ /* Flush any pending read and write closures. Since fd->shutdown is 'true'
+ at this point, the closures would be called with 'success = false' */
+ // set_ready(exec_ctx, fd, &fd->read_closure);
+ // set_ready(exec_ctx, fd, &fd->write_closure);
+ // } else {
+ // GRPC_ERROR_UNREF(why);
+ // }
+ // gpr_mu_unlock(&fd->po.mu);
}
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- gpr_mu_lock(&fd->po.mu);
- notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
- gpr_mu_unlock(&fd->po.mu);
+ notify_on(exec_ctx, fd, &fd->read_closure, closure);
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- gpr_mu_lock(&fd->po.mu);
- notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
- gpr_mu_unlock(&fd->po.mu);
+ notify_on(exec_ctx, fd, &fd->write_closure, closure);
}
static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
@@ -1343,18 +1412,18 @@
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
- gpr_mu_lock(&fd->po.mu);
- set_ready_locked(exec_ctx, fd, &fd->read_closure);
- fd->read_notifier_pollset = notifier;
- gpr_mu_unlock(&fd->po.mu);
+ set_ready(exec_ctx, fd, &fd->read_closure);
+
+ // Note, it is possible that fd_become_readable might be called twice with
+ // different 'notifier's when an fd becomes readable and it is in two epoll
+ // sets (This can happen briefly during polling island merges). In such cases
+ // it does not really matter which notifer is set as the read_notifier_pollset
+ // (They would both point to the same polling island anyway)
+ gpr_atm_no_barrier_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
- gpr_mu_lock(&fd->po.mu);
- set_ready_locked(exec_ctx, fd, &fd->write_closure);
- gpr_mu_unlock(&fd->po.mu);
+ set_ready(exec_ctx, fd, &fd->write_closure);
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,