Tie workqueue implementation to event engine
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 273505f..2b68240 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -92,6 +92,7 @@
   lock->next_combiner_on_this_exec_ctx = NULL;
   lock->time_to_execute_final_list = false;
   lock->optional_workqueue = optional_workqueue;
+  lock->final_list_covered_by_poller = false;
   gpr_atm_no_barrier_store(&lock->state, 1);
   gpr_atm_no_barrier_store(&lock->covered_by_poller, 0);
   gpr_mpscq_init(&lock->queue);
@@ -188,8 +189,8 @@
     return false;
   }
 
-  if (lock->optional_workqueue != NULL &&
-      is_covered_by_poller(lock) && grpc_exec_ctx_ready_to_finish(exec_ctx)) {
+  if (lock->optional_workqueue != NULL && is_covered_by_poller(lock) &&
+      grpc_exec_ctx_ready_to_finish(exec_ctx)) {
     GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0);
     // this execution context wants to move on, and we have a workqueue (and
     // so can help the execution context out): schedule remaining work to be
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index f473e47..864fe62 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -152,14 +152,13 @@
  * Polling island Declarations
  */
 
-//#define GRPC_PI_REF_COUNT_DEBUG
-#ifdef GRPC_PI_REF_COUNT_DEBUG
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
 
 #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
 #define PI_UNREF(exec_ctx, p, r) \
   pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
 
-#else /* defined(GRPC_PI_REF_COUNT_DEBUG) */
+#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
 
 #define PI_ADD_REF(p, r) pi_add_ref((p))
 #define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
@@ -185,8 +184,11 @@
    * (except mu and ref_count) are invalid and must be ignored. */
   gpr_atm merged_to;
 
-  /* The workqueue associated with this polling island */
-  grpc_workqueue *workqueue;
+  gpr_atm poller_count;
+  gpr_mu workqueue_read_mu;
+  gpr_mpscq workqueue_items;
+  gpr_atm workqueue_item_count;
+  grpc_wakeup_fd workqueue_wakeup_fd;
 
   /* The fd of the underlying epoll set */
   int epoll_fd;
@@ -275,6 +277,8 @@
    threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
 static grpc_wakeup_fd polling_island_wakeup_fd;
 
+static __thread polling_island *g_current_thread_polling_island;
+
 /* Forward declaration */
 static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
 
@@ -289,10 +293,10 @@
 gpr_atm g_epoll_sync;
 #endif /* defined(GRPC_TSAN) */
 
-#ifdef GRPC_PI_REF_COUNT_DEBUG
 static void pi_add_ref(polling_island *pi);
 static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
 
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
 static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file,
                            int line) {
   long old_cnt = gpr_atm_acq_load(&pi->ref_count);
@@ -308,6 +312,36 @@
   gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
           (void *)pi, old_cnt, (old_cnt - 1), reason, file, line);
 }
+
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
+                                     const char *file, int line,
+                                     const char *reason) {
+  if (workqueue != NULL) {
+    pi_add_ref_debug((polling_island *)workqueue, reason, file, line);
+  }
+  return workqueue;
+}
+
+static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+                            const char *file, int line, const char *reason) {
+  if (workqueue != NULL) {
+    pi_unref_dbg((polling_island *)workqueue, reason, file, line);
+  }
+}
+#else
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
+  if (workqueue != NULL) {
+    pi_add_ref((polling_island *)workqueue);
+  }
+  return workqueue;
+}
+
+static void workqueue_unref(grpc_exec_ctx *exec_ctx,
+                            grpc_workqueue *workqueue) {
+  if (workqueue != NULL) {
+    pi_unref(exec_ctx, (polling_island *)workqueue);
+  }
+}
 #endif
 
 static void pi_add_ref(polling_island *pi) {
@@ -315,10 +349,7 @@
 }
 
 static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
-  /* If ref count went to one, we're back to just the workqueue owning a ref.
-     Unref the workqueue to break the loop.
-
-     If ref count went to zero, delete the polling island.
+  /* If ref count went to zero, delete the polling island.
      Note that this deletion not be done under a lock. Once the ref count goes
      to zero, we are guaranteed that no one else holds a reference to the
      polling island (and that there is no racing pi_add_ref() call either).
@@ -326,20 +357,12 @@
      Also, if we are deleting the polling island and the merged_to field is
      non-empty, we should remove a ref to the merged_to polling island
    */
-  switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
-    case 2: /* last external ref: the only one now owned is by the workqueue */
-      GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
-      break;
-    case 1: {
-      polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
-      polling_island_delete(exec_ctx, pi);
-      if (next != NULL) {
-        PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
-      }
-      break;
+  if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
+    polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
+    polling_island_delete(exec_ctx, pi);
+    if (next != NULL) {
+      PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
     }
-    case 0:
-      GPR_UNREACHABLE_CODE(return );
   }
 }
 
@@ -488,11 +511,20 @@
   pi->fd_capacity = 0;
   pi->fds = NULL;
   pi->epoll_fd = -1;
-  pi->workqueue = NULL;
+
+  gpr_mu_init(&pi->workqueue_read_mu);
+  gpr_mpscq_init(&pi->workqueue_items);
+  gpr_atm_rel_store(&pi->workqueue_item_count, 0);
 
   gpr_atm_rel_store(&pi->ref_count, 0);
+  gpr_atm_rel_store(&pi->poller_count, 0);
   gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
 
+  if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
+                    err_desc)) {
+    goto done;
+  }
+
   pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
 
   if (pi->epoll_fd < 0) {
@@ -501,26 +533,14 @@
   }
 
   polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
+  polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
 
   if (initial_fd != NULL) {
     polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
   }
 
-  if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue),
-                   err_desc) &&
-      *error == GRPC_ERROR_NONE) {
-    polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true,
-                                  error);
-    GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL);
-    pi->workqueue->wakeup_read_fd->polling_island = pi;
-    PI_ADD_REF(pi, "fd");
-  }
-
 done:
   if (*error != GRPC_ERROR_NONE) {
-    if (pi->workqueue != NULL) {
-      GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
-    }
     polling_island_delete(exec_ctx, pi);
     pi = NULL;
   }
@@ -533,7 +553,11 @@
   if (pi->epoll_fd >= 0) {
     close(pi->epoll_fd);
   }
+  GPR_ASSERT(gpr_atm_no_barrier_load(&pi->workqueue_item_count) == 0);
+  gpr_mu_destroy(&pi->workqueue_read_mu);
+  gpr_mpscq_destroy(&pi->workqueue_items);
   gpr_mu_destroy(&pi->mu);
+  grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
   gpr_free(pi->fds);
   gpr_free(pi);
 }
@@ -678,6 +702,40 @@
   }
 }
 
+static void workqueue_maybe_wakeup(polling_island *pi) {
+  bool force_wakeup = false;
+  bool is_current_poller = (g_current_thread_polling_island == pi);
+  gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
+  gpr_atm current_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
+  if (force_wakeup || current_pollers > min_current_pollers_for_wakeup) {
+    GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
+                      grpc_wakeup_fd_wakeup(&pi->workqueue_wakeup_fd));
+  }
+}
+
+static void workqueue_move_items_to_parent(polling_island *q) {
+  polling_island *p = (polling_island *)gpr_atm_no_barrier_load(&q->merged_to);
+  if (p == NULL) {
+    return;
+  }
+  gpr_mu_lock(&q->workqueue_read_mu);
+  int num_added = 0;
+  while (gpr_atm_no_barrier_load(&q->workqueue_item_count) > 0) {
+    gpr_mpscq_node *n = gpr_mpscq_pop(&q->workqueue_items);
+    if (n != NULL) {
+      gpr_atm_no_barrier_fetch_add(&q->workqueue_item_count, -1);
+      gpr_atm_no_barrier_fetch_add(&p->workqueue_item_count, 1);
+      gpr_mpscq_push(&p->workqueue_items, n);
+      num_added++;
+    }
+  }
+  gpr_mu_unlock(&q->workqueue_read_mu);
+  if (num_added > 0) {
+    workqueue_maybe_wakeup(p);
+  }
+  workqueue_move_items_to_parent(p);
+}
+
 static polling_island *polling_island_merge(polling_island *p,
                                             polling_island *q,
                                             grpc_error **error) {
@@ -702,6 +760,8 @@
     /* Add the 'merged_to' link from p --> q */
     gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
     PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
+
+    workqueue_move_items_to_parent(q);
   }
   /* else if p == q, nothing needs to be done */
 
@@ -712,6 +772,21 @@
   return q;
 }
 
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
+                              grpc_workqueue *workqueue, grpc_closure *closure,
+                              grpc_error *error) {
+  polling_island *pi = (polling_island *)workqueue;
+  GPR_TIMER_BEGIN("workqueue.enqueue", 0);
+  gpr_atm last = gpr_atm_no_barrier_fetch_add(&pi->workqueue_item_count, 1);
+  closure->error_data.error = error;
+  gpr_mpscq_push(&pi->workqueue_items, &closure->next_data.atm_next);
+  if (last == 0) {
+    workqueue_maybe_wakeup(pi);
+  }
+  GPR_TIMER_END("workqueue.enqueue", 0);
+  workqueue_move_items_to_parent(pi);
+}
+
 static grpc_error *polling_island_global_init() {
   grpc_error *error = GRPC_ERROR_NONE;
 
@@ -1042,11 +1117,8 @@
 
 static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) {
   gpr_mu_lock(&fd->mu);
-  grpc_workqueue *workqueue = NULL;
-  if (fd->polling_island != NULL) {
-    workqueue =
-        GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue");
-  }
+  grpc_workqueue *workqueue =
+      grpc_workqueue_ref((grpc_workqueue *)fd->polling_island);
   gpr_mu_unlock(&fd->mu);
   return workqueue;
 }
@@ -1299,6 +1371,25 @@
   GPR_ASSERT(pollset->polling_island == NULL);
 }
 
+static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
+                                    polling_island *pi) {
+  if (gpr_mu_trylock(&pi->workqueue_read_mu)) {
+    gpr_mpscq_node *n = gpr_mpscq_pop(&pi->workqueue_items);
+    gpr_mu_unlock(&pi->workqueue_read_mu);
+    if (n != NULL) {
+      if (gpr_atm_full_fetch_add(&pi->workqueue_item_count, -1) > 1) {
+        workqueue_maybe_wakeup(pi);
+      }
+      grpc_closure *c = (grpc_closure *)n;
+      grpc_closure_run(exec_ctx, c, c->error_data.error);
+      return true;
+    } else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
+      workqueue_maybe_wakeup(pi);
+    }
+  }
+  return false;
+}
+
 #define GRPC_EPOLL_MAX_EVENTS 100
 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
 static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
@@ -1354,7 +1445,10 @@
   PI_ADD_REF(pi, "ps_work");
   gpr_mu_unlock(&pollset->mu);
 
-  do {
+  if (!maybe_do_workqueue_work(exec_ctx, pi)) {
+    gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
+    g_current_thread_polling_island = pi;
+
     GRPC_SCHEDULING_START_BLOCKING_REGION;
     ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
                         sig_mask);
@@ -1386,6 +1480,11 @@
         append_error(error,
                      grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
                      err_desc);
+      } else if (data_ptr == &pi->workqueue_wakeup_fd) {
+        append_error(error,
+                     grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
+                     err_desc);
+        maybe_do_workqueue_work(exec_ctx, pi);
       } else if (data_ptr == &polling_island_wakeup_fd) {
         GRPC_POLLING_TRACE(
             "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
@@ -1408,7 +1507,10 @@
         }
       }
     }
-  } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
+
+    g_current_thread_polling_island = NULL;
+    gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
+  }
 
   GPR_ASSERT(pi != NULL);
 
@@ -1868,6 +1970,10 @@
 
     .kick_poller = kick_poller,
 
+    .workqueue_ref = workqueue_ref,
+    .workqueue_unref = workqueue_unref,
+    .workqueue_enqueue = workqueue_enqueue,
+
     .shutdown_engine = shutdown_engine,
 };
 
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index c2107e5..1829440 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -1989,6 +1989,32 @@
 }
 
 /*******************************************************************************
+ * workqueue stubs
+ */
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
+                                     const char *file, int line,
+                                     const char *reason) {
+  return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+                            const char *file, int line, const char *reason) {}
+#else
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
+  return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx,
+                            grpc_workqueue *workqueue) {}
+#endif
+
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
+                              grpc_workqueue *workqueue, grpc_closure *closure,
+                              grpc_error *error) {
+  grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
+/*******************************************************************************
  * event engine binding
  */
 
@@ -2029,6 +2055,10 @@
 
     .kick_poller = kick_poller,
 
+    .workqueue_ref = workqueue_ref,
+    .workqueue_unref = workqueue_unref,
+    .workqueue_enqueue = workqueue_enqueue,
+
     .shutdown_engine = shutdown_engine,
 };
 
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 16a5e30..b84a560 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -1236,6 +1236,32 @@
 }
 
 /*******************************************************************************
+ * workqueue stubs
+ */
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
+                                     const char *file, int line,
+                                     const char *reason) {
+  return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+                            const char *file, int line, const char *reason) {}
+#else
+static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
+  return workqueue;
+}
+static void workqueue_unref(grpc_exec_ctx *exec_ctx,
+                            grpc_workqueue *workqueue) {}
+#endif
+
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
+                              grpc_workqueue *workqueue, grpc_closure *closure,
+                              grpc_error *error) {
+  grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+}
+
+/*******************************************************************************
  * event engine binding
  */
 
@@ -1273,6 +1299,10 @@
 
     .kick_poller = kick_poller,
 
+    .workqueue_ref = workqueue_ref,
+    .workqueue_unref = workqueue_unref,
+    .workqueue_enqueue = workqueue_enqueue,
+
     .shutdown_engine = shutdown_engine,
 };
 
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 6536672..26618f8 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -258,4 +258,27 @@
 
 grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); }
 
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
+                                   int line, const char *reason) {
+  return g_event_engine->workqueue_ref(workqueue, file, line, reason);
+}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+                          const char *file, int line, const char *reason) {
+  g_event_engine->workqueue_unref(exec_ctx, workqueue, file, line, reason);
+}
+#else
+grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
+  return g_event_engine->workqueue_ref(workqueue);
+}
+void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
+  g_event_engine->workqueue_unref(exec_ctx, workqueue);
+}
+#endif
+
+void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+                            grpc_closure *closure, grpc_error *error) {
+  g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error);
+}
+
 #endif  // GPR_POSIX_SOCKET
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index c2aa175..9666fe5 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -95,6 +95,18 @@
   grpc_error *(*kick_poller)(void);
 
   void (*shutdown_engine)(void);
+
+#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+  grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue, const char *file,
+                                   int line, const char *reason);
+  void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+                          const char *file, int line, const char *reason);
+#else
+  grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue);
+  void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
+#endif
+  void (*workqueue_enqueue)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
+                            grpc_closure *closure, grpc_error *error);
 } grpc_event_engine_vtable;
 
 void grpc_event_engine_init(void);
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 9f95562..e1902a3 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -40,10 +40,6 @@
 #include "src/core/lib/iomgr/pollset.h"
 #include "src/core/lib/iomgr/pollset_set.h"
 
-#ifdef GPR_POSIX_SOCKET
-#include "src/core/lib/iomgr/workqueue_posix.h"
-#endif
-
 #ifdef GPR_WINDOWS
 #include "src/core/lib/iomgr/workqueue_windows.h"
 #endif
diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c
deleted file mode 100644
index 6f8a266..0000000
--- a/src/core/lib/iomgr/workqueue_posix.c
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- *     * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *     * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#ifdef GPR_POSIX_SOCKET
-
-#include "src/core/lib/iomgr/workqueue.h"
-
-#include <stdio.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/useful.h>
-
-#include "src/core/lib/iomgr/ev_posix.h"
-#include "src/core/lib/profiling/timers.h"
-
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
-                                  grpc_workqueue **workqueue) {
-  char name[32];
-  *workqueue = gpr_malloc(sizeof(grpc_workqueue));
-  gpr_ref_init(&(*workqueue)->refs, 1);
-  gpr_atm_no_barrier_store(&(*workqueue)->state, 1);
-  grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd);
-  if (err != GRPC_ERROR_NONE) {
-    gpr_free(*workqueue);
-    return err;
-  }
-  sprintf(name, "workqueue:%p", (void *)(*workqueue));
-  (*workqueue)->wakeup_read_fd = grpc_fd_create(
-      GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name);
-  gpr_mpscq_init(&(*workqueue)->queue);
-  grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue);
-  grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd,
-                         &(*workqueue)->read_closure);
-  return GRPC_ERROR_NONE;
-}
-
-static void workqueue_destroy(grpc_exec_ctx *exec_ctx,
-                              grpc_workqueue *workqueue) {
-  grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd);
-}
-
-static void workqueue_orphan(grpc_exec_ctx *exec_ctx,
-                             grpc_workqueue *workqueue) {
-  gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, -1);
-  if (last == 1) {
-    workqueue_destroy(exec_ctx, workqueue);
-  }
-}
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file,
-                                   int line, const char *reason) {
-  if (workqueue == NULL) return NULL;
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p   ref %d -> %d %s",
-          workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1,
-          reason);
-  gpr_ref(&workqueue->refs);
-  return workqueue;
-}
-#else
-grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
-  if (workqueue != NULL) {
-    gpr_ref(&workqueue->refs);
-  }
-  return workqueue;
-}
-#endif
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
-                          const char *file, int line, const char *reason) {
-  if (workqueue == NULL) return;
-  gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s",
-          workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1,
-          reason);
-  if (gpr_unref(&workqueue->refs)) {
-    workqueue_orphan(exec_ctx, workqueue);
-  }
-}
-#else
-void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
-  if (workqueue == NULL) return;
-  if (gpr_unref(&workqueue->refs)) {
-    workqueue_orphan(exec_ctx, workqueue);
-  }
-}
-#endif
-
-static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
-  abort();
-}
-
-static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
-  GPR_TIMER_MARK("workqueue.wakeup", 0);
-  grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
-  if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) {
-    drain(exec_ctx, workqueue);
-  }
-}
-
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
-  GPR_TIMER_BEGIN("workqueue.on_readable", 0);
-
-  grpc_workqueue *workqueue = arg;
-
-  if (error != GRPC_ERROR_NONE) {
-    /* HACK: let wakeup_fd code know that we stole the fd */
-    workqueue->wakeup_fd.read_fd = 0;
-    grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
-    grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy");
-    GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0);
-    gpr_free(workqueue);
-  } else {
-    gpr_mpscq_node *n = NULL;
-    for (int i = 0; i < 100; i++) {
-      n = gpr_mpscq_pop(&workqueue->queue);
-      if (n != NULL) {
-        grpc_closure *c = (grpc_closure *)n;
-        grpc_closure_run(exec_ctx, c, c->error_data.error);
-        grpc_exec_ctx_flush(exec_ctx);
-        gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, -2);
-        switch (last) {
-          default:
-            // there's more to do, keep going
-            goto keep_going;
-          case 3:  // had one count, one unorphaned --> done, unorphaned
-            goto switch_to_idle;
-          case 2:  // had one count, one orphaned --> done, orphaned
-            goto destroy;
-          case 1:
-          case 0:
-            // these values are illegal - representing an already done or
-            // deleted workqueue
-            GPR_UNREACHABLE_CODE(break);
-        }
-      }
-    }
-    /* fall through to wakeup_next -- we tried a bunch of times to pull a node
-     * but failed */
-wakeup_next:
-    error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
-    if (error != GRPC_ERROR_NONE) {
-      /* recurse to get error handling */
-      on_readable(exec_ctx, arg, error);
-    } else {
-      grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
-                             &workqueue->read_closure);
-      wakeup(exec_ctx, workqueue);
-    }
-    return;
-
-keep_going:
-    if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
-      goto wakeup_next;
-    } else {
-      /* recurse to continue */
-      on_readable(exec_ctx, arg, GRPC_ERROR_NONE);
-    }
-    return;
-
-switch_to_idle:
-    error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
-    if (error != GRPC_ERROR_NONE) {
-      /* recurse to get error handling */
-      on_readable(exec_ctx, arg, error);
-    } else {
-      grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,
-                             &workqueue->read_closure);
-    }
-    return;
-
-destroy:
-    workqueue_destroy(exec_ctx, workqueue);
-    return;
-  }
-
-  GPR_TIMER_END("workqueue.on_readable", 0);
-}
-
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
-                            grpc_closure *closure, grpc_error *error) {
-  GPR_TIMER_BEGIN("workqueue.enqueue", 0);
-  GRPC_WORKQUEUE_REF(workqueue, "enqueue");
-  gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2);
-  GPR_ASSERT(last & 1);
-  closure->error_data.error = error;
-  gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next);
-  if (last == 1) {
-    wakeup(exec_ctx, workqueue);
-  }
-  GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
-  GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-#endif /* GPR_POSIX_SOCKET */
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h
deleted file mode 100644
index 03ee21c..0000000
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- *     * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *     * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H
-#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H
-
-#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/support/mpscq.h"
-
-struct grpc_fd;
-
-struct grpc_workqueue {
-  gpr_refcount refs;
-  gpr_mpscq queue;
-  // state is:
-  // lower bit - zero if orphaned
-  // other bits - number of items enqueued
-  gpr_atm state;
-
-  grpc_wakeup_fd wakeup_fd;
-  struct grpc_fd *wakeup_read_fd;
-
-  grpc_closure read_closure;
-};
-
-/** Create a work queue. Returns an error if creation fails. If creation
-    succeeds, sets *workqueue to point to it. */
-grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
-                                  grpc_workqueue **workqueue);
-
-#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */