closures -> atomics
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 51842fc..c60ff85 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -68,7 +68,7 @@
     gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
   }
 
-/* Uncomment the following enable extra checks on poll_object operations */
+/* Uncomment the following to enable extra checks on poll_object operations */
 /* #define PO_DEBUG */
 
 static int grpc_wakeup_signal = -1;
@@ -141,23 +141,26 @@
   gpr_atm refst;
 
   /* Indicates that the fd is shutdown and that any pending read/write closures
-     should fail */
-  bool shutdown;
-  grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */
+     should fail. */
+  // TODO: sreek storing bool and grpc_error*
+  gpr_atm shutdown1;
+  gpr_atm shutdown_error1; /* reason for shutdown: set iff shutdown==true */
 
   /* The fd is either closed or we relinquished control of it. In either cases,
      this indicates that the 'fd' on this structure is no longer valid */
   bool orphaned;
 
-  /* TODO: sreek - Move this to a lockfree implementation */
-  grpc_closure *read_closure;
-  grpc_closure *write_closure;
+  /* Closures to call when the fd is readable or writable. The actual type
+     stored in these is (grpc_closure *) */
+  gpr_atm read_closure;
+  gpr_atm write_closure;
 
   struct grpc_fd *freelist_next;
   grpc_closure *on_done_closure;
 
-  /* The pollset that last noticed that the fd is readable */
-  grpc_pollset *read_notifier_pollset;
+  /* The pollset that last noticed that the fd is readable. The actual type
+   * stored in this is (grpc_pollset *) */
+  gpr_atm read_notifier_pollset;
 
   grpc_iomgr_object iomgr_object;
 };
@@ -180,8 +183,8 @@
 static void fd_global_init(void);
 static void fd_global_shutdown(void);
 
-#define CLOSURE_NOT_READY ((grpc_closure *)0)
-#define CLOSURE_READY ((grpc_closure *)1)
+#define CLOSURE_NOT_READY ((gpr_atm)0)
+#define CLOSURE_READY ((gpr_atm)1)
 
 /*******************************************************************************
  * Polling island Declarations
@@ -908,7 +911,12 @@
     fd->freelist_next = fd_freelist;
     fd_freelist = fd;
     grpc_iomgr_unregister_object(&fd->iomgr_object);
-    if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
+
+    if ((bool)gpr_atm_acq_load(&fd->shutdown1)) {
+      grpc_error *err =
+          (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error1);
+      GRPC_ERROR_UNREF(err);
+    }
 
     gpr_mu_unlock(&fd_freelist_mu);
   } else {
@@ -972,13 +980,15 @@
 
   gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
   new_fd->fd = fd;
-  new_fd->shutdown = false;
+  gpr_atm_rel_store(&new_fd->shutdown1, (gpr_atm) false);
+  gpr_atm_rel_store(&new_fd->shutdown_error1, (gpr_atm)GRPC_ERROR_NONE);
   new_fd->orphaned = false;
-  new_fd->read_closure = CLOSURE_NOT_READY;
-  new_fd->write_closure = CLOSURE_NOT_READY;
+  gpr_atm_rel_store(&new_fd->read_closure, CLOSURE_NOT_READY);
+  gpr_atm_rel_store(&new_fd->write_closure, CLOSURE_NOT_READY);
+  gpr_atm_rel_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
+
   new_fd->freelist_next = NULL;
   new_fd->on_done_closure = NULL;
-  new_fd->read_notifier_pollset = NULL;
 
   gpr_mu_unlock(&new_fd->po.mu);
 
@@ -1061,100 +1071,159 @@
 }
 
 static grpc_error *fd_shutdown_error(grpc_fd *fd) {
+  grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error1);
+  if (err != GRPC_ERROR_NONE) {
+    err = GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &err, 1);
+  }
+
+  return err;
+
+  /* TODO sreek - delete this */
+  /*
   if (!fd->shutdown) {
     return GRPC_ERROR_NONE;
   } else {
     return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
   }
+  */
 }
 
-static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
-                             grpc_closure **st, grpc_closure *closure) {
-  if (fd->shutdown) {
-    grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
-  } else if (*st == CLOSURE_NOT_READY) {
-    /* not ready ==> switch to a waiting state by setting the closure */
-    *st = closure;
-  } else if (*st == CLOSURE_READY) {
-    /* already ready ==> queue the closure to run immediately */
-    *st = CLOSURE_NOT_READY;
-    grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
-  } else {
-    /* upcallptr was set to a different closure.  This is an error! */
-    gpr_log(GPR_ERROR,
-            "User called a notify_on function with a previous callback still "
-            "pending");
-    abort();
+static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
+                      grpc_closure *closure) {
+  bool is_done = false;
+  while (!is_done) {
+    is_done = true;
+    if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
+      // CAS failed because the current value of 'state' is not
+      // 'CLOSURE_NOT_READY'
+      gpr_atm curr = gpr_atm_acq_load(state);
+
+      switch (curr) {
+        case CLOSURE_NOT_READY: {
+          // The CAS above failed because the state was not 'CLOSURE_NOT_READY'
+          // but it seems to be back to 'CLOSURE_NOT_READY'. Lets retry CAS
+          // again
+          is_done = false;
+          break;
+        }
+
+        case CLOSURE_READY: {
+          // Change the state to CLOSURE_NOT_READY and if successful, schedule
+          // the closure
+          if (gpr_atm_rel_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
+            grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
+          } else {
+            // Looks like the current state is not CLOSURE_READY anymore. Retry
+            // from the beginning
+            is_done = false;
+          }
+        }
+
+        default: {
+          // The current state already contains a closure. This is a fatal error
+          gpr_log(
+              GPR_ERROR,
+              "User called notify_on function with a previous callback still "
+              "pending");
+          abort();
+          break;
+        }
+      }
+    }
   }
 }
 
-/* returns 1 if state becomes not ready */
-static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
-                            grpc_closure **st) {
-  if (*st == CLOSURE_READY) {
-    /* duplicate ready ==> ignore */
-    return 0;
-  } else if (*st == CLOSURE_NOT_READY) {
-    /* not ready, and not waiting ==> flag ready */
-    *st = CLOSURE_READY;
-    return 0;
-  } else {
-    /* waiting ==> queue closure */
-    grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
-    *st = CLOSURE_NOT_READY;
-    return 1;
-  }
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
+  /* Try the fast-path first (i.e expect current value to be CLOSURE_NOT_READY
+   * and then try to change it to CLOSURE_READY) */
+  if (!gpr_atm_acq_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
+    /* CAS failed since the current state is not CLOSURE_NOT_READY. Find out
+       what is the current state */
+    gpr_atm curr = gpr_atm_acq_load(state);
+    switch (curr) {
+      case CLOSURE_READY: {
+        /* Already ready. We are done here */
+        break;
+      }
+
+      case CLOSURE_NOT_READY: {
+        /* The state was not CLOSURE_NOT_READY when we checked initially at the
+           beginning of this function but now it is CLOSURE_NOT_READY. This is
+           only possible if the state transitioned out of CLOSURE_NOT_READY to
+           either CLOSURE_READY or <some closure> and then back to
+           CLOSURE_NOT_READY again. So there is no need to make the state
+           CLOSURE_READY now */
+        break;
+      }
+
+      default: {
+        /* 'curr' is a closure. This closure should be enqueued and the current
+           state should be changed to CLOSURE_NOT_READY */
+        if (gpr_atm_rel_cas(state, curr, CLOSURE_NOT_READY)) {
+          grpc_closure_sched(exec_ctx, (grpc_closure *)*state,
+                             fd_shutdown_error(fd));
+        } /* else the state changed again. This can only happen due to another
+             racing set_ready function (which means, we do not have to do
+             anything else here */
+        break;
+      }
+    }
+  } /* else fast-path succeeded. We are done */
 }
 
 static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
                                                   grpc_fd *fd) {
-  grpc_pollset *notifier = NULL;
-
-  gpr_mu_lock(&fd->po.mu);
-  notifier = fd->read_notifier_pollset;
-  gpr_mu_unlock(&fd->po.mu);
-
-  return notifier;
+  gpr_atm notifier = gpr_atm_no_barrier_load(&fd->read_closure);
+  return (grpc_pollset *)notifier;
 }
 
 static bool fd_is_shutdown(grpc_fd *fd) {
-  gpr_mu_lock(&fd->po.mu);
-  const bool r = fd->shutdown;
-  gpr_mu_unlock(&fd->po.mu);
-  return r;
+  return (bool)gpr_atm_acq_load(&fd->shutdown1);
 }
 
 /* Might be called multiple times */
 static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
-  gpr_mu_lock(&fd->po.mu);
-  /* Do the actual shutdown only once */
-  if (!fd->shutdown) {
-    fd->shutdown = true;
-    fd->shutdown_error = why;
+  if (gpr_atm_acq_cas(&fd->shutdown1, (gpr_atm) false, (gpr_atm) true)) {
+    gpr_atm_rel_store(&fd->shutdown_error1, (gpr_atm)why);
 
     shutdown(fd->fd, SHUT_RDWR);
-    /* Flush any pending read and write closures. Since fd->shutdown is 'true'
-       at this point, the closures would be called with 'success = false' */
-    set_ready_locked(exec_ctx, fd, &fd->read_closure);
-    set_ready_locked(exec_ctx, fd, &fd->write_closure);
+
+    /* Flush any pending read and write closures at this point. Since
+       fd->shutdown_error1 is set, both the closures would be called with
+       success = false */
+    set_ready(exec_ctx, fd, &fd->read_closure);
+    set_ready(exec_ctx, fd, &fd->write_closure);
+
   } else {
+    // Shutdown already called
     GRPC_ERROR_UNREF(why);
   }
-  gpr_mu_unlock(&fd->po.mu);
+
+  // gpr_mu_lock(&fd->po.mu);
+  /* Do the actual shutdown only once */
+  // if (!fd->shutdown) {
+  //  fd->shutdown = true;
+  //  fd->shutdown_error = why;
+
+  //  shutdown(fd->fd, SHUT_RDWR);
+  /* Flush any pending read and write closures. Since fd->shutdown is 'true'
+     at this point, the closures would be called with 'success = false' */
+  //  set_ready(exec_ctx, fd, &fd->read_closure);
+  //  set_ready(exec_ctx, fd, &fd->write_closure);
+  // } else {
+  //  GRPC_ERROR_UNREF(why);
+  // }
+  // gpr_mu_unlock(&fd->po.mu);
 }
 
 static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                               grpc_closure *closure) {
-  gpr_mu_lock(&fd->po.mu);
-  notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
-  gpr_mu_unlock(&fd->po.mu);
+  notify_on(exec_ctx, fd, &fd->read_closure, closure);
 }
 
 static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                                grpc_closure *closure) {
-  gpr_mu_lock(&fd->po.mu);
-  notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
-  gpr_mu_unlock(&fd->po.mu);
+  notify_on(exec_ctx, fd, &fd->write_closure, closure);
 }
 
 static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
@@ -1343,18 +1412,18 @@
 
 static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                                grpc_pollset *notifier) {
-  /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
-  gpr_mu_lock(&fd->po.mu);
-  set_ready_locked(exec_ctx, fd, &fd->read_closure);
-  fd->read_notifier_pollset = notifier;
-  gpr_mu_unlock(&fd->po.mu);
+  set_ready(exec_ctx, fd, &fd->read_closure);
+
+  // Note, it is possible that fd_become_readable might be called twice with
+  // different 'notifier's when an fd becomes readable and it is in two epoll
+  // sets (This can happen briefly during polling island merges). In such cases
+  // it does not really matter which notifer is set as the read_notifier_pollset
+  // (They would both point to the same polling island anyway)
+  gpr_atm_no_barrier_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
 }
 
 static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
-  /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
-  gpr_mu_lock(&fd->po.mu);
-  set_ready_locked(exec_ctx, fd, &fd->write_closure);
-  gpr_mu_unlock(&fd->po.mu);
+  set_ready(exec_ctx, fd, &fd->write_closure);
 }
 
 static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,