Introduce call lists for moving work outside locks
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 5b16597..a234650 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -73,7 +73,7 @@
       guarded by mu_config */
   grpc_client_config *incoming_configuration;
   /** a list of closures that are all waiting for config to come in */
-  grpc_iomgr_closure *waiting_for_config_closures;
+  grpc_iomgr_call_list waiting_for_config_closures;
   /** resolver callback */
   grpc_iomgr_closure on_config_changed;
   /** connectivity state being tracked */
@@ -181,8 +181,8 @@
   waiting_call *wc = gpr_malloc(sizeof(*wc));
   grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
   wc->elem = elem;
-  wc->closure.next = chand->waiting_for_config_closures;
-  chand->waiting_for_config_closures = &wc->closure;
+  grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure,
+                           1);
 }
 
 static int is_empty(void *p, int len) {
@@ -230,6 +230,7 @@
 static void picked_target(void *arg, int iomgr_success) {
   call_data *calld = arg;
   grpc_pollset *pollset;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
 
   if (calld->picked_channel == NULL) {
     /* treat this like a cancellation */
@@ -248,9 +249,10 @@
       grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
       grpc_subchannel_create_call(calld->picked_channel, pollset,
                                   &calld->subchannel_call,
-                                  &calld->async_setup_task);
+                                  &calld->async_setup_task, &call_list);
     }
   }
+  grpc_iomgr_call_list_run(call_list);
 }
 
 static grpc_iomgr_closure *merge_into_waiting_op(
@@ -310,7 +312,7 @@
   grpc_subchannel_call *subchannel_call;
   grpc_lb_policy *lb_policy;
   grpc_transport_stream_op op2;
-  grpc_iomgr_closure *consumed_op = NULL;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
 
@@ -328,7 +330,7 @@
       break;
     case CALL_WAITING_FOR_SEND:
       GPR_ASSERT(!continuation);
-      consumed_op = merge_into_waiting_op(elem, op);
+      grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
       if (!calld->waiting_op.send_ops &&
           calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
         gpr_mu_unlock(&calld->mu_state);
@@ -357,7 +359,8 @@
           handle_op_after_cancellation(elem, op);
           handle_op_after_cancellation(elem, &op2);
         } else {
-          consumed_op = merge_into_waiting_op(elem, op);
+          grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op),
+                                   1);
           gpr_mu_unlock(&calld->mu_state);
         }
         break;
@@ -398,7 +401,7 @@
                                     calld);
             grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
                                 &calld->picked_channel,
-                                &calld->async_setup_task);
+                                &calld->async_setup_task, &call_list);
 
             GRPC_LB_POLICY_UNREF(lb_policy, "pick");
           } else if (chand->resolver != NULL) {
@@ -424,9 +427,7 @@
       break;
   }
 
-  if (consumed_op != NULL) {
-    consumed_op->cb(consumed_op->cb_arg, 1);
-  }
+  grpc_iomgr_call_list_run(call_list);
 }
 
 static void cc_start_transport_stream_op(grpc_call_element *elem,
@@ -435,36 +436,38 @@
 }
 
 static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
-                            grpc_connectivity_state current_state);
+                            grpc_connectivity_state current_state,
+                            grpc_iomgr_call_list *cl);
 
-static void on_lb_policy_state_changed_locked(
-    lb_policy_connectivity_watcher *w) {
+static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
+                                              grpc_iomgr_call_list *cl) {
   /* check if the notification is for a stale policy */
   if (w->lb_policy != w->chand->lb_policy) return;
 
-  grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed");
+  grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed",
+                              cl);
   if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
-    watch_lb_policy(w->chand, w->lb_policy, w->state);
+    watch_lb_policy(w->chand, w->lb_policy, w->state, cl);
   }
 }
 
 static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
   lb_policy_connectivity_watcher *w = arg;
-  grpc_connectivity_state_flusher f;
+  grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
 
   gpr_mu_lock(&w->chand->mu_config);
-  on_lb_policy_state_changed_locked(w);
-  grpc_connectivity_state_begin_flush(&w->chand->state_tracker, &f);
+  on_lb_policy_state_changed_locked(w, &cl);
   gpr_mu_unlock(&w->chand->mu_config);
 
-  grpc_connectivity_state_end_flush(&f);
+  grpc_iomgr_call_list_run(cl);
 
   GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
   gpr_free(w);
 }
 
 static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
-                            grpc_connectivity_state current_state) {
+                            grpc_connectivity_state current_state,
+                            grpc_iomgr_call_list *call_list) {
   lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
   GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
 
@@ -472,13 +475,8 @@
   grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
   w->state = current_state;
   w->lb_policy = lb_policy;
-  if (grpc_lb_policy_notify_on_state_change(lb_policy, &w->state,
-                                            &w->on_changed)
-          .state_already_changed) {
-    on_lb_policy_state_changed_locked(w);
-    GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
-    gpr_free(w);
-  }
+  grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed,
+                                        call_list);
 }
 
 static void cc_on_config_changed(void *arg, int iomgr_success) {
@@ -486,9 +484,8 @@
   grpc_lb_policy *lb_policy = NULL;
   grpc_lb_policy *old_lb_policy;
   grpc_resolver *old_resolver;
-  grpc_iomgr_closure *wakeup_closures = NULL;
   grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
-  grpc_connectivity_state_flusher f;
+  grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
   int exit_idle = 0;
 
   if (chand->incoming_configuration != NULL) {
@@ -496,7 +493,7 @@
     if (lb_policy != NULL) {
       GRPC_LB_POLICY_REF(lb_policy, "channel");
       GRPC_LB_POLICY_REF(lb_policy, "config_change");
-      state = grpc_lb_policy_check_connectivity(lb_policy);
+      state = grpc_lb_policy_check_connectivity(lb_policy, &cl);
     }
 
     grpc_client_config_unref(chand->incoming_configuration);
@@ -508,8 +505,7 @@
   old_lb_policy = chand->lb_policy;
   chand->lb_policy = lb_policy;
   if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
-    wakeup_closures = chand->waiting_for_config_closures;
-    chand->waiting_for_config_closures = NULL;
+    grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl);
   }
   if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
     GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@@ -520,15 +516,13 @@
   if (iomgr_success && chand->resolver) {
     grpc_resolver *resolver = chand->resolver;
     GRPC_RESOLVER_REF(resolver, "channel-next");
-    grpc_connectivity_state_set(&chand->state_tracker, state,
-                                "new_lb+resolver");
+    grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver",
+                                &cl);
     if (lb_policy != NULL) {
-      watch_lb_policy(chand, lb_policy, state);
+      watch_lb_policy(chand, lb_policy, state, &cl);
     }
-    grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
     gpr_mu_unlock(&chand->mu_config);
     GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
-    grpc_connectivity_state_end_flush(&f);
     grpc_resolver_next(resolver, &chand->incoming_configuration,
                        &chand->on_config_changed);
     GRPC_RESOLVER_UNREF(resolver, "channel-next");
@@ -536,10 +530,9 @@
     old_resolver = chand->resolver;
     chand->resolver = NULL;
     grpc_connectivity_state_set(&chand->state_tracker,
-                                GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
-    grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
+                                GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone",
+                                &cl);
     gpr_mu_unlock(&chand->mu_config);
-    grpc_connectivity_state_end_flush(&f);
     if (old_resolver != NULL) {
       grpc_resolver_shutdown(old_resolver);
       GRPC_RESOLVER_UNREF(old_resolver, "channel");
@@ -547,24 +540,20 @@
   }
 
   if (exit_idle) {
-    grpc_lb_policy_exit_idle(lb_policy);
+    grpc_lb_policy_exit_idle(lb_policy, &cl);
     GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
   }
 
   if (old_lb_policy != NULL) {
-    grpc_lb_policy_shutdown(old_lb_policy);
+    grpc_lb_policy_shutdown(old_lb_policy, &cl);
     GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
   }
 
-  while (wakeup_closures) {
-    grpc_iomgr_closure *next = wakeup_closures->next;
-    wakeup_closures->cb(wakeup_closures->cb_arg, 1);
-    wakeup_closures = next;
-  }
-
   if (lb_policy != NULL) {
     GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
   }
+
+  grpc_iomgr_call_list_run(cl);
   GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
 }
 
@@ -573,23 +562,21 @@
   grpc_lb_policy *lb_policy = NULL;
   channel_data *chand = elem->channel_data;
   grpc_resolver *destroy_resolver = NULL;
-  grpc_connectivity_state_flusher f;
-  grpc_iomgr_closure *call_list = op->on_consumed;
-  call_list->next = NULL;
-  op->on_consumed = NULL;
+  grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+
+  if (op->on_consumed) {
+    grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1);
+    op->on_consumed = NULL;
+  }
 
   GPR_ASSERT(op->set_accept_stream == NULL);
   GPR_ASSERT(op->bind_pollset == NULL);
 
   gpr_mu_lock(&chand->mu_config);
   if (op->on_connectivity_state_change != NULL) {
-    if (grpc_connectivity_state_notify_on_state_change(
-            &chand->state_tracker, op->connectivity_state,
-            op->on_connectivity_state_change)
-            .state_already_changed) {
-      op->on_connectivity_state_change->next = call_list;
-      call_list = op->on_connectivity_state_change;
-    }
+    grpc_connectivity_state_notify_on_state_change(
+        &chand->state_tracker, op->connectivity_state,
+        op->on_connectivity_state_change, &call_list);
     op->on_connectivity_state_change = NULL;
     op->connectivity_state = NULL;
   }
@@ -603,18 +590,17 @@
 
   if (op->disconnect && chand->resolver != NULL) {
     grpc_connectivity_state_set(&chand->state_tracker,
-                                GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+                                GRPC_CHANNEL_FATAL_FAILURE, "disconnect",
+                                &call_list);
     destroy_resolver = chand->resolver;
     chand->resolver = NULL;
     if (chand->lb_policy != NULL) {
-      grpc_lb_policy_shutdown(chand->lb_policy);
+      grpc_lb_policy_shutdown(chand->lb_policy, &call_list);
       GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
       chand->lb_policy = NULL;
     }
   }
-  grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
   gpr_mu_unlock(&chand->mu_config);
-  grpc_connectivity_state_end_flush(&f);
 
   if (destroy_resolver) {
     grpc_resolver_shutdown(destroy_resolver);
@@ -622,15 +608,11 @@
   }
 
   if (lb_policy) {
-    grpc_lb_policy_broadcast(lb_policy, op);
+    grpc_lb_policy_broadcast(lb_policy, op, &call_list);
     GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
   }
 
-  while (call_list != NULL) {
-    grpc_iomgr_closure *next = call_list->next;
-    call_list->cb(call_list->cb_arg, 1);
-    call_list = next;
-  }
+  grpc_iomgr_call_list_run(call_list);
 }
 
 /* Constructor for call_data */
@@ -740,7 +722,7 @@
   GPR_ASSERT(!chand->resolver);
   chand->resolver = resolver;
   GRPC_RESOLVER_REF(resolver, "channel");
-  if (chand->waiting_for_config_closures != NULL ||
+  if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) ||
       chand->exit_idle_when_lb_policy_arrives) {
     chand->started_resolving = 1;
     GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
@@ -751,14 +733,15 @@
 }
 
 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
-    grpc_channel_element *elem, int try_to_connect) {
+    grpc_channel_element *elem, int try_to_connect,
+    grpc_iomgr_call_list *call_list) {
   channel_data *chand = elem->channel_data;
   grpc_connectivity_state out;
   gpr_mu_lock(&chand->mu_config);
   out = grpc_connectivity_state_check(&chand->state_tracker);
   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
     if (chand->lb_policy != NULL) {
-      grpc_lb_policy_exit_idle(chand->lb_policy);
+      grpc_lb_policy_exit_idle(chand->lb_policy, call_list);
     } else {
       chand->exit_idle_when_lb_policy_arrives = 1;
       if (!chand->started_resolving && chand->resolver != NULL) {
@@ -775,16 +758,12 @@
 
 void grpc_client_channel_watch_connectivity_state(
     grpc_channel_element *elem, grpc_connectivity_state *state,
-    grpc_iomgr_closure *on_complete) {
+    grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
   channel_data *chand = elem->channel_data;
-  grpc_connectivity_state_notify_on_state_change_result r;
   gpr_mu_lock(&chand->mu_config);
-  r = grpc_connectivity_state_notify_on_state_change(&chand->state_tracker,
-                                                     state, on_complete);
+  grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
+                                                 on_complete, call_list);
   gpr_mu_unlock(&chand->mu_config);
-  if (r.state_already_changed) {
-    on_complete->cb(on_complete->cb_arg, 1);
-  }
 }
 
 grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(