Introduce call lists for moving work outside locks
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 5b16597..a234650 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -73,7 +73,7 @@
       guarded by mu_config */
   grpc_client_config *incoming_configuration;
   /** a list of closures that are all waiting for config to come in */
-  grpc_iomgr_closure *waiting_for_config_closures;
+  grpc_iomgr_call_list waiting_for_config_closures;
   /** resolver callback */
   grpc_iomgr_closure on_config_changed;
   /** connectivity state being tracked */
@@ -181,8 +181,8 @@
   waiting_call *wc = gpr_malloc(sizeof(*wc));
   grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
   wc->elem = elem;
-  wc->closure.next = chand->waiting_for_config_closures;
-  chand->waiting_for_config_closures = &wc->closure;
+  grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure,
+                           1);
 }
 
 static int is_empty(void *p, int len) {
@@ -230,6 +230,7 @@
 static void picked_target(void *arg, int iomgr_success) {
   call_data *calld = arg;
   grpc_pollset *pollset;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
 
   if (calld->picked_channel == NULL) {
     /* treat this like a cancellation */
@@ -248,9 +249,10 @@
       grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
       grpc_subchannel_create_call(calld->picked_channel, pollset,
                                   &calld->subchannel_call,
-                                  &calld->async_setup_task);
+                                  &calld->async_setup_task, &call_list);
     }
   }
+  grpc_iomgr_call_list_run(call_list);
 }
 
 static grpc_iomgr_closure *merge_into_waiting_op(
@@ -310,7 +312,7 @@
   grpc_subchannel_call *subchannel_call;
   grpc_lb_policy *lb_policy;
   grpc_transport_stream_op op2;
-  grpc_iomgr_closure *consumed_op = NULL;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
 
@@ -328,7 +330,7 @@
       break;
     case CALL_WAITING_FOR_SEND:
       GPR_ASSERT(!continuation);
-      consumed_op = merge_into_waiting_op(elem, op);
+      grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
       if (!calld->waiting_op.send_ops &&
           calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
         gpr_mu_unlock(&calld->mu_state);
@@ -357,7 +359,8 @@
           handle_op_after_cancellation(elem, op);
           handle_op_after_cancellation(elem, &op2);
         } else {
-          consumed_op = merge_into_waiting_op(elem, op);
+          grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op),
+                                   1);
           gpr_mu_unlock(&calld->mu_state);
         }
         break;
@@ -398,7 +401,7 @@
                                     calld);
             grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
                                 &calld->picked_channel,
-                                &calld->async_setup_task);
+                                &calld->async_setup_task, &call_list);
 
             GRPC_LB_POLICY_UNREF(lb_policy, "pick");
           } else if (chand->resolver != NULL) {
@@ -424,9 +427,7 @@
       break;
   }
 
-  if (consumed_op != NULL) {
-    consumed_op->cb(consumed_op->cb_arg, 1);
-  }
+  grpc_iomgr_call_list_run(call_list);
 }
 
 static void cc_start_transport_stream_op(grpc_call_element *elem,
@@ -435,36 +436,38 @@
 }
 
 static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
-                            grpc_connectivity_state current_state);
+                            grpc_connectivity_state current_state,
+                            grpc_iomgr_call_list *cl);
 
-static void on_lb_policy_state_changed_locked(
-    lb_policy_connectivity_watcher *w) {
+static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
+                                              grpc_iomgr_call_list *cl) {
   /* check if the notification is for a stale policy */
   if (w->lb_policy != w->chand->lb_policy) return;
 
-  grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed");
+  grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed",
+                              cl);
   if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
-    watch_lb_policy(w->chand, w->lb_policy, w->state);
+    watch_lb_policy(w->chand, w->lb_policy, w->state, cl);
   }
 }
 
 static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
   lb_policy_connectivity_watcher *w = arg;
-  grpc_connectivity_state_flusher f;
+  grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
 
   gpr_mu_lock(&w->chand->mu_config);
-  on_lb_policy_state_changed_locked(w);
-  grpc_connectivity_state_begin_flush(&w->chand->state_tracker, &f);
+  on_lb_policy_state_changed_locked(w, &cl);
   gpr_mu_unlock(&w->chand->mu_config);
 
-  grpc_connectivity_state_end_flush(&f);
+  grpc_iomgr_call_list_run(cl);
 
   GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
   gpr_free(w);
 }
 
 static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
-                            grpc_connectivity_state current_state) {
+                            grpc_connectivity_state current_state,
+                            grpc_iomgr_call_list *call_list) {
   lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
   GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
 
@@ -472,13 +475,8 @@
   grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
   w->state = current_state;
   w->lb_policy = lb_policy;
-  if (grpc_lb_policy_notify_on_state_change(lb_policy, &w->state,
-                                            &w->on_changed)
-          .state_already_changed) {
-    on_lb_policy_state_changed_locked(w);
-    GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
-    gpr_free(w);
-  }
+  grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed,
+                                        call_list);
 }
 
 static void cc_on_config_changed(void *arg, int iomgr_success) {
@@ -486,9 +484,8 @@
   grpc_lb_policy *lb_policy = NULL;
   grpc_lb_policy *old_lb_policy;
   grpc_resolver *old_resolver;
-  grpc_iomgr_closure *wakeup_closures = NULL;
   grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
-  grpc_connectivity_state_flusher f;
+  grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
   int exit_idle = 0;
 
   if (chand->incoming_configuration != NULL) {
@@ -496,7 +493,7 @@
     if (lb_policy != NULL) {
       GRPC_LB_POLICY_REF(lb_policy, "channel");
       GRPC_LB_POLICY_REF(lb_policy, "config_change");
-      state = grpc_lb_policy_check_connectivity(lb_policy);
+      state = grpc_lb_policy_check_connectivity(lb_policy, &cl);
     }
 
     grpc_client_config_unref(chand->incoming_configuration);
@@ -508,8 +505,7 @@
   old_lb_policy = chand->lb_policy;
   chand->lb_policy = lb_policy;
   if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
-    wakeup_closures = chand->waiting_for_config_closures;
-    chand->waiting_for_config_closures = NULL;
+    grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl);
   }
   if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
     GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@@ -520,15 +516,13 @@
   if (iomgr_success && chand->resolver) {
     grpc_resolver *resolver = chand->resolver;
     GRPC_RESOLVER_REF(resolver, "channel-next");
-    grpc_connectivity_state_set(&chand->state_tracker, state,
-                                "new_lb+resolver");
+    grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver",
+                                &cl);
     if (lb_policy != NULL) {
-      watch_lb_policy(chand, lb_policy, state);
+      watch_lb_policy(chand, lb_policy, state, &cl);
     }
-    grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
     gpr_mu_unlock(&chand->mu_config);
     GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
-    grpc_connectivity_state_end_flush(&f);
     grpc_resolver_next(resolver, &chand->incoming_configuration,
                        &chand->on_config_changed);
     GRPC_RESOLVER_UNREF(resolver, "channel-next");
@@ -536,10 +530,9 @@
     old_resolver = chand->resolver;
     chand->resolver = NULL;
     grpc_connectivity_state_set(&chand->state_tracker,
-                                GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
-    grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
+                                GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone",
+                                &cl);
     gpr_mu_unlock(&chand->mu_config);
-    grpc_connectivity_state_end_flush(&f);
     if (old_resolver != NULL) {
       grpc_resolver_shutdown(old_resolver);
       GRPC_RESOLVER_UNREF(old_resolver, "channel");
@@ -547,24 +540,20 @@
   }
 
   if (exit_idle) {
-    grpc_lb_policy_exit_idle(lb_policy);
+    grpc_lb_policy_exit_idle(lb_policy, &cl);
     GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
   }
 
   if (old_lb_policy != NULL) {
-    grpc_lb_policy_shutdown(old_lb_policy);
+    grpc_lb_policy_shutdown(old_lb_policy, &cl);
     GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
   }
 
-  while (wakeup_closures) {
-    grpc_iomgr_closure *next = wakeup_closures->next;
-    wakeup_closures->cb(wakeup_closures->cb_arg, 1);
-    wakeup_closures = next;
-  }
-
   if (lb_policy != NULL) {
     GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
   }
+
+  grpc_iomgr_call_list_run(cl);
   GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
 }
 
@@ -573,23 +562,21 @@
   grpc_lb_policy *lb_policy = NULL;
   channel_data *chand = elem->channel_data;
   grpc_resolver *destroy_resolver = NULL;
-  grpc_connectivity_state_flusher f;
-  grpc_iomgr_closure *call_list = op->on_consumed;
-  call_list->next = NULL;
-  op->on_consumed = NULL;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+
+  if (op->on_consumed) {
+    grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1);
+    op->on_consumed = NULL;
+  }
 
   GPR_ASSERT(op->set_accept_stream == NULL);
   GPR_ASSERT(op->bind_pollset == NULL);
 
   gpr_mu_lock(&chand->mu_config);
   if (op->on_connectivity_state_change != NULL) {
-    if (grpc_connectivity_state_notify_on_state_change(
-            &chand->state_tracker, op->connectivity_state,
-            op->on_connectivity_state_change)
-            .state_already_changed) {
-      op->on_connectivity_state_change->next = call_list;
-      call_list = op->on_connectivity_state_change;
-    }
+    grpc_connectivity_state_notify_on_state_change(
+        &chand->state_tracker, op->connectivity_state,
+        op->on_connectivity_state_change, &call_list);
     op->on_connectivity_state_change = NULL;
     op->connectivity_state = NULL;
   }
@@ -603,18 +590,17 @@
 
   if (op->disconnect && chand->resolver != NULL) {
     grpc_connectivity_state_set(&chand->state_tracker,
-                                GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+                                GRPC_CHANNEL_FATAL_FAILURE, "disconnect",
+                                &call_list);
     destroy_resolver = chand->resolver;
     chand->resolver = NULL;
     if (chand->lb_policy != NULL) {
-      grpc_lb_policy_shutdown(chand->lb_policy);
+      grpc_lb_policy_shutdown(chand->lb_policy, &call_list);
       GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
       chand->lb_policy = NULL;
     }
   }
-  grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
   gpr_mu_unlock(&chand->mu_config);
-  grpc_connectivity_state_end_flush(&f);
 
   if (destroy_resolver) {
     grpc_resolver_shutdown(destroy_resolver);
@@ -622,15 +608,11 @@
   }
 
   if (lb_policy) {
-    grpc_lb_policy_broadcast(lb_policy, op);
+    grpc_lb_policy_broadcast(lb_policy, op, &call_list);
     GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
   }
 
-  while (call_list != NULL) {
-    grpc_iomgr_closure *next = call_list->next;
-    call_list->cb(call_list->cb_arg, 1);
-    call_list = next;
-  }
+  grpc_iomgr_call_list_run(call_list);
 }
 
 /* Constructor for call_data */
@@ -740,7 +722,7 @@
   GPR_ASSERT(!chand->resolver);
   chand->resolver = resolver;
   GRPC_RESOLVER_REF(resolver, "channel");
-  if (chand->waiting_for_config_closures != NULL ||
+  if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) ||
       chand->exit_idle_when_lb_policy_arrives) {
     chand->started_resolving = 1;
     GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
@@ -751,14 +733,15 @@
 }
 
 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
-    grpc_channel_element *elem, int try_to_connect) {
+    grpc_channel_element *elem, int try_to_connect,
+    grpc_iomgr_call_list *call_list) {
   channel_data *chand = elem->channel_data;
   grpc_connectivity_state out;
   gpr_mu_lock(&chand->mu_config);
   out = grpc_connectivity_state_check(&chand->state_tracker);
   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
     if (chand->lb_policy != NULL) {
-      grpc_lb_policy_exit_idle(chand->lb_policy);
+      grpc_lb_policy_exit_idle(chand->lb_policy, call_list);
     } else {
       chand->exit_idle_when_lb_policy_arrives = 1;
       if (!chand->started_resolving && chand->resolver != NULL) {
@@ -775,16 +758,12 @@
 
 void grpc_client_channel_watch_connectivity_state(
     grpc_channel_element *elem, grpc_connectivity_state *state,
-    grpc_iomgr_closure *on_complete) {
+    grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
   channel_data *chand = elem->channel_data;
-  grpc_connectivity_state_notify_on_state_change_result r;
   gpr_mu_lock(&chand->mu_config);
-  r = grpc_connectivity_state_notify_on_state_change(&chand->state_tracker,
-                                                     state, on_complete);
+  grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
+                                                 on_complete, call_list);
   gpr_mu_unlock(&chand->mu_config);
-  if (r.state_already_changed) {
-    on_complete->cb(on_complete->cb_arg, 1);
-  }
 }
 
 grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index 13681e3..60090e8 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -53,11 +53,12 @@
                                       grpc_resolver *resolver);
 
 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
-    grpc_channel_element *elem, int try_to_connect);
+    grpc_channel_element *elem, int try_to_connect,
+    grpc_iomgr_call_list *call_list);
 
 void grpc_client_channel_watch_connectivity_state(
     grpc_channel_element *elem, grpc_connectivity_state *state,
-    grpc_iomgr_closure *on_complete);
+    grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list);
 
 grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
     grpc_channel_element *elem);
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 804dbde..8fd8dd7 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -108,9 +108,8 @@
   gpr_free(p);
 }
 
-void pf_shutdown(grpc_lb_policy *pol) {
+void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
-  grpc_connectivity_state_flusher f;
   pending_pick *pp;
   gpr_mu_lock(&p->mu);
   del_interested_parties_locked(p);
@@ -118,51 +117,40 @@
   pp = p->pending_picks;
   p->pending_picks = NULL;
   grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
-                              "shutdown");
-  grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
+                              "shutdown", call_list);
   gpr_mu_unlock(&p->mu);
-  grpc_connectivity_state_end_flush(&f);
   while (pp != NULL) {
     pending_pick *next = pp->next;
     *pp->target = NULL;
-    pp->on_complete->cb(pp->on_complete->cb_arg, 0);
+    grpc_iomgr_call_list_add(call_list, pp->on_complete, 1);
     gpr_free(pp);
     pp = next;
   }
 }
 
-/* returns a closure to call, or NULL */
-static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) {
+static void start_picking(pick_first_lb_policy *p,
+                          grpc_iomgr_call_list *call_list) {
   p->started_picking = 1;
   p->checking_subchannel = 0;
   p->checking_connectivity = GRPC_CHANNEL_IDLE;
   GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
-  if (grpc_subchannel_notify_on_state_change(
-          p->subchannels[p->checking_subchannel], &p->checking_connectivity,
-          &p->connectivity_changed)
-          .state_already_changed) {
-    return &p->connectivity_changed;
-  } else {
-    return NULL;
-  }
+  grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
+                                         &p->checking_connectivity,
+                                         &p->connectivity_changed, call_list);
 }
 
-void pf_exit_idle(grpc_lb_policy *pol) {
+void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
-  grpc_iomgr_closure *call = NULL;
   gpr_mu_lock(&p->mu);
   if (!p->started_picking) {
-    call = start_picking(p);
+    start_picking(p, call_list);
   }
   gpr_mu_unlock(&p->mu);
-  if (call) {
-    call->cb(call->cb_arg, 1);
-  }
 }
 
 void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
              grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
-             grpc_iomgr_closure *on_complete) {
+             grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   pending_pick *pp;
   gpr_mu_lock(&p->mu);
@@ -171,9 +159,8 @@
     *target = p->selected;
     on_complete->cb(on_complete->cb_arg, 1);
   } else {
-    grpc_iomgr_closure *call = NULL;
     if (!p->started_picking) {
-      call = start_picking(p);
+      start_picking(p, call_list);
     }
     grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
                                          pollset);
@@ -184,9 +171,6 @@
     pp->on_complete = on_complete;
     p->pending_picks = pp;
     gpr_mu_unlock(&p->mu);
-    if (call) {
-      call->cb(call->cb_arg, 1);
-    }
   }
 }
 
@@ -194,8 +178,7 @@
   pick_first_lb_policy *p = arg;
   pending_pick *pp;
   int unref = 0;
-  grpc_iomgr_closure *cbs = NULL;
-  grpc_connectivity_state_flusher f;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
 
   gpr_mu_lock(&p->mu);
 
@@ -203,14 +186,11 @@
     unref = 1;
   } else if (p->selected != NULL) {
     grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
-                                "selected_changed");
+                                "selected_changed", &call_list);
     if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
-      if (grpc_subchannel_notify_on_state_change(
-              p->selected, &p->checking_connectivity, &p->connectivity_changed)
-              .state_already_changed) {
-        p->connectivity_changed.next = cbs;
-        cbs = &p->connectivity_changed;
-      }
+      grpc_subchannel_notify_on_state_change(
+          p->selected, &p->checking_connectivity, &p->connectivity_changed,
+          &call_list);
     } else {
       unref = 1;
     }
@@ -219,28 +199,23 @@
     switch (p->checking_connectivity) {
       case GRPC_CHANNEL_READY:
         grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
-                                    "connecting_ready");
+                                    "connecting_ready", &call_list);
         p->selected = p->subchannels[p->checking_subchannel];
         while ((pp = p->pending_picks)) {
           p->pending_picks = pp->next;
           *pp->target = p->selected;
           grpc_subchannel_del_interested_party(p->selected, pp->pollset);
-          pp->on_complete->next = cbs;
-          cbs = pp->on_complete;
+          grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
           gpr_free(pp);
         }
-        if (grpc_subchannel_notify_on_state_change(p->selected,
-                                                   &p->checking_connectivity,
-                                                   &p->connectivity_changed)
-                .state_already_changed) {
-          p->connectivity_changed.next = cbs;
-          cbs = &p->connectivity_changed;
-        }
+        grpc_subchannel_notify_on_state_change(
+            p->selected, &p->checking_connectivity, &p->connectivity_changed,
+            &call_list);
         break;
       case GRPC_CHANNEL_TRANSIENT_FAILURE:
         grpc_connectivity_state_set(&p->state_tracker,
                                     GRPC_CHANNEL_TRANSIENT_FAILURE,
-                                    "connecting_transient_failure");
+                                    "connecting_transient_failure", &call_list);
         del_interested_parties_locked(p);
         p->checking_subchannel =
             (p->checking_subchannel + 1) % p->num_subchannels;
@@ -248,13 +223,9 @@
             p->subchannels[p->checking_subchannel]);
         add_interested_parties_locked(p);
         if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
-          if (grpc_subchannel_notify_on_state_change(
-                  p->subchannels[p->checking_subchannel],
-                  &p->checking_connectivity, &p->connectivity_changed)
-                  .state_already_changed) {
-            p->connectivity_changed.next = cbs;
-            cbs = &p->connectivity_changed;
-          }
+          grpc_subchannel_notify_on_state_change(
+              p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+              &p->connectivity_changed, &call_list);
         } else {
           goto loop;
         }
@@ -262,14 +233,10 @@
       case GRPC_CHANNEL_CONNECTING:
       case GRPC_CHANNEL_IDLE:
         grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
-                                    "connecting_changed");
-        if (grpc_subchannel_notify_on_state_change(
-                p->subchannels[p->checking_subchannel],
-                &p->checking_connectivity, &p->connectivity_changed)
-                .state_already_changed) {
-          p->connectivity_changed.next = cbs;
-          cbs = &p->connectivity_changed;
-        }
+                                    "connecting_changed", &call_list);
+        grpc_subchannel_notify_on_state_change(
+            p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+            &p->connectivity_changed, &call_list);
         break;
       case GRPC_CHANNEL_FATAL_FAILURE:
         del_interested_parties_locked(p);
@@ -280,19 +247,18 @@
         if (p->num_subchannels == 0) {
           grpc_connectivity_state_set(&p->state_tracker,
                                       GRPC_CHANNEL_FATAL_FAILURE,
-                                      "no_more_channels");
+                                      "no_more_channels", &call_list);
           while ((pp = p->pending_picks)) {
             p->pending_picks = pp->next;
             *pp->target = NULL;
-            pp->on_complete->next = cbs;
-            cbs = pp->on_complete;
+            grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
             gpr_free(pp);
           }
           unref = 1;
         } else {
           grpc_connectivity_state_set(&p->state_tracker,
                                       GRPC_CHANNEL_TRANSIENT_FAILURE,
-                                      "subchannel_failed");
+                                      "subchannel_failed", &call_list);
           p->checking_subchannel %= p->num_subchannels;
           p->checking_connectivity = grpc_subchannel_check_connectivity(
               p->subchannels[p->checking_subchannel]);
@@ -302,22 +268,17 @@
     }
   }
 
-  grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
   gpr_mu_unlock(&p->mu);
-  grpc_connectivity_state_end_flush(&f);
 
-  while (cbs != NULL) {
-    grpc_iomgr_closure *next = cbs->next;
-    cbs->cb(cbs->cb_arg, 1);
-    cbs = next;
-  }
+  grpc_iomgr_call_list_run(call_list);
 
   if (unref) {
     GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
   }
 }
 
-static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
+static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op,
+                         grpc_iomgr_call_list *call_list) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   size_t i;
   size_t n;
@@ -339,7 +300,8 @@
   gpr_free(subchannels);
 }
 
-static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
+static grpc_connectivity_state pf_check_connectivity(
+    grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
   grpc_connectivity_state st;
   gpr_mu_lock(&p->mu);
@@ -348,16 +310,15 @@
   return st;
 }
 
-static grpc_connectivity_state_notify_on_state_change_result
-pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current,
-                          grpc_iomgr_closure *notify) {
+void pf_notify_on_state_change(grpc_lb_policy *pol,
+                               grpc_connectivity_state *current,
+                               grpc_iomgr_closure *notify,
+                               grpc_iomgr_call_list *call_list) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
-  grpc_connectivity_state_notify_on_state_change_result r;
   gpr_mu_lock(&p->mu);
-  r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
-                                                     notify);
+  grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+                                                 notify, call_list);
   gpr_mu_unlock(&p->mu);
-  return r;
 }
 
 static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 48a787d..a9dc9dc 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -63,33 +63,38 @@
   }
 }
 
-void grpc_lb_policy_shutdown(grpc_lb_policy *policy) {
-  policy->vtable->shutdown(policy);
+void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
+                             grpc_iomgr_call_list *call_list) {
+  policy->vtable->shutdown(policy, call_list);
 }
 
 void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
                          grpc_metadata_batch *initial_metadata,
                          grpc_subchannel **target,
-                         grpc_iomgr_closure *on_complete) {
-  policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete);
+                         grpc_iomgr_closure *on_complete,
+                         grpc_iomgr_call_list *call_list) {
+  policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete,
+                       call_list);
 }
 
-void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) {
-  policy->vtable->broadcast(policy, op);
+void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op,
+                              grpc_iomgr_call_list *call_list) {
+  policy->vtable->broadcast(policy, op, call_list);
 }
 
-void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) {
-  policy->vtable->exit_idle(policy);
+void grpc_lb_policy_exit_idle(grpc_lb_policy *policy,
+                              grpc_iomgr_call_list *call_list) {
+  policy->vtable->exit_idle(policy, call_list);
 }
 
-grpc_connectivity_state_notify_on_state_change_result
-grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
-                                      grpc_connectivity_state *state,
-                                      grpc_iomgr_closure *closure) {
-  return policy->vtable->notify_on_state_change(policy, state, closure);
+void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
+                                           grpc_connectivity_state *state,
+                                           grpc_iomgr_closure *closure,
+                                           grpc_iomgr_call_list *call_list) {
+  policy->vtable->notify_on_state_change(policy, state, closure, call_list);
 }
 
 grpc_connectivity_state grpc_lb_policy_check_connectivity(
-    grpc_lb_policy *policy) {
-  return policy->vtable->check_connectivity(policy);
+    grpc_lb_policy *policy, grpc_iomgr_call_list *call_list) {
+  return policy->vtable->check_connectivity(policy, call_list);
 }
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 7c9bdac..8d7eb57 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -53,28 +53,31 @@
 struct grpc_lb_policy_vtable {
   void (*destroy)(grpc_lb_policy *policy);
 
-  void (*shutdown)(grpc_lb_policy *policy);
+  void (*shutdown)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
 
   /** implement grpc_lb_policy_pick */
   void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset,
                grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
-               grpc_iomgr_closure *on_complete);
+               grpc_iomgr_closure *on_complete,
+               grpc_iomgr_call_list *call_list);
 
   /** try to enter a READY connectivity state */
-  void (*exit_idle)(grpc_lb_policy *policy);
+  void (*exit_idle)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
 
   /** broadcast a transport op to all subchannels */
-  void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op);
+  void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op,
+                    grpc_iomgr_call_list *call_list);
 
   /** check the current connectivity of the lb_policy */
-  grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy);
+  grpc_connectivity_state (*check_connectivity)(
+      grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
 
   /** call notify when the connectivity state of a channel changes from *state.
       Updates *state with the new state of the policy */
-  grpc_connectivity_state_notify_on_state_change_result (
-      *notify_on_state_change)(grpc_lb_policy *policy,
-                               grpc_connectivity_state *state,
-                               grpc_iomgr_closure *closure);
+  void (*notify_on_state_change)(grpc_lb_policy *policy,
+                                 grpc_connectivity_state *state,
+                                 grpc_iomgr_closure *closure,
+                                 grpc_iomgr_call_list *call_list);
 };
 
 #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@@ -98,7 +101,8 @@
                          const grpc_lb_policy_vtable *vtable);
 
 /** Start shutting down (fail any pending picks) */
-void grpc_lb_policy_shutdown(grpc_lb_policy *policy);
+void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
+                             grpc_iomgr_call_list *call_list);
 
 /** Given initial metadata in \a initial_metadata, find an appropriate
     target for this rpc, and 'return' it by calling \a on_complete after setting
@@ -107,18 +111,21 @@
 void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
                          grpc_metadata_batch *initial_metadata,
                          grpc_subchannel **target,
-                         grpc_iomgr_closure *on_complete);
+                         grpc_iomgr_closure *on_complete,
+                         grpc_iomgr_call_list *call_list);
 
-void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op);
+void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op,
+                              grpc_iomgr_call_list *call_list);
 
-void grpc_lb_policy_exit_idle(grpc_lb_policy *policy);
+void grpc_lb_policy_exit_idle(grpc_lb_policy *policy,
+                              grpc_iomgr_call_list *call_list);
 
-grpc_connectivity_state_notify_on_state_change_result
-grpc_lb_policy_notify_on_state_change(
-    grpc_lb_policy *policy, grpc_connectivity_state *state,
-    grpc_iomgr_closure *closure) GRPC_MUST_USE_RESULT;
+void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
+                                           grpc_connectivity_state *state,
+                                           grpc_iomgr_closure *closure,
+                                           grpc_iomgr_call_list *call_list);
 
 grpc_connectivity_state grpc_lb_policy_check_connectivity(
-    grpc_lb_policy *policy);
+    grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
 
 #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 6fbf966..b15acf8 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -146,7 +146,8 @@
 
 static grpc_subchannel_call *create_call(connection *con);
 static void connectivity_state_changed_locked(grpc_subchannel *c,
-                                              const char *reason);
+                                              const char *reason,
+                                              grpc_iomgr_call_list *call_list);
 static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
 static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
 static void subchannel_connected(void *subchannel, int iomgr_success);
@@ -333,16 +334,18 @@
 
 static void continue_creating_call(void *arg, int iomgr_success) {
   waiting_for_connect *w4c = arg;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
   grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset);
   grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
-                              w4c->notify);
+                              w4c->notify, &call_list);
   GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect");
   gpr_free(w4c);
 }
 
 void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
                                  grpc_subchannel_call **target,
-                                 grpc_iomgr_closure *notify) {
+                                 grpc_iomgr_closure *notify,
+                                 grpc_iomgr_call_list *call_list) {
   connection *con;
   gpr_mu_lock(&c->mu);
   if (c->active != NULL) {
@@ -365,15 +368,12 @@
     c->waiting = w4c;
     grpc_subchannel_add_interested_party(c, pollset);
     if (!c->connecting) {
-      grpc_connectivity_state_flusher f;
       c->connecting = 1;
-      connectivity_state_changed_locked(c, "create_call");
+      connectivity_state_changed_locked(c, "create_call", call_list);
       /* released by connection */
       SUBCHANNEL_REF_LOCKED(c, "connecting");
       GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
-      grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
       gpr_mu_unlock(&c->mu);
-      grpc_connectivity_state_end_flush(&f);
 
       start_connect(c);
     } else {
@@ -390,33 +390,26 @@
   return state;
 }
 
-grpc_connectivity_state_notify_on_state_change_result
-grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
-                                       grpc_connectivity_state *state,
-                                       grpc_iomgr_closure *notify) {
+void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
+                                            grpc_connectivity_state *state,
+                                            grpc_iomgr_closure *notify,
+                                            grpc_iomgr_call_list *call_list) {
   int do_connect = 0;
-  grpc_connectivity_state_notify_on_state_change_result r;
   gpr_mu_lock(&c->mu);
-  r = grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
-                                                     notify);
-  if (r.current_state_is_idle) {
-    grpc_connectivity_state_flusher f;
+  if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
+                                                     notify, call_list)) {
     do_connect = 1;
     c->connecting = 1;
     /* released by connection */
     SUBCHANNEL_REF_LOCKED(c, "connecting");
     GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
-    connectivity_state_changed_locked(c, "state_change");
-    grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
-    gpr_mu_unlock(&c->mu);
-    grpc_connectivity_state_end_flush(&f);
-  } else {
-  gpr_mu_unlock(&c->mu);
+    connectivity_state_changed_locked(c, "state_change", call_list);
   }
+  gpr_mu_unlock(&c->mu);
+
   if (do_connect) {
     start_connect(c);
   }
-  return r;
 }
 
 void grpc_subchannel_process_transport_op(grpc_subchannel *c,
@@ -424,24 +417,20 @@
   connection *con = NULL;
   grpc_subchannel *destroy;
   int cancel_alarm = 0;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
   gpr_mu_lock(&c->mu);
   if (c->active != NULL) {
     con = c->active;
     CONNECTION_REF_LOCKED(con, "transport-op");
   }
   if (op->disconnect) {
-    grpc_connectivity_state_flusher f;
     c->disconnected = 1;
-    connectivity_state_changed_locked(c, "disconnect");
+    connectivity_state_changed_locked(c, "disconnect", &call_list);
     if (c->have_alarm) {
       cancel_alarm = 1;
     }
-    grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
-    gpr_mu_unlock(&c->mu);
-    grpc_connectivity_state_end_flush(&f);
-  } else {
-  gpr_mu_unlock(&c->mu);
   }
+  gpr_mu_unlock(&c->mu);
 
   if (con != NULL) {
     grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
@@ -464,6 +453,8 @@
   if (op->disconnect) {
     grpc_connector_shutdown(c->connector);
   }
+
+  grpc_iomgr_call_list_run(call_list);
 }
 
 static void on_state_changed(void *p, int iomgr_success) {
@@ -474,7 +465,7 @@
   grpc_transport_op op;
   grpc_channel_element *elem;
   connection *destroy_connection = NULL;
-  grpc_connectivity_state_flusher f;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
 
   gpr_mu_lock(mu);
 
@@ -508,26 +499,26 @@
       grpc_connectivity_state_set(
           &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE
                                              : GRPC_CHANNEL_TRANSIENT_FAILURE,
-          "connection_failed");
+          "connection_failed", &call_list);
       break;
   }
 
 done:
-  connectivity_state_changed_locked(c, "transport_state_changed");
+  connectivity_state_changed_locked(c, "transport_state_changed", &call_list);
   destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
   gpr_free(sw);
-  grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
   gpr_mu_unlock(mu);
-  grpc_connectivity_state_end_flush(&f);
   if (destroy) {
     subchannel_destroy(c);
   }
   if (destroy_connection != NULL) {
     connection_destroy(destroy_connection);
   }
+  grpc_iomgr_call_list_run(call_list);
 }
 
-static void publish_transport(grpc_subchannel *c) {
+static void publish_transport(grpc_subchannel *c,
+                              grpc_iomgr_call_list *call_list) {
   size_t channel_stack_size;
   connection *con;
   grpc_channel_stack *stk;
@@ -538,7 +529,6 @@
   state_watcher *sw;
   connection *destroy_connection = NULL;
   grpc_channel_element *elem;
-  grpc_connectivity_state_flusher f;
 
   /* build final filter list */
   num_filters = c->num_filters + c->connecting_result.num_filters + 1;
@@ -601,17 +591,15 @@
   elem->filter->start_transport_op(elem, &op);
 
   /* signal completion */
-  connectivity_state_changed_locked(c, "connected");
+  connectivity_state_changed_locked(c, "connected", call_list);
   w4c = c->waiting;
   c->waiting = NULL;
 
-  grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
   gpr_mu_unlock(&c->mu);
-  grpc_connectivity_state_end_flush(&f);
 
   while (w4c != NULL) {
     waiting_for_connect *next = w4c;
-    w4c->continuation.cb(w4c->continuation.cb_arg, 1);
+    grpc_iomgr_call_list_add(call_list, &w4c->continuation, 1);
     w4c = next;
   }
 
@@ -653,16 +641,14 @@
 
 static void on_alarm(void *arg, int iomgr_success) {
   grpc_subchannel *c = arg;
-  grpc_connectivity_state_flusher f;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
   gpr_mu_lock(&c->mu);
   c->have_alarm = 0;
   if (c->disconnected) {
     iomgr_success = 0;
   }
-  connectivity_state_changed_locked(c, "alarm");
-  grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
+  connectivity_state_changed_locked(c, "alarm", &call_list);
   gpr_mu_unlock(&c->mu);
-  grpc_connectivity_state_end_flush(&f);
   if (iomgr_success) {
     update_reconnect_parameters(c);
     continue_connect(c);
@@ -670,24 +656,24 @@
     GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
     GRPC_SUBCHANNEL_UNREF(c, "connecting");
   }
+  grpc_iomgr_call_list_run(call_list);
 }
 
 static void subchannel_connected(void *arg, int iomgr_success) {
   grpc_subchannel *c = arg;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
   if (c->connecting_result.transport != NULL) {
-    publish_transport(c);
+    publish_transport(c, &call_list);
   } else {
     gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
-    grpc_connectivity_state_flusher f;
     gpr_mu_lock(&c->mu);
     GPR_ASSERT(!c->have_alarm);
     c->have_alarm = 1;
-    connectivity_state_changed_locked(c, "connect_failed");
+    connectivity_state_changed_locked(c, "connect_failed", &call_list);
     grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now);
-    grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
     gpr_mu_unlock(&c->mu);
-    grpc_connectivity_state_end_flush(&f);
   }
+  grpc_iomgr_call_list_run(call_list);
 }
 
 static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
@@ -718,9 +704,10 @@
 }
 
 static void connectivity_state_changed_locked(grpc_subchannel *c,
-                                              const char *reason) {
+                                              const char *reason,
+                                              grpc_iomgr_call_list *call_list) {
   grpc_connectivity_state current = compute_connectivity_locked(c);
-  grpc_connectivity_state_set(&c->state_tracker, current, reason);
+  grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list);
 }
 
 /*
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 391df36..7c00ff1 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -76,7 +76,8 @@
 void grpc_subchannel_create_call(grpc_subchannel *subchannel,
                                  grpc_pollset *pollset,
                                  grpc_subchannel_call **target,
-                                 grpc_iomgr_closure *notify);
+                                 grpc_iomgr_closure *notify,
+                                 grpc_iomgr_call_list *call_list);
 
 /** process a transport level op */
 void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel,
@@ -88,10 +89,10 @@
 
 /** call notify when the connectivity state of a channel changes from *state.
     Updates *state with the new state of the channel */
-grpc_connectivity_state_notify_on_state_change_result
-grpc_subchannel_notify_on_state_change(
-    grpc_subchannel *channel, grpc_connectivity_state *state,
-    grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT;
+void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
+                                            grpc_connectivity_state *state,
+                                            grpc_iomgr_closure *notify,
+                                            grpc_iomgr_call_list *call_list);
 
 /** express interest in \a channel's activities through \a pollset. */
 void grpc_subchannel_add_interested_party(grpc_subchannel *channel,
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 67eff3e..ba8f73f 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -157,3 +157,42 @@
   closure->cb_arg = cb_arg;
   closure->next = NULL;
 }
+
+void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list,
+                              grpc_iomgr_closure *closure, int success) {
+  if (!closure) return;
+  closure->next = NULL;
+  closure->success = success;
+  if (!call_list->head) {
+    call_list->head = closure;
+  } else {
+    call_list->tail->next = closure;
+  }
+  call_list->tail = closure;
+}
+
+void grpc_iomgr_call_list_run(grpc_iomgr_call_list call_list) {
+  grpc_iomgr_closure *c = call_list.head;
+  while (c) {
+    grpc_iomgr_closure *next = c->next;
+    c->cb(c->cb_arg, c->success);
+    c = next;
+  }
+}
+
+int grpc_iomgr_call_list_empty(grpc_iomgr_call_list call_list) {
+  return call_list.head == NULL;
+}
+
+void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
+                               grpc_iomgr_call_list *dst) {
+  if (dst->head == NULL) {
+    *dst = *src;
+    return;
+  }
+  if (src->head == NULL) {
+    return;
+  }
+  dst->tail->next = src->head;
+  dst->tail = src->tail;
+}
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index f1d2e64..58c3176 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -58,10 +58,25 @@
   struct grpc_iomgr_closure *next;
 } grpc_iomgr_closure;
 
+typedef struct grpc_iomgr_call_list {
+  grpc_iomgr_closure *head;
+  grpc_iomgr_closure *tail;
+} grpc_iomgr_call_list;
+
 /** Initializes \a closure with \a cb and \a cb_arg. */
 void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
                              void *cb_arg);
 
+#define GRPC_IOMGR_CALL_LIST_INIT \
+  { NULL, NULL }
+
+void grpc_iomgr_call_list_add(grpc_iomgr_call_list *list,
+                              grpc_iomgr_closure *closure, int success);
+void grpc_iomgr_call_list_run(grpc_iomgr_call_list list);
+void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
+                               grpc_iomgr_call_list *dst);
+int grpc_iomgr_call_list_empty(grpc_iomgr_call_list list);
+
 /** Initializes the iomgr. */
 void grpc_iomgr_init(void);
 
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 12f34c1..253f319 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -127,7 +127,7 @@
     GRPC_FD_REF(fd, "delayed_add");
     grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
     pollset->in_flight_cbs++;
-    grpc_workqueue_push(fd->workqueue, &da->closure, 1);
+    grpc_pollset_add_unlock_job(pollset, &da->closure);
   }
 }
 
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 291181d..107ffb0 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -136,6 +136,8 @@
   pollset->in_flight_cbs = 0;
   pollset->shutting_down = 0;
   pollset->called_shutdown = 0;
+  pollset->idle_jobs = NULL;
+  pollset->unlock_jobs = NULL;
   become_basic_pollset(pollset, NULL);
 }
 
@@ -145,7 +147,6 @@
   }
   gpr_mu_lock(&pollset->mu);
   pollset->vtable->add_fd(pollset, fd, 1);
-  grpc_workqueue_flush(fd->workqueue, 0);
 /* the following (enabled only in debug) will reacquire and then release
    our lock - meaning that if the unlocking flag passed to del_fd above is
    not respected, the code will deadlock (in a way that we have a chance of
@@ -174,6 +175,33 @@
   pollset->shutdown_done_cb(pollset->shutdown_done_arg);
 }
 
+static void run_jobs(grpc_pollset *pollset, grpc_iomgr_closure **root) {
+  grpc_iomgr_closure *exec = *root;
+  *root = NULL;
+  gpr_mu_unlock(&pollset->mu);
+  while (exec != NULL) {
+    grpc_iomgr_closure *next = exec->next;
+    exec->cb(exec->cb_arg, 1);
+    exec = next;
+  }
+  gpr_mu_lock(&pollset->mu);
+}
+
+static void add_job(grpc_iomgr_closure **root, grpc_iomgr_closure *closure) {
+  closure->next = *root;
+  *root = closure;
+}
+
+void grpc_pollset_add_idle_job(grpc_pollset *pollset,
+                               grpc_iomgr_closure *closure) {
+  add_job(&pollset->idle_jobs, closure);
+}
+
+void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
+                                 grpc_iomgr_closure *closure) {
+  add_job(&pollset->unlock_jobs, closure);
+}
+
 void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
                        gpr_timespec now, gpr_timespec deadline) {
   /* pollset->mu already held */
@@ -182,6 +210,14 @@
   worker->next = worker->prev = NULL;
   /* TODO(ctiller): pool these */
   grpc_wakeup_fd_init(&worker->wakeup_fd);
+  if (!grpc_pollset_has_workers(pollset) && pollset->idle_jobs != NULL) {
+    run_jobs(pollset, &pollset->idle_jobs);
+    goto done;
+  }
+  if (pollset->unlock_jobs != NULL) {
+    run_jobs(pollset, &pollset->unlock_jobs);
+    goto done;
+  }
   if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
     goto done;
   }
@@ -301,12 +337,7 @@
 
   gpr_mu_lock(&pollset->mu);
   /* First we need to ensure that nobody is polling concurrently */
-  if (grpc_pollset_has_workers(pollset)) {
-    grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
-    grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
-    gpr_mu_unlock(&pollset->mu);
-    return;
-  }
+  GPR_ASSERT(!grpc_pollset_has_workers(pollset));
 
   gpr_free(up_args);
   /* At this point the pollset may no longer be a unary poller. In that case
@@ -390,13 +421,12 @@
   GRPC_FD_REF(fd, "basicpoll_add");
   pollset->in_flight_cbs++;
   up_args = gpr_malloc(sizeof(*up_args));
-  up_args->pollset = pollset;
   up_args->fd = fd;
   up_args->original_vtable = pollset->vtable;
   up_args->promotion_closure.cb = basic_do_promote;
   up_args->promotion_closure.cb_arg = up_args;
-  grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
 
+  grpc_pollset_add_idle_job(pollset, &up_args->promotion_closure);
   grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
 
 exit:
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 69bd9cc..3f89095 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -37,6 +37,7 @@
 #include <poll.h>
 
 #include <grpc/support/sync.h>
+#include "src/core/iomgr/iomgr.h"
 #include "src/core/iomgr/wakeup_fd_posix.h"
 
 typedef struct grpc_pollset_vtable grpc_pollset_vtable;
@@ -66,6 +67,8 @@
   int kicked_without_pollers;
   void (*shutdown_done_cb)(void *arg);
   void *shutdown_done_arg;
+  grpc_iomgr_closure *unlock_jobs;
+  grpc_iomgr_closure *idle_jobs;
   union {
     int fd;
     void *ptr;
@@ -124,4 +127,11 @@
 typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
 extern grpc_poll_function_type grpc_poll_function;
 
+/** schedule a closure to be run next time there are no active workers */
+void grpc_pollset_add_idle_job(grpc_pollset *pollset,
+                               grpc_iomgr_closure *closure);
+/** schedule a closure to be run next time the pollset is unlocked */
+void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
+                                 grpc_iomgr_closure *closure);
+
 #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 4ec6aba..6f49060 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -77,7 +77,6 @@
 
   gpr_mu registered_call_mu;
   registered_call *registered_calls;
-  grpc_iomgr_closure destroy_closure;
   char *target;
   grpc_workqueue *workqueue;
 };
@@ -273,8 +272,7 @@
   gpr_ref(&c->refs);
 }
 
-static void destroy_channel(void *p, int ok) {
-  grpc_channel *channel = p;
+static void destroy_channel(grpc_channel *channel) {
   size_t i;
   grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
   for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
@@ -312,9 +310,7 @@
 void grpc_channel_internal_unref(grpc_channel *channel) {
 #endif
   if (gpr_unref(&channel->refs)) {
-    channel->destroy_closure.cb = destroy_channel;
-    channel->destroy_closure.cb_arg = channel;
-    grpc_workqueue_push(channel->workqueue, &channel->destroy_closure, 1);
+    destroy_channel(channel);
   }
 }
 
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 12b15f3..ef47a28 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -45,6 +45,8 @@
   /* forward through to the underlying client channel */
   grpc_channel_element *client_channel_elem =
       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+  grpc_connectivity_state state;
   if (client_channel_elem->filter != &grpc_client_channel_filter) {
     gpr_log(GPR_ERROR,
             "grpc_channel_check_connectivity_state called on something that is "
@@ -52,8 +54,10 @@
             client_channel_elem->filter->name);
     return GRPC_CHANNEL_FATAL_FAILURE;
   }
-  return grpc_client_channel_check_connectivity_state(client_channel_elem,
-                                                      try_to_connect);
+  state = grpc_client_channel_check_connectivity_state(
+      client_channel_elem, try_to_connect, &call_list);
+  grpc_iomgr_call_list_run(call_list);
+  return state;
 }
 
 typedef enum {
@@ -154,6 +158,7 @@
     gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
   grpc_channel_element *client_channel_elem =
       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
   state_watcher *w = gpr_malloc(sizeof(*w));
 
   grpc_cq_begin_op(cq);
@@ -176,13 +181,14 @@
             "grpc_channel_watch_connectivity_state called on something that is "
             "not a client channel, but '%s'",
             client_channel_elem->filter->name);
-    grpc_workqueue_push(grpc_channel_get_workqueue(channel), &w->on_complete,
-                        1);
+    grpc_iomgr_call_list_add(&call_list, &w->on_complete, 1);
   } else {
     GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
     grpc_client_channel_add_interested_party(client_channel_elem,
                                              grpc_cq_pollset(cq));
     grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
-                                                 &w->on_complete);
+                                                 &w->on_complete, &call_list);
   }
+
+  grpc_iomgr_call_list_run(call_list);
 }
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index c8c1abb..9be1f2f 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -164,8 +164,7 @@
   /** data to write next write */
   gpr_slice_buffer qbuf;
   /** queued callbacks */
-  grpc_iomgr_closure *pending_closures_head;
-  grpc_iomgr_closure *pending_closures_tail;
+  grpc_iomgr_call_list run_at_unlock;
 
   /** window available for us to send to peer */
   gpr_int64 outgoing_window;
@@ -568,11 +567,6 @@
     grpc_chttp2_transport_global *transport_global,
     grpc_chttp2_stream_global **stream_global);
 
-/** schedule a closure to run without the transport lock taken */
-void grpc_chttp2_schedule_closure(
-    grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
-    int success);
-
 grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
     grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
 grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index c015e82..1da3f85 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -238,8 +238,8 @@
           stream_global->outgoing_sopb->nops == 0) {
         GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
         stream_global->outgoing_sopb = NULL;
-        grpc_chttp2_schedule_closure(transport_global,
-                                     stream_global->send_done_closure, 1);
+        grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
+                                 stream_global->send_done_closure, 1);
       }
     }
     stream_global->writing_now = 0;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 6376c39..50d5a3e 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -499,32 +499,20 @@
 static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
 
 static void unlock(grpc_chttp2_transport *t) {
-  grpc_iomgr_closure *run_closures;
-  grpc_connectivity_state_flusher f;
+  grpc_iomgr_call_list run = GRPC_IOMGR_CALL_LIST_INIT;
 
   unlock_check_read_write_state(t);
   if (!t->writing_active && !t->closed &&
       grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
     t->writing_active = 1;
     REF_TRANSPORT(t, "writing");
-    grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
+    grpc_iomgr_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1);
     prevent_endpoint_shutdown(t);
   }
 
-  run_closures = t->global.pending_closures_head;
-  t->global.pending_closures_head = NULL;
-  t->global.pending_closures_tail = NULL;
-
-  grpc_connectivity_state_begin_flush(&t->channel_callback.state_tracker, &f);
+  GPR_SWAP(grpc_iomgr_call_list, run, t->global.run_at_unlock);
   gpr_mu_unlock(&t->mu);
-
-  grpc_connectivity_state_end_flush(&f);
-
-  while (run_closures) {
-    grpc_iomgr_closure *next = run_closures->next;
-    run_closures->cb(run_closures->cb_arg, run_closures->success);
-    run_closures = next;
-  }
+  grpc_iomgr_call_list_run(run);
 }
 
 /*
@@ -676,8 +664,8 @@
       }
     } else {
       grpc_sopb_reset(op->send_ops);
-      grpc_chttp2_schedule_closure(transport_global,
-                                   stream_global->send_done_closure, 0);
+      grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
+                               stream_global->send_done_closure, 0);
     }
   }
 
@@ -715,9 +703,8 @@
                           op->bind_pollset);
   }
 
-  if (op->on_consumed) {
-    grpc_chttp2_schedule_closure(transport_global, op->on_consumed, 1);
-  }
+  grpc_iomgr_call_list_add(&transport_global->run_at_unlock, op->on_consumed,
+                           1);
 }
 
 static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
@@ -754,18 +741,12 @@
 
   lock(t);
 
-  if (op->on_consumed) {
-    grpc_chttp2_schedule_closure(&t->global, op->on_consumed, 1);
-  }
+  grpc_iomgr_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1);
 
   if (op->on_connectivity_state_change) {
-    if (grpc_connectivity_state_notify_on_state_change(
-            &t->channel_callback.state_tracker, op->connectivity_state,
-            op->on_connectivity_state_change)
-            .state_already_changed) {
-      grpc_chttp2_schedule_closure(&t->global, op->on_connectivity_state_change,
-                                   1);
-    }
+    grpc_connectivity_state_notify_on_state_change(
+        &t->channel_callback.state_tracker, op->connectivity_state,
+        op->on_connectivity_state_change, &t->global.run_at_unlock);
   }
 
   if (op->send_goaway) {
@@ -887,8 +868,8 @@
         if (stream_global->outgoing_sopb != NULL) {
           grpc_sopb_reset(stream_global->outgoing_sopb);
           stream_global->outgoing_sopb = NULL;
-          grpc_chttp2_schedule_closure(transport_global,
-                                       stream_global->send_done_closure, 1);
+          grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
+                                   stream_global->send_done_closure, 1);
         }
         stream_global->read_closed = 1;
         if (!stream_global->published_cancelled) {
@@ -938,8 +919,8 @@
         &stream_global->outstanding_metadata);
     grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
     stream_global->published_state = *stream_global->publish_state = state;
-    grpc_chttp2_schedule_closure(transport_global,
-                                 stream_global->recv_done_closure, 1);
+    grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
+                             stream_global->recv_done_closure, 1);
     stream_global->recv_done_closure = NULL;
     stream_global->publish_sopb = NULL;
     stream_global->publish_state = NULL;
@@ -1200,21 +1181,7 @@
       gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
   grpc_connectivity_state_set(
       &TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
-      state, reason);
-}
-
-void grpc_chttp2_schedule_closure(
-    grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
-    int success) {
-  closure->success = success;
-  if (transport_global->pending_closures_tail == NULL) {
-    transport_global->pending_closures_head =
-        transport_global->pending_closures_tail = closure;
-  } else {
-    transport_global->pending_closures_tail->next = closure;
-    transport_global->pending_closures_tail = closure;
-  }
-  closure->next = NULL;
+      state, reason, &transport_global->run_at_unlock);
 }
 
 /*
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index 034f824..b605da9 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -64,7 +64,6 @@
   tracker->current_state = init_state;
   tracker->watchers = NULL;
   tracker->name = gpr_strdup(name);
-  tracker->changed = 0;
 }
 
 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
@@ -90,20 +89,17 @@
   return tracker->current_state;
 }
 
-grpc_connectivity_state_notify_on_state_change_result
-grpc_connectivity_state_notify_on_state_change(
+int grpc_connectivity_state_notify_on_state_change(
     grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
-    grpc_iomgr_closure *notify) {
-  grpc_connectivity_state_notify_on_state_change_result result;
-  memset(&result, 0, sizeof(result));
+    grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list) {
   if (grpc_connectivity_state_trace) {
-    gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name,
-            grpc_connectivity_state_name(*current),
-            grpc_connectivity_state_name(tracker->current_state));
+    gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p",
+            tracker->name, grpc_connectivity_state_name(*current),
+            grpc_connectivity_state_name(tracker->current_state), notify);
   }
   if (tracker->current_state != *current) {
     *current = tracker->current_state;
-    result.state_already_changed = 1;
+    grpc_iomgr_call_list_add(call_list, notify, 1);
   } else {
     grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
     w->current = current;
@@ -111,13 +107,14 @@
     w->next = tracker->watchers;
     tracker->watchers = w;
   }
-  result.current_state_is_idle = tracker->current_state == GRPC_CHANNEL_IDLE;
-  return result;
+  return tracker->current_state == GRPC_CHANNEL_IDLE;
 }
 
 void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
                                  grpc_connectivity_state state,
-                                 const char *reason) {
+                                 const char *reason,
+                                 grpc_iomgr_call_list *call_list) {
+  grpc_connectivity_state_watcher *w;
   if (grpc_connectivity_state_trace) {
     gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name,
             grpc_connectivity_state_name(tracker->current_state),
@@ -128,40 +125,11 @@
   }
   GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_FATAL_FAILURE);
   tracker->current_state = state;
-  tracker->changed = 1;
-}
-
-void grpc_connectivity_state_begin_flush(
-    grpc_connectivity_state_tracker *tracker,
-    grpc_connectivity_state_flusher *flusher) {
-  grpc_connectivity_state_watcher *w;
-  flusher->cbs = NULL;
-  if (!tracker->changed) return;
-  w = tracker->watchers;
-  tracker->watchers = NULL;
-  while (w != NULL) {
-    grpc_connectivity_state_watcher *next = w->next;
-    if (tracker->current_state != *w->current) {
-      *w->current = tracker->current_state;
-      w->notify->next = flusher->cbs;
-      flusher->cbs = w->notify;
-      gpr_free(w);
-    } else {
-      w->next = tracker->watchers;
-      tracker->watchers = w;
-    }
-    w = next;
-  }
-  tracker->changed = 0;
-}
-
-void grpc_connectivity_state_end_flush(
-    grpc_connectivity_state_flusher *flusher) {
-  grpc_iomgr_closure *c = flusher->cbs;
-  while (c != NULL) {
-    grpc_iomgr_closure *next = c;
-    c->cb(c->cb_arg, 1);
-    c = next;
+  while ((w = tracker->watchers) != NULL) {
+    *w->current = tracker->current_state;
+    tracker->watchers = w->next;
+    grpc_iomgr_call_list_add(call_list, w->notify, 1);
+    gpr_free(w);
   }
 }
 
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index 323af27..49c2815 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -54,12 +54,8 @@
   grpc_connectivity_state_watcher *watchers;
   /** a name to help debugging */
   char *name;
-  /** has this state been changed since the last flush? */
-  int changed;
 } grpc_connectivity_state_tracker;
 
-typedef struct { grpc_iomgr_closure *cbs; } grpc_connectivity_state_flusher;
-
 extern int grpc_connectivity_state_trace;
 
 void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
@@ -71,35 +67,15 @@
  * external lock */
 void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
                                  grpc_connectivity_state state,
-                                 const char *reason);
-
-/** Begin flushing callbacks; not thread safe; access must be serialized using
- * the same external lock as grpc_connectivity_state_set. Initializes flusher.
- */
-void grpc_connectivity_state_begin_flush(
-    grpc_connectivity_state_tracker *tracker,
-    grpc_connectivity_state_flusher *flusher);
-
-/** Complete flushing updates: must not be called with any locks held */
-void grpc_connectivity_state_end_flush(
-    grpc_connectivity_state_flusher *flusher);
+                                 const char *reason,
+                                 grpc_iomgr_call_list *call_list);
 
 grpc_connectivity_state grpc_connectivity_state_check(
     grpc_connectivity_state_tracker *tracker);
 
-typedef struct {
-  /** 1 if the current state is idle (a hint to begin connecting), 0 otherwise
-   */
-  int current_state_is_idle;
-  /** 1 if the state has already changed: in this case the closure passed to
-   * grpc_connectivity_state_notify_on_state_change will not be called */
-  int state_already_changed;
-} grpc_connectivity_state_notify_on_state_change_result;
-
 /** Return 1 if the channel should start connecting, 0 otherwise */
-grpc_connectivity_state_notify_on_state_change_result
-grpc_connectivity_state_notify_on_state_change(
+int grpc_connectivity_state_notify_on_state_change(
     grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
-    grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT;
+    grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list);
 
 #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */