Merge branch 'master' into fd_rw_atm_closure
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index ffacdac..2613512 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -137,8 +137,8 @@
 } grpc_error_times;
 
 /// The following "special" errors can be propagated without allocating memory.
-/// They are always even so that other code (particularly combiner locks) can
-/// safely use the lower bit for themselves.
+/// They are always even so that other code (particularly combiner locks,
+/// polling engines) can safely use the lower bit for themselves.
 
 #define GRPC_ERROR_NONE ((grpc_error *)NULL)
 #define GRPC_ERROR_OOM ((grpc_error *)2)
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index fac3705..31ef433 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -68,7 +68,7 @@
     gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
   }
 
-/* Uncomment the following enable extra checks on poll_object operations */
+/* Uncomment the following to enable extra checks on poll_object operations */
 /* #define PO_DEBUG */
 
 static int grpc_wakeup_signal = -1;
@@ -140,24 +140,61 @@
      Ref/Unref by two to avoid altering the orphaned bit */
   gpr_atm refst;
 
-  /* Indicates that the fd is shutdown and that any pending read/write closures
-     should fail */
-  bool shutdown;
-  grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */
+  /* Internally stores data of type (grpc_error *). If the FD is shutdown, this
+     contains reason for shutdown (i.e a pointer to grpc_error) ORed with
+     FD_SHUTDOWN_BIT. Since address allocations are word-aligned, the lower bit
+     of (grpc_error *) addresses is guaranteed to be zero. Even if the
+     (grpc_error *), is of special types like GRPC_ERROR_NONE, GRPC_ERROR_OOM
+     etc, the lower bit is guaranteed to be zero.
 
-  /* The fd is either closed or we relinquished control of it. In either cases,
-     this indicates that the 'fd' on this structure is no longer valid */
+     Once an fd is shutdown, any pending or future read/write closures on the
+     fd should fail */
+  gpr_atm shutdown_error;
+
+  /* The fd is either closed or we relinquished control of it. In either
+     cases, this indicates that the 'fd' on this structure is no longer
+     valid */
   bool orphaned;
 
-  /* TODO: sreek - Move this to a lockfree implementation */
-  grpc_closure *read_closure;
-  grpc_closure *write_closure;
+  /* Closures to call when the fd is readable or writable respectively. These
+     fields contain one of the following values:
+       CLOSURE_READY     : The fd has an I/O event of interest but there is no
+                           closure yet to execute
+
+       CLOSURE_NOT_READY : The fd has no I/O event of interest
+
+       closure ptr       : The closure to be executed when the fd has an I/O
+                           event of interest
+
+       shutdown_error | FD_SHUTDOWN_BIT :
+                          'shutdown_error' field ORed with FD_SHUTDOWN_BIT.
+                           This indicates that the fd is shutdown. Since all
+                           memory allocations are word-aligned, the lower two
+                           bits of the shutdown_error pointer are always 0. So
+                           it is safe to OR these with FD_SHUTDOWN_BIT
+
+     Valid state transitions:
+
+       <closure ptr> <-----3------ CLOSURE_NOT_READY ----1---->  CLOSURE_READY
+         |  |                         ^   |    ^                         |  |
+         |  |                         |   |    |                         |  |
+         |  +--------------4----------+   6    +---------2---------------+  |
+         |                                |                                 |
+         |                                v                                 |
+         +-----5------->  [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+
+
+      For 1, 4 : See set_ready() function
+      For 2, 3 : See notify_on() function
+      For 5,6,7: See set_shutdown() function */
+  gpr_atm read_closure;
+  gpr_atm write_closure;
 
   struct grpc_fd *freelist_next;
   grpc_closure *on_done_closure;
 
-  /* The pollset that last noticed that the fd is readable */
-  grpc_pollset *read_notifier_pollset;
+  /* The pollset that last noticed that the fd is readable. The actual type
+   * stored in this is (grpc_pollset *) */
+  gpr_atm read_notifier_pollset;
 
   grpc_iomgr_object iomgr_object;
 };
@@ -180,8 +217,10 @@
 static void fd_global_init(void);
 static void fd_global_shutdown(void);
 
-#define CLOSURE_NOT_READY ((grpc_closure *)0)
-#define CLOSURE_READY ((grpc_closure *)1)
+#define CLOSURE_NOT_READY ((gpr_atm)0)
+#define CLOSURE_READY ((gpr_atm)2)
+
+#define FD_SHUTDOWN_BIT 1
 
 /*******************************************************************************
  * Polling island Declarations
@@ -908,7 +947,11 @@
     fd->freelist_next = fd_freelist;
     fd_freelist = fd;
     grpc_iomgr_unregister_object(&fd->iomgr_object);
-    if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
+
+    grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
+    /* Clear the least significant bit if it set (in case fd was shutdown) */
+    err = (grpc_error *)((intptr_t)err & ~FD_SHUTDOWN_BIT);
+    GRPC_ERROR_UNREF(err);
 
     gpr_mu_unlock(&fd_freelist_mu);
   } else {
@@ -972,13 +1015,14 @@
 
   gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
   new_fd->fd = fd;
-  new_fd->shutdown = false;
+  gpr_atm_no_barrier_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE);
   new_fd->orphaned = false;
-  new_fd->read_closure = CLOSURE_NOT_READY;
-  new_fd->write_closure = CLOSURE_NOT_READY;
+  gpr_atm_no_barrier_store(&new_fd->read_closure, CLOSURE_NOT_READY);
+  gpr_atm_no_barrier_store(&new_fd->write_closure, CLOSURE_NOT_READY);
+  gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
+
   new_fd->freelist_next = NULL;
   new_fd->on_done_closure = NULL;
-  new_fd->read_notifier_pollset = NULL;
 
   gpr_mu_unlock(&new_fd->po.mu);
 
@@ -1060,101 +1104,182 @@
   GRPC_ERROR_UNREF(error);
 }
 
-static grpc_error *fd_shutdown_error(grpc_fd *fd) {
-  if (!fd->shutdown) {
-    return GRPC_ERROR_NONE;
-  } else {
-    return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
+static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
+                      grpc_closure *closure) {
+  while (true) {
+    /* Fast-path: CLOSURE_NOT_READY -> <closure> */
+    /* Also do a release-cas here so that any acqire-loads in set_ready or
+       set_shutdown see this */
+    if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
+      return; /* Fast-path successful. Return */
+    }
+
+    /* Slowpath */
+    gpr_atm curr = gpr_atm_acq_load(state);
+    switch (curr) {
+      case CLOSURE_NOT_READY: {
+        break; /* retry */
+      }
+
+      case CLOSURE_READY: {
+        /* Change the state to CLOSURE_NOT_READY. If successful: Schedule the
+           closure. If not, most likely the state transitioned to shutdown. We
+           should retry */
+        if (gpr_atm_rel_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
+          grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+          return; /* Slow-path successful. Return */
+        }
+
+        break; /* retry */
+      }
+
+      default: {
+        /* 'curr' is either a closure or the fd is shutdown(in which case 'curr'
+           contains a pointer to the shutdown-error). If the fd is shutdown,
+           schedule the closure with the shutdown error */
+        if ((curr & FD_SHUTDOWN_BIT) > 0) {
+          grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT);
+          grpc_closure_sched(
+              exec_ctx, closure,
+              GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
+          return;
+        }
+
+        /* There is already a closure!. This indicates a bug in the code */
+        gpr_log(GPR_ERROR,
+                "notify_on called with a previous callback still pending");
+        abort();
+      }
+    }
   }
+
+  GPR_UNREACHABLE_CODE(return );
 }
 
-static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
-                             grpc_closure **st, grpc_closure *closure) {
-  if (fd->shutdown) {
-    grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
-  } else if (*st == CLOSURE_NOT_READY) {
-    /* not ready ==> switch to a waiting state by setting the closure */
-    *st = closure;
-  } else if (*st == CLOSURE_READY) {
-    /* already ready ==> queue the closure to run immediately */
-    *st = CLOSURE_NOT_READY;
-    grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
-  } else {
-    /* upcallptr was set to a different closure.  This is an error! */
-    gpr_log(GPR_ERROR,
-            "User called a notify_on function with a previous callback still "
-            "pending");
-    abort();
+static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
+                         grpc_error *shutdown_err) {
+  /* Try the fast-path first (i.e expect the current value to be
+     CLOSURE_NOT_READY */
+  gpr_atm curr = CLOSURE_NOT_READY;
+  gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT;
+
+  while (true) {
+    if (gpr_atm_rel_cas(state, curr, new_state)) {
+      return; /* Fast-path successful. Return */
+    }
+
+    /* Fallback to slowpath */
+    curr = gpr_atm_acq_load(state);
+    switch (curr) {
+      case CLOSURE_READY: {
+        break; /* retry */
+      }
+
+      case CLOSURE_NOT_READY: {
+        break; /* retry */
+      }
+
+      default: {
+        /* 'curr' is either a closure or the fd is already shutdown */
+
+        /* If fd is already shutdown, we are done */
+        if ((curr & FD_SHUTDOWN_BIT) > 0) {
+          return;
+        }
+
+        /* Fd is not shutdown. Schedule the closure and move the state to
+           shutdown state */
+        if (gpr_atm_rel_cas(state, curr, new_state)) {
+          grpc_closure_sched(
+              exec_ctx, (grpc_closure *)curr,
+              GRPC_ERROR_CREATE_REFERENCING("FD Shutdown", &shutdown_err, 1));
+          return;
+        }
+
+        /* 'curr' was a closure but now changed to a different state. We will
+          have to retry */
+        break;
+      }
+    }
   }
+
+  GPR_UNREACHABLE_CODE(return );
 }
 
-/* returns 1 if state becomes not ready */
-static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
-                            grpc_closure **st) {
-  if (*st == CLOSURE_READY) {
-    /* duplicate ready ==> ignore */
-    return 0;
-  } else if (*st == CLOSURE_NOT_READY) {
-    /* not ready, and not waiting ==> flag ready */
-    *st = CLOSURE_READY;
-    return 0;
-  } else {
-    /* waiting ==> queue closure */
-    grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
-    *st = CLOSURE_NOT_READY;
-    return 1;
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
+  /* Try an optimistic case first (i.e assume current state is
+     CLOSURE_NOT_READY) */
+  if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
+    return; /* early out */
+  }
+
+  gpr_atm curr = gpr_atm_acq_load(state);
+  switch (curr) {
+    case CLOSURE_READY: {
+      /* Already ready. We are done here */
+      break;
+    }
+
+    case CLOSURE_NOT_READY: {
+      /* The state was not CLOSURE_NOT_READY when we checked initially at the
+         beginning of this function but now it is CLOSURE_NOT_READY again.
+         This is only possible if the state transitioned out of
+         CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then
+         back to CLOSURE_NOT_READY again (i.e after we entered this function,
+         the fd became "ready" and the necessary actions were already done).
+         So there is no need to make the state CLOSURE_READY now */
+      break;
+    }
+
+    default: {
+      /* 'curr' is either a closure or the fd is shutdown */
+      if ((curr & FD_SHUTDOWN_BIT) > 0) {
+        /* The fd is shutdown. Do nothing */
+      } else if (gpr_atm_rel_cas(state, curr, CLOSURE_NOT_READY)) {
+        grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
+      }
+      /* else the state changed again (only possible by either a racing
+         set_ready or set_shutdown functions. In both these cases, the closure
+         would have been scheduled for execution. So we are done here */
+      break;
+    }
   }
 }
 
 static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
                                                   grpc_fd *fd) {
-  grpc_pollset *notifier = NULL;
-
-  gpr_mu_lock(&fd->po.mu);
-  notifier = fd->read_notifier_pollset;
-  gpr_mu_unlock(&fd->po.mu);
-
-  return notifier;
+  gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
+  return (grpc_pollset *)notifier;
 }
 
 static bool fd_is_shutdown(grpc_fd *fd) {
-  gpr_mu_lock(&fd->po.mu);
-  const bool r = fd->shutdown;
-  gpr_mu_unlock(&fd->po.mu);
-  return r;
+  grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error);
+  return (((intptr_t)err & FD_SHUTDOWN_BIT) > 0);
 }
 
 /* Might be called multiple times */
 static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
-  gpr_mu_lock(&fd->po.mu);
-  /* Do the actual shutdown only once */
-  if (!fd->shutdown) {
-    fd->shutdown = true;
-    fd->shutdown_error = why;
-
+  /* Store the shutdown error ORed with FD_SHUTDOWN_BIT in fd->shutdown_error */
+  if (gpr_atm_rel_cas(&fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE,
+                      (gpr_atm)why | FD_SHUTDOWN_BIT)) {
     shutdown(fd->fd, SHUT_RDWR);
-    /* Flush any pending read and write closures. Since fd->shutdown is 'true'
-       at this point, the closures would be called with 'success = false' */
-    set_ready_locked(exec_ctx, fd, &fd->read_closure);
-    set_ready_locked(exec_ctx, fd, &fd->write_closure);
+
+    set_shutdown(exec_ctx, fd, &fd->read_closure, why);
+    set_shutdown(exec_ctx, fd, &fd->write_closure, why);
   } else {
+    /* Shutdown already called */
     GRPC_ERROR_UNREF(why);
   }
-  gpr_mu_unlock(&fd->po.mu);
 }
 
 static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                               grpc_closure *closure) {
-  gpr_mu_lock(&fd->po.mu);
-  notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
-  gpr_mu_unlock(&fd->po.mu);
+  notify_on(exec_ctx, fd, &fd->read_closure, closure);
 }
 
 static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                                grpc_closure *closure) {
-  gpr_mu_lock(&fd->po.mu);
-  notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
-  gpr_mu_unlock(&fd->po.mu);
+  notify_on(exec_ctx, fd, &fd->write_closure, closure);
 }
 
 static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
@@ -1343,18 +1468,19 @@
 
 static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                                grpc_pollset *notifier) {
-  /* Need the fd->po.mu since we might be racing with fd_notify_on_read */
-  gpr_mu_lock(&fd->po.mu);
-  set_ready_locked(exec_ctx, fd, &fd->read_closure);
-  fd->read_notifier_pollset = notifier;
-  gpr_mu_unlock(&fd->po.mu);
+  set_ready(exec_ctx, fd, &fd->read_closure);
+
+  /* Note, it is possible that fd_become_readable might be called twice with
+     different 'notifier's when an fd becomes readable and it is in two epoll
+     sets (This can happen briefly during polling island merges). In such cases
+     it does not really matter which notifer is set as the read_notifier_pollset
+     (They would both point to the same polling island anyway) */
+  /* Use release store to match with acquire load in fd_get_read_notifier */
+  gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
 }
 
 static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
-  /* Need the fd->po.mu since we might be racing with fd_notify_on_write */
-  gpr_mu_lock(&fd->po.mu);
-  set_ready_locked(exec_ctx, fd, &fd->write_closure);
-  gpr_mu_unlock(&fd->po.mu);
+  set_ready(exec_ctx, fd, &fd->write_closure);
 }
 
 static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,