Merge pull request #8931 from ctiller/fixit20

Remove resource users from reclaimer lists when shutting down
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 4e3c7ff..3e7c078 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -111,9 +111,6 @@
 static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
                                                 void *byte_stream,
                                                 grpc_error *error_ignored);
-static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
-                                grpc_chttp2_transport *t, grpc_chttp2_stream *s,
-                                grpc_error *error);
 
 static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
                              grpc_error *error);
@@ -428,6 +425,7 @@
     /* flush writable stream list to avoid dangling references */
     grpc_chttp2_stream *s;
     while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
+      grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
       GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
     }
     end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
@@ -523,6 +521,10 @@
     }
   }
 
+  if (s->fail_pending_writes_on_writes_finished_error != NULL) {
+    GRPC_ERROR_UNREF(s->fail_pending_writes_on_writes_finished_error);
+  }
+
   GPR_ASSERT(s->send_initial_metadata_finished == NULL);
   GPR_ASSERT(s->fetching_send_message == NULL);
   GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
@@ -704,8 +706,6 @@
     }
   }
 
-  grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
-
   switch (t->write_state) {
     case GRPC_CHTTP2_WRITE_STATE_IDLE:
       GPR_UNREACHABLE_CODE(break);
@@ -734,6 +734,8 @@
       break;
   }
 
+  grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
+
   GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
   GPR_TIMER_END("terminate_writing_with_lock", 0);
 }
@@ -1404,6 +1406,7 @@
     }
   }
   if (grpc_chttp2_list_remove_writable_stream(t, s)) {
+    grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
     GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:remove_stream");
   }
 
@@ -1534,9 +1537,41 @@
   return error;
 }
 
-static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
-                                grpc_chttp2_transport *t, grpc_chttp2_stream *s,
-                                grpc_error *error) {
+void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
+                                     grpc_chttp2_transport *t,
+                                     grpc_chttp2_stream *s) {
+  if (s->need_fail_pending_writes_on_writes_finished) {
+    grpc_error *error = s->fail_pending_writes_on_writes_finished_error;
+    s->fail_pending_writes_on_writes_finished_error = NULL;
+    s->need_fail_pending_writes_on_writes_finished = false;
+    grpc_chttp2_fail_pending_writes(exec_ctx, t, s, error);
+  }
+}
+
+void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
+                                     grpc_chttp2_transport *t,
+                                     grpc_chttp2_stream *s, grpc_error *error) {
+  if (s->need_fail_pending_writes_on_writes_finished ||
+      (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE &&
+       (s->included[GRPC_CHTTP2_LIST_WRITABLE] ||
+        s->included[GRPC_CHTTP2_LIST_WRITING]))) {
+    /* If a write is in progress, and it involves this stream, wait for the
+     * write to complete before cancelling things out. If we don't do this, then
+     * our combiner lock might think that some operation on its queue might be
+     * covering a completion even though there is none, in which case we might
+     * offload to another thread, which isn't guarateed to exist */
+    if (error != GRPC_ERROR_NONE) {
+      if (s->fail_pending_writes_on_writes_finished_error == GRPC_ERROR_NONE) {
+        s->fail_pending_writes_on_writes_finished_error = GRPC_ERROR_CREATE(
+            "Post-poned fail writes due to in-progress write");
+      }
+      s->fail_pending_writes_on_writes_finished_error = grpc_error_add_child(
+          s->fail_pending_writes_on_writes_finished_error, error);
+    }
+    s->need_fail_pending_writes_on_writes_finished = true;
+    return; /* early out */
+  }
+
   error =
       removal_error(error, s, "Pending writes failed due to stream closure");
   s->send_initial_metadata = NULL;
@@ -1590,7 +1625,7 @@
   if (close_writes && !s->write_closed) {
     s->write_closed_error = GRPC_ERROR_REF(error);
     s->write_closed = true;
-    fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
+    grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
     grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
   }
   if (s->read_closed && s->write_closed) {
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index b74233d..6cba1e7 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -409,6 +409,9 @@
   grpc_error *read_closed_error;
   /** the error that resulted in this stream being write-closed */
   grpc_error *write_closed_error;
+  /** should any writes be cleared once this stream becomes non-writable */
+  bool need_fail_pending_writes_on_writes_finished;
+  grpc_error *fail_pending_writes_on_writes_finished_error;
 
   grpc_published_metadata_method published_metadata[2];
   bool final_metadata_requested;
@@ -689,4 +692,11 @@
                                                        grpc_chttp2_transport *t,
                                                        grpc_chttp2_stream *s);
 
+void grpc_chttp2_leave_writing_lists(grpc_exec_ctx *exec_ctx,
+                                     grpc_chttp2_transport *t,
+                                     grpc_chttp2_stream *s);
+void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
+                                     grpc_chttp2_transport *t,
+                                     grpc_chttp2_stream *s, grpc_error *error);
+
 #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 139e738..769b229 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -208,6 +208,7 @@
         GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing");
       }
     } else {
+      grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
       GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write");
     }
   }
@@ -252,6 +253,7 @@
       grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1,
                                      GRPC_ERROR_REF(error));
     }
+    grpc_chttp2_leave_writing_lists(exec_ctx, t, s);
     GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end");
   }
   grpc_slice_buffer_reset_and_unref(&t->outbuf);
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 60ee14e..cfc6702 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -90,6 +90,12 @@
          gpr_atm_acq_load(&lock->elements_covered_by_poller) > 0;
 }
 
+#define IS_COVERED_BY_POLLER_FMT "(final=%d elems=%" PRIdPTR ")->%d"
+#define IS_COVERED_BY_POLLER_ARGS(lock)                      \
+  (lock)->final_list_covered_by_poller,                      \
+      gpr_atm_acq_load(&(lock)->elements_covered_by_poller), \
+      is_covered_by_poller((lock))
+
 grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
   grpc_combiner *lock = gpr_malloc(sizeof(*lock));
   lock->next_combiner_on_this_exec_ctx = NULL;
@@ -197,9 +203,10 @@
   GRPC_COMBINER_TRACE(
       gpr_log(GPR_DEBUG,
               "C:%p grpc_combiner_continue_exec_ctx workqueue=%p "
-              "is_covered_by_poller=%d exec_ctx_ready_to_finish=%d "
+              "is_covered_by_poller=" IS_COVERED_BY_POLLER_FMT
+              " exec_ctx_ready_to_finish=%d "
               "time_to_execute_final_list=%d",
-              lock, lock->optional_workqueue, is_covered_by_poller(lock),
+              lock, lock->optional_workqueue, IS_COVERED_BY_POLLER_ARGS(lock),
               grpc_exec_ctx_ready_to_finish(exec_ctx),
               lock->time_to_execute_final_list));
 
diff --git a/src/php/tests/unit_tests/ChannelTest.php b/src/php/tests/unit_tests/ChannelTest.php
index 4b35b1a..fa33d38 100644
--- a/src/php/tests/unit_tests/ChannelTest.php
+++ b/src/php/tests/unit_tests/ChannelTest.php
@@ -99,7 +99,7 @@
         $this->channel = new Grpc\Channel('localhost:0',
              ['credentials' => Grpc\ChannelCredentials::createInsecure()]);
         $time = new Grpc\Timeval(1000);
-        $state = $this->channel->watchConnectivityState(123, $time);
+        $state = $this->channel->watchConnectivityState(1, $time);
         $this->assertTrue($state);
         unset($time);
     }
diff --git a/src/php/tests/unit_tests/ServerTest.php b/src/php/tests/unit_tests/ServerTest.php
index f2346ab..5f40202 100644
--- a/src/php/tests/unit_tests/ServerTest.php
+++ b/src/php/tests/unit_tests/ServerTest.php
@@ -67,9 +67,9 @@
     public function testRequestCall()
     {
         $this->server = new Grpc\Server();
-        $port = $this->server->addHttp2Port('0.0.0.0:8888');
+        $port = $this->server->addHttp2Port('0.0.0.0:0');
         $this->server->start();
-        $channel = new Grpc\Channel('localhost:8888',
+        $channel = new Grpc\Channel('localhost:' . $port,
              ['credentials' => Grpc\ChannelCredentials::createInsecure()]);
 
         $deadline = Grpc\Timeval::infFuture();
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 4e4062b..6087276 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -905,6 +905,21 @@
     raise NotImplementedError()
 
   @abc.abstractmethod
+  def add_shutdown_handler(self, shutdown_handler):
+    """Adds a handler to be called on server shutdown.
+
+    Shutdown handlers are run on server stop() or in the event that a running
+    server is destroyed unexpectedly.  The handlers are run in series before
+    the stop grace period.
+
+    Args:
+      shutdown_handler:  A function taking a single arg, a time in seconds
+      within which the handler should complete.  None indicates the handler can
+      run for any duration.
+    """
+    raise NotImplementedError()
+
+  @abc.abstractmethod
   def start(self):
     """Starts this Server's service of RPCs.
 
@@ -914,7 +929,7 @@
     raise NotImplementedError()
 
   @abc.abstractmethod
-  def stop(self, grace):
+  def stop(self, grace, shutdown_handler_grace=None):
     """Stops this Server's service of RPCs.
 
     All calls to this method immediately stop service of new RPCs. When existing
@@ -937,6 +952,8 @@
         aborted by this Server's stopping. If None, all RPCs will be aborted
         immediately and this method will block until this Server is completely
         stopped.
+      shutdown_handler_grace:  A duration of time in seconds or None.  This
+        value is passed to all shutdown handlers.
 
     Returns:
       A threading.Event that will be set when this Server has completely
@@ -1231,7 +1248,8 @@
                           credentials._credentials)
 
 
-def server(thread_pool, handlers=None, options=None):
+def server(thread_pool, handlers=None, options=None, exit_grace=None,
+           exit_shutdown_handler_grace=None):
   """Creates a Server with which RPCs can be serviced.
 
   Args:
@@ -1244,13 +1262,19 @@
       returned Server is started.
     options: A sequence of string-value pairs according to which to configure
       the created server.
+    exit_grace:  The grace period to use when terminating
+      running servers at interpreter exit.  None indicates unspecified.
+    exit_shutdown_handler_grace:  The shutdown handler grace to use when
+      terminating running servers at interpreter exit.  None indicates
+      unspecified.
 
   Returns:
     A Server with which RPCs can be serviced.
   """
   from grpc import _server
   return _server.Server(thread_pool, () if handlers is None else handlers,
-                        () if options is None else options)
+                        () if options is None else options, exit_grace,
+                        exit_shutdown_handler_grace)
 
 
 ###################################  __all__  #################################
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 5223712..d83a2e6 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -60,7 +60,8 @@
 _EMPTY_FLAGS = 0
 _EMPTY_METADATA = cygrpc.Metadata(())
 
-_UNEXPECTED_EXIT_SERVER_GRACE = 1.0
+_DEFAULT_EXIT_GRACE = 1.0
+_DEFAULT_EXIT_SHUTDOWN_HANDLER_GRACE = 5.0
 
 
 def _serialized_request(request_event):
@@ -595,14 +596,18 @@
 
 class _ServerState(object):
 
-  def __init__(self, completion_queue, server, generic_handlers, thread_pool):
+  def __init__(self, completion_queue, server, generic_handlers, thread_pool,
+               exit_grace, exit_shutdown_handler_grace):
     self.lock = threading.Lock()
     self.completion_queue = completion_queue
     self.server = server
     self.generic_handlers = list(generic_handlers)
     self.thread_pool = thread_pool
+    self.exit_grace = exit_grace
+    self.exit_shutdown_handler_grace = exit_shutdown_handler_grace
     self.stage = _ServerStage.STOPPED
     self.shutdown_events = None
+    self.shutdown_handlers = []
 
     # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
     self.rpc_states = set()
@@ -672,41 +677,45 @@
             return
 
 
-def _stop(state, grace):
-  with state.lock:
-    if state.stage is _ServerStage.STOPPED:
-      shutdown_event = threading.Event()
-      shutdown_event.set()
-      return shutdown_event
-    else:
-      if state.stage is _ServerStage.STARTED:
-        state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
+def _stop(state, grace, shutdown_handler_grace):
+  shutdown_event = threading.Event()
+
+  def cancel_all_calls_after_grace():
+    with state.lock:
+      if state.stage is _ServerStage.STOPPED:
+        shutdown_event.set()
+        return
+      elif state.stage is _ServerStage.STARTED:
+        do_shutdown = True
         state.stage = _ServerStage.GRACE
         state.shutdown_events = []
-        state.due.add(_SHUTDOWN_TAG)
-      shutdown_event = threading.Event()
+      else:
+        do_shutdown = False
       state.shutdown_events.append(shutdown_event)
-      if grace is None:
+
+    if do_shutdown:
+      # Run Shutdown Handlers without the lock
+      for handler in state.shutdown_handlers:
+        handler(shutdown_handler_grace)
+      with state.lock:
+        state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
+        state.stage = _ServerStage.GRACE
+        state.due.add(_SHUTDOWN_TAG)
+
+    if not shutdown_event.wait(timeout=grace):
+      with state.lock:
         state.server.cancel_all_calls()
         # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
         for rpc_state in state.rpc_states:
           with rpc_state.condition:
             rpc_state.client = _CANCELLED
             rpc_state.condition.notify_all()
-      else:
-        def cancel_all_calls_after_grace():
-          shutdown_event.wait(timeout=grace)
-          with state.lock:
-            state.server.cancel_all_calls()
-            # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
-            for rpc_state in state.rpc_states:
-              with rpc_state.condition:
-                rpc_state.client = _CANCELLED
-                rpc_state.condition.notify_all()
-        thread = threading.Thread(target=cancel_all_calls_after_grace)
-        thread.start()
-        return shutdown_event
-  shutdown_event.wait()
+
+  if grace is None:
+    cancel_all_calls_after_grace()
+  else:
+    threading.Thread(target=cancel_all_calls_after_grace).start()
+
   return shutdown_event
 
 
@@ -716,12 +725,12 @@
       raise ValueError('Cannot start already-started server!')
     state.server.start()
     state.stage = _ServerStage.STARTED
-    _request_call(state)    
+    _request_call(state)
     def cleanup_server(timeout):
       if timeout is None:
-        _stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait()
+        _stop(state, state.exit_grace, state.exit_shutdown_handler_grace).wait()
       else:
-        _stop(state, timeout).wait()
+        _stop(state, timeout, 0).wait()
 
     thread = _common.CleanupThread(
         cleanup_server, target=_serve, args=(state,))
@@ -729,12 +738,16 @@
 
 class Server(grpc.Server):
 
-  def __init__(self, thread_pool, generic_handlers, options):
+  def __init__(self, thread_pool, generic_handlers, options, exit_grace,
+               exit_shutdown_handler_grace):
     completion_queue = cygrpc.CompletionQueue()
     server = cygrpc.Server(_common.channel_args(options))
     server.register_completion_queue(completion_queue)
     self._state = _ServerState(
-        completion_queue, server, generic_handlers, thread_pool)
+        completion_queue, server, generic_handlers, thread_pool,
+        _DEFAULT_EXIT_GRACE if exit_grace is None else exit_grace,
+        _DEFAULT_EXIT_SHUTDOWN_HANDLER_GRACE if exit_shutdown_handler_grace
+        is None else exit_shutdown_handler_grace)
 
   def add_generic_rpc_handlers(self, generic_rpc_handlers):
     _add_generic_handlers(self._state, generic_rpc_handlers)
@@ -745,11 +758,14 @@
   def add_secure_port(self, address, server_credentials):
     return _add_secure_port(self._state, _common.encode(address), server_credentials)
 
+  def add_shutdown_handler(self, handler):
+    self._state.shutdown_handlers.append(handler)
+
   def start(self):
     _start(self._state)
 
-  def stop(self, grace):
-    return _stop(self._state, grace)
+  def stop(self, grace, shutdown_handler_grace=None):
+    return _stop(self._state, grace, shutdown_handler_grace)
 
   def __del__(self):
-    _stop(self._state, None)
+    _stop(self._state, None, None)
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index dd4a025..04a2e44 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -27,6 +27,7 @@
   "unit._cython.cygrpc_test.TypeSmokeTest",
   "unit._empty_message_test.EmptyMessageTest",
   "unit._exit_test.ExitTest",
+  "unit._exit_test.ShutdownHandlerTest",
   "unit._metadata_code_details_test.MetadataCodeDetailsTest",
   "unit._metadata_test.MetadataTest",
   "unit._rpc_test.RPCTest",
diff --git a/src/python/grpcio_tests/tests/unit/_exit_test.py b/src/python/grpcio_tests/tests/unit/_exit_test.py
index 5a4a328..342f5fc 100644
--- a/src/python/grpcio_tests/tests/unit/_exit_test.py
+++ b/src/python/grpcio_tests/tests/unit/_exit_test.py
@@ -43,6 +43,8 @@
 import time
 import unittest
 
+import grpc
+from grpc.framework.foundation import logging_pool
 from tests.unit import _exit_scenarios
 
 SCENARIO_FILE = os.path.abspath(os.path.join(
@@ -52,7 +54,7 @@
 BASE_SIGTERM_COMMAND = BASE_COMMAND + ['--wait_for_interrupt']
 
 INIT_TIME = 1.0
-
+SHUTDOWN_GRACE = 5.0
 
 processes = []
 process_lock = threading.Lock()
@@ -182,5 +184,24 @@
     interrupt_and_wait(process)
 
 
+class _ShutDownHandler(object):
+
+  def __init__(self):
+    self.seen_handler_grace = None
+
+  def shutdown_handler(self, handler_grace):
+    self.seen_handler_grace = handler_grace
+
+  
+class ShutdownHandlerTest(unittest.TestCase):
+
+  def test_shutdown_handler(self):
+    server = grpc.server(logging_pool.pool(1))
+    handler = _ShutDownHandler()
+    server.add_shutdown_handler(handler.shutdown_handler)
+    server.start()
+    server.stop(0, shutdown_handler_grace=SHUTDOWN_GRACE).wait()
+    self.assertEqual(SHUTDOWN_GRACE, handler.seen_handler_grace)
+
 if __name__ == '__main__':
   unittest.main(verbosity=2)
diff --git a/templates/tools/dockerfile/test/sanity/Dockerfile.template b/templates/tools/dockerfile/test/sanity/Dockerfile.template
index 12309b6..0168353 100644
--- a/templates/tools/dockerfile/test/sanity/Dockerfile.template
+++ b/templates/tools/dockerfile/test/sanity/Dockerfile.template
@@ -48,9 +48,12 @@
   #======================================
   # More sanity test dependencies (bazel)
   RUN apt-get install -y openjdk-8-jdk
-  # TOOD(jtattermusch): pin the bazel version
-  RUN git clone https://github.com/bazelbuild/bazel.git /bazel
-  RUN cd /bazel && ./compile.sh
+  # Check out Bazel version 0.4.1 since this version allows running
+  # ./compile.sh without a local protoc dependency
+  # TODO(mattkwong): install dependencies to support latest Bazel version if newer
+  # version is needed
+  RUN git clone https://github.com/bazelbuild/bazel.git /bazel && \
+    cd /bazel && git checkout tags/0.4.1 && ./compile.sh
   RUN ln -s /bazel/output/bazel /bin/
   
   #===================
diff --git a/test/core/client_channel/lb_policies_test.c b/test/core/client_channel/lb_policies_test.c
index 95595d7..51df52c 100644
--- a/test/core/client_channel/lb_policies_test.c
+++ b/test/core/client_channel/lb_policies_test.c
@@ -64,9 +64,11 @@
 } servers_fixture;
 
 typedef struct request_sequences {
-  size_t n;
-  int *connections;
-  int *connectivity_states;
+  size_t n;         /* number of iterations */
+  int *connections; /* indexed by the interation number, value is the index of
+                       the server it connected to or -1 if none */
+  int *connectivity_states; /* indexed by the interation number, value is the
+                               client connectivity state */
 } request_sequences;
 
 typedef void (*verifier_fn)(const servers_fixture *, grpc_channel *,
@@ -780,15 +782,17 @@
     }
   }
 
-  /* no server is ever available. The persistent state is TRANSIENT_FAILURE */
+  /* no server is ever available. The persistent state is TRANSIENT_FAILURE. May
+   * also be CONNECTING if, under load, this check took too long to run and some
+   * subchannel already transitioned to retrying. */
   for (size_t i = 0; i < sequences->n; i++) {
     const grpc_connectivity_state actual = sequences->connectivity_states[i];
-    const grpc_connectivity_state expected = GRPC_CHANNEL_TRANSIENT_FAILURE;
-    if (actual != expected) {
+    if (actual != GRPC_CHANNEL_TRANSIENT_FAILURE &&
+        actual != GRPC_CHANNEL_CONNECTING) {
       gpr_log(GPR_ERROR,
-              "CONNECTIVITY STATUS SEQUENCE FAILURE: expected '%s', got '%s' "
-              "at iteration #%d",
-              grpc_connectivity_state_name(expected),
+              "CONNECTIVITY STATUS SEQUENCE FAILURE: expected "
+              "GRPC_CHANNEL_TRANSIENT_FAILURE or GRPC_CHANNEL_CONNECTING, got "
+              "'%s' at iteration #%d",
               grpc_connectivity_state_name(actual), (int)i);
       abort();
     }
@@ -825,8 +829,7 @@
   }
 
   /* We can assert that the first client channel state should be READY, when all
-   * servers were available; and that the last one should be TRANSIENT_FAILURE,
-   * after all servers are gone. */
+   * servers were available */
   grpc_connectivity_state actual = sequences->connectivity_states[0];
   grpc_connectivity_state expected = GRPC_CHANNEL_READY;
   if (actual != expected) {
@@ -838,17 +841,21 @@
     abort();
   }
 
+  /* ... and that the last one should be TRANSIENT_FAILURE, after all servers
+   * are gone. May also be CONNECTING if, under load, this check took too long
+   * to run and the subchannel already transitioned to retrying. */
   actual = sequences->connectivity_states[num_iters - 1];
-  expected = GRPC_CHANNEL_TRANSIENT_FAILURE;
-  if (actual != expected) {
-    gpr_log(GPR_ERROR,
-            "CONNECTIVITY STATUS SEQUENCE FAILURE: expected '%s', got '%s' "
-            "at iteration #%d",
-            grpc_connectivity_state_name(expected),
-            grpc_connectivity_state_name(actual), (int)num_iters - 1);
-    abort();
+  for (i = 0; i < sequences->n; i++) {
+    if (actual != GRPC_CHANNEL_TRANSIENT_FAILURE &&
+        actual != GRPC_CHANNEL_CONNECTING) {
+      gpr_log(GPR_ERROR,
+              "CONNECTIVITY STATUS SEQUENCE FAILURE: expected "
+              "GRPC_CHANNEL_TRANSIENT_FAILURE or GRPC_CHANNEL_CONNECTING, got "
+              "'%s' at iteration #%d",
+              grpc_connectivity_state_name(actual), (int)i);
+      abort();
+    }
   }
-
   gpr_free(expected_connection_sequence);
 }
 
@@ -873,68 +880,21 @@
                                        grpc_channel *client,
                                        const request_sequences *sequences,
                                        const size_t num_iters) {
-  int *expected_connection_sequence;
-  size_t i, j, unique_seq_last_idx, unique_seq_first_idx;
-  const size_t expected_seq_length = f->num_servers;
-  int *seen_elements;
-
   dump_array("actual_connection_sequence", sequences->connections, num_iters);
 
-  /* verify conn. seq. expectation */
-  /* get the first unique run of length "num_servers". */
-  expected_connection_sequence = gpr_malloc(sizeof(int) * expected_seq_length);
-  seen_elements = gpr_malloc(sizeof(int) * expected_seq_length);
-
-  unique_seq_last_idx = ~(size_t)0;
-
-  memset(seen_elements, 0, sizeof(int) * expected_seq_length);
-  for (i = 0; i < num_iters; i++) {
-    if (sequences->connections[i] < 0 ||
-        seen_elements[sequences->connections[i]] != 0) {
-      /* if anything breaks the uniqueness of the run, back to square zero */
-      memset(seen_elements, 0, sizeof(int) * expected_seq_length);
-      continue;
-    }
-    seen_elements[sequences->connections[i]] = 1;
-    for (j = 0; j < expected_seq_length; j++) {
-      if (seen_elements[j] == 0) break;
-    }
-    if (j == expected_seq_length) { /* seen all the elements */
-      unique_seq_last_idx = i;
-      break;
-    }
-  }
-  /* make sure we found a valid run */
-  dump_array("seen_elements", seen_elements, expected_seq_length);
-  for (j = 0; j < expected_seq_length; j++) {
-    GPR_ASSERT(seen_elements[j] != 0);
-  }
-
-  GPR_ASSERT(unique_seq_last_idx != ~(size_t)0);
-
-  unique_seq_first_idx = (unique_seq_last_idx - expected_seq_length + 1);
-  memcpy(expected_connection_sequence,
-         sequences->connections + unique_seq_first_idx,
-         sizeof(int) * expected_seq_length);
-
   /* first iteration succeeds */
   GPR_ASSERT(sequences->connections[0] != -1);
   /* then we fail for a while... */
   GPR_ASSERT(sequences->connections[1] == -1);
-  /* ... but should be up at "unique_seq_first_idx" */
-  GPR_ASSERT(sequences->connections[unique_seq_first_idx] != -1);
-
-  for (j = 0, i = unique_seq_first_idx; i < num_iters; i++) {
-    const int actual = sequences->connections[i];
-    const int expected =
-        expected_connection_sequence[j++ % expected_seq_length];
-    if (actual != expected) {
-      print_failed_expectations(expected_connection_sequence,
-                                sequences->connections, expected_seq_length,
-                                num_iters);
-      abort();
+  /* ... but should be up eventually */
+  size_t first_iter_back_up = ~0ul;
+  for (size_t i = 2; i < sequences->n; ++i) {
+    if (sequences->connections[i] != -1) {
+      first_iter_back_up = i;
+      break;
     }
   }
+  GPR_ASSERT(first_iter_back_up != ~0ul);
 
   /* We can assert that the first client channel state should be READY, when all
    * servers were available; same thing for the last one. In the middle
@@ -962,7 +922,7 @@
   }
 
   bool found_failure_status = false;
-  for (i = 1; i < sequences->n - 1; i++) {
+  for (size_t i = 1; i < sequences->n - 1; i++) {
     if (sequences->connectivity_states[i] == GRPC_CHANNEL_TRANSIENT_FAILURE) {
       found_failure_status = true;
       break;
@@ -974,14 +934,11 @@
         "CONNECTIVITY STATUS SEQUENCE FAILURE: "
         "GRPC_CHANNEL_TRANSIENT_FAILURE status not found. Got the following "
         "instead:");
-    for (i = 0; i < num_iters; i++) {
+    for (size_t i = 0; i < num_iters; i++) {
       gpr_log(GPR_ERROR, "[%d]: %s", (int)i,
               grpc_connectivity_state_name(sequences->connectivity_states[i]));
     }
   }
-
-  gpr_free(expected_connection_sequence);
-  gpr_free(seen_elements);
 }
 
 int main(int argc, char **argv) {
diff --git a/tools/dockerfile/push_testing_images.sh b/tools/dockerfile/push_testing_images.sh
new file mode 100755
index 0000000..f1ee8d5
--- /dev/null
+++ b/tools/dockerfile/push_testing_images.sh
@@ -0,0 +1,64 @@
+#!/bin/bash
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+#     * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#     * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+#     * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Builds selected testing docker images and pushes them to dockerhub.
+# Useful for testing environments where it's impractical (or impossible)
+# to rely on docker images being cached locally after they've been built
+# for the first time (which might be costly especially for some images).
+# NOTE: gRPC docker images intended to be used by end users are NOT
+# pushed using this script (they're built automatically by dockerhub).
+# This script is only for "internal" images we use when testing gRPC.  
+
+set -ex
+
+cd $(dirname $0)/../..
+git_root=$(pwd)
+cd -
+
+DOCKERHUB_ORGANIZATION=grpctesting
+
+for DOCKERFILE_DIR in tools/dockerfile/test/fuzzer
+do
+  # Generate image name based on Dockerfile checksum. That works well as long
+  # as can count on dockerfiles being written in a way that changing the logical 
+  # contents of the docker image always changes the SHA (e.g. using "ADD file" 
+  # cmd in the dockerfile in not ok as contents of the added file will not be
+  # reflected in the SHA).
+  DOCKER_IMAGE_NAME=$(basename $DOCKERFILE_DIR)_$(sha1sum $DOCKERFILE_DIR/Dockerfile | cut -f1 -d\ )
+
+  # skip the image if it already exists in the repo 
+  curl --silent -f -lSL https://registry.hub.docker.com/v2/repositories/${DOCKERHUB_ORGANIZATION}/${DOCKER_IMAGE_NAME}/tags/latest > /dev/null \
+      && continue
+
+  docker build -t ${DOCKERHUB_ORGANIZATION}/${DOCKER_IMAGE_NAME} ${DOCKERFILE_DIR}
+      
+  # "docker login" needs to be run in advance
+  docker push ${DOCKERHUB_ORGANIZATION}/${DOCKER_IMAGE_NAME}
+done
diff --git a/tools/dockerfile/test/sanity/Dockerfile b/tools/dockerfile/test/sanity/Dockerfile
index f4b4831..6b19ac8 100644
--- a/tools/dockerfile/test/sanity/Dockerfile
+++ b/tools/dockerfile/test/sanity/Dockerfile
@@ -93,9 +93,11 @@
 #======================================
 # More sanity test dependencies (bazel)
 RUN apt-get install -y openjdk-8-jdk
-# TOOD(jtattermusch): pin the bazel version
-RUN git clone https://github.com/bazelbuild/bazel.git /bazel
-RUN cd /bazel && ./compile.sh
+# Check out Bazel version 0.4.1 since this version allows running
+# ./compile.sh without a local protoc dependency
+# TODO(mattkwong): install dependencies to support latest Bazel version if newer
+# version is needed
+RUN git clone https://github.com/bazelbuild/bazel.git /bazel &&   cd /bazel && git checkout tags/0.4.1 && ./compile.sh
 RUN ln -s /bazel/output/bazel /bin/
 
 #===================
diff --git a/tools/run_tests/dockerize/build_and_run_docker.sh b/tools/run_tests/dockerize/build_and_run_docker.sh
index 1ef34b2..f52f16e 100755
--- a/tools/run_tests/dockerize/build_and_run_docker.sh
+++ b/tools/run_tests/dockerize/build_and_run_docker.sh
@@ -41,13 +41,20 @@
 # DOCKERFILE_DIR - Directory in which Dockerfile file is located.
 # DOCKER_RUN_SCRIPT - Script to run under docker (relative to grpc repo root)
 # OUTPUT_DIR - Directory that will be copied from inside docker after finishing.
+# DOCKERHUB_ORGANIZATION - If set, pull a prebuilt image from given dockerhub org.
 # $@ - Extra args to pass to docker run
 
 # Use image name based on Dockerfile location checksum
 DOCKER_IMAGE_NAME=$(basename $DOCKERFILE_DIR)_$(sha1sum $DOCKERFILE_DIR/Dockerfile | cut -f1 -d\ )
 
-# Make sure docker image has been built. Should be instantaneous if so.
-docker build -t $DOCKER_IMAGE_NAME $DOCKERFILE_DIR
+if [ "$DOCKERHUB_ORGANIZATION" != "" ]
+then
+  DOCKER_IMAGE_NAME=$DOCKERHUB_ORGANIZATION/$DOCKER_IMAGE_NAME
+  docker pull $DOCKER_IMAGE_NAME
+else
+  # Make sure docker image has been built. Should be instantaneous if so.
+  docker build -t $DOCKER_IMAGE_NAME $DOCKERFILE_DIR
+fi
 
 # Choose random name for docker container
 CONTAINER_NAME="build_and_run_docker_$(uuidgen)"