Merge pull request #13559 from dmaclach/threadnames

Add thread naming support on platforms that support it.
diff --git a/.gitignore b/.gitignore
index 151bbbd..2b35c5f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -121,6 +121,7 @@
 tags
 
 # perf data
+memory_usage.csv
 perf.data
 perf.data.old
 
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 921487e..22a1ab7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -235,6 +235,7 @@
 elseif("${gRPC_SSL_PROVIDER}" STREQUAL "package")
   find_package(OpenSSL REQUIRED)
   set(_gRPC_SSL_LIBRARIES ${OPENSSL_LIBRARIES})
+  include_directories(${OPENSSL_INCLUDE_DIR})
   set(_gRPC_FIND_SSL "if(NOT OPENSSL_FOUND)\n  find_package(OpenSSL)\nendif()")
 endif()
 
@@ -677,6 +678,7 @@
 endif()
 add_dependencies(buildtests_cxx channel_arguments_test)
 add_dependencies(buildtests_cxx channel_filter_test)
+add_dependencies(buildtests_cxx chttp2_settings_timeout_test)
 add_dependencies(buildtests_cxx cli_call_test)
 add_dependencies(buildtests_cxx client_channel_stress_test)
 if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@@ -9862,6 +9864,44 @@
 endif (gRPC_BUILD_TESTS)
 if (gRPC_BUILD_TESTS)
 
+add_executable(chttp2_settings_timeout_test
+  test/core/transport/chttp2/settings_timeout_test.cc
+  third_party/googletest/googletest/src/gtest-all.cc
+  third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+
+target_include_directories(chttp2_settings_timeout_test
+  PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
+  PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
+  PRIVATE ${BORINGSSL_ROOT_DIR}/include
+  PRIVATE ${PROTOBUF_ROOT_DIR}/src
+  PRIVATE ${BENCHMARK_ROOT_DIR}/include
+  PRIVATE ${ZLIB_ROOT_DIR}
+  PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
+  PRIVATE ${CARES_INCLUDE_DIR}
+  PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
+  PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
+  PRIVATE third_party/googletest/googletest/include
+  PRIVATE third_party/googletest/googletest
+  PRIVATE third_party/googletest/googlemock/include
+  PRIVATE third_party/googletest/googlemock
+  PRIVATE ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(chttp2_settings_timeout_test
+  ${_gRPC_PROTOBUF_LIBRARIES}
+  ${_gRPC_ALLTARGETS_LIBRARIES}
+  grpc_test_util
+  grpc
+  gpr_test_util
+  gpr
+  ${_gRPC_GFLAGS_LIBRARIES}
+)
+
+endif (gRPC_BUILD_TESTS)
+if (gRPC_BUILD_TESTS)
+
 add_executable(cli_call_test
   test/cpp/util/cli_call_test.cc
   third_party/googletest/googletest/src/gtest-all.cc
diff --git a/Makefile b/Makefile
index 0e52d5a..9dfd2a8 100644
--- a/Makefile
+++ b/Makefile
@@ -1114,6 +1114,7 @@
 bm_pollset: $(BINDIR)/$(CONFIG)/bm_pollset
 channel_arguments_test: $(BINDIR)/$(CONFIG)/channel_arguments_test
 channel_filter_test: $(BINDIR)/$(CONFIG)/channel_filter_test
+chttp2_settings_timeout_test: $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test
 cli_call_test: $(BINDIR)/$(CONFIG)/cli_call_test
 client_channel_stress_test: $(BINDIR)/$(CONFIG)/client_channel_stress_test
 client_crash_test: $(BINDIR)/$(CONFIG)/client_crash_test
@@ -1557,6 +1558,7 @@
   $(BINDIR)/$(CONFIG)/bm_pollset \
   $(BINDIR)/$(CONFIG)/channel_arguments_test \
   $(BINDIR)/$(CONFIG)/channel_filter_test \
+  $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
   $(BINDIR)/$(CONFIG)/cli_call_test \
   $(BINDIR)/$(CONFIG)/client_channel_stress_test \
   $(BINDIR)/$(CONFIG)/client_crash_test \
@@ -1684,6 +1686,7 @@
   $(BINDIR)/$(CONFIG)/bm_pollset \
   $(BINDIR)/$(CONFIG)/channel_arguments_test \
   $(BINDIR)/$(CONFIG)/channel_filter_test \
+  $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
   $(BINDIR)/$(CONFIG)/cli_call_test \
   $(BINDIR)/$(CONFIG)/client_channel_stress_test \
   $(BINDIR)/$(CONFIG)/client_crash_test \
@@ -2069,6 +2072,8 @@
 	$(Q) $(BINDIR)/$(CONFIG)/channel_arguments_test || ( echo test channel_arguments_test failed ; exit 1 )
 	$(E) "[RUN]     Testing channel_filter_test"
 	$(Q) $(BINDIR)/$(CONFIG)/channel_filter_test || ( echo test channel_filter_test failed ; exit 1 )
+	$(E) "[RUN]     Testing chttp2_settings_timeout_test"
+	$(Q) $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test || ( echo test chttp2_settings_timeout_test failed ; exit 1 )
 	$(E) "[RUN]     Testing cli_call_test"
 	$(Q) $(BINDIR)/$(CONFIG)/cli_call_test || ( echo test cli_call_test failed ; exit 1 )
 	$(E) "[RUN]     Testing client_channel_stress_test"
@@ -14369,6 +14374,49 @@
 endif
 
 
+CHTTP2_SETTINGS_TIMEOUT_TEST_SRC = \
+    test/core/transport/chttp2/settings_timeout_test.cc \
+
+CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CHTTP2_SETTINGS_TIMEOUT_TEST_SRC))))
+ifeq ($(NO_SECURE),true)
+
+# You can't build secure targets if you don't have OpenSSL.
+
+$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: openssl_dep_error
+
+else
+
+
+
+
+ifeq ($(NO_PROTOBUF),true)
+
+# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
+
+$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: protobuf_dep_error
+
+else
+
+$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: $(PROTOBUF_DEP) $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+	$(E) "[LD]      Linking $@"
+	$(Q) mkdir -p `dirname $@`
+	$(Q) $(LDXX) $(LDFLAGS) $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test
+
+endif
+
+endif
+
+$(OBJDIR)/$(CONFIG)/test/core/transport/chttp2/settings_timeout_test.o:  $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+
+deps_chttp2_settings_timeout_test: $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS:.o=.dep)
+
+ifneq ($(NO_SECURE),true)
+ifneq ($(NO_DEPS),true)
+-include $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS:.o=.dep)
+endif
+endif
+
+
 CLI_CALL_TEST_SRC = \
     test/cpp/util/cli_call_test.cc \
 
diff --git a/build.yaml b/build.yaml
index 854a695..6158b46 100644
--- a/build.yaml
+++ b/build.yaml
@@ -3824,6 +3824,18 @@
   - grpc
   - gpr
   uses_polling: false
+- name: chttp2_settings_timeout_test
+  gtest: true
+  build: test
+  language: c++
+  src:
+  - test/core/transport/chttp2/settings_timeout_test.cc
+  deps:
+  - grpc_test_util
+  - grpc
+  - gpr_test_util
+  - gpr
+  uses_polling: true
 - name: cli_call_test
   gtest: true
   build: test
diff --git a/examples/cpp/helloworld/CMakeLists.txt b/examples/cpp/helloworld/CMakeLists.txt
index 71a8db4..49684a1 100644
--- a/examples/cpp/helloworld/CMakeLists.txt
+++ b/examples/cpp/helloworld/CMakeLists.txt
@@ -6,13 +6,29 @@
 

 if(NOT MSVC)

   set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")

+else()

+  add_definitions(-D_WIN32_WINNT=0x600)

 endif()

 

 # Protobuf

-set(protobuf_MODULE_COMPATIBLE TRUE)

-find_package(protobuf CONFIG REQUIRED)

+# NOTE: we cannot use "CONFIG" mode here because protobuf-config.cmake

+# is broken when used with CMAKE_INSTALL_PREFIX

+find_package(Protobuf REQUIRED)

 message(STATUS "Using protobuf ${protobuf_VERSION}")

 

+if(Protobuf_FOUND)

+  # Protobuf_FOUND is set for package type "CONFIG"

+  set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf)

+  set(_PROTOBUF_PROTOC protobuf::protoc)

+elseif(PROTOBUF_FOUND)

+  # PROTOBUF_FOUND is set for package type "MODULE"

+  set(_PROTOBUF_LIBPROTOBUF ${PROTOBUF_LIBRARIES})

+  set(_PROTOBUF_PROTOC ${PROTOBUF_PROTOC_EXECUTABLE})

+  include_directories(${PROTOBUF_INCLUDE_DIRS})

+else()

+  message(WARNING "Failed to locate libprotobuf and protoc!")

+endif()

+

 # gRPC

 find_package(gRPC CONFIG REQUIRED)

 message(STATUS "Using gRPC ${gRPC_VERSION}")

@@ -31,7 +47,7 @@
 set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h")

 add_custom_command(

       OUTPUT "${hw_grpc_srcs}" "${hw_grpc_hdrs}"

-      COMMAND protobuf::protoc

+      COMMAND ${_PROTOBUF_PROTOC}

       ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" -I "${hw_proto_path}"

         --plugin=protoc-gen-grpc="${gRPC_CPP_PLUGIN_EXECUTABLE}"

         "${hw_proto}"

@@ -48,6 +64,6 @@
     ${hw_proto_srcs}

     ${hw_grpc_srcs})

   target_link_libraries(${_target}

-    protobuf::libprotobuf

+    ${_PROTOBUF_LIBPROTOBUF}

     gRPC::grpc++_unsecure)

 endforeach()

diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 39b8484..bc4b318 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -34,6 +34,10 @@
     :tag => "v#{version}",
   }
 
+  # gRPC podspecs depend on fix for https://github.com/CocoaPods/CocoaPods/issues/6024,
+  # which was released in Cocoapods v1.2.0.
+  s.cocoapods_version = '>= 1.2.0'
+
   s.ios.deployment_target = '7.0'
   s.osx.deployment_target = '10.9'
   s.requires_arc = false
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index ad668b4..77844aa 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -240,6 +240,9 @@
 /** The time between the first and second connection attempts, in ms */
 #define GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS \
   "grpc.initial_reconnect_backoff_ms"
+/** The timeout used on servers for finishing handshaking on an incoming
+    connection.  Defaults to 120 seconds. */
+#define GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS "grpc.server_handshake_timeout_ms"
 /** This *should* be used for testing only.
     The caller of the secure_channel_create functions may override the target
     name used for SSL host name checking using this channel argument which is of
diff --git a/include/grpc/support/tls_gcc.h b/include/grpc/support/tls_gcc.h
index 019acdf..1b91d22 100644
--- a/include/grpc/support/tls_gcc.h
+++ b/include/grpc/support/tls_gcc.h
@@ -26,44 +26,6 @@
 /** Thread local storage based on gcc compiler primitives.
    #include tls.h to use this - and see that file for documentation */
 
-#ifndef NDEBUG
-
-struct gpr_gcc_thread_local {
-  intptr_t value;
-  bool* inited;
-};
-
-#define GPR_TLS_DECL(name)           \
-  static bool name##_inited = false; \
-  static __thread struct gpr_gcc_thread_local name = {0, &(name##_inited)}
-
-#define gpr_tls_init(tls)                  \
-  do {                                     \
-    GPR_ASSERT(*((tls)->inited) == false); \
-    *((tls)->inited) = true;               \
-  } while (0)
-
-/** It is allowed to call gpr_tls_init after gpr_tls_destroy is called. */
-#define gpr_tls_destroy(tls)      \
-  do {                            \
-    GPR_ASSERT(*((tls)->inited)); \
-    *((tls)->inited) = false;     \
-  } while (0)
-
-#define gpr_tls_set(tls, new_value) \
-  do {                              \
-    GPR_ASSERT(*((tls)->inited));   \
-    (tls)->value = (new_value);     \
-  } while (0)
-
-#define gpr_tls_get(tls)          \
-  ({                              \
-    GPR_ASSERT(*((tls)->inited)); \
-    (tls)->value;                 \
-  })
-
-#else /* NDEBUG */
-
 struct gpr_gcc_thread_local {
   intptr_t value;
 };
@@ -80,6 +42,4 @@
 #define gpr_tls_set(tls, new_value) (((tls)->value) = (new_value))
 #define gpr_tls_get(tls) ((tls)->value)
 
-#endif /* NDEBUG */
-
 #endif /* GRPC_SUPPORT_TLS_GCC_H */
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 58496dc..aced9ad 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -210,6 +210,14 @@
   char* info_service_config_json;
 } channel_data;
 
+typedef struct {
+  channel_data* chand;
+  /** used as an identifier, don't dereference it because the LB policy may be
+   * non-existing when the callback is run */
+  grpc_lb_policy* lb_policy;
+  grpc_closure closure;
+} reresolution_request_args;
+
 /** We create one watcher for each new lb_policy that is returned from a
     resolver, to watch for state changes from the lb_policy. When a state
     change is seen, we update the channel, and create a new watcher. */
@@ -258,21 +266,13 @@
 static void on_lb_policy_state_changed_locked(grpc_exec_ctx* exec_ctx,
                                               void* arg, grpc_error* error) {
   lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg;
-  grpc_connectivity_state publish_state = w->state;
   /* check if the notification is for the latest policy */
   if (w->lb_policy == w->chand->lb_policy) {
     if (grpc_client_channel_trace.enabled()) {
       gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
               w->lb_policy, grpc_connectivity_state_name(w->state));
     }
-    if (publish_state == GRPC_CHANNEL_SHUTDOWN &&
-        w->chand->resolver != nullptr) {
-      publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
-      grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
-      GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
-      w->chand->lb_policy = nullptr;
-    }
-    set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
+    set_channel_connectivity_state_locked(exec_ctx, w->chand, w->state,
                                           GRPC_ERROR_REF(error), "lb_changed");
     if (w->state != GRPC_CHANNEL_SHUTDOWN) {
       watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
@@ -369,6 +369,27 @@
   }
 }
 
+static void request_reresolution_locked(grpc_exec_ctx* exec_ctx, void* arg,
+                                        grpc_error* error) {
+  reresolution_request_args* args = (reresolution_request_args*)arg;
+  channel_data* chand = args->chand;
+  // If this invocation is for a stale LB policy, treat it as an LB shutdown
+  // signal.
+  if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE ||
+      chand->resolver == nullptr) {
+    GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "re-resolution");
+    gpr_free(args);
+    return;
+  }
+  if (grpc_client_channel_trace.enabled()) {
+    gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand);
+  }
+  grpc_resolver_channel_saw_error_locked(exec_ctx, chand->resolver);
+  // Give back the closure to the LB policy.
+  grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, chand->lb_policy,
+                                              &args->closure);
+}
+
 static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
                                               void* arg, grpc_error* error) {
   channel_data* chand = (channel_data*)arg;
@@ -385,100 +406,114 @@
   grpc_server_retry_throttle_data* retry_throttle_data = nullptr;
   grpc_slice_hash_table* method_params_table = nullptr;
   if (chand->resolver_result != nullptr) {
-    // Find LB policy name.
-    const char* lb_policy_name = nullptr;
-    const grpc_arg* channel_arg =
-        grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
-    if (channel_arg != nullptr) {
-      GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
-      lb_policy_name = channel_arg->value.string;
-    }
-    // Special case: If at least one balancer address is present, we use
-    // the grpclb policy, regardless of what the resolver actually specified.
-    channel_arg =
-        grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
-    if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
-      grpc_lb_addresses* addresses =
-          (grpc_lb_addresses*)channel_arg->value.pointer.p;
-      bool found_balancer_address = false;
-      for (size_t i = 0; i < addresses->num_addresses; ++i) {
-        if (addresses->addresses[i].is_balancer) {
-          found_balancer_address = true;
-          break;
-        }
-      }
-      if (found_balancer_address) {
-        if (lb_policy_name != nullptr &&
-            strcmp(lb_policy_name, "grpclb") != 0) {
-          gpr_log(GPR_INFO,
-                  "resolver requested LB policy %s but provided at least one "
-                  "balancer address -- forcing use of grpclb LB policy",
-                  lb_policy_name);
-        }
-        lb_policy_name = "grpclb";
-      }
-    }
-    // Use pick_first if nothing was specified and we didn't select grpclb
-    // above.
-    if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
-    grpc_lb_policy_args lb_policy_args;
-    lb_policy_args.args = chand->resolver_result;
-    lb_policy_args.client_channel_factory = chand->client_channel_factory;
-    lb_policy_args.combiner = chand->combiner;
-    // Check to see if we're already using the right LB policy.
-    // Note: It's safe to use chand->info_lb_policy_name here without
-    // taking a lock on chand->info_mu, because this function is the
-    // only thing that modifies its value, and it can only be invoked
-    // once at any given time.
-    lb_policy_name_changed =
-        chand->info_lb_policy_name == nullptr ||
-        gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
-    if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
-      // Continue using the same LB policy.  Update with new addresses.
-      lb_policy_updated = true;
-      grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
-    } else {
-      // Instantiate new LB policy.
-      new_lb_policy =
-          grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
-      if (new_lb_policy == nullptr) {
-        gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
-      }
-    }
-    // Find service config.
-    channel_arg =
-        grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
-    if (channel_arg != nullptr) {
-      GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
-      service_config_json = gpr_strdup(channel_arg->value.string);
-      grpc_service_config* service_config =
-          grpc_service_config_create(service_config_json);
-      if (service_config != nullptr) {
-        channel_arg =
-            grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
-        GPR_ASSERT(channel_arg != nullptr);
+    if (chand->resolver != nullptr) {
+      // Find LB policy name.
+      const char* lb_policy_name = nullptr;
+      const grpc_arg* channel_arg = grpc_channel_args_find(
+          chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
+      if (channel_arg != nullptr) {
         GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
-        grpc_uri* uri =
-            grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
-        GPR_ASSERT(uri->path[0] != '\0');
-        service_config_parsing_state parsing_state;
-        memset(&parsing_state, 0, sizeof(parsing_state));
-        parsing_state.server_name =
-            uri->path[0] == '/' ? uri->path + 1 : uri->path;
-        grpc_service_config_parse_global_params(
-            service_config, parse_retry_throttle_params, &parsing_state);
-        grpc_uri_destroy(uri);
-        retry_throttle_data = parsing_state.retry_throttle_data;
-        method_params_table = grpc_service_config_create_method_config_table(
-            exec_ctx, service_config, method_parameters_create_from_json,
-            method_parameters_ref_wrapper, method_parameters_unref_wrapper);
-        grpc_service_config_destroy(service_config);
+        lb_policy_name = channel_arg->value.string;
       }
+      // Special case: If at least one balancer address is present, we use
+      // the grpclb policy, regardless of what the resolver actually specified.
+      channel_arg =
+          grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
+      if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
+        grpc_lb_addresses* addresses =
+            (grpc_lb_addresses*)channel_arg->value.pointer.p;
+        bool found_balancer_address = false;
+        for (size_t i = 0; i < addresses->num_addresses; ++i) {
+          if (addresses->addresses[i].is_balancer) {
+            found_balancer_address = true;
+            break;
+          }
+        }
+        if (found_balancer_address) {
+          if (lb_policy_name != nullptr &&
+              strcmp(lb_policy_name, "grpclb") != 0) {
+            gpr_log(GPR_INFO,
+                    "resolver requested LB policy %s but provided at least one "
+                    "balancer address -- forcing use of grpclb LB policy",
+                    lb_policy_name);
+          }
+          lb_policy_name = "grpclb";
+        }
+      }
+      // Use pick_first if nothing was specified and we didn't select grpclb
+      // above.
+      if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
+      grpc_lb_policy_args lb_policy_args;
+      lb_policy_args.args = chand->resolver_result;
+      lb_policy_args.client_channel_factory = chand->client_channel_factory;
+      lb_policy_args.combiner = chand->combiner;
+      // Check to see if we're already using the right LB policy.
+      // Note: It's safe to use chand->info_lb_policy_name here without
+      // taking a lock on chand->info_mu, because this function is the
+      // only thing that modifies its value, and it can only be invoked
+      // once at any given time.
+      lb_policy_name_changed =
+          chand->info_lb_policy_name == nullptr ||
+          gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
+      if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
+        // Continue using the same LB policy.  Update with new addresses.
+        lb_policy_updated = true;
+        grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy,
+                                     &lb_policy_args);
+      } else {
+        // Instantiate new LB policy.
+        new_lb_policy =
+            grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
+        if (new_lb_policy == nullptr) {
+          gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
+                  lb_policy_name);
+        } else {
+          reresolution_request_args* args =
+              (reresolution_request_args*)gpr_zalloc(sizeof(*args));
+          args->chand = chand;
+          args->lb_policy = new_lb_policy;
+          GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
+                            grpc_combiner_scheduler(chand->combiner));
+          GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
+          grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, new_lb_policy,
+                                                      &args->closure);
+        }
+      }
+      // Find service config.
+      channel_arg = grpc_channel_args_find(chand->resolver_result,
+                                           GRPC_ARG_SERVICE_CONFIG);
+      if (channel_arg != nullptr) {
+        GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+        service_config_json = gpr_strdup(channel_arg->value.string);
+        grpc_service_config* service_config =
+            grpc_service_config_create(service_config_json);
+        if (service_config != nullptr) {
+          channel_arg = grpc_channel_args_find(chand->resolver_result,
+                                               GRPC_ARG_SERVER_URI);
+          GPR_ASSERT(channel_arg != nullptr);
+          GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+          grpc_uri* uri =
+              grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
+          GPR_ASSERT(uri->path[0] != '\0');
+          service_config_parsing_state parsing_state;
+          memset(&parsing_state, 0, sizeof(parsing_state));
+          parsing_state.server_name =
+              uri->path[0] == '/' ? uri->path + 1 : uri->path;
+          grpc_service_config_parse_global_params(
+              service_config, parse_retry_throttle_params, &parsing_state);
+          grpc_uri_destroy(uri);
+          retry_throttle_data = parsing_state.retry_throttle_data;
+          method_params_table = grpc_service_config_create_method_config_table(
+              exec_ctx, service_config, method_parameters_create_from_json,
+              method_parameters_ref_wrapper, method_parameters_unref_wrapper);
+          grpc_service_config_destroy(service_config);
+        }
+      }
+      // Before we clean up, save a copy of lb_policy_name, since it might
+      // be pointing to data inside chand->resolver_result.
+      // The copy will be saved in chand->lb_policy_name below.
+      lb_policy_name_dup = gpr_strdup(lb_policy_name);
     }
-    // Before we clean up, save a copy of lb_policy_name, since it might
-    // be pointing to data inside chand->resolver_result.
-    // The copy will be saved in chand->lb_policy_name below.
-    lb_policy_name_dup = gpr_strdup(lb_policy_name);
     grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
     chand->resolver_result = nullptr;
   }
@@ -515,11 +550,11 @@
   }
   chand->method_params_table = method_params_table;
   // If we have a new LB policy or are shutting down (in which case
-  // new_lb_policy will be NULL), swap out the LB policy, unreffing the
-  // old one and removing its fds from chand->interested_parties.
-  // Note that we do NOT do this if either (a) we updated the existing
-  // LB policy above or (b) we failed to create the new LB policy (in
-  // which case we want to continue using the most recent one we had).
+  // new_lb_policy will be NULL), swap out the LB policy, unreffing the old one
+  // and removing its fds from chand->interested_parties. Note that we do NOT do
+  // this if either (a) we updated the existing LB policy above or (b) we failed
+  // to create the new LB policy (in which case we want to continue using the
+  // most recent one we had).
   if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
       chand->resolver == nullptr) {
     if (chand->lb_policy != nullptr) {
diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc
index 6276c3e..db566f1 100644
--- a/src/core/ext/filters/client_channel/lb_policy.cc
+++ b/src/core/ext/filters/client_channel/lb_policy.cc
@@ -161,3 +161,30 @@
                                   const grpc_lb_policy_args* lb_policy_args) {
   policy->vtable->update_locked(exec_ctx, policy, lb_policy_args);
 }
+
+void grpc_lb_policy_set_reresolve_closure_locked(
+    grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+    grpc_closure* request_reresolution) {
+  policy->vtable->set_reresolve_closure_locked(exec_ctx, policy,
+                                               request_reresolution);
+}
+
+void grpc_lb_policy_try_reresolve(grpc_exec_ctx* exec_ctx,
+                                  grpc_lb_policy* policy,
+                                  grpc_core::TraceFlag* grpc_lb_trace,
+                                  grpc_error* error) {
+  if (policy->request_reresolution != nullptr) {
+    GRPC_CLOSURE_SCHED(exec_ctx, policy->request_reresolution, error);
+    policy->request_reresolution = nullptr;
+    if (grpc_lb_trace->enabled()) {
+      gpr_log(GPR_DEBUG,
+              "%s %p: scheduling re-resolution closure with error=%s.",
+              grpc_lb_trace->name(), policy, grpc_error_string(error));
+    }
+  } else {
+    if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) {
+      gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.",
+              grpc_lb_trace->name(), policy);
+    }
+  }
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 72d0279..d3159ee 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -38,6 +38,8 @@
   grpc_pollset_set* interested_parties;
   /* combiner under which lb_policy actions take place */
   grpc_combiner* combiner;
+  /* callback to force a re-resolution */
+  grpc_closure* request_reresolution;
 };
 
 /** Extra arguments for an LB pick */
@@ -96,6 +98,11 @@
 
   void (*update_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
                         const grpc_lb_policy_args* args);
+
+  /** \see grpc_lb_policy_set_reresolve_closure */
+  void (*set_reresolve_closure_locked)(grpc_exec_ctx* exec_ctx,
+                                       grpc_lb_policy* policy,
+                                       grpc_closure* request_reresolution);
 };
 
 #ifndef NDEBUG
@@ -202,4 +209,16 @@
                                   grpc_lb_policy* policy,
                                   const grpc_lb_policy_args* lb_policy_args);
 
+/** Set the re-resolution closure to \a request_reresolution. */
+void grpc_lb_policy_set_reresolve_closure_locked(
+    grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+    grpc_closure* request_reresolution);
+
+/** Try to request a re-resolution. It's NOT a public API; it's only for use by
+    the LB policy implementations. */
+void grpc_lb_policy_try_reresolve(grpc_exec_ctx* exec_ctx,
+                                  grpc_lb_policy* policy,
+                                  grpc_core::TraceFlag* grpc_lb_trace,
+                                  grpc_error* error);
+
 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 2f8e0c9..db06fc2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -637,7 +637,7 @@
 
 /* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
  * immediately (ignoring its completion callback), we need to perform the
- * cleanups this callback would otherwise be resposible for.
+ * cleanups this callback would otherwise be responsible for.
  * If \a force_async is true, then we will manually schedule the
  * completion callback even if the pick is available immediately. */
 static bool pick_from_internal_rr_locked(
@@ -766,6 +766,9 @@
             glb_policy->rr_policy);
     return;
   }
+  grpc_lb_policy_set_reresolve_closure_locked(
+      exec_ctx, new_rr_policy, glb_policy->base.request_reresolution);
+  glb_policy->base.request_reresolution = nullptr;
   glb_policy->rr_policy = new_rr_policy;
   grpc_error* rr_state_error = nullptr;
   const grpc_connectivity_state rr_state =
@@ -991,6 +994,7 @@
 
 static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
   glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
+  grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   glb_policy->shutting_down = true;
 
   /* We need a copy of the lb_call pointer because we can't cancell the call
@@ -1021,6 +1025,9 @@
   glb_policy->pending_pings = nullptr;
   if (glb_policy->rr_policy != nullptr) {
     GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
+  } else {
+    grpc_lb_policy_try_reresolve(exec_ctx, pol, &grpc_lb_glb_trace,
+                                 GRPC_ERROR_CANCELLED);
   }
   // We destroy the LB channel here because
   // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@@ -1030,28 +1037,27 @@
     grpc_channel_destroy(glb_policy->lb_channel);
     glb_policy->lb_channel = nullptr;
   }
-  grpc_connectivity_state_set(
-      exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
-      GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
+  grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
+                              GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
+                              "glb_shutdown");
 
   while (pp != nullptr) {
     pending_pick* next = pp->next;
     *pp->target = nullptr;
-    GRPC_CLOSURE_SCHED(
-        exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
-        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+    GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
+                       GRPC_ERROR_REF(error));
     gpr_free(pp);
     pp = next;
   }
 
   while (pping != nullptr) {
     pending_ping* next = pping->next;
-    GRPC_CLOSURE_SCHED(
-        exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
-        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
+    GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
+                       GRPC_ERROR_REF(error));
     gpr_free(pping);
     pping = next;
   }
+  GRPC_ERROR_UNREF(error);
 }
 
 // Cancel a specific pending pick.
@@ -1754,8 +1760,8 @@
   grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
   glb_policy->fallback_backend_addresses =
       extract_backend_addresses_locked(exec_ctx, addresses);
-  if (glb_policy->started_picking && glb_policy->lb_fallback_timeout_ms > 0 &&
-      !glb_policy->fallback_timer_active) {
+  if (glb_policy->lb_fallback_timeout_ms > 0 &&
+      glb_policy->rr_policy != nullptr) {
     rr_handover_locked(exec_ctx, glb_policy);
   }
 }
@@ -1853,7 +1859,7 @@
         grpc_call_cancel(glb_policy->lb_call, nullptr);
         // lb_on_server_status_received() will pick up the cancel and reinit
         // lb_call.
-      } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
+      } else if (glb_policy->started_picking) {
         if (glb_policy->retry_timer_active) {
           grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
           glb_policy->retry_timer_active = false;
@@ -1870,6 +1876,20 @@
   }
 }
 
+static void glb_set_reresolve_closure_locked(
+    grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+    grpc_closure* request_reresolution) {
+  glb_lb_policy* glb_policy = (glb_lb_policy*)policy;
+  GPR_ASSERT(!glb_policy->shutting_down);
+  GPR_ASSERT(glb_policy->base.request_reresolution == nullptr);
+  if (glb_policy->rr_policy != nullptr) {
+    grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, glb_policy->rr_policy,
+                                                request_reresolution);
+  } else {
+    glb_policy->base.request_reresolution = request_reresolution;
+  }
+}
+
 /* Code wiring the policy with the rest of the core */
 static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
     glb_destroy,
@@ -1881,7 +1901,8 @@
     glb_exit_idle_locked,
     glb_check_connectivity_locked,
     glb_notify_on_state_change_locked,
-    glb_update_locked};
+    glb_update_locked,
+    glb_set_reresolve_closure_locked};
 
 static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx,
                                   grpc_lb_policy_factory* factory,
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 6cfc37e..228a77d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -70,8 +70,9 @@
   }
 }
 
-static void shutdown_locked(grpc_exec_ctx* exec_ctx, pick_first_lb_policy* p,
-                            grpc_error* error) {
+static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+  pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
+  grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   if (grpc_lb_pick_first_trace.enabled()) {
     gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
   }
@@ -96,14 +97,11 @@
         exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown");
     p->latest_pending_subchannel_list = nullptr;
   }
+  grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_pick_first_trace,
+                               GRPC_ERROR_CANCELLED);
   GRPC_ERROR_UNREF(error);
 }
 
-static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
-  shutdown_locked(exec_ctx, (pick_first_lb_policy*)pol,
-                  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"));
-}
-
 static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
                                   grpc_connected_subchannel** target,
                                   grpc_error* error) {
@@ -157,10 +155,15 @@
   if (p->subchannel_list != nullptr &&
       p->subchannel_list->num_subchannels > 0) {
     p->subchannel_list->checking_subchannel = 0;
-    grpc_lb_subchannel_list_ref_for_connectivity_watch(
-        p->subchannel_list, "connectivity_watch+start_picking");
-    grpc_lb_subchannel_data_start_connectivity_watch(
-        exec_ctx, &p->subchannel_list->subchannels[0]);
+    for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
+      if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
+        grpc_lb_subchannel_list_ref_for_connectivity_watch(
+            p->subchannel_list, "connectivity_watch+start_picking");
+        grpc_lb_subchannel_data_start_connectivity_watch(
+            exec_ctx, &p->subchannel_list->subchannels[i]);
+        break;
+      }
+    }
   }
 }
 
@@ -404,6 +407,9 @@
     if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
         p->latest_pending_subchannel_list != nullptr) {
       p->selected = nullptr;
+      grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
+      grpc_lb_subchannel_list_unref_for_connectivity_watch(
+          exec_ctx, sd->subchannel_list, "selected_not_ready+switch_to_update");
       grpc_lb_subchannel_list_shutdown_and_unref(
           exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update");
       p->subchannel_list = p->latest_pending_subchannel_list;
@@ -412,21 +418,35 @@
           exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
           GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
     } else {
-      if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
-        /* if the selected channel goes bad, we're done */
-        sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN;
+      // TODO(juanlishen): we re-resolve when the selected subchannel goes to
+      // TRANSIENT_FAILURE because we used to shut down in this case before
+      // re-resolution is introduced. But we need to investigate whether we
+      // really want to take any action instead of waiting for the selected
+      // subchannel reconnecting.
+      if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
+          sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+        // If the selected channel goes bad, request a re-resolution.
+        grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+                                    GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
+                                    "selected_changed+reresolve");
+        p->started_picking = false;
+        grpc_lb_policy_try_reresolve(
+            exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
+      } else {
+        grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+                                    sd->curr_connectivity_state,
+                                    GRPC_ERROR_REF(error), "selected_changed");
       }
-      grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
-                                  sd->curr_connectivity_state,
-                                  GRPC_ERROR_REF(error), "selected_changed");
       if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
         // Renew notification.
         grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
       } else {
+        p->selected = nullptr;
         grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
         grpc_lb_subchannel_list_unref_for_connectivity_watch(
             exec_ctx, sd->subchannel_list, "pf_selected_shutdown");
-        shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
+        grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
+                                                 "pf_selected_shutdown");
       }
     }
     return;
@@ -531,24 +551,37 @@
       } while (sd->subchannel == nullptr && sd != original_sd);
       if (sd == original_sd) {
         grpc_lb_subchannel_list_unref_for_connectivity_watch(
-            exec_ctx, sd->subchannel_list, "pf_candidate_shutdown");
-        shutdown_locked(exec_ctx, p,
-                        GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
-                            "Pick first exhausted channels", &error, 1));
-        break;
+            exec_ctx, sd->subchannel_list, "pf_exhausted_subchannels");
+        if (sd->subchannel_list == p->subchannel_list) {
+          grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
+                                      GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
+                                      "exhausted_subchannels+reresolve");
+          p->started_picking = false;
+          grpc_lb_policy_try_reresolve(
+              exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
+        }
+      } else {
+        if (sd->subchannel_list == p->subchannel_list) {
+          grpc_connectivity_state_set(
+              exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+              GRPC_ERROR_REF(error), "subchannel_failed");
+        }
+        // Reuses the connectivity refs from the previous watch.
+        grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
       }
-      if (sd->subchannel_list == p->subchannel_list) {
-        grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
-                                    GRPC_CHANNEL_TRANSIENT_FAILURE,
-                                    GRPC_ERROR_REF(error), "subchannel_failed");
-      }
-      // Reuses the connectivity refs from the previous watch.
-      grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
-      break;
     }
   }
 }
 
+static void pf_set_reresolve_closure_locked(
+    grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+    grpc_closure* request_reresolution) {
+  pick_first_lb_policy* p = (pick_first_lb_policy*)policy;
+  GPR_ASSERT(!p->shutdown);
+  GPR_ASSERT(policy->request_reresolution == nullptr);
+  policy->request_reresolution = request_reresolution;
+}
+
 static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
     pf_destroy,
     pf_shutdown_locked,
@@ -559,7 +592,8 @@
     pf_exit_idle_locked,
     pf_check_connectivity_locked,
     pf_notify_on_state_change_locked,
-    pf_update_locked};
+    pf_update_locked,
+    pf_set_reresolve_closure_locked};
 
 static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}
 
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 23a4cc4..f68daba 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -20,9 +20,9 @@
  *
  * Before every pick, the \a get_next_ready_subchannel_index_locked function
  * returns the p->subchannel_list->subchannels index for next subchannel,
- * respecting the relative
- * order of the addresses provided upon creation or updates. Note however that
- * updates will start picking from the beginning of the updated list. */
+ * respecting the relative order of the addresses provided upon creation or
+ * updates. Note however that updates will start picking from the beginning of
+ * the updated list. */
 
 #include <string.h>
 
@@ -167,8 +167,9 @@
   gpr_free(p);
 }
 
-static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p,
-                            grpc_error* error) {
+static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
+  round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
+  grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
   if (grpc_lb_round_robin_trace.enabled()) {
     gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
   }
@@ -194,15 +195,11 @@
         "sl_shutdown_pending_rr_shutdown");
     p->latest_pending_subchannel_list = nullptr;
   }
+  grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
+                               GRPC_ERROR_CANCELLED);
   GRPC_ERROR_UNREF(error);
 }
 
-static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
-  round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
-  shutdown_locked(exec_ctx, p,
-                  GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
-}
-
 static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
                                   grpc_connected_subchannel** target,
                                   grpc_error* error) {
@@ -255,10 +252,12 @@
                                  round_robin_lb_policy* p) {
   p->started_picking = true;
   for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
-    grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
-                                                       "connectivity_watch");
-    grpc_lb_subchannel_data_start_connectivity_watch(
-        exec_ctx, &p->subchannel_list->subchannels[i]);
+    if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
+      grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
+                                                         "connectivity_watch");
+      grpc_lb_subchannel_data_start_connectivity_watch(
+          exec_ctx, &p->subchannel_list->subchannels[i]);
+    }
   }
 }
 
@@ -346,71 +345,71 @@
 }
 
 /** Sets the policy's connectivity status based on that of the passed-in \a sd
- * (the grpc_lb_subchannel_data associted with the updated subchannel) and the
- * subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
- * used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
- * connectivity status set. */
-static grpc_connectivity_state update_lb_connectivity_status_locked(
-    grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd, grpc_error* error) {
+ * (the grpc_lb_subchannel_data associated with the updated subchannel) and the
+ * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
+ * only if the policy transitions to state TRANSIENT_FAILURE. */
+static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx,
+                                                 grpc_lb_subchannel_data* sd,
+                                                 grpc_error* error) {
   /* In priority order. The first rule to match terminates the search (ie, if we
    * are on rule n, all previous rules were unfulfilled).
    *
    * 1) RULE: ANY subchannel is READY => policy is READY.
-   *    CHECK: At least one subchannel is ready iff p->ready_list is NOT empty.
+   *    CHECK: subchannel_list->num_ready > 0.
    *
    * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
    *    CHECK: sd->curr_connectivity_state == CONNECTING.
    *
-   * 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
-   *    CHECK: p->subchannel_list->num_shutdown ==
-   *           p->subchannel_list->num_subchannels.
+   * 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests
+   *          re-resolution).
+   *    CHECK: subchannel_list->num_shutdown ==
+   *           subchannel_list->num_subchannels.
    *
    * 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
-   *    TRANSIENT_FAILURE.
-   *    CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels.
+   *          TRANSIENT_FAILURE.
+   *    CHECK: subchannel_list->num_transient_failures ==
+   *           subchannel_list->num_subchannels.
    *
    * 5) RULE: ALL subchannels are IDLE => policy is IDLE.
-   *    CHECK: p->num_idle == p->subchannel_list->num_subchannels.
+   *    CHECK: subchannel_list->num_idle == subchannel_list->num_subchannels.
+   *    (Note that all the subchannels will transition from IDLE to CONNECTING
+   *    in batch when we start trying to connect.)
    */
-  grpc_connectivity_state new_state = sd->curr_connectivity_state;
+  // TODO(juanlishen): if the subchannel states are mixed by {SHUTDOWN,
+  // TRANSIENT_FAILURE}, we don't change the state. We may want to improve on
+  // this.
   grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
   round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy;
-  if (subchannel_list->num_ready > 0) { /* 1) READY */
+  if (subchannel_list->num_ready > 0) {
+    /* 1) READY */
     grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
                                 GRPC_ERROR_NONE, "rr_ready");
-    new_state = GRPC_CHANNEL_READY;
-  } else if (sd->curr_connectivity_state ==
-             GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
+  } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
+    /* 2) CONNECTING */
     grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
                                 GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
                                 "rr_connecting");
-    new_state = GRPC_CHANNEL_CONNECTING;
-  } else if (p->subchannel_list->num_shutdown ==
-             p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
-    grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
-                                GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
-                                "rr_shutdown");
-    p->shutdown = true;
-    new_state = GRPC_CHANNEL_SHUTDOWN;
-    if (grpc_lb_round_robin_trace.enabled()) {
-      gpr_log(GPR_INFO,
-              "[RR %p] Shutting down: all subchannels have gone into shutdown",
-              (void*)p);
-    }
+  } else if (subchannel_list->num_shutdown ==
+             subchannel_list->num_subchannels) {
+    /* 3) IDLE and re-resolve */
+    grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
+                                GRPC_ERROR_NONE,
+                                "rr_exhausted_subchannels+reresolve");
+    p->started_picking = false;
+    grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
+                                 GRPC_ERROR_NONE);
   } else if (subchannel_list->num_transient_failures ==
-             p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
+             subchannel_list->num_subchannels) {
+    /* 4) TRANSIENT_FAILURE */
     grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
                                 GRPC_CHANNEL_TRANSIENT_FAILURE,
                                 GRPC_ERROR_REF(error), "rr_transient_failure");
-    new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
-  } else if (subchannel_list->num_idle ==
-             p->subchannel_list->num_subchannels) { /* 5) IDLE */
+  } else if (subchannel_list->num_idle == subchannel_list->num_subchannels) {
+    /* 5) IDLE */
     grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
                                 GRPC_ERROR_NONE, "rr_idle");
-    new_state = GRPC_CHANNEL_IDLE;
   }
   GRPC_ERROR_UNREF(error);
-  return new_state;
 }
 
 static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
@@ -454,21 +453,16 @@
   // state (which was set by the connectivity state watcher) to
   // curr_connectivity_state, which is what we use inside of the combiner.
   sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
-  // Update state counters and determine new overall state.
+  // Update state counters and new overall state.
   update_state_counters_locked(sd);
-  const grpc_connectivity_state new_policy_connectivity_state =
-      update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
-  // If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
-  // policy's state is SHUTDOWN, clean up.
+  update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
+  // If the sd's new state is SHUTDOWN, unref the subchannel.
   if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
     grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
     grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
                                              "rr_connectivity_shutdown");
     grpc_lb_subchannel_list_unref_for_connectivity_watch(
         exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown");
-    if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
-      shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
-    }
   } else {  // sd not in SHUTDOWN
     if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
       if (sd->connected_subchannel == nullptr) {
@@ -504,7 +498,7 @@
       }
       /* at this point we know there's at least one suitable subchannel. Go
        * ahead and pick one and notify the pending suitors in
-       * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
+       * p->pending_picks. This preemptively replicates rr_pick()'s actions. */
       const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
       GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
       grpc_lb_subchannel_data* selected =
@@ -642,6 +636,15 @@
   }
 }
 
+static void rr_set_reresolve_closure_locked(
+    grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
+    grpc_closure* request_reresolution) {
+  round_robin_lb_policy* p = (round_robin_lb_policy*)policy;
+  GPR_ASSERT(!p->shutdown);
+  GPR_ASSERT(policy->request_reresolution == nullptr);
+  policy->request_reresolution = request_reresolution;
+}
+
 static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
     rr_destroy,
     rr_shutdown_locked,
@@ -652,7 +655,8 @@
     rr_exit_idle_locked,
     rr_check_connectivity_locked,
     rr_notify_on_state_change_locked,
-    rr_update_locked};
+    rr_update_locked,
+    rr_set_reresolve_closure_locked};
 
 static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}
 
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc
index 77cc313..819f66a 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc
@@ -117,11 +117,35 @@
   } else {
     grpc_endpoint_delete_from_pollset_set(exec_ctx, args->endpoint,
                                           c->args.interested_parties);
-    c->result->transport =
-        grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1);
+    c->result->transport = grpc_create_chttp2_transport(exec_ctx, args->args,
+                                                        args->endpoint, true);
     GPR_ASSERT(c->result->transport);
+    // TODO(roth): We ideally want to wait until we receive HTTP/2
+    // settings from the server before we consider the connection
+    // established.  If that doesn't happen before the connection
+    // timeout expires, then we should consider the connection attempt a
+    // failure and feed that information back into the backoff code.
+    // We could pass a notify_on_receive_settings callback to
+    // grpc_chttp2_transport_start_reading() to let us know when
+    // settings are received, but we would need to figure out how to use
+    // that information here.
+    //
+    // Unfortunately, we don't currently have a way to split apart the two
+    // effects of scheduling c->notify: we start sending RPCs immediately
+    // (which we want to do) and we consider the connection attempt successful
+    // (which we don't want to do until we get the notify_on_receive_settings
+    // callback from the transport).  If we could split those things
+    // apart, then we could start sending RPCs but then wait for our
+    // timeout before deciding if the connection attempt is successful.
+    // If the attempt is not successful, then we would tear down the
+    // transport and feed the failure back into the backoff code.
+    //
+    // In addition, even if we did that, we would probably not want to do
+    // so until after transparent retries is implemented.  Otherwise, any
+    // RPC that we attempt to send on the connection before the timeout
+    // would fail instead of being retried on a subsequent attempt.
     grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
-                                        args->read_buffer);
+                                        args->read_buffer, nullptr);
     c->result->channel_args = args->args;
   }
   grpc_closure* notify = c->notify;
@@ -141,8 +165,9 @@
   grpc_endpoint_add_to_pollset_set(exec_ctx, c->endpoint,
                                    c->args.interested_parties);
   grpc_handshake_manager_do_handshake(
-      exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
-      c->args.deadline, nullptr /* acceptor */, on_handshake_done, c);
+      exec_ctx, c->handshake_mgr, c->args.interested_parties, c->endpoint,
+      c->args.channel_args, c->args.deadline, nullptr /* acceptor */,
+      on_handshake_done, c);
   c->endpoint = nullptr;  // Endpoint handed off to handshake manager.
 }
 
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index e748d28..c6b149d 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -53,12 +53,12 @@
       &exec_ctx, grpc_fd_create(fd, "client"), args, "fd-client");
 
   grpc_transport* transport =
-      grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1);
+      grpc_create_chttp2_transport(&exec_ctx, final_args, client, true);
   GPR_ASSERT(transport);
   grpc_channel* channel = grpc_channel_create(
       &exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
   grpc_channel_args_destroy(&exec_ctx, final_args);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
 
   grpc_exec_ctx_finish(&exec_ctx);
 
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc
index 93be5e4..49ee677 100644
--- a/src/core/ext/transport/chttp2/server/chttp2_server.cc
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc
@@ -21,6 +21,7 @@
 #include <grpc/grpc.h>
 
 #include <inttypes.h>
+#include <limits.h>
 #include <string.h>
 
 #include <grpc/support/alloc.h>
@@ -31,6 +32,7 @@
 
 #include "src/core/ext/filters/http/server/http_server_filter.h"
 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/ext/transport/chttp2/transport/internal.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/channel/handshaker.h"
 #include "src/core/lib/channel/handshaker_registry.h"
@@ -53,12 +55,52 @@
 } server_state;
 
 typedef struct {
+  gpr_refcount refs;
   server_state* svr_state;
   grpc_pollset* accepting_pollset;
   grpc_tcp_server_acceptor* acceptor;
   grpc_handshake_manager* handshake_mgr;
+  // State for enforcing handshake timeout on receiving HTTP/2 settings.
+  grpc_chttp2_transport* transport;
+  grpc_millis deadline;
+  grpc_timer timer;
+  grpc_closure on_timeout;
+  grpc_closure on_receive_settings;
 } server_connection_state;
 
+static void server_connection_state_unref(
+    grpc_exec_ctx* exec_ctx, server_connection_state* connection_state) {
+  if (gpr_unref(&connection_state->refs)) {
+    if (connection_state->transport != nullptr) {
+      GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, connection_state->transport,
+                                  "receive settings timeout");
+    }
+    gpr_free(connection_state);
+  }
+}
+
+static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+  server_connection_state* connection_state = (server_connection_state*)arg;
+  // Note that we may be called with GRPC_ERROR_NONE when the timer fires
+  // or with an error indicating that the timer system is being shut down.
+  if (error != GRPC_ERROR_CANCELLED) {
+    grpc_transport_op* op = grpc_make_transport_op(nullptr);
+    op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "Did not receive HTTP/2 settings before handshake timeout");
+    grpc_transport_perform_op(exec_ctx, &connection_state->transport->base, op);
+  }
+  server_connection_state_unref(exec_ctx, connection_state);
+}
+
+static void on_receive_settings(grpc_exec_ctx* exec_ctx, void* arg,
+                                grpc_error* error) {
+  server_connection_state* connection_state = (server_connection_state*)arg;
+  if (error == GRPC_ERROR_NONE) {
+    grpc_timer_cancel(exec_ctx, &connection_state->timer);
+  }
+  server_connection_state_unref(exec_ctx, connection_state);
+}
+
 static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
                               grpc_error* error) {
   grpc_handshaker_args* args = (grpc_handshaker_args*)arg;
@@ -68,7 +110,6 @@
   if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) {
     const char* error_str = grpc_error_string(error);
     gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
-
     if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
       // We were shut down after handshaking completed successfully, so
       // destroy the endpoint here.
@@ -87,14 +128,30 @@
     // handshaker may have handed off the connection to some external
     // code, so we can just clean up here without creating a transport.
     if (args->endpoint != nullptr) {
-      grpc_transport* transport =
-          grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0);
+      grpc_transport* transport = grpc_create_chttp2_transport(
+          exec_ctx, args->args, args->endpoint, false);
       grpc_server_setup_transport(
           exec_ctx, connection_state->svr_state->server, transport,
           connection_state->accepting_pollset, args->args);
-      grpc_chttp2_transport_start_reading(exec_ctx, transport,
-                                          args->read_buffer);
+      // Use notify_on_receive_settings callback to enforce the
+      // handshake deadline.
+      connection_state->transport = (grpc_chttp2_transport*)transport;
+      gpr_ref(&connection_state->refs);
+      GRPC_CLOSURE_INIT(&connection_state->on_receive_settings,
+                        on_receive_settings, connection_state,
+                        grpc_schedule_on_exec_ctx);
+      grpc_chttp2_transport_start_reading(
+          exec_ctx, transport, args->read_buffer,
+          &connection_state->on_receive_settings);
       grpc_channel_args_destroy(exec_ctx, args->args);
+      gpr_ref(&connection_state->refs);
+      GRPC_CHTTP2_REF_TRANSPORT((grpc_chttp2_transport*)transport,
+                                "receive settings timeout");
+      GRPC_CLOSURE_INIT(&connection_state->on_timeout, on_timeout,
+                        connection_state, grpc_schedule_on_exec_ctx);
+      grpc_timer_init(exec_ctx, &connection_state->timer,
+                      connection_state->deadline,
+                      &connection_state->on_timeout);
     }
   }
   grpc_handshake_manager_pending_list_remove(
@@ -102,9 +159,9 @@
       connection_state->handshake_mgr);
   gpr_mu_unlock(&connection_state->svr_state->mu);
   grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
-  grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server);
   gpr_free(connection_state->acceptor);
-  gpr_free(connection_state);
+  grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server);
+  server_connection_state_unref(exec_ctx, connection_state);
 }
 
 static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp,
@@ -125,20 +182,25 @@
   gpr_mu_unlock(&state->mu);
   grpc_tcp_server_ref(state->tcp_server);
   server_connection_state* connection_state =
-      (server_connection_state*)gpr_malloc(sizeof(*connection_state));
+      (server_connection_state*)gpr_zalloc(sizeof(*connection_state));
+  gpr_ref_init(&connection_state->refs, 1);
   connection_state->svr_state = state;
   connection_state->accepting_pollset = accepting_pollset;
   connection_state->acceptor = acceptor;
   connection_state->handshake_mgr = handshake_mgr;
   grpc_handshakers_add(exec_ctx, HANDSHAKER_SERVER, state->args,
                        connection_state->handshake_mgr);
-  // TODO(roth): We should really get this timeout value from channel
-  // args instead of hard-coding it.
-  const grpc_millis deadline =
-      grpc_exec_ctx_now(exec_ctx) + 120 * GPR_MS_PER_SEC;
+  const grpc_arg* timeout_arg =
+      grpc_channel_args_find(state->args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
+  connection_state->deadline =
+      grpc_exec_ctx_now(exec_ctx) +
+      grpc_channel_arg_get_integer(timeout_arg,
+                                   {120 * GPR_MS_PER_SEC, 1, INT_MAX});
   grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr,
-                                      tcp, state->args, deadline, acceptor,
-                                      on_handshake_done, connection_state);
+                                      nullptr /* interested_parties */, tcp,
+                                      state->args, connection_state->deadline,
+                                      acceptor, on_handshake_done,
+                                      connection_state);
 }
 
 /* Server callback: start listening on our ports */
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
index 007d1be..3fe05ce 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
@@ -50,7 +50,7 @@
 
   const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
   grpc_transport* transport = grpc_create_chttp2_transport(
-      &exec_ctx, server_args, server_endpoint, 0 /* is_client */);
+      &exec_ctx, server_args, server_endpoint, false /* is_client */);
 
   grpc_pollset** pollsets;
   size_t num_pollsets = 0;
@@ -62,7 +62,7 @@
 
   grpc_server_setup_transport(&exec_ctx, server, transport, nullptr,
                               server_args);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 5a7830b..63ac65a 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -652,6 +652,11 @@
     GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
     grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
   }
+  if (t->notify_on_receive_settings != nullptr) {
+    GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings,
+                       GRPC_ERROR_CANCELLED);
+    t->notify_on_receive_settings = nullptr;
+  }
   GRPC_ERROR_UNREF(error);
 }
 
@@ -1791,7 +1796,6 @@
   grpc_transport_op* op = (grpc_transport_op*)stream_op;
   grpc_chttp2_transport* t =
       (grpc_chttp2_transport*)op->handler_private.extra_arg;
-  grpc_error* close_transport = op->disconnect_with_error;
 
   if (op->goaway_error) {
     send_goaway(exec_ctx, t, op->goaway_error);
@@ -1823,8 +1827,8 @@
         op->on_connectivity_state_change);
   }
 
-  if (close_transport != GRPC_ERROR_NONE) {
-    close_transport_locked(exec_ctx, t, close_transport);
+  if (op->disconnect_with_error != GRPC_ERROR_NONE) {
+    close_transport_locked(exec_ctx, t, op->disconnect_with_error);
   }
 
   GRPC_CLOSURE_RUN(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
@@ -3232,16 +3236,16 @@
 
 grpc_transport* grpc_create_chttp2_transport(
     grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args,
-    grpc_endpoint* ep, int is_client) {
+    grpc_endpoint* ep, bool is_client) {
   grpc_chttp2_transport* t =
       (grpc_chttp2_transport*)gpr_zalloc(sizeof(grpc_chttp2_transport));
-  init_transport(exec_ctx, t, channel_args, ep, is_client != 0);
+  init_transport(exec_ctx, t, channel_args, ep, is_client);
   return &t->base;
 }
 
-void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx,
-                                         grpc_transport* transport,
-                                         grpc_slice_buffer* read_buffer) {
+void grpc_chttp2_transport_start_reading(
+    grpc_exec_ctx* exec_ctx, grpc_transport* transport,
+    grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings) {
   grpc_chttp2_transport* t = (grpc_chttp2_transport*)transport;
   GRPC_CHTTP2_REF_TRANSPORT(
       t, "reading_action"); /* matches unref inside reading_action */
@@ -3249,5 +3253,6 @@
     grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
     gpr_free(read_buffer);
   }
+  t->notify_on_receive_settings = notify_on_receive_settings;
   GRPC_CLOSURE_SCHED(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE);
 }
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
index 369dc34..bd72e07 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
@@ -29,12 +29,14 @@
 
 grpc_transport* grpc_create_chttp2_transport(
     grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args,
-    grpc_endpoint* ep, int is_client);
+    grpc_endpoint* ep, bool is_client);
 
 /// Takes ownership of \a read_buffer, which (if non-NULL) contains
 /// leftover bytes previously read from the endpoint (e.g., by handshakers).
-void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx,
-                                         grpc_transport* transport,
-                                         grpc_slice_buffer* read_buffer);
+/// If non-null, \a notify_on_receive_settings will be scheduled when
+/// HTTP/2 settings are received from the peer.
+void grpc_chttp2_transport_start_reading(
+    grpc_exec_ctx* exec_ctx, grpc_transport* transport,
+    grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings);
 
 #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc
index 75bb78d..de4340f 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.cc
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc
@@ -131,6 +131,11 @@
             memcpy(parser->target_settings, parser->incoming_settings,
                    GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
             grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
+            if (t->notify_on_receive_settings != nullptr) {
+              GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings,
+                                 GRPC_ERROR_NONE);
+              t->notify_on_receive_settings = nullptr;
+            }
           }
           return GRPC_ERROR_NONE;
         }
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 9404213..f6fd679 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -241,6 +241,8 @@
 
   grpc_combiner* combiner;
 
+  grpc_closure* notify_on_receive_settings;
+
   /** write execution state of the transport */
   grpc_chttp2_write_state write_state;
   /** is this the first write in a series of writes?
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 2579060..d8d753e 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -458,6 +458,14 @@
     } else {
       err = GRPC_ERROR_REF(error);
     }
+    if (s->recv_initial_md_op->payload->recv_initial_metadata
+            .trailing_metadata_available != nullptr) {
+      // Set to true unconditionally, because we're failing the call, so even
+      // if we haven't actually seen the send_trailing_metadata op from the
+      // other side, we're going to return trailing metadata anyway.
+      *s->recv_initial_md_op->payload->recv_initial_metadata
+           .trailing_metadata_available = true;
+    }
     INPROC_LOG(GPR_DEBUG,
                "fail_helper %p scheduling initial-metadata-ready %p %p", s,
                error, err);
@@ -670,6 +678,12 @@
           nullptr);
       s->recv_initial_md_op->payload->recv_initial_metadata
           .recv_initial_metadata->deadline = s->deadline;
+      if (s->recv_initial_md_op->payload->recv_initial_metadata
+              .trailing_metadata_available != nullptr) {
+        *s->recv_initial_md_op->payload->recv_initial_metadata
+             .trailing_metadata_available =
+            (other != nullptr && other->send_trailing_md_op != nullptr);
+      }
       grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
       s->to_read_initial_md_filled = false;
       INPROC_LOG(GPR_DEBUG,
@@ -995,6 +1009,15 @@
     if (error != GRPC_ERROR_NONE) {
       // Schedule op's closures that we didn't push to op state machine
       if (op->recv_initial_metadata) {
+        if (op->payload->recv_initial_metadata.trailing_metadata_available !=
+            nullptr) {
+          // Set to true unconditionally, because we're failing the call, so
+          // even if we haven't actually seen the send_trailing_metadata op
+          // from the other side, we're going to return trailing metadata
+          // anyway.
+          *op->payload->recv_initial_metadata.trailing_metadata_available =
+              true;
+        }
         INPROC_LOG(
             GPR_DEBUG,
             "perform_stream_op error %p scheduling initial-metadata-ready %p",
diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc
index aae1b35..58c30b1 100644
--- a/src/core/lib/channel/handshaker.cc
+++ b/src/core/lib/channel/handshaker.cc
@@ -231,14 +231,16 @@
 
 void grpc_handshake_manager_do_handshake(
     grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
-    grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
-    grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
-    grpc_iomgr_cb_func on_handshake_done, void* user_data) {
+    grpc_pollset_set* interested_parties, grpc_endpoint* endpoint,
+    const grpc_channel_args* channel_args, grpc_millis deadline,
+    grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done,
+    void* user_data) {
   gpr_mu_lock(&mgr->mu);
   GPR_ASSERT(mgr->index == 0);
   GPR_ASSERT(!mgr->shutdown);
   // Construct handshaker args.  These will be passed through all
   // handshakers and eventually be freed by the on_handshake_done callback.
+  mgr->args.interested_parties = interested_parties;
   mgr->args.endpoint = endpoint;
   mgr->args.args = grpc_channel_args_copy(channel_args);
   mgr->args.user_data = user_data;
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
index 09b4a27..8e4114d 100644
--- a/src/core/lib/channel/handshaker.h
+++ b/src/core/lib/channel/handshaker.h
@@ -54,6 +54,7 @@
 /// For the on_handshake_done callback, all members are input arguments,
 /// which the callback takes ownership of.
 typedef struct {
+  grpc_pollset_set* interested_parties;
   grpc_endpoint* endpoint;
   grpc_channel_args* args;
   grpc_slice_buffer* read_buffer;
@@ -131,11 +132,13 @@
                                      grpc_error* why);
 
 /// Invokes handshakers in the order they were added.
+/// \a interested_parties may be non-nullptr to provide a pollset_set that
+/// may be used during handshaking. Ownership is not taken.
 /// Takes ownership of \a endpoint, and then passes that ownership to
 /// the \a on_handshake_done callback.
 /// Does NOT take ownership of \a channel_args.  Instead, makes a copy before
 /// invoking the first handshaker.
-/// \a acceptor will be NULL for client-side handshakers.
+/// \a acceptor will be nullptr for client-side handshakers.
 ///
 /// When done, invokes \a on_handshake_done with a grpc_handshaker_args
 /// object as its argument.  If the callback is invoked with error !=
@@ -144,9 +147,10 @@
 /// the arguments.
 void grpc_handshake_manager_do_handshake(
     grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
-    grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
-    grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
-    grpc_iomgr_cb_func on_handshake_done, void* user_data);
+    grpc_pollset_set* interested_parties, grpc_endpoint* endpoint,
+    const grpc_channel_args* channel_args, grpc_millis deadline,
+    grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done,
+    void* user_data);
 
 /// Add \a mgr to the server side list of all pending handshake managers, the
 /// list starts with \a *head.
diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc
index d25fba5..dfcaee7 100644
--- a/src/core/lib/http/httpcli_security_connector.cc
+++ b/src/core/lib/http/httpcli_security_connector.cc
@@ -191,8 +191,9 @@
   c->handshake_mgr = grpc_handshake_manager_create();
   grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, &args, c->handshake_mgr);
   grpc_handshake_manager_do_handshake(
-      exec_ctx, c->handshake_mgr, tcp, nullptr /* channel_args */, deadline,
-      nullptr /* acceptor */, on_handshake_done, c /* user_data */);
+      exec_ctx, c->handshake_mgr, nullptr /* interested_parties */, tcp,
+      nullptr /* channel_args */, deadline, nullptr /* acceptor */,
+      on_handshake_done, c /* user_data */);
   GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &sc->base, "httpcli");
 }
 
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index 72443cc..5139760 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -55,7 +55,7 @@
   if (fgets(buf, sizeof buf, fp)) {
     char* end;
     long i = strtol(buf, &end, 10);
-    if (i > 0 && i <= INT_MAX && end && *end == 0) {
+    if (i > 0 && i <= INT_MAX && end && *end == '\n') {
       n = (int)i;
     }
   }
diff --git a/src/core/lib/iomgr/tcp_server_uv.cc b/src/core/lib/iomgr/tcp_server_uv.cc
index 2c76fae..ffadf0b 100644
--- a/src/core/lib/iomgr/tcp_server_uv.cc
+++ b/src/core/lib/iomgr/tcp_server_uv.cc
@@ -260,15 +260,36 @@
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
-static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle,
-                                        const grpc_resolved_address* addr,
-                                        unsigned port_index,
-                                        grpc_tcp_listener** listener) {
+static grpc_error* add_addr_to_server(grpc_tcp_server* s,
+                                      const grpc_resolved_address* addr,
+                                      unsigned port_index,
+                                      grpc_tcp_listener** listener) {
   grpc_tcp_listener* sp = NULL;
   int port = -1;
   int status;
   grpc_error* error;
   grpc_resolved_address sockname_temp;
+  uv_tcp_t* handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
+  int family = grpc_sockaddr_get_family(addr);
+
+  status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
+#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
+  if (family == AF_INET || family == AF_INET6) {
+    int fd;
+    uv_fileno((uv_handle_t*)handle, &fd);
+    int enable = 1;
+    setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
+  }
+#endif /* GPR_LINUX && SO_REUSEPORT */
+
+  if (status != 0) {
+    error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "Failed to initialize UV tcp handle");
+    error =
+        grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+                           grpc_slice_from_static_string(uv_strerror(status)));
+    return error;
+  }
 
   // The last argument to uv_tcp_bind is flags
   status = uv_tcp_bind(handle, (struct sockaddr*)addr->addr, 0);
@@ -325,20 +346,48 @@
   return GRPC_ERROR_NONE;
 }
 
+static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s,
+                                                unsigned port_index,
+                                                int requested_port,
+                                                grpc_tcp_listener** listener) {
+  grpc_resolved_address wild4;
+  grpc_resolved_address wild6;
+  grpc_tcp_listener* sp = nullptr;
+  grpc_tcp_listener* sp2 = nullptr;
+  grpc_error* v6_err = GRPC_ERROR_NONE;
+  grpc_error* v4_err = GRPC_ERROR_NONE;
+
+  grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
+  /* Try listening on IPv6 first. */
+  if ((v6_err = add_addr_to_server(s, &wild6, port_index, &sp)) ==
+      GRPC_ERROR_NONE) {
+    *listener = sp;
+    return GRPC_ERROR_NONE;
+  }
+
+  if ((v4_err = add_addr_to_server(s, &wild4, port_index, &sp2)) ==
+      GRPC_ERROR_NONE) {
+    *listener = sp2;
+    return GRPC_ERROR_NONE;
+  }
+
+  grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+      "Failed to add any wildcard listeners");
+  root_err = grpc_error_add_child(root_err, v6_err);
+  root_err = grpc_error_add_child(root_err, v4_err);
+  return root_err;
+}
+
 grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
                                      const grpc_resolved_address* addr,
                                      int* port) {
   // This function is mostly copied from tcp_server_windows.c
   grpc_tcp_listener* sp = NULL;
-  uv_tcp_t* handle;
   grpc_resolved_address addr6_v4mapped;
-  grpc_resolved_address wildcard;
   grpc_resolved_address* allocated_addr = NULL;
   grpc_resolved_address sockname_temp;
   unsigned port_index = 0;
-  int status;
   grpc_error* error = GRPC_ERROR_NONE;
-  int family;
 
   GRPC_UV_ASSERT_SAME_THREAD();
 
@@ -367,38 +416,15 @@
     }
   }
 
-  if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
-    addr = &addr6_v4mapped;
-  }
-
   /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
   if (grpc_sockaddr_is_wildcard(addr, port)) {
-    grpc_sockaddr_make_wildcard6(*port, &wildcard);
-
-    addr = &wildcard;
-  }
-
-  handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
-
-  family = grpc_sockaddr_get_family(addr);
-  status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
-#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
-  if (family == AF_INET || family == AF_INET6) {
-    int fd;
-    uv_fileno((uv_handle_t*)handle, &fd);
-    int enable = 1;
-    setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
-  }
-#endif /* GPR_LINUX && SO_REUSEPORT */
-
-  if (status == 0) {
-    error = add_socket_to_server(s, handle, addr, port_index, &sp);
+    error = add_wildcard_addrs_to_server(s, port_index, *port, &sp);
   } else {
-    error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-        "Failed to initialize UV tcp handle");
-    error =
-        grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
-                           grpc_slice_from_static_string(uv_strerror(status)));
+    if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
+      addr = &addr6_v4mapped;
+    }
+
+    error = add_addr_to_server(s, addr, port_index, &sp);
   }
 
   gpr_free(allocated_addr);
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 68ab935..7b7d694 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -47,6 +47,7 @@
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/iomgr/error.h"
 #include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/executor.h"
 #include "src/core/lib/iomgr/resolve_address.h"
 #include "src/core/lib/iomgr/sockaddr.h"
 #include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -71,14 +72,22 @@
   grpc_udp_server_read_cb read_cb;
   grpc_udp_server_write_cb write_cb;
   grpc_udp_server_orphan_cb orphan_cb;
+  // To be scheduled on another thread to actually read/write.
+  grpc_closure do_read_closure;
+  grpc_closure do_write_closure;
+  grpc_closure notify_on_write_closure;
   // True if orphan_cb is trigered.
   bool orphan_notified;
+  // True if grpc_fd_notify_on_write() is called after on_write() call.
+  bool notify_on_write_armed;
+  // True if fd has been shutdown.
+  bool already_shutdown;
 
   struct grpc_udp_listener* next;
 };
 
 struct shutdown_fd_args {
-  grpc_fd* fd;
+  grpc_udp_listener* sp;
   gpr_mu* server_mu;
 };
 
@@ -144,8 +153,17 @@
 static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args,
                         grpc_error* error) {
   struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args;
+  grpc_udp_listener* sp = shutdown_args->sp;
+  gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd);
   gpr_mu_lock(shutdown_args->server_mu);
-  grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error));
+  grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_REF(error));
+  sp->already_shutdown = true;
+  if (!sp->notify_on_write_armed) {
+    // Re-arm write notification to notify listener with error. This is
+    // necessary to decrement active_ports.
+    sp->notify_on_write_armed = true;
+    grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+  }
   gpr_mu_unlock(shutdown_args->server_mu);
   gpr_free(shutdown_args);
 }
@@ -161,6 +179,7 @@
 
   gpr_mu_destroy(&s->mu);
 
+  gpr_log(GPR_DEBUG, "Destroy all listeners.");
   while (s->head) {
     grpc_udp_listener* sp = s->head;
     s->head = sp->next;
@@ -207,9 +226,10 @@
         /* Call the orphan_cb to signal that the FD is about to be closed and
          * should no longer be used. Because at this point, all listening ports
          * have been shutdown already, no need to shutdown again.*/
-        GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
+        GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp,
                           grpc_schedule_on_exec_ctx);
         GPR_ASSERT(sp->orphan_cb);
+        gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd);
         sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
                       sp->server->user_data);
       }
@@ -233,13 +253,14 @@
 
   s->shutdown_complete = on_done;
 
+  gpr_log(GPR_DEBUG, "start to destroy udp_server");
   /* shutdown all fd's */
   if (s->active_ports) {
     for (sp = s->head; sp; sp = sp->next) {
       GPR_ASSERT(sp->orphan_cb);
       struct shutdown_fd_args* args =
           (struct shutdown_fd_args*)gpr_malloc(sizeof(*args));
-      args->fd = sp->emfd;
+      args->sp = sp;
       args->server_mu = &s->mu;
       GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
                         grpc_schedule_on_exec_ctx);
@@ -329,6 +350,28 @@
   return -1;
 }
 
+static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+  grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+  GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE);
+  /* TODO: the reason we hold server->mu here is merely to prevent fd
+   * shutdown while we are reading. However, it blocks do_write(). Switch to
+   * read lock if available. */
+  gpr_mu_lock(&sp->server->mu);
+  /* Tell the registered callback that data is available to read. */
+  if (!sp->already_shutdown &&
+      sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) {
+    /* There maybe more packets to read. Schedule read_more_cb_ closure to run
+     * after finishing this event loop. */
+    GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE);
+  } else {
+    /* Finish reading all the packets, re-arm the notification event so we can
+     * get another chance to read. Or fd already shutdown, re-arm to get a
+     * notification with shutdown error. */
+    grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+  }
+  gpr_mu_unlock(&sp->server->mu);
+}
+
 /* event manager callback when reads are ready */
 static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
   grpc_udp_listener* sp = (grpc_udp_listener*)arg;
@@ -343,13 +386,51 @@
     }
     return;
   }
-
-  /* Tell the registered callback that data is available to read. */
+  /* Read once. If there is more data to read, off load the work to another
+   * thread to finish. */
   GPR_ASSERT(sp->read_cb);
-  sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data);
+  if (sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) {
+    /* There maybe more packets to read. Schedule read_more_cb_ closure to run
+     * after finishing this event loop. */
+    GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
+                      grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
+    GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE);
+  } else {
+    /* Finish reading all the packets, re-arm the notification event so we can
+     * get another chance to read. Or fd already shutdown, re-arm to get a
+     * notification with shutdown error. */
+    grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+  }
+  gpr_mu_unlock(&sp->server->mu);
+}
 
-  /* Re-arm the notification event so we get another chance to read. */
-  grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
+// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
+void fd_notify_on_write_wrapper(grpc_exec_ctx* exec_ctx, void* arg,
+                                grpc_error* error) {
+  grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+  gpr_mu_lock(&sp->server->mu);
+  if (!sp->notify_on_write_armed) {
+    grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+    sp->notify_on_write_armed = true;
+  }
+  gpr_mu_unlock(&sp->server->mu);
+}
+
+static void do_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+  grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
+  gpr_mu_lock(&(sp->server->mu));
+  if (sp->already_shutdown) {
+    // If fd has been shutdown, don't write any more and re-arm notification.
+    grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+  } else {
+    sp->notify_on_write_armed = false;
+    /* Tell the registered callback that the socket is writeable. */
+    GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE);
+    GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper,
+                      arg, grpc_schedule_on_exec_ctx);
+    sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data,
+                 &sp->notify_on_write_closure);
+  }
   gpr_mu_unlock(&sp->server->mu);
 }
 
@@ -367,12 +448,11 @@
     return;
   }
 
-  /* Tell the registered callback that the socket is writeable. */
-  GPR_ASSERT(sp->write_cb);
-  sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data);
+  /* Schedule actual write in another thread. */
+  GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg,
+                    grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
 
-  /* Re-arm the notification event so we get another chance to write. */
-  grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
+  GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_write_closure, GRPC_ERROR_NONE);
   gpr_mu_unlock(&sp->server->mu);
 }
 
@@ -409,6 +489,7 @@
     sp->write_cb = write_cb;
     sp->orphan_cb = orphan_cb;
     sp->orphan_notified = false;
+    sp->already_shutdown = false;
     GPR_ASSERT(sp->emfd);
     gpr_mu_unlock(&s->mu);
     gpr_free(name);
@@ -533,6 +614,7 @@
 
     GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp,
                       grpc_schedule_on_exec_ctx);
+    sp->notify_on_write_armed = true;
     grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
 
     /* Registered for both read and write callbacks: increment active_ports
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index 8cc08d3..1bd6922 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -30,13 +30,16 @@
 /* Forward decl of grpc_udp_server */
 typedef struct grpc_udp_server grpc_udp_server;
 
-/* Called when data is available to read from the socket. */
-typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
+/* Called when data is available to read from the socket.
+ * Return true if there is more data to read from fd. */
+typedef bool (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
                                         void* user_data);
 
-/* Called when the socket is writeable. */
+/* Called when the socket is writeable. The given closure should be scheduled
+ * when the socket becomes blocked next time. */
 typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
-                                         void* user_data);
+                                         void* user_data,
+                                         grpc_closure* notify_on_write_closure);
 
 /* Called when the grpc_fd is about to be orphaned (and the FD closed). */
 typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx* exec_ctx,
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index af48f13..b3cf04c 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -327,9 +327,6 @@
 void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status,
                            grpc_slice debug_data);
 
-/* Close a transport. Aborts all open streams. */
-void grpc_transport_close(grpc_transport* transport);
-
 /* Destroy the transport */
 void grpc_transport_destroy(grpc_exec_ctx* exec_ctx, grpc_transport* transport);
 
diff --git a/src/csharp/generate_proto_csharp.sh b/src/csharp/generate_proto_csharp.sh
index 1a1adbb..299dc3f 100755
--- a/src/csharp/generate_proto_csharp.sh
+++ b/src/csharp/generate_proto_csharp.sh
@@ -33,6 +33,11 @@
 $PROTOC --plugin=$PLUGIN --csharp_out=$REFLECTION_DIR --grpc_out=$REFLECTION_DIR \
     -I src/proto src/proto/grpc/reflection/v1alpha/reflection.proto
 
+# Put grp/core/stats.proto in a subdirectory to avoid collision with grpc/testing/stats.proto
+mkdir -p $TESTING_DIR/CoreStats
+$PROTOC --plugin=$PLUGIN --csharp_out=$TESTING_DIR/CoreStats --grpc_out=$TESTING_DIR/CoreStats \
+    -I src/proto src/proto/grpc/core/stats.proto
+
 # TODO(jtattermusch): following .proto files are a bit broken and import paths
 # don't match the package names. Setting -I to the correct value src/proto
 # breaks the code generation.
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 558ce42..5647725 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -348,26 +348,25 @@
 class ChannelCredentials(object):
     """An encapsulation of the data required to create a secure Channel.
 
-  This class has no supported interface - it exists to define the type of its
-  instances and its instances exist to be passed to other functions. For
-  example, ssl_channel_credentials returns an instance, and secure_channel
-  consumes an instance of this class.
-  """
+    This class has no supported interface - it exists to define the type of its
+    instances and its instances exist to be passed to other functions. For
+    example, ssl_channel_credentials returns an instance of this class and
+    secure_channel requires an instance of this class.
+    """
 
     def __init__(self, credentials):
         self._credentials = credentials
 
 
 class CallCredentials(object):
-    """An encapsulation of the data required to assert an identity over a
-       channel.
+    """An encapsulation of the data required to assert an identity over a call.
 
-  A CallCredentials may be composed with ChannelCredentials to always assert
-  identity for every call over that Channel.
+    A CallCredentials may be composed with ChannelCredentials to always assert
+    identity for every call over that Channel.
 
-  This class has no supported interface - it exists to define the type of its
-  instances and its instances exist to be passed to other functions.
-  """
+    This class has no supported interface - it exists to define the type of its
+    instances and its instances exist to be passed to other functions.
+    """
 
     def __init__(self, credentials):
         self._credentials = credentials
@@ -376,23 +375,22 @@
 class AuthMetadataContext(six.with_metaclass(abc.ABCMeta)):
     """Provides information to call credentials metadata plugins.
 
-  Attributes:
-    service_url: A string URL of the service being called into.
-    method_name: A string of the fully qualified method name being called.
-  """
+    Attributes:
+      service_url: A string URL of the service being called into.
+      method_name: A string of the fully qualified method name being called.
+    """
 
 
 class AuthMetadataPluginCallback(six.with_metaclass(abc.ABCMeta)):
     """Callback object received by a metadata plugin."""
 
     def __call__(self, metadata, error):
-        """Inform the gRPC runtime of the metadata to construct a
-           CallCredentials.
+        """Passes to the gRPC runtime authentication metadata for an RPC.
 
-    Args:
-      metadata: The :term:`metadata` used to construct the CallCredentials.
-      error: An Exception to indicate error or None to indicate success.
-    """
+        Args:
+          metadata: The :term:`metadata` used to construct the CallCredentials.
+          error: An Exception to indicate error or None to indicate success.
+        """
         raise NotImplementedError()
 
 
@@ -402,14 +400,14 @@
     def __call__(self, context, callback):
         """Implements authentication by passing metadata to a callback.
 
-    Implementations of this method must not block.
+        Implementations of this method must not block.
 
-    Args:
-      context: An AuthMetadataContext providing information on the RPC that the
-        plugin is being called to authenticate.
-      callback: An AuthMetadataPluginCallback to be invoked either synchronously
-        or asynchronously.
-    """
+        Args:
+          context: An AuthMetadataContext providing information on the RPC that
+            the plugin is being called to authenticate.
+          callback: An AuthMetadataPluginCallback to be invoked either
+            synchronously or asynchronously.
+        """
         raise NotImplementedError()
 
 
@@ -1138,99 +1136,86 @@
                             certificate_chain=None):
     """Creates a ChannelCredentials for use with an SSL-enabled Channel.
 
-  Args:
-    root_certificates: The PEM-encoded root certificates as a byte string,
-    or None to retrieve them from a default location chosen by gRPC runtime.
-    private_key: The PEM-encoded private key as a byte string, or None if no
-    private key should be used.
-    certificate_chain: The PEM-encoded certificate chain as a byte string
-    to use or or None if no certificate chain should be used.
+    Args:
+      root_certificates: The PEM-encoded root certificates as a byte string,
+        or None to retrieve them from a default location chosen by gRPC
+        runtime.
+      private_key: The PEM-encoded private key as a byte string, or None if no
+        private key should be used.
+      certificate_chain: The PEM-encoded certificate chain as a byte string
+        to use or or None if no certificate chain should be used.
 
-  Returns:
-    A ChannelCredentials for use with an SSL-enabled Channel.
-  """
-    if private_key is not None or certificate_chain is not None:
-        pair = _cygrpc.SslPemKeyCertPair(private_key, certificate_chain)
-    else:
-        pair = None
+    Returns:
+      A ChannelCredentials for use with an SSL-enabled Channel.
+    """
     return ChannelCredentials(
-        _cygrpc.channel_credentials_ssl(root_certificates, pair))
+        _cygrpc.SSLChannelCredentials(root_certificates, private_key,
+                                      certificate_chain))
 
 
 def metadata_call_credentials(metadata_plugin, name=None):
     """Construct CallCredentials from an AuthMetadataPlugin.
 
-  Args:
-    metadata_plugin: An AuthMetadataPlugin to use for authentication.
-    name: An optional name for the plugin.
+    Args:
+      metadata_plugin: An AuthMetadataPlugin to use for authentication.
+      name: An optional name for the plugin.
 
-  Returns:
-    A CallCredentials.
-  """
+    Returns:
+      A CallCredentials.
+    """
     from grpc import _plugin_wrapping  # pylint: disable=cyclic-import
-    if name is None:
-        try:
-            effective_name = metadata_plugin.__name__
-        except AttributeError:
-            effective_name = metadata_plugin.__class__.__name__
-    else:
-        effective_name = name
-    return CallCredentials(
-        _plugin_wrapping.call_credentials_metadata_plugin(metadata_plugin,
-                                                          effective_name))
+    return _plugin_wrapping.metadata_plugin_call_credentials(metadata_plugin,
+                                                             name)
 
 
 def access_token_call_credentials(access_token):
     """Construct CallCredentials from an access token.
 
-  Args:
-    access_token: A string to place directly in the http request
-      authorization header, for example
-      "authorization: Bearer <access_token>".
+    Args:
+      access_token: A string to place directly in the http request
+        authorization header, for example
+        "authorization: Bearer <access_token>".
 
-  Returns:
-    A CallCredentials.
-  """
+    Returns:
+      A CallCredentials.
+    """
     from grpc import _auth  # pylint: disable=cyclic-import
-    return metadata_call_credentials(
-        _auth.AccessTokenCallCredentials(access_token))
+    from grpc import _plugin_wrapping  # pylint: disable=cyclic-import
+    return _plugin_wrapping.metadata_plugin_call_credentials(
+        _auth.AccessTokenAuthMetadataPlugin(access_token), None)
 
 
 def composite_call_credentials(*call_credentials):
     """Compose multiple CallCredentials to make a new CallCredentials.
 
-  Args:
-    *call_credentials: At least two CallCredentials objects.
+    Args:
+      *call_credentials: At least two CallCredentials objects.
 
-  Returns:
-    A CallCredentials object composed of the given CallCredentials objects.
-  """
-    from grpc import _credential_composition  # pylint: disable=cyclic-import
-    cygrpc_call_credentials = tuple(
-        single_call_credentials._credentials
-        for single_call_credentials in call_credentials)
+    Returns:
+      A CallCredentials object composed of the given CallCredentials objects.
+    """
     return CallCredentials(
-        _credential_composition.call(cygrpc_call_credentials))
+        _cygrpc.CompositeCallCredentials(
+            tuple(single_call_credentials._credentials
+                  for single_call_credentials in call_credentials)))
 
 
 def composite_channel_credentials(channel_credentials, *call_credentials):
     """Compose a ChannelCredentials and one or more CallCredentials objects.
 
-  Args:
-    channel_credentials: A ChannelCredentials object.
-    *call_credentials: One or more CallCredentials objects.
+    Args:
+      channel_credentials: A ChannelCredentials object.
+      *call_credentials: One or more CallCredentials objects.
 
-  Returns:
-    A ChannelCredentials composed of the given ChannelCredentials and
-    CallCredentials objects.
-  """
-    from grpc import _credential_composition  # pylint: disable=cyclic-import
-    cygrpc_call_credentials = tuple(
-        single_call_credentials._credentials
-        for single_call_credentials in call_credentials)
+    Returns:
+      A ChannelCredentials composed of the given ChannelCredentials and
+        CallCredentials objects.
+    """
     return ChannelCredentials(
-        _credential_composition.channel(channel_credentials._credentials,
-                                        cygrpc_call_credentials))
+        _cygrpc.CompositeChannelCredentials(
+            tuple(single_call_credentials._credentials
+                  for single_call_credentials in call_credentials),
+            channel_credentials._credentials))
 
 
 def ssl_server_credentials(private_key_certificate_chain_pairs,
diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py
index c6542d0..9a339b5 100644
--- a/src/python/grpcio/grpc/_auth.py
+++ b/src/python/grpcio/grpc/_auth.py
@@ -63,7 +63,7 @@
         self._pool.shutdown(wait=False)
 
 
-class AccessTokenCallCredentials(grpc.AuthMetadataPlugin):
+class AccessTokenAuthMetadataPlugin(grpc.AuthMetadataPlugin):
     """Metadata wrapper for raw access token credentials."""
 
     def __init__(self, access_token):
diff --git a/src/python/grpcio/grpc/_credential_composition.py b/src/python/grpcio/grpc/_credential_composition.py
deleted file mode 100644
index f652cc3..0000000
--- a/src/python/grpcio/grpc/_credential_composition.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# Copyright 2016 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from grpc._cython import cygrpc
-
-
-def _call(call_credentialses):
-    call_credentials_iterator = iter(call_credentialses)
-    composition = next(call_credentials_iterator)
-    for additional_call_credentials in call_credentials_iterator:
-        composition = cygrpc.call_credentials_composite(
-            composition, additional_call_credentials)
-    return composition
-
-
-def call(call_credentialses):
-    return _call(call_credentialses)
-
-
-def channel(channel_credentials, call_credentialses):
-    return cygrpc.channel_credentials_composite(channel_credentials,
-                                                _call(call_credentialses))
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 752fb33..6b3a276 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -72,13 +72,12 @@
         result = grpc_call_cancel(self.c_call, NULL)
       return result
 
-  def set_credentials(
-      self, CallCredentials call_credentials not None):
-    cdef grpc_call_error result
-    with nogil:
-      result = grpc_call_set_credentials(
-          self.c_call, call_credentials.c_credentials)
-    return result
+  def set_credentials(self, CallCredentials call_credentials not None):
+    cdef grpc_call_credentials *c_call_credentials = call_credentials.c()
+    cdef grpc_call_error call_error = grpc_call_set_credentials(
+        self.c_call, c_call_credentials)
+    grpc_call_credentials_release(c_call_credentials)
+    return call_error
 
   def peer(self):
     cdef char *peer = NULL
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index aeabdba..4c397f8 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -33,10 +33,10 @@
         self.c_channel = grpc_insecure_channel_create(c_target, c_arguments,
                                                       NULL)
     else:
-      with nogil:
-        self.c_channel = grpc_secure_channel_create(
-            channel_credentials.c_credentials, c_target, c_arguments, NULL)
-      self.references.append(channel_credentials)
+      c_channel_credentials = channel_credentials.c()
+      self.c_channel = grpc_secure_channel_create(
+          c_channel_credentials, c_target, c_arguments, NULL)
+      grpc_channel_credentials_release(c_channel_credentials)
     self.references.append(target)
     self.references.append(arguments)
 
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
index bc0f185..7e9ea33 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
@@ -12,20 +12,66 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-cimport cpython
+
+cdef class CallCredentials:
+
+  cdef grpc_call_credentials *c(self)
+
+  # TODO(https://github.com/grpc/grpc/issues/12531): remove.
+  cdef grpc_call_credentials *c_credentials
+
+
+cdef int _get_metadata(
+    void *state, grpc_auth_metadata_context context,
+    grpc_credentials_plugin_metadata_cb cb, void *user_data,
+    grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
+    size_t *num_creds_md, grpc_status_code *status,
+    const char **error_details) with gil
+
+cdef void _destroy(void *state) with gil
+
+
+cdef class MetadataPluginCallCredentials(CallCredentials):
+
+  cdef readonly object _metadata_plugin
+  cdef readonly bytes _name
+
+  cdef grpc_call_credentials *c(self)
+
+
+cdef grpc_call_credentials *_composition(call_credentialses)
+
+
+cdef class CompositeCallCredentials(CallCredentials):
+
+  cdef readonly tuple _call_credentialses
+
+  cdef grpc_call_credentials *c(self)
 
 
 cdef class ChannelCredentials:
 
+  cdef grpc_channel_credentials *c(self)
+
+  # TODO(https://github.com/grpc/grpc/issues/12531): remove.
   cdef grpc_channel_credentials *c_credentials
-  cdef grpc_ssl_pem_key_cert_pair c_ssl_pem_key_cert_pair
-  cdef list references
 
 
-cdef class CallCredentials:
+cdef class SSLChannelCredentials(ChannelCredentials):
 
-  cdef grpc_call_credentials *c_credentials
-  cdef list references
+  cdef readonly object _pem_root_certificates
+  cdef readonly object _private_key
+  cdef readonly object _certificate_chain
+
+  cdef grpc_channel_credentials *c(self)
+
+
+cdef class CompositeChannelCredentials(ChannelCredentials):
+
+  cdef readonly tuple _call_credentialses
+  cdef readonly ChannelCredentials _channel_credentials
+
+  cdef grpc_channel_credentials *c(self)
 
 
 cdef class ServerCertificateConfig:
@@ -49,27 +95,3 @@
   cdef object cert_config_fetcher
   # whether C-core has asked for the initial_cert_config
   cdef bint initial_cert_config_fetched
-
-
-cdef class CredentialsMetadataPlugin:
-
-  cdef object plugin_callback
-  cdef bytes plugin_name
-
-
-cdef grpc_metadata_credentials_plugin _c_plugin(CredentialsMetadataPlugin plugin)
-
-
-cdef class AuthMetadataContext:
-
-  cdef grpc_auth_metadata_context context
-
-
-cdef int plugin_get_metadata(
-    void *state, grpc_auth_metadata_context context,
-    grpc_credentials_plugin_metadata_cb cb, void *user_data,
-    grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
-    size_t *num_creds_md, grpc_status_code *status,
-    const char **error_details) with gil
-
-cdef void plugin_destroy_c_plugin_state(void *state) with gil
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index ee1328d..246a271 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -16,47 +16,123 @@
 
 import grpc
 import threading
-import traceback
-
-
-cdef class ChannelCredentials:
-
-  def __cinit__(self):
-    grpc_init()
-    self.c_credentials = NULL
-    self.c_ssl_pem_key_cert_pair.private_key = NULL
-    self.c_ssl_pem_key_cert_pair.certificate_chain = NULL
-    self.references = []
-
-  # The object *can* be invalid in Python if we fail to make the credentials
-  # (and the core thus returns NULL credentials). Used primarily for debugging.
-  @property
-  def is_valid(self):
-    return self.c_credentials != NULL
-
-  def __dealloc__(self):
-    if self.c_credentials != NULL:
-      grpc_channel_credentials_release(self.c_credentials)
-    grpc_shutdown()
 
 
 cdef class CallCredentials:
 
-  def __cinit__(self):
-    grpc_init()
-    self.c_credentials = NULL
-    self.references = []
+  cdef grpc_call_credentials *c(self):
+    raise NotImplementedError()
 
-  # The object *can* be invalid in Python if we fail to make the credentials
-  # (and the core thus returns NULL credentials). Used primarily for debugging.
-  @property
-  def is_valid(self):
-    return self.c_credentials != NULL
 
-  def __dealloc__(self):
-    if self.c_credentials != NULL:
-      grpc_call_credentials_release(self.c_credentials)
-    grpc_shutdown()
+cdef int _get_metadata(
+    void *state, grpc_auth_metadata_context context,
+    grpc_credentials_plugin_metadata_cb cb, void *user_data,
+    grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
+    size_t *num_creds_md, grpc_status_code *status,
+    const char **error_details) with gil:
+  def callback(Metadata metadata, grpc_status_code status, bytes error_details):
+    if status is StatusCode.ok:
+      cb(user_data, metadata.c_metadata, metadata.c_count, status, NULL)
+    else:
+      cb(user_data, NULL, 0, status, error_details)
+  args = context.service_url, context.method_name, callback,
+  threading.Thread(target=<object>state, args=args).start()
+  return 0  # Asynchronous return
+
+
+cdef void _destroy(void *state) with gil:
+  cpython.Py_DECREF(<object>state)
+
+
+cdef class MetadataPluginCallCredentials(CallCredentials):
+
+  def __cinit__(self, metadata_plugin, name):
+    self._metadata_plugin = metadata_plugin
+    self._name = name
+
+  cdef grpc_call_credentials *c(self):
+    cdef grpc_metadata_credentials_plugin c_metadata_plugin
+    c_metadata_plugin.get_metadata = _get_metadata
+    c_metadata_plugin.destroy = _destroy
+    c_metadata_plugin.state = <void *>self._metadata_plugin
+    c_metadata_plugin.type = self._name
+    cpython.Py_INCREF(self._metadata_plugin)
+    return grpc_metadata_credentials_create_from_plugin(c_metadata_plugin, NULL)
+
+
+cdef grpc_call_credentials *_composition(call_credentialses):
+  call_credentials_iterator = iter(call_credentialses)
+  cdef CallCredentials composition = next(call_credentials_iterator)
+  cdef grpc_call_credentials *c_composition = composition.c()
+  cdef CallCredentials additional_call_credentials
+  cdef grpc_call_credentials *c_additional_call_credentials
+  cdef grpc_call_credentials *c_next_composition
+  for additional_call_credentials in call_credentials_iterator:
+    c_additional_call_credentials = additional_call_credentials.c()
+    c_next_composition = grpc_composite_call_credentials_create(
+        c_composition, c_additional_call_credentials, NULL)
+    grpc_call_credentials_release(c_composition)
+    grpc_call_credentials_release(c_additional_call_credentials)
+    c_composition = c_next_composition
+  return c_composition
+
+
+cdef class CompositeCallCredentials(CallCredentials):
+
+  def __cinit__(self, call_credentialses):
+    self._call_credentialses = call_credentialses
+
+  cdef grpc_call_credentials *c(self):
+    return _composition(self._call_credentialses)
+
+
+cdef class ChannelCredentials:
+
+  cdef grpc_channel_credentials *c(self):
+    raise NotImplementedError()
+
+
+cdef class SSLChannelCredentials(ChannelCredentials):
+
+  def __cinit__(self, pem_root_certificates, private_key, certificate_chain):
+    self._pem_root_certificates = pem_root_certificates
+    self._private_key = private_key
+    self._certificate_chain = certificate_chain
+
+  cdef grpc_channel_credentials *c(self):
+    cdef const char *c_pem_root_certificates
+    cdef grpc_ssl_pem_key_cert_pair c_pem_key_certificate_pair
+    if self._pem_root_certificates is None:
+      c_pem_root_certificates = NULL
+    else:
+      c_pem_root_certificates = self._pem_root_certificates
+    if self._private_key is None and self._certificate_chain is None:
+      return grpc_ssl_credentials_create(
+          c_pem_root_certificates, NULL, NULL)
+    else:
+      c_pem_key_certificate_pair.private_key = self._private_key
+      c_pem_key_certificate_pair.certificate_chain = self._certificate_chain
+      return grpc_ssl_credentials_create(
+          c_pem_root_certificates, &c_pem_key_certificate_pair, NULL)
+
+
+cdef class CompositeChannelCredentials(ChannelCredentials):
+
+  def __cinit__(self, call_credentialses, channel_credentials):
+    self._call_credentialses = call_credentialses
+    self._channel_credentials = channel_credentials
+
+  cdef grpc_channel_credentials *c(self):
+    cdef grpc_channel_credentials *c_channel_credentials
+    c_channel_credentials = self._channel_credentials.c()
+    cdef grpc_call_credentials *c_call_credentials_composition = _composition(
+        self._call_credentialses)
+    cdef grpc_channel_credentials *composition
+    c_composition = grpc_composite_channel_credentials_create(
+        c_channel_credentials, c_call_credentials_composition, NULL)
+    grpc_channel_credentials_release(c_channel_credentials)
+    grpc_call_credentials_release(c_call_credentials_composition)
+    return c_composition
 
 
 cdef class ServerCertificateConfig:
@@ -89,190 +165,6 @@
       grpc_server_credentials_release(self.c_credentials)
     grpc_shutdown()
 
-
-cdef class CredentialsMetadataPlugin:
-
-  def __cinit__(self, object plugin_callback, bytes name):
-    """
-    Args:
-      plugin_callback (callable): Callback accepting a service URL (str/bytes)
-        and callback object (accepting a MetadataArray,
-        grpc_status_code, and a str/bytes error message). This argument
-        when called should be non-blocking and eventually call the callback
-        object with the appropriate status code/details and metadata (if
-        successful).
-      name (bytes): Plugin name.
-    """
-    grpc_init()
-    if not callable(plugin_callback):
-      raise ValueError('expected callable plugin_callback')
-    self.plugin_callback = plugin_callback
-    self.plugin_name = name
-
-  def __dealloc__(self):
-    grpc_shutdown()
-
-
-cdef grpc_metadata_credentials_plugin _c_plugin(CredentialsMetadataPlugin plugin):
-  cdef grpc_metadata_credentials_plugin c_plugin
-  c_plugin.get_metadata = plugin_get_metadata
-  c_plugin.destroy = plugin_destroy_c_plugin_state
-  c_plugin.state = <void *>plugin
-  c_plugin.type = plugin.plugin_name
-  cpython.Py_INCREF(plugin)
-  return c_plugin
-
-
-cdef class AuthMetadataContext:
-
-  def __cinit__(self):
-    grpc_init()
-    self.context.service_url = NULL
-    self.context.method_name = NULL
-
-  @property
-  def service_url(self):
-    return self.context.service_url
-
-  @property
-  def method_name(self):
-    return self.context.method_name
-
-  def __dealloc__(self):
-    grpc_shutdown()
-
-
-cdef int plugin_get_metadata(
-    void *state, grpc_auth_metadata_context context,
-    grpc_credentials_plugin_metadata_cb cb, void *user_data,
-    grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
-    size_t *num_creds_md, grpc_status_code *status,
-    const char **error_details) with gil:
-  called_flag = [False]
-  def python_callback(
-      Metadata metadata, grpc_status_code status,
-      bytes error_details):
-    cb(user_data, metadata.c_metadata, metadata.c_count, status, error_details)
-    called_flag[0] = True
-  cdef CredentialsMetadataPlugin self = <CredentialsMetadataPlugin>state
-  cdef AuthMetadataContext cy_context = AuthMetadataContext()
-  cy_context.context = context
-  def async_callback():
-    try:
-      self.plugin_callback(cy_context, python_callback)
-    except Exception as error:
-      if not called_flag[0]:
-        cb(user_data, NULL, 0, StatusCode.unknown,
-           traceback.format_exc().encode())
-  threading.Thread(group=None, target=async_callback).start()
-  return 0  # Asynchronous return
-
-cdef void plugin_destroy_c_plugin_state(void *state) with gil:
-  cpython.Py_DECREF(<CredentialsMetadataPlugin>state)
-
-def channel_credentials_google_default():
-  cdef ChannelCredentials credentials = ChannelCredentials();
-  with nogil:
-    credentials.c_credentials = grpc_google_default_credentials_create()
-  return credentials
-
-def channel_credentials_ssl(pem_root_certificates,
-                            SslPemKeyCertPair ssl_pem_key_cert_pair):
-  pem_root_certificates = str_to_bytes(pem_root_certificates)
-  cdef ChannelCredentials credentials = ChannelCredentials()
-  cdef const char *c_pem_root_certificates = NULL
-  if pem_root_certificates is not None:
-    c_pem_root_certificates = pem_root_certificates
-    credentials.references.append(pem_root_certificates)
-  if ssl_pem_key_cert_pair is not None:
-    with nogil:
-      credentials.c_credentials = grpc_ssl_credentials_create(
-          c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL)
-    credentials.references.append(ssl_pem_key_cert_pair)
-  else:
-    with nogil:
-      credentials.c_credentials = grpc_ssl_credentials_create(
-        c_pem_root_certificates, NULL, NULL)
-  return credentials
-
-def channel_credentials_composite(
-    ChannelCredentials credentials_1 not None,
-    CallCredentials credentials_2 not None):
-  if not credentials_1.is_valid or not credentials_2.is_valid:
-    raise ValueError("passed credentials must both be valid")
-  cdef ChannelCredentials credentials = ChannelCredentials()
-  with nogil:
-    credentials.c_credentials = grpc_composite_channel_credentials_create(
-        credentials_1.c_credentials, credentials_2.c_credentials, NULL)
-  credentials.references.append(credentials_1)
-  credentials.references.append(credentials_2)
-  return credentials
-
-def call_credentials_composite(
-    CallCredentials credentials_1 not None,
-    CallCredentials credentials_2 not None):
-  if not credentials_1.is_valid or not credentials_2.is_valid:
-    raise ValueError("passed credentials must both be valid")
-  cdef CallCredentials credentials = CallCredentials()
-  with nogil:
-    credentials.c_credentials = grpc_composite_call_credentials_create(
-        credentials_1.c_credentials, credentials_2.c_credentials, NULL)
-  credentials.references.append(credentials_1)
-  credentials.references.append(credentials_2)
-  return credentials
-
-def call_credentials_google_compute_engine():
-  cdef CallCredentials credentials = CallCredentials()
-  with nogil:
-    credentials.c_credentials = (
-        grpc_google_compute_engine_credentials_create(NULL))
-  return credentials
-
-def call_credentials_service_account_jwt_access(
-    json_key, Timespec token_lifetime not None):
-  json_key = str_to_bytes(json_key)
-  cdef CallCredentials credentials = CallCredentials()
-  cdef char *json_key_c_string = json_key
-  with nogil:
-    credentials.c_credentials = (
-        grpc_service_account_jwt_access_credentials_create(
-            json_key_c_string, token_lifetime.c_time, NULL))
-  credentials.references.append(json_key)
-  return credentials
-
-def call_credentials_google_refresh_token(json_refresh_token):
-  json_refresh_token = str_to_bytes(json_refresh_token)
-  cdef CallCredentials credentials = CallCredentials()
-  cdef char *json_refresh_token_c_string = json_refresh_token
-  with nogil:
-    credentials.c_credentials = grpc_google_refresh_token_credentials_create(
-        json_refresh_token_c_string, NULL)
-  credentials.references.append(json_refresh_token)
-  return credentials
-
-def call_credentials_google_iam(authorization_token, authority_selector):
-  authorization_token = str_to_bytes(authorization_token)
-  authority_selector = str_to_bytes(authority_selector)
-  cdef CallCredentials credentials = CallCredentials()
-  cdef char *authorization_token_c_string = authorization_token
-  cdef char *authority_selector_c_string = authority_selector
-  with nogil:
-    credentials.c_credentials = grpc_google_iam_credentials_create(
-        authorization_token_c_string, authority_selector_c_string, NULL)
-  credentials.references.append(authorization_token)
-  credentials.references.append(authority_selector)
-  return credentials
-
-def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin):
-  cdef CallCredentials credentials = CallCredentials()
-  cdef grpc_metadata_credentials_plugin c_plugin = _c_plugin(plugin)
-  with nogil:
-    credentials.c_credentials = (
-        grpc_metadata_credentials_create_from_plugin(c_plugin, NULL))
-  # TODO(atash): the following held reference is *probably* never necessary
-  credentials.references.append(plugin)
-  return credentials
-
 cdef const char* _get_c_pem_root_certs(pem_root_certs):
   if pem_root_certs is None:
     return NULL
diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py
index bea2c0f..cd17f4a 100644
--- a/src/python/grpcio/grpc/_plugin_wrapping.py
+++ b/src/python/grpcio/grpc/_plugin_wrapping.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import collections
+import logging
 import threading
 
 import grpc
@@ -20,89 +21,79 @@
 from grpc._cython import cygrpc
 
 
-class AuthMetadataContext(
+class _AuthMetadataContext(
         collections.namedtuple('AuthMetadataContext', (
             'service_url', 'method_name',)), grpc.AuthMetadataContext):
     pass
 
 
-class AuthMetadataPluginCallback(grpc.AuthMetadataContext):
+class _CallbackState(object):
 
-    def __init__(self, callback):
+    def __init__(self):
+        self.lock = threading.Lock()
+        self.called = False
+        self.exception = None
+
+
+class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback):
+
+    def __init__(self, state, callback):
+        self._state = state
         self._callback = callback
 
     def __call__(self, metadata, error):
-        self._callback(metadata, error)
-
-
-class _WrappedCygrpcCallback(object):
-
-    def __init__(self, cygrpc_callback):
-        self.is_called = False
-        self.error = None
-        self.is_called_lock = threading.Lock()
-        self.cygrpc_callback = cygrpc_callback
-
-    def _invoke_failure(self, error):
-        # TODO(atash) translate different Exception superclasses into different
-        # status codes.
-        self.cygrpc_callback(_common.EMPTY_METADATA, cygrpc.StatusCode.internal,
-                             _common.encode(str(error)))
-
-    def _invoke_success(self, metadata):
-        try:
-            cygrpc_metadata = _common.to_cygrpc_metadata(metadata)
-        except Exception as exception:  # pylint: disable=broad-except
-            self._invoke_failure(exception)
-            return
-        self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, b'')
-
-    def __call__(self, metadata, error):
-        with self.is_called_lock:
-            if self.is_called:
-                raise RuntimeError('callback should only ever be invoked once')
-            if self.error:
-                self._invoke_failure(self.error)
-                return
-            self.is_called = True
+        with self._state.lock:
+            if self._state.exception is None:
+                if self._state.called:
+                    raise RuntimeError(
+                        'AuthMetadataPluginCallback invoked more than once!')
+                else:
+                    self._state.called = True
+            else:
+                raise RuntimeError(
+                    'AuthMetadataPluginCallback raised exception "{}"!'.format(
+                        self._state.exception))
         if error is None:
-            self._invoke_success(metadata)
+            self._callback(
+                _common.to_cygrpc_metadata(metadata), cygrpc.StatusCode.ok,
+                None)
         else:
-            self._invoke_failure(error)
-
-    def notify_failure(self, error):
-        with self.is_called_lock:
-            if not self.is_called:
-                self.error = error
+            self._callback(None, cygrpc.StatusCode.internal,
+                           _common.encode(str(error)))
 
 
-class _WrappedPlugin(object):
+class _Plugin(object):
 
-    def __init__(self, plugin):
-        self.plugin = plugin
+    def __init__(self, metadata_plugin):
+        self._metadata_plugin = metadata_plugin
 
-    def __call__(self, context, cygrpc_callback):
-        wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback)
-        wrapped_context = AuthMetadataContext(
-            _common.decode(context.service_url),
-            _common.decode(context.method_name))
+    def __call__(self, service_url, method_name, callback):
+        context = _AuthMetadataContext(
+            _common.decode(service_url), _common.decode(method_name))
+        callback_state = _CallbackState()
         try:
-            self.plugin(wrapped_context,
-                        AuthMetadataPluginCallback(wrapped_cygrpc_callback))
-        except Exception as error:
-            wrapped_cygrpc_callback.notify_failure(error)
-            raise
+            self._metadata_plugin(
+                context, _AuthMetadataPluginCallback(callback_state, callback))
+        except Exception as exception:  # pylint: disable=broad-except
+            logging.exception(
+                'AuthMetadataPluginCallback "%s" raised exception!',
+                self._metadata_plugin)
+            with callback_state.lock:
+                callback_state.exception = exception
+                if callback_state.called:
+                    return
+            callback(None, cygrpc.StatusCode.internal,
+                     _common.encode(str(exception)))
 
 
-def call_credentials_metadata_plugin(plugin, name):
-    """
-  Args:
-    plugin: A callable accepting a grpc.AuthMetadataContext
-      object and a callback (itself accepting a list of metadata key/value
-      2-tuples and a None-able exception value). The callback must be eventually
-      called, but need not be called in plugin's invocation.
-      plugin's invocation must be non-blocking.
-  """
-    return cygrpc.call_credentials_metadata_plugin(
-        cygrpc.CredentialsMetadataPlugin(
-            _WrappedPlugin(plugin), _common.encode(name)))
+def metadata_plugin_call_credentials(metadata_plugin, name):
+    if name is None:
+        try:
+            effective_name = metadata_plugin.__name__
+        except AttributeError:
+            effective_name = metadata_plugin.__class__.__name__
+    else:
+        effective_name = name
+    return grpc.CallCredentials(
+        cygrpc.MetadataPluginCallCredentials(
+            _Plugin(metadata_plugin), _common.encode(effective_name)))
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index cd59b07..5b4812b 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -374,10 +374,10 @@
     context = _Context(rpc_event, state, request_deserializer)
     try:
         return behavior(argument, context), True
-    except Exception as e:  # pylint: disable=broad-except
+    except Exception as exception:  # pylint: disable=broad-except
         with state.condition:
-            if e not in state.rpc_errors:
-                details = 'Exception calling application: {}'.format(e)
+            if exception not in state.rpc_errors:
+                details = 'Exception calling application: {}'.format(exception)
                 logging.exception(details)
                 _abort(state, rpc_event.operation_call,
                        cygrpc.StatusCode.unknown, _common.encode(details))
@@ -389,10 +389,10 @@
         return next(response_iterator), True
     except StopIteration:
         return None, True
-    except Exception as e:  # pylint: disable=broad-except
+    except Exception as exception:  # pylint: disable=broad-except
         with state.condition:
-            if e not in state.rpc_errors:
-                details = 'Exception iterating responses: {}'.format(e)
+            if exception not in state.rpc_errors:
+                details = 'Exception iterating responses: {}'.format(exception)
                 logging.exception(details)
                 _abort(state, rpc_event.operation_call,
                        cygrpc.StatusCode.unknown, _common.encode(details))
@@ -591,7 +591,13 @@
     if not rpc_event.success:
         return None, None
     if rpc_event.request_call_details.method is not None:
-        method_handler = _find_method_handler(rpc_event, generic_handlers)
+        try:
+            method_handler = _find_method_handler(rpc_event, generic_handlers)
+        except Exception as exception:  # pylint: disable=broad-except
+            details = 'Exception servicing handler: {}'.format(exception)
+            logging.exception(details)
+            return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown,
+                               b'Error in service handler!'), None
         if method_handler is None:
             return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented,
                                b'Method not found!'), None
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index e277a3e..34cbade 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -22,7 +22,7 @@
   "unit._api_test.ChannelConnectivityTest",
   "unit._api_test.ChannelTest",
   "unit._auth_context_test.AuthContextTest",
-  "unit._auth_test.AccessTokenCallCredentialsTest",
+  "unit._auth_test.AccessTokenAuthMetadataPluginTest",
   "unit._auth_test.GoogleCallCredentialsTest",
   "unit._channel_args_test.ChannelArgsTest",
   "unit._channel_connectivity_test.ChannelConnectivityTest",
diff --git a/src/python/grpcio_tests/tests/unit/_auth_test.py b/src/python/grpcio_tests/tests/unit/_auth_test.py
index f61951b..e2cb938 100644
--- a/src/python/grpcio_tests/tests/unit/_auth_test.py
+++ b/src/python/grpcio_tests/tests/unit/_auth_test.py
@@ -61,7 +61,7 @@
         self.assertTrue(callback_event.wait(1.0))
 
 
-class AccessTokenCallCredentialsTest(unittest.TestCase):
+class AccessTokenAuthMetadataPluginTest(unittest.TestCase):
 
     def test_google_call_credentials_success(self):
         callback_event = threading.Event()
@@ -71,8 +71,8 @@
             self.assertIsNone(error)
             callback_event.set()
 
-        call_creds = _auth.AccessTokenCallCredentials('token')
-        call_creds(None, mock_callback)
+        metadata_plugin = _auth.AccessTokenAuthMetadataPlugin('token')
+        metadata_plugin(None, mock_callback)
         self.assertTrue(callback_event.wait(1.0))
 
 
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 18d4a6d..da94cf8 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -28,7 +28,7 @@
 _EMPTY_FLAGS = 0
 
 
-def _metadata_plugin_callback(context, callback):
+def _metadata_plugin(context, callback):
     callback(
         cygrpc.Metadata([
             cygrpc.Metadatum(_CALL_CREDENTIALS_METADATA_KEY,
@@ -105,17 +105,9 @@
         channel = cygrpc.Channel(b'[::]:0', cygrpc.ChannelArgs([]))
         del channel
 
-    def testCredentialsMetadataPluginUpDown(self):
-        plugin = cygrpc.CredentialsMetadataPlugin(
-            lambda ignored_a, ignored_b: None, b'')
-        del plugin
-
-    def testCallCredentialsFromPluginUpDown(self):
-        plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback,
-                                                  b'')
-        call_credentials = cygrpc.call_credentials_metadata_plugin(plugin)
-        del plugin
-        del call_credentials
+    def test_metadata_plugin_call_credentials_up_down(self):
+        cygrpc.MetadataPluginCallCredentials(_metadata_plugin,
+                                             b'test plugin name!')
 
     def testServerStartNoExplicitShutdown(self):
         server = cygrpc.Server(cygrpc.ChannelArgs([]))
@@ -205,7 +197,7 @@
 
         return test_utilities.SimpleFuture(performer)
 
-    def testEcho(self):
+    def test_echo(self):
         DEADLINE = time.time() + 5
         DEADLINE_TOLERANCE = 0.25
         CLIENT_METADATA_ASCII_KEY = b'key'
@@ -439,8 +431,8 @@
             cygrpc.SslPemKeyCertPair(resources.private_key(),
                                      resources.certificate_chain())
         ], False)
-        client_credentials = cygrpc.channel_credentials_ssl(
-            resources.test_root_certificates(), None)
+        client_credentials = cygrpc.SSLChannelCredentials(
+            resources.test_root_certificates(), None, None)
         self.setUpMixin(server_credentials, client_credentials,
                         _SSL_HOST_OVERRIDE)
 
diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
index 0a1e50c..2a1a49c 100644
--- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
+++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
@@ -32,6 +32,7 @@
 _UNARY_STREAM = '/test/UnaryStream'
 _STREAM_UNARY = '/test/StreamUnary'
 _STREAM_STREAM = '/test/StreamStream'
+_DEFECTIVE_GENERIC_RPC_HANDLER = '/test/DefectiveGenericRpcHandler'
 
 
 class _Callback(object):
@@ -95,6 +96,9 @@
             yield request
         self._control.control()
 
+    def defective_generic_rpc_handler(self):
+        raise test_control.Defect()
+
 
 class _MethodHandler(grpc.RpcMethodHandler):
 
@@ -132,6 +136,8 @@
         elif handler_call_details.method == _STREAM_STREAM:
             return _MethodHandler(True, True, None, None, None, None, None,
                                   self._handler.handle_stream_stream)
+        elif handler_call_details.method == _DEFECTIVE_GENERIC_RPC_HANDLER:
+            return self._handler.defective_generic_rpc_handler()
         else:
             return None
 
@@ -176,6 +182,10 @@
     return channel.stream_stream(_STREAM_STREAM)
 
 
+def _defective_handler_multi_callable(channel):
+    return channel.unary_unary(_DEFECTIVE_GENERIC_RPC_HANDLER)
+
+
 class InvocationDefectsTest(unittest.TestCase):
 
     def setUp(self):
@@ -235,6 +245,18 @@
             for _ in range(test_constants.STREAM_LENGTH // 2 + 1):
                 next(response_iterator)
 
+    def testDefectiveGenericRpcHandlerUnaryResponse(self):
+        request = b'\x07\x08'
+        multi_callable = _defective_handler_multi_callable(self._channel)
+
+        with self.assertRaises(grpc.RpcError) as exception_context:
+            response = multi_callable(
+                request,
+                metadata=(('test', 'DefectiveGenericRpcHandlerUnary'),))
+
+        self.assertIs(grpc.StatusCode.UNKNOWN,
+                      exception_context.exception.code())
+
 
 if __name__ == '__main__':
     unittest.main(verbosity=2)
diff --git a/templates/CMakeLists.txt.template b/templates/CMakeLists.txt.template
index 0be5d14..f893710 100644
--- a/templates/CMakeLists.txt.template
+++ b/templates/CMakeLists.txt.template
@@ -280,6 +280,7 @@
   elseif("<%text>${gRPC_SSL_PROVIDER}</%text>" STREQUAL "package")
     find_package(OpenSSL REQUIRED)
     set(_gRPC_SSL_LIBRARIES <%text>${OPENSSL_LIBRARIES}</%text>)
+    include_directories(<%text>${OPENSSL_INCLUDE_DIR}</%text>)
     set(_gRPC_FIND_SSL "if(NOT OPENSSL_FOUND)\n  find_package(OpenSSL)\nendif()")
   endif()
 
diff --git a/templates/gRPC-Core.podspec.template b/templates/gRPC-Core.podspec.template
index 04b8940..cb4d556 100644
--- a/templates/gRPC-Core.podspec.template
+++ b/templates/gRPC-Core.podspec.template
@@ -61,6 +61,10 @@
       :tag => "v#{version}",
     }
 
+    # gRPC podspecs depend on fix for https://github.com/CocoaPods/CocoaPods/issues/6024,
+    # which was released in Cocoapods v1.2.0.
+    s.cocoapods_version = '>= 1.2.0'
+
     s.ios.deployment_target = '7.0'
     s.osx.deployment_target = '10.9'
     s.requires_arc = false
diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc
index 4438867..d8bb092 100644
--- a/test/core/bad_client/bad_client.cc
+++ b/test/core/bad_client/bad_client.cc
@@ -115,9 +115,10 @@
                                   GRPC_BAD_CLIENT_REGISTERED_HOST,
                                   GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0);
   grpc_server_start(a.server);
-  transport = grpc_create_chttp2_transport(&exec_ctx, nullptr, sfd.server, 0);
+  transport =
+      grpc_create_chttp2_transport(&exec_ctx, nullptr, sfd.server, false);
   server_setup_transport(&a, transport);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
   grpc_exec_ctx_finish(&exec_ctx);
 
   /* Bind everything into the same pollset */
diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.cc b/test/core/end2end/fixtures/h2_sockpair+trace.cc
index fe4ab74..9319c40 100644
--- a/test/core/end2end/fixtures/h2_sockpair+trace.cc
+++ b/test/core/end2end/fixtures/h2_sockpair+trace.cc
@@ -97,10 +97,10 @@
   cs.client_args = client_args;
   cs.f = f;
   transport =
-      grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1);
+      grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true);
   client_setup_transport(&exec_ctx, &cs, transport);
   GPR_ASSERT(f->client);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
@@ -114,9 +114,9 @@
   grpc_server_register_completion_queue(f->server, f->cq, nullptr);
   grpc_server_start(f->server);
   transport =
-      grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0);
+      grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false);
   server_setup_transport(f, transport);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
diff --git a/test/core/end2end/fixtures/h2_sockpair.cc b/test/core/end2end/fixtures/h2_sockpair.cc
index 59a3a6d..03566aa 100644
--- a/test/core/end2end/fixtures/h2_sockpair.cc
+++ b/test/core/end2end/fixtures/h2_sockpair.cc
@@ -91,10 +91,10 @@
   cs.client_args = client_args;
   cs.f = f;
   transport =
-      grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1);
+      grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true);
   client_setup_transport(&exec_ctx, &cs, transport);
   GPR_ASSERT(f->client);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
@@ -108,9 +108,9 @@
   grpc_server_register_completion_queue(f->server, f->cq, nullptr);
   grpc_server_start(f->server);
   transport =
-      grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0);
+      grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false);
   server_setup_transport(f, transport);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.cc b/test/core/end2end/fixtures/h2_sockpair_1byte.cc
index d935efb..9adba00 100644
--- a/test/core/end2end/fixtures/h2_sockpair_1byte.cc
+++ b/test/core/end2end/fixtures/h2_sockpair_1byte.cc
@@ -102,10 +102,10 @@
   cs.client_args = client_args;
   cs.f = f;
   transport =
-      grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1);
+      grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true);
   client_setup_transport(&exec_ctx, &cs, transport);
   GPR_ASSERT(f->client);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
@@ -119,9 +119,9 @@
   grpc_server_register_completion_queue(f->server, f->cq, nullptr);
   grpc_server_start(f->server);
   transport =
-      grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0);
+      grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false);
   server_setup_transport(f, transport);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
   grpc_exec_ctx_finish(&exec_ctx);
 }
 
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc
index 3f78069..3904887 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.cc
+++ b/test/core/end2end/fixtures/http_proxy_fixture.cc
@@ -68,6 +68,9 @@
 // Connection handling
 //
 
+// proxy_connection structure is only accessed in the closures which are all
+// scheduled under the same combiner lock. So there is is no need for a mutex to
+// protect this structure.
 typedef struct proxy_connection {
   grpc_end2end_http_proxy* proxy;
 
@@ -78,6 +81,8 @@
 
   grpc_pollset_set* pollset_set;
 
+  // NOTE: All the closures execute under proxy->combiner lock. Which means
+  // there will not be any data-races between the closures
   grpc_closure on_read_request_done;
   grpc_closure on_server_connect_done;
   grpc_closure on_write_response_done;
@@ -86,6 +91,13 @@
   grpc_closure on_server_read_done;
   grpc_closure on_server_write_done;
 
+  bool client_read_failed : 1;
+  bool client_write_failed : 1;
+  bool client_shutdown : 1;
+  bool server_read_failed : 1;
+  bool server_write_failed : 1;
+  bool server_shutdown : 1;
+
   grpc_slice_buffer client_read_buffer;
   grpc_slice_buffer client_deferred_write_buffer;
   bool client_is_writing;
@@ -129,21 +141,53 @@
   }
 }
 
-// Helper function to shut down the proxy connection.
-// Does NOT take ownership of a reference to error.
-static void proxy_connection_failed(grpc_exec_ctx* exec_ctx,
-                                    proxy_connection* conn, bool is_client,
-                                    const char* prefix, grpc_error* error) {
-  const char* msg = grpc_error_string(error);
-  gpr_log(GPR_INFO, "%s: %s", prefix, msg);
+enum failure_type {
+  SETUP_FAILED,  // To be used before we start proxying.
+  CLIENT_READ_FAILED,
+  CLIENT_WRITE_FAILED,
+  SERVER_READ_FAILED,
+  SERVER_WRITE_FAILED,
+};
 
-  grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint,
-                         GRPC_ERROR_REF(error));
-  if (conn->server_endpoint != nullptr) {
+// Helper function to shut down the proxy connection.
+static void proxy_connection_failed(grpc_exec_ctx* exec_ctx,
+                                    proxy_connection* conn,
+                                    failure_type failure, const char* prefix,
+                                    grpc_error* error) {
+  gpr_log(GPR_INFO, "%s: %s", prefix, grpc_error_string(error));
+  // Decide whether we should shut down the client and server.
+  bool shutdown_client = false;
+  bool shutdown_server = false;
+  if (failure == SETUP_FAILED) {
+    shutdown_client = true;
+    shutdown_server = true;
+  } else {
+    if ((failure == CLIENT_READ_FAILED && conn->client_write_failed) ||
+        (failure == CLIENT_WRITE_FAILED && conn->client_read_failed) ||
+        (failure == SERVER_READ_FAILED && !conn->client_is_writing)) {
+      shutdown_client = true;
+    }
+    if ((failure == SERVER_READ_FAILED && conn->server_write_failed) ||
+        (failure == SERVER_WRITE_FAILED && conn->server_read_failed) ||
+        (failure == CLIENT_READ_FAILED && !conn->server_is_writing)) {
+      shutdown_server = true;
+    }
+  }
+  // If we decided to shut down either one and have not yet done so, do so.
+  if (shutdown_client && !conn->client_shutdown) {
+    grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint,
+                           GRPC_ERROR_REF(error));
+    conn->client_shutdown = true;
+  }
+  if (shutdown_server && !conn->server_shutdown &&
+      (conn->server_endpoint != nullptr)) {
     grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint,
                            GRPC_ERROR_REF(error));
+    conn->server_shutdown = true;
   }
+  // Unref the connection.
   proxy_connection_unref(exec_ctx, conn, "conn_failed");
+  GRPC_ERROR_UNREF(error);
 }
 
 // Callback for writing proxy data to the client.
@@ -152,8 +196,8 @@
   proxy_connection* conn = (proxy_connection*)arg;
   conn->client_is_writing = false;
   if (error != GRPC_ERROR_NONE) {
-    proxy_connection_failed(exec_ctx, conn, true /* is_client */,
-                            "HTTP proxy client write", error);
+    proxy_connection_failed(exec_ctx, conn, CLIENT_WRITE_FAILED,
+                            "HTTP proxy client write", GRPC_ERROR_REF(error));
     return;
   }
   // Clear write buffer (the data we just wrote).
@@ -179,8 +223,8 @@
   proxy_connection* conn = (proxy_connection*)arg;
   conn->server_is_writing = false;
   if (error != GRPC_ERROR_NONE) {
-    proxy_connection_failed(exec_ctx, conn, false /* is_client */,
-                            "HTTP proxy server write", error);
+    proxy_connection_failed(exec_ctx, conn, SERVER_WRITE_FAILED,
+                            "HTTP proxy server write", GRPC_ERROR_REF(error));
     return;
   }
   // Clear write buffer (the data we just wrote).
@@ -206,8 +250,8 @@
                                 grpc_error* error) {
   proxy_connection* conn = (proxy_connection*)arg;
   if (error != GRPC_ERROR_NONE) {
-    proxy_connection_failed(exec_ctx, conn, true /* is_client */,
-                            "HTTP proxy client read", error);
+    proxy_connection_failed(exec_ctx, conn, CLIENT_READ_FAILED,
+                            "HTTP proxy client read", GRPC_ERROR_REF(error));
     return;
   }
   // If there is already a pending write (i.e., server_write_buffer is
@@ -239,8 +283,8 @@
                                 grpc_error* error) {
   proxy_connection* conn = (proxy_connection*)arg;
   if (error != GRPC_ERROR_NONE) {
-    proxy_connection_failed(exec_ctx, conn, false /* is_client */,
-                            "HTTP proxy server read", error);
+    proxy_connection_failed(exec_ctx, conn, SERVER_READ_FAILED,
+                            "HTTP proxy server read", GRPC_ERROR_REF(error));
     return;
   }
   // If there is already a pending write (i.e., client_write_buffer is
@@ -272,8 +316,8 @@
   proxy_connection* conn = (proxy_connection*)arg;
   conn->client_is_writing = false;
   if (error != GRPC_ERROR_NONE) {
-    proxy_connection_failed(exec_ctx, conn, true /* is_client */,
-                            "HTTP proxy write response", error);
+    proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+                            "HTTP proxy write response", GRPC_ERROR_REF(error));
     return;
   }
   // Clear write buffer.
@@ -301,8 +345,8 @@
     // connection failed.  However, for the purposes of this test code,
     // it's fine to pretend this is a client-side error, which will
     // cause the client connection to be dropped.
-    proxy_connection_failed(exec_ctx, conn, true /* is_client */,
-                            "HTTP proxy server connect", error);
+    proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+                            "HTTP proxy server connect", GRPC_ERROR_REF(error));
     return;
   }
   // We've established a connection, so send back a 200 response code to
@@ -351,8 +395,8 @@
   gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn,
           grpc_error_string(error));
   if (error != GRPC_ERROR_NONE) {
-    proxy_connection_failed(exec_ctx, conn, true /* is_client */,
-                            "HTTP proxy read request", error);
+    proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+                            "HTTP proxy read request", GRPC_ERROR_REF(error));
     return;
   }
   // Read request and feed it to the parser.
@@ -361,8 +405,9 @@
       error = grpc_http_parser_parse(
           &conn->http_parser, conn->client_read_buffer.slices[i], nullptr);
       if (error != GRPC_ERROR_NONE) {
-        proxy_connection_failed(exec_ctx, conn, true /* is_client */,
-                                "HTTP proxy request parse", error);
+        proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+                                "HTTP proxy request parse",
+                                GRPC_ERROR_REF(error));
         GRPC_ERROR_UNREF(error);
         return;
       }
@@ -382,8 +427,8 @@
                  conn->http_request.method);
     error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
     gpr_free(msg);
-    proxy_connection_failed(exec_ctx, conn, true /* is_client */,
-                            "HTTP proxy read request", error);
+    proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+                            "HTTP proxy read request", GRPC_ERROR_REF(error));
     GRPC_ERROR_UNREF(error);
     return;
   }
@@ -403,8 +448,8 @@
     if (!client_authenticated) {
       const char* msg = "HTTP Connect could not verify authentication";
       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg);
-      proxy_connection_failed(exec_ctx, conn, true /* is_client */,
-                              "HTTP proxy read request", error);
+      proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+                              "HTTP proxy read request", GRPC_ERROR_REF(error));
       GRPC_ERROR_UNREF(error);
       return;
     }
@@ -414,8 +459,8 @@
   error = grpc_blocking_resolve_address(conn->http_request.path, "80",
                                         &resolved_addresses);
   if (error != GRPC_ERROR_NONE) {
-    proxy_connection_failed(exec_ctx, conn, true /* is_client */,
-                            "HTTP proxy DNS lookup", error);
+    proxy_connection_failed(exec_ctx, conn, SETUP_FAILED,
+                            "HTTP proxy DNS lookup", GRPC_ERROR_REF(error));
     GRPC_ERROR_UNREF(error);
     return;
   }
diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc
index 0b49b89..7511721 100644
--- a/test/core/end2end/fuzzers/api_fuzzer.cc
+++ b/test/core/end2end/fuzzers/api_fuzzer.cc
@@ -468,10 +468,10 @@
     *fc->ep = client;
 
     grpc_transport* transport =
-        grpc_create_chttp2_transport(exec_ctx, nullptr, server, 0);
+        grpc_create_chttp2_transport(exec_ctx, nullptr, server, false);
     grpc_server_setup_transport(exec_ctx, g_server, transport, nullptr,
                                 nullptr);
-    grpc_chttp2_transport_start_reading(exec_ctx, transport, nullptr);
+    grpc_chttp2_transport_start_reading(exec_ctx, transport, nullptr, nullptr);
 
     GRPC_CLOSURE_SCHED(exec_ctx, fc->closure, GRPC_ERROR_NONE);
   } else {
diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc
index ebc8c27..5871f0f 100644
--- a/test/core/end2end/fuzzers/client_fuzzer.cc
+++ b/test/core/end2end/fuzzers/client_fuzzer.cc
@@ -54,8 +54,8 @@
 
   grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
   grpc_transport* transport =
-      grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, 1);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+      grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, true);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
 
   grpc_channel* channel = grpc_channel_create(
       &exec_ctx, "test-target", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, transport);
diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc
index fb6477b..67caf4e 100644
--- a/test/core/end2end/fuzzers/server_fuzzer.cc
+++ b/test/core/end2end/fuzzers/server_fuzzer.cc
@@ -61,9 +61,9 @@
   //    grpc_server_register_method(server, "/reg", NULL, 0);
   grpc_server_start(server);
   grpc_transport* transport =
-      grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, 0);
+      grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, false);
   grpc_server_setup_transport(&exec_ctx, server, transport, nullptr, nullptr);
-  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+  grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
 
   grpc_call* call1 = nullptr;
   grpc_call_details call_details1;
diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc
index 803f017..6e17be9 100644
--- a/test/core/iomgr/udp_server_test.cc
+++ b/test/core/iomgr/udp_server_test.cc
@@ -50,7 +50,7 @@
 static int g_number_of_bytes_read = 0;
 static int g_number_of_orphan_calls = 0;
 
-static void on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
+static bool on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
   char read_buffer[512];
   ssize_t byte_count;
 
@@ -64,9 +64,11 @@
   GPR_ASSERT(GRPC_LOG_IF_ERROR(
       "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
   gpr_mu_unlock(g_mu);
+  return false;
 }
 
-static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
+static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data,
+                     grpc_closure* notify_on_write_closure) {
   gpr_mu_lock(g_mu);
   g_number_of_writes++;
 
@@ -79,6 +81,7 @@
                            grpc_closure* closure, void* user_data) {
   gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
           grpc_fd_wrapped_fd(emfd));
+  GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
   g_number_of_orphan_calls++;
 }
 
@@ -226,7 +229,6 @@
   int clifd, svrfd;
   grpc_udp_server* s = grpc_udp_server_create(nullptr);
   int i;
-  int number_of_reads_before;
   grpc_millis deadline;
   grpc_pollset* pollsets[1];
   LOG_TEST("test_receive");
@@ -256,14 +258,14 @@
     deadline =
         grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
 
-    number_of_reads_before = g_number_of_reads;
+    int number_of_bytes_read_before = g_number_of_bytes_read;
     /* Create a socket, send a packet to the UDP server. */
     clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
     GPR_ASSERT(clifd >= 0);
     GPR_ASSERT(connect(clifd, (struct sockaddr*)addr,
                        (socklen_t)resolved_addr.len) == 0);
     GPR_ASSERT(5 == write(clifd, "hello", 5));
-    while (g_number_of_reads == number_of_reads_before &&
+    while (g_number_of_bytes_read < (number_of_bytes_read_before + 5) &&
            deadline > grpc_exec_ctx_now(&exec_ctx)) {
       grpc_pollset_worker* worker = nullptr;
       GPR_ASSERT(GRPC_LOG_IF_ERROR(
@@ -273,7 +275,6 @@
       grpc_exec_ctx_flush(&exec_ctx);
       gpr_mu_lock(g_mu);
     }
-    GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1);
     close(clifd);
   }
   GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients);
@@ -286,9 +287,6 @@
   /* The server had a single FD, which is orphaned exactly once in *
    * grpc_udp_server_destroy. */
   GPR_ASSERT(g_number_of_orphan_calls == 1);
-
-  /* The write callback should have fired a few times. */
-  GPR_ASSERT(g_number_of_writes > 0);
 }
 
 static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
diff --git a/test/core/security/ssl_server_fuzzer.cc b/test/core/security/ssl_server_fuzzer.cc
index bbb2f60..d83ebb1 100644
--- a/test/core/security/ssl_server_fuzzer.cc
+++ b/test/core/security/ssl_server_fuzzer.cc
@@ -92,8 +92,9 @@
   grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create();
   grpc_server_security_connector_add_handshakers(&exec_ctx, sc, handshake_mgr);
   grpc_handshake_manager_do_handshake(
-      &exec_ctx, handshake_mgr, mock_endpoint, nullptr /* channel_args */,
-      deadline, nullptr /* acceptor */, on_handshake_done, &state);
+      &exec_ctx, handshake_mgr, nullptr /* interested_parties */, mock_endpoint,
+      nullptr /* channel_args */, deadline, nullptr /* acceptor */,
+      on_handshake_done, &state);
   grpc_exec_ctx_flush(&exec_ctx);
 
   // If the given string happens to be part of the correct client hello, the
diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD
index 1ea7d03..6eff716 100644
--- a/test/core/transport/chttp2/BUILD
+++ b/test/core/transport/chttp2/BUILD
@@ -115,6 +115,21 @@
 )
 
 grpc_cc_test(
+    name = "settings_timeout_test",
+    srcs = ["settings_timeout_test.cc"],
+    language = "C++",
+    deps = [
+        "//:gpr",
+        "//:grpc",
+        "//test/core/util:gpr_test_util",
+        "//test/core/util:grpc_test_util",
+    ],
+    external_deps = [
+        "gtest",
+    ],
+)
+
+grpc_cc_test(
     name = "varint_test",
     srcs = ["varint_test.cc"],
     language = "C++",
diff --git a/test/core/transport/chttp2/settings_timeout_test.cc b/test/core/transport/chttp2/settings_timeout_test.cc
new file mode 100644
index 0000000..670eae1
--- /dev/null
+++ b/test/core/transport/chttp2/settings_timeout_test.cc
@@ -0,0 +1,258 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include <memory>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/slice/slice_internal.h"
+
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+
+namespace grpc_core {
+namespace test {
+namespace {
+
+// A gRPC server, running in its own thread.
+class ServerThread {
+ public:
+  explicit ServerThread(const char* address) : address_(address) {}
+
+  void Start() {
+    // Start server with 1-second handshake timeout.
+    grpc_arg arg;
+    arg.type = GRPC_ARG_INTEGER;
+    arg.key = const_cast<char*>(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
+    arg.value.integer = 1000;
+    grpc_channel_args args = {1, &arg};
+    server_ = grpc_server_create(&args, nullptr);
+    ASSERT_TRUE(grpc_server_add_insecure_http2_port(server_, address_));
+    cq_ = grpc_completion_queue_create_for_next(nullptr);
+    grpc_server_register_completion_queue(server_, cq_, nullptr);
+    grpc_server_start(server_);
+    thread_.reset(new std::thread(std::bind(&ServerThread::Serve, this)));
+  }
+
+  void Shutdown() {
+    grpc_completion_queue* shutdown_cq =
+        grpc_completion_queue_create_for_pluck(nullptr);
+    grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr);
+    GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, nullptr,
+                                           grpc_timeout_seconds_to_deadline(1),
+                                           nullptr)
+                   .type == GRPC_OP_COMPLETE);
+    grpc_completion_queue_destroy(shutdown_cq);
+    grpc_server_destroy(server_);
+    grpc_completion_queue_destroy(cq_);
+    thread_->join();
+  }
+
+ private:
+  void Serve() {
+    // The completion queue should not return anything other than shutdown.
+    grpc_event ev = grpc_completion_queue_next(
+        cq_, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
+    ASSERT_EQ(GRPC_QUEUE_SHUTDOWN, ev.type);
+  }
+
+  const char* address_;  // Do not own.
+  grpc_server* server_ = nullptr;
+  grpc_completion_queue* cq_ = nullptr;
+  std::unique_ptr<std::thread> thread_;
+};
+
+// A TCP client that connects to the server, reads data until the server
+// closes, and then terminates.
+class Client {
+ public:
+  explicit Client(const char* server_address)
+      : server_address_(server_address) {}
+
+  void Connect() {
+    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+    grpc_resolved_addresses* server_addresses = nullptr;
+    grpc_error* error =
+        grpc_blocking_resolve_address(server_address_, "80", &server_addresses);
+    ASSERT_EQ(GRPC_ERROR_NONE, error) << grpc_error_string(error);
+    ASSERT_GE(server_addresses->naddrs, 1UL);
+    pollset_ = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
+    grpc_pollset_init(pollset_, &mu_);
+    grpc_pollset_set* pollset_set = grpc_pollset_set_create();
+    grpc_pollset_set_add_pollset(&exec_ctx, pollset_set, pollset_);
+    EventState state;
+    grpc_tcp_client_connect(&exec_ctx, state.closure(), &endpoint_, pollset_set,
+                            nullptr /* channel_args */, server_addresses->addrs,
+                            1000);
+    ASSERT_TRUE(PollUntilDone(
+        &exec_ctx, &state,
+        grpc_timespec_to_millis_round_up(gpr_inf_future(GPR_CLOCK_MONOTONIC))));
+    ASSERT_EQ(GRPC_ERROR_NONE, state.error());
+    grpc_pollset_set_destroy(&exec_ctx, pollset_set);
+    grpc_endpoint_add_to_pollset(&exec_ctx, endpoint_, pollset_);
+    grpc_resolved_addresses_destroy(server_addresses);
+    grpc_exec_ctx_finish(&exec_ctx);
+  }
+
+  // Reads until an error is returned.
+  // Returns true if an error was encountered before the deadline.
+  bool ReadUntilError() {
+    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+    grpc_slice_buffer read_buffer;
+    grpc_slice_buffer_init(&read_buffer);
+    bool retval = true;
+    // Use a deadline of 3 seconds, which is a lot more than we should
+    // need for a 1-second timeout, but this helps avoid flakes.
+    grpc_millis deadline = grpc_exec_ctx_now(&exec_ctx) + 3000;
+    while (true) {
+      EventState state;
+      grpc_endpoint_read(&exec_ctx, endpoint_, &read_buffer, state.closure());
+      if (!PollUntilDone(&exec_ctx, &state, deadline)) {
+        retval = false;
+        break;
+      }
+      if (state.error() != GRPC_ERROR_NONE) break;
+      gpr_log(GPR_INFO, "client read %" PRIuPTR " bytes", read_buffer.length);
+      grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &read_buffer);
+    }
+    grpc_endpoint_shutdown(&exec_ctx, endpoint_,
+                           GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown"));
+    grpc_slice_buffer_destroy_internal(&exec_ctx, &read_buffer);
+    grpc_exec_ctx_finish(&exec_ctx);
+    return retval;
+  }
+
+  void Shutdown() {
+    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+    grpc_endpoint_destroy(&exec_ctx, endpoint_);
+    grpc_pollset_shutdown(&exec_ctx, pollset_,
+                          GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_,
+                                              grpc_schedule_on_exec_ctx));
+    grpc_exec_ctx_finish(&exec_ctx);
+  }
+
+ private:
+  // State used to wait for an I/O event.
+  class EventState {
+   public:
+    EventState() {
+      GRPC_CLOSURE_INIT(&closure_, &EventState::OnEventDone, this,
+                        grpc_schedule_on_exec_ctx);
+    }
+
+    ~EventState() { GRPC_ERROR_UNREF(error_); }
+
+    grpc_closure* closure() { return &closure_; }
+
+    bool done() const { return done_; }
+
+    // Caller does NOT take ownership of the error.
+    grpc_error* error() const { return error_; }
+
+   private:
+    static void OnEventDone(grpc_exec_ctx* exec_ctx, void* arg,
+                            grpc_error* error) {
+      gpr_log(GPR_INFO, "OnEventDone(): %s", grpc_error_string(error));
+      EventState* state = (EventState*)arg;
+      state->error_ = GRPC_ERROR_REF(error);
+      state->done_ = true;
+    }
+
+    grpc_closure closure_;
+    bool done_ = false;
+    grpc_error* error_ = GRPC_ERROR_NONE;
+  };
+
+  // Returns true if done, or false if deadline exceeded.
+  bool PollUntilDone(grpc_exec_ctx* exec_ctx, EventState* state,
+                     grpc_millis deadline) {
+    while (true) {
+      grpc_pollset_worker* worker = nullptr;
+      gpr_mu_lock(mu_);
+      GRPC_LOG_IF_ERROR("grpc_pollset_work",
+                        grpc_pollset_work(exec_ctx, pollset_, &worker,
+                                          grpc_exec_ctx_now(exec_ctx) + 1000));
+      gpr_mu_unlock(mu_);
+      if (state != nullptr && state->done()) return true;
+      if (grpc_exec_ctx_now(exec_ctx) >= deadline) return false;
+    }
+  }
+
+  static void PollsetDestroy(grpc_exec_ctx* exec_ctx, void* arg,
+                             grpc_error* error) {
+    grpc_pollset* pollset = (grpc_pollset*)arg;
+    grpc_pollset_destroy(exec_ctx, pollset);
+    gpr_free(pollset);
+  }
+
+  const char* server_address_;  // Do not own.
+  grpc_endpoint* endpoint_;
+  gpr_mu* mu_;
+  grpc_pollset* pollset_;
+};
+
+TEST(SettingsTimeout, Basic) {
+  // Construct server address string.
+  const int server_port = grpc_pick_unused_port_or_die();
+  char* server_address_string;
+  gpr_asprintf(&server_address_string, "localhost:%d", server_port);
+  // Start server.
+  gpr_log(GPR_INFO, "starting server on %s", server_address_string);
+  ServerThread server_thread(server_address_string);
+  server_thread.Start();
+  // Create client and connect to server.
+  gpr_log(GPR_INFO, "starting client connect");
+  Client client(server_address_string);
+  client.Connect();
+  // Client read.  Should fail due to server dropping connection.
+  gpr_log(GPR_INFO, "starting client read");
+  EXPECT_TRUE(client.ReadUntilError());
+  // Shut down client.
+  gpr_log(GPR_INFO, "shutting down client");
+  client.Shutdown();
+  // Shut down server.
+  gpr_log(GPR_INFO, "shutting down server");
+  server_thread.Shutdown();
+  // Clean up.
+  gpr_free(server_address_string);
+}
+
+}  // namespace
+}  // namespace test
+}  // namespace grpc_core
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  grpc_test_init(argc, argv);
+  grpc_init();
+  int result = RUN_ALL_TESTS();
+  grpc_shutdown();
+  return result;
+}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index c15ab88..bbf3da4 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -353,11 +353,6 @@
           "balancer", server_host_, balancers_.back().get()));
     }
     ResetStub();
-    std::vector<AddressData> addresses;
-    for (size_t i = 0; i < balancer_servers_.size(); ++i) {
-      addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""});
-    }
-    SetNextResolution(addresses);
   }
 
   void TearDown() override {
@@ -370,6 +365,14 @@
     grpc_fake_resolver_response_generator_unref(response_generator_);
   }
 
+  void SetNextResolutionAllBalancers() {
+    std::vector<AddressData> addresses;
+    for (size_t i = 0; i < balancer_servers_.size(); ++i) {
+      addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""});
+    }
+    SetNextResolution(addresses);
+  }
+
   void ResetStub(int fallback_timeout = 0) {
     ChannelArguments args;
     args.SetGrpclbFallbackTimeout(fallback_timeout);
@@ -581,6 +584,7 @@
 };
 
 TEST_F(SingleBalancerTest, Vanilla) {
+  SetNextResolutionAllBalancers();
   const size_t kNumRpcsPerAddress = 100;
   ScheduleResponseForBalancer(
       0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
@@ -608,6 +612,7 @@
 }
 
 TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
+  SetNextResolutionAllBalancers();
   const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
   const int kCallDeadlineMs = 1000 * grpc_test_slowdown_factor();
 
@@ -645,6 +650,7 @@
 }
 
 TEST_F(SingleBalancerTest, Fallback) {
+  SetNextResolutionAllBalancers();
   const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
   const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
   const size_t kNumBackendInResolution = backends_.size() / 2;
@@ -711,6 +717,7 @@
 }
 
 TEST_F(SingleBalancerTest, FallbackUpdate) {
+  SetNextResolutionAllBalancers();
   const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
   const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
   const size_t kNumBackendInResolution = backends_.size() / 3;
@@ -818,6 +825,7 @@
 }
 
 TEST_F(SingleBalancerTest, BackendsRestart) {
+  SetNextResolutionAllBalancers();
   const size_t kNumRpcsPerAddress = 100;
   ScheduleResponseForBalancer(
       0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
@@ -857,6 +865,7 @@
 };
 
 TEST_F(UpdatesTest, UpdateBalancers) {
+  SetNextResolutionAllBalancers();
   const std::vector<int> first_backend{GetBackendPorts()[0]};
   const std::vector<int> second_backend{GetBackendPorts()[1]};
   ScheduleResponseForBalancer(
@@ -919,6 +928,7 @@
 // verify that the LB channel inside grpclb keeps the initial connection (which
 // by definition is also present in the update).
 TEST_F(UpdatesTest, UpdateBalancersRepeated) {
+  SetNextResolutionAllBalancers();
   const std::vector<int> first_backend{GetBackendPorts()[0]};
   const std::vector<int> second_backend{GetBackendPorts()[0]};
 
@@ -989,6 +999,9 @@
 }
 
 TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
+  std::vector<AddressData> addresses;
+  addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
+  SetNextResolution(addresses);
   const std::vector<int> first_backend{GetBackendPorts()[0]};
   const std::vector<int> second_backend{GetBackendPorts()[1]};
 
@@ -1030,7 +1043,7 @@
   EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
   EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
 
-  std::vector<AddressData> addresses;
+  addresses.clear();
   addresses.emplace_back(AddressData{balancer_servers_[1].port_, true, ""});
   gpr_log(GPR_INFO, "========= ABOUT TO UPDATE 1 ==========");
   SetNextResolution(addresses);
@@ -1055,8 +1068,14 @@
   balancers_[2]->NotifyDoneWithServerlists();
   EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
   EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
-  EXPECT_EQ(1U, balancer_servers_[1].service_->request_count());
-  EXPECT_EQ(1U, balancer_servers_[1].service_->response_count());
+  // The second balancer, published as part of the first update, may end up
+  // getting two requests (that is, 1 <= #req <= 2) if the LB call retry timer
+  // firing races with the arrival of the update containing the second
+  // balancer.
+  EXPECT_GE(balancer_servers_[1].service_->request_count(), 1U);
+  EXPECT_GE(balancer_servers_[1].service_->response_count(), 1U);
+  EXPECT_LE(balancer_servers_[1].service_->request_count(), 2U);
+  EXPECT_LE(balancer_servers_[1].service_->response_count(), 2U);
   EXPECT_EQ(0U, balancer_servers_[2].service_->request_count());
   EXPECT_EQ(0U, balancer_servers_[2].service_->response_count());
   // Check LB policy name for the channel.
@@ -1064,6 +1083,7 @@
 }
 
 TEST_F(SingleBalancerTest, Drop) {
+  SetNextResolutionAllBalancers();
   const size_t kNumRpcsPerAddress = 100;
   const int num_of_drop_by_rate_limiting_addresses = 1;
   const int num_of_drop_by_load_balancing_addresses = 2;
@@ -1107,6 +1127,7 @@
 }
 
 TEST_F(SingleBalancerTest, DropAllFirst) {
+  SetNextResolutionAllBalancers();
   // All registered addresses are marked as "drop".
   const int num_of_drop_by_rate_limiting_addresses = 1;
   const int num_of_drop_by_load_balancing_addresses = 1;
@@ -1122,6 +1143,7 @@
 }
 
 TEST_F(SingleBalancerTest, DropAll) {
+  SetNextResolutionAllBalancers();
   ScheduleResponseForBalancer(
       0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
       0);
@@ -1152,6 +1174,7 @@
 };
 
 TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
+  SetNextResolutionAllBalancers();
   const size_t kNumRpcsPerAddress = 100;
   ScheduleResponseForBalancer(
       0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
@@ -1186,6 +1209,7 @@
 }
 
 TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
+  SetNextResolutionAllBalancers();
   const size_t kNumRpcsPerAddress = 3;
   const int num_of_drop_by_rate_limiting_addresses = 2;
   const int num_of_drop_by_load_balancing_addresses = 1;
diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc
index a24cdc7..30bd8bf 100644
--- a/test/cpp/interop/interop_server.cc
+++ b/test/cpp/interop/interop_server.cc
@@ -317,9 +317,15 @@
 
 void grpc::testing::interop::RunServer(
     std::shared_ptr<ServerCredentials> creds) {
-  GPR_ASSERT(FLAGS_port != 0);
+  RunServer(creds, FLAGS_port, nullptr);
+}
+
+void grpc::testing::interop::RunServer(
+    std::shared_ptr<ServerCredentials> creds, const int port,
+    ServerStartedCondition* server_started_condition) {
+  GPR_ASSERT(port != 0);
   std::ostringstream server_address;
-  server_address << "0.0.0.0:" << FLAGS_port;
+  server_address << "0.0.0.0:" << port;
   TestServiceImpl service;
 
   SimpleRequest request;
@@ -333,6 +339,14 @@
   }
   std::unique_ptr<Server> server(builder.BuildAndStart());
   gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
+
+  // Signal that the server has started.
+  if (server_started_condition) {
+    std::unique_lock<std::mutex> lock(server_started_condition->mutex);
+    server_started_condition->server_started = true;
+    server_started_condition->condition.notify_all();
+  }
+
   while (!gpr_atm_no_barrier_load(&g_got_sigint)) {
     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
                                  gpr_time_from_seconds(5, GPR_TIMESPAN)));
diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h
index 6af003f..1bf7db1 100644
--- a/test/cpp/interop/server_helper.h
+++ b/test/cpp/interop/server_helper.h
@@ -19,6 +19,7 @@
 #ifndef GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H
 #define GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H
 
+#include <condition_variable>
 #include <memory>
 
 #include <grpc/compression.h>
@@ -50,8 +51,27 @@
 namespace interop {
 
 extern gpr_atm g_got_sigint;
+
+struct ServerStartedCondition {
+  std::mutex mutex;
+  std::condition_variable condition;
+  bool server_started = false;
+};
+
+/// Run gRPC interop server using port FLAGS_port.
+///
+/// \param creds The credentials associated with the server.
 void RunServer(std::shared_ptr<ServerCredentials> creds);
 
+/// Run gRPC interop server.
+///
+/// \param creds The credentials associated with the server.
+/// \param port Port to use for the server.
+/// \param server_started_condition (optional) Struct holding mutex, condition
+///     variable, and condition used to notify when the server has started.
+void RunServer(std::shared_ptr<ServerCredentials> creds, int port,
+               ServerStartedCondition* server_started_condition);
+
 }  // namespace interop
 }  // namespace testing
 }  // namespace grpc
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index b95ee0c..be4da4d 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -137,7 +137,7 @@
     grpc_channel_args c_args = args.c_channel_args();
     ep_ = new DummyEndpoint;
     t_ = grpc_create_chttp2_transport(exec_ctx(), &c_args, ep_, client);
-    grpc_chttp2_transport_start_reading(exec_ctx(), t_, nullptr);
+    grpc_chttp2_transport_start_reading(exec_ctx(), t_, nullptr, nullptr);
     FlushExecCtx();
   }
 
diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h
index 7f1aa48..7e20843 100644
--- a/test/cpp/microbenchmarks/fullstack_fixtures.h
+++ b/test/cpp/microbenchmarks/fullstack_fixtures.h
@@ -174,7 +174,7 @@
       const grpc_channel_args* server_args =
           grpc_server_get_channel_args(server_->c_server());
       server_transport_ = grpc_create_chttp2_transport(
-          &exec_ctx, server_args, endpoints.server, 0 /* is_client */);
+          &exec_ctx, server_args, endpoints.server, false /* is_client */);
 
       grpc_pollset** pollsets;
       size_t num_pollsets = 0;
@@ -186,7 +186,7 @@
 
       grpc_server_setup_transport(&exec_ctx, server_->c_server(),
                                   server_transport_, nullptr, server_args);
-      grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_,
+      grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_, nullptr,
                                           nullptr);
     }
 
@@ -197,13 +197,13 @@
       fixture_configuration.ApplyCommonChannelArguments(&args);
 
       grpc_channel_args c_args = args.c_channel_args();
-      client_transport_ =
-          grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1);
+      client_transport_ = grpc_create_chttp2_transport(&exec_ctx, &c_args,
+                                                       endpoints.client, true);
       GPR_ASSERT(client_transport_);
       grpc_channel* channel =
           grpc_channel_create(&exec_ctx, "target", &c_args,
                               GRPC_CLIENT_DIRECT_CHANNEL, client_transport_);
-      grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_,
+      grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_, nullptr,
                                           nullptr);
 
       channel_ = CreateChannelInternal("", channel);
diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc
index 23fff2e..1c6f44d 100644
--- a/test/cpp/performance/writes_per_rpc_test.cc
+++ b/test/cpp/performance/writes_per_rpc_test.cc
@@ -89,7 +89,7 @@
       const grpc_channel_args* server_args =
           grpc_server_get_channel_args(server_->c_server());
       grpc_transport* transport = grpc_create_chttp2_transport(
-          &exec_ctx, server_args, endpoints.server, 0 /* is_client */);
+          &exec_ctx, server_args, endpoints.server, false /* is_client */);
 
       grpc_pollset** pollsets;
       size_t num_pollsets = 0;
@@ -101,7 +101,8 @@
 
       grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport,
                                   nullptr, server_args);
-      grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+      grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr,
+                                          nullptr);
     }
 
     /* create channel */
@@ -111,12 +112,13 @@
       ApplyCommonChannelArguments(&args);
 
       grpc_channel_args c_args = args.c_channel_args();
-      grpc_transport* transport =
-          grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1);
+      grpc_transport* transport = grpc_create_chttp2_transport(
+          &exec_ctx, &c_args, endpoints.client, true);
       GPR_ASSERT(transport);
       grpc_channel* channel = grpc_channel_create(
           &exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
-      grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
+      grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr,
+                                          nullptr);
 
       channel_ = CreateChannelInternal("", channel);
     }
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 9f20b14..82a3f00 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -60,21 +60,20 @@
     SetupLoadTest(config, num_threads_);
   }
 
-  virtual ~SynchronousClient(){};
+  virtual ~SynchronousClient() {}
 
-  virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
+  virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
   virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
 
   void ThreadFunc(size_t thread_idx, Thread* t) override {
-    InitThreadFuncImpl(thread_idx);
+    if (!InitThreadFuncImpl(thread_idx)) {
+      return;
+    }
     for (;;) {
       // run the loop body
       HistogramEntry entry;
       const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
       t->UpdateHistogram(&entry);
-      if (!thread_still_ok) {
-        gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
-      }
       if (!thread_still_ok || ThreadCompleted()) {
         return;
       }
@@ -109,9 +108,6 @@
 
   size_t num_threads_;
   std::vector<SimpleResponse> responses_;
-
- private:
-  void DestroyMultithreading() override final { EndThreads(); }
 };
 
 class SynchronousUnaryClient final : public SynchronousClient {
@@ -122,7 +118,7 @@
   }
   ~SynchronousUnaryClient() {}
 
-  void InitThreadFuncImpl(size_t thread_idx) override {}
+  bool InitThreadFuncImpl(size_t thread_idx) override { return true; }
 
   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
     if (!WaitToIssue(thread_idx)) {
@@ -140,6 +136,9 @@
     entry->set_status(s.error_code());
     return true;
   }
+
+ private:
+  void DestroyMultithreading() override final { EndThreads(); }
 };
 
 template <class StreamType>
@@ -149,31 +148,30 @@
       : SynchronousClient(config),
         context_(num_threads_),
         stream_(num_threads_),
+        stream_mu_(num_threads_),
+        shutdown_(num_threads_),
         messages_per_stream_(config.messages_per_stream()),
         messages_issued_(num_threads_) {
     StartThreads(num_threads_);
   }
   virtual ~SynchronousStreamingClient() {
-    std::vector<std::thread> cleanup_threads;
-    for (size_t i = 0; i < num_threads_; i++) {
-      cleanup_threads.emplace_back([this, i]() {
-        auto stream = &stream_[i];
-        if (*stream) {
-          // forcibly cancel the streams, then finish
-          context_[i].TryCancel();
-          (*stream)->Finish().IgnoreError();
-          // don't log any error message on !ok since this was canceled
-        }
-      });
-    }
-    for (auto& th : cleanup_threads) {
-      th.join();
-    }
+    CleanupAllStreams([this](size_t thread_idx) {
+      // Don't log any kind of error since we may have canceled this
+      stream_[thread_idx]->Finish().IgnoreError();
+    });
   }
 
  protected:
   std::vector<grpc::ClientContext> context_;
   std::vector<std::unique_ptr<StreamType>> stream_;
+  // stream_mu_ is only needed when changing an element of stream_ or context_
+  std::vector<std::mutex> stream_mu_;
+  // use struct Bool rather than bool because vector<bool> is not concurrent
+  struct Bool {
+    bool val;
+    Bool() : val(false) {}
+  };
+  std::vector<Bool> shutdown_;
   const int messages_per_stream_;
   std::vector<int> messages_issued_;
 
@@ -182,12 +180,40 @@
     // don't set the value since the stream is failed and shouldn't be timed
     entry->set_status(s.error_code());
     if (!s.ok()) {
-      gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx,
-              s.error_message().c_str());
+      std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+      if (!shutdown_[thread_idx].val) {
+        gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
+                thread_idx, s.error_message().c_str());
+      }
     }
+    // Lock the stream_mu_ now because the client context could change
+    std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
     context_[thread_idx].~ClientContext();
     new (&context_[thread_idx]) ClientContext();
   }
+
+  void CleanupAllStreams(std::function<void(size_t)> cleaner) {
+    std::vector<std::thread> cleanup_threads;
+    for (size_t i = 0; i < num_threads_; i++) {
+      cleanup_threads.emplace_back([this, i, cleaner] {
+        std::lock_guard<std::mutex> l(stream_mu_[i]);
+        shutdown_[i].val = true;
+        if (stream_[i]) {
+          cleaner(i);
+        }
+      });
+    }
+    for (auto& th : cleanup_threads) {
+      th.join();
+    }
+  }
+
+ private:
+  void DestroyMultithreading() override final {
+    CleanupAllStreams(
+        [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
+    EndThreads();
+  }
 };
 
 class SynchronousStreamingPingPongClient final
@@ -197,24 +223,21 @@
   SynchronousStreamingPingPongClient(const ClientConfig& config)
       : SynchronousStreamingClient(config) {}
   ~SynchronousStreamingPingPongClient() {
-    std::vector<std::thread> cleanup_threads;
-    for (size_t i = 0; i < num_threads_; i++) {
-      cleanup_threads.emplace_back([this, i]() {
-        auto stream = &stream_[i];
-        if (*stream) {
-          (*stream)->WritesDone();
-        }
-      });
-    }
-    for (auto& th : cleanup_threads) {
-      th.join();
-    }
+    CleanupAllStreams(
+        [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
   }
 
-  void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+  bool InitThreadFuncImpl(size_t thread_idx) override {
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
-    stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+    std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+    if (!shutdown_[thread_idx].val) {
+      stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+    } else {
+      return false;
+    }
     messages_issued_[thread_idx] = 0;
+    return true;
   }
 
   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
@@ -239,7 +262,13 @@
     stream_[thread_idx]->WritesDone();
     FinishStream(entry, thread_idx);
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
-    stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+    std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+    if (!shutdown_[thread_idx].val) {
+      stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+    } else {
+      stream_[thread_idx].reset();
+      return false;
+    }
     messages_issued_[thread_idx] = 0;
     return true;
   }
@@ -251,25 +280,24 @@
   SynchronousStreamingFromClientClient(const ClientConfig& config)
       : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
   ~SynchronousStreamingFromClientClient() {
-    std::vector<std::thread> cleanup_threads;
-    for (size_t i = 0; i < num_threads_; i++) {
-      cleanup_threads.emplace_back([this, i]() {
-        auto stream = &stream_[i];
-        if (*stream) {
-          (*stream)->WritesDone();
-        }
-      });
-    }
-    for (auto& th : cleanup_threads) {
-      th.join();
-    }
+    CleanupAllStreams(
+        [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
   }
 
-  void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+  std::vector<double> last_issue_;
+
+  bool InitThreadFuncImpl(size_t thread_idx) override {
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
-    stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
-                                                    &responses_[thread_idx]);
+    std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+    if (!shutdown_[thread_idx].val) {
+      stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+                                                      &responses_[thread_idx]);
+    } else {
+      return false;
+    }
     last_issue_[thread_idx] = UsageTimer::Now();
+    return true;
   }
 
   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
@@ -287,13 +315,16 @@
     stream_[thread_idx]->WritesDone();
     FinishStream(entry, thread_idx);
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
-    stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
-                                                    &responses_[thread_idx]);
+    std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+    if (!shutdown_[thread_idx].val) {
+      stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+                                                      &responses_[thread_idx]);
+    } else {
+      stream_[thread_idx].reset();
+      return false;
+    }
     return true;
   }
-
- private:
-  std::vector<double> last_issue_;
 };
 
 class SynchronousStreamingFromServerClient final
@@ -301,12 +332,24 @@
  public:
   SynchronousStreamingFromServerClient(const ClientConfig& config)
       : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
-  void InitThreadFuncImpl(size_t thread_idx) override {
+  ~SynchronousStreamingFromServerClient() {}
+
+ private:
+  std::vector<double> last_recv_;
+
+  bool InitThreadFuncImpl(size_t thread_idx) override {
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
-    stream_[thread_idx] =
-        stub->StreamingFromServer(&context_[thread_idx], request_);
+    std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+    if (!shutdown_[thread_idx].val) {
+      stream_[thread_idx] =
+          stub->StreamingFromServer(&context_[thread_idx], request_);
+    } else {
+      return false;
+    }
     last_recv_[thread_idx] = UsageTimer::Now();
+    return true;
   }
+
   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
     GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
     if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
@@ -317,13 +360,16 @@
     }
     FinishStream(entry, thread_idx);
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
-    stream_[thread_idx] =
-        stub->StreamingFromServer(&context_[thread_idx], request_);
+    std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+    if (!shutdown_[thread_idx].val) {
+      stream_[thread_idx] =
+          stub->StreamingFromServer(&context_[thread_idx], request_);
+    } else {
+      stream_[thread_idx].reset();
+      return false;
+    }
     return true;
   }
-
- private:
-  std::vector<double> last_recv_;
 };
 
 class SynchronousStreamingBothWaysClient final
@@ -333,24 +379,22 @@
   SynchronousStreamingBothWaysClient(const ClientConfig& config)
       : SynchronousStreamingClient(config) {}
   ~SynchronousStreamingBothWaysClient() {
-    std::vector<std::thread> cleanup_threads;
-    for (size_t i = 0; i < num_threads_; i++) {
-      cleanup_threads.emplace_back([this, i]() {
-        auto stream = &stream_[i];
-        if (*stream) {
-          (*stream)->WritesDone();
-        }
-      });
-    }
-    for (auto& th : cleanup_threads) {
-      th.join();
-    }
+    CleanupAllStreams(
+        [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
   }
 
-  void InitThreadFuncImpl(size_t thread_idx) override {
+ private:
+  bool InitThreadFuncImpl(size_t thread_idx) override {
     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
-    stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+    std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
+    if (!shutdown_[thread_idx].val) {
+      stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+    } else {
+      return false;
+    }
+    return true;
   }
+
   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
     // TODO (vjpai): Do this
     return true;
diff --git a/test/distrib/cpp/run_distrib_test_cmake.bat b/test/distrib/cpp/run_distrib_test_cmake.bat
new file mode 100644
index 0000000..047846b
--- /dev/null
+++ b/test/distrib/cpp/run_distrib_test_cmake.bat
@@ -0,0 +1,75 @@
+@rem Copyright 2016 gRPC authors.
+@rem
+@rem Licensed under the Apache License, Version 2.0 (the "License");
+@rem you may not use this file except in compliance with the License.
+@rem You may obtain a copy of the License at
+@rem
+@rem     http://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+
+@rem enter this directory
+cd /d %~dp0\..\..\..
+
+@rem TODO(jtattermusch): Kokoro has pre-installed protoc.exe in C:\Program Files\ProtoC and that directory
+@rem is on PATH. To avoid picking up the older version protoc.exe, we change the path to something non-existent.
+set PATH=%PATH:ProtoC=DontPickupProtoC%
+
+@rem Install into ./testinstall, but use absolute path and foward slashes
+set INSTALL_DIR=%cd:\=/%/testinstall
+
+@rem Download OpenSSL-Win32 originally installed from https://slproweb.com/products/Win32OpenSSL.html
+powershell -Command "(New-Object Net.WebClient).DownloadFile('https://storage.googleapis.com/grpc-testing.appspot.com/OpenSSL-Win32-1_1_0g.zip', 'OpenSSL-Win32.zip')"
+powershell -Command "Add-Type -Assembly 'System.IO.Compression.FileSystem'; [System.IO.Compression.ZipFile]::ExtractToDirectory('OpenSSL-Win32.zip', '.');"
+
+@rem set absolute path to OpenSSL with forward slashes
+set OPENSSL_DIR=%cd:\=/%/OpenSSL-Win32
+
+cd third_party/zlib
+mkdir cmake
+cd cmake
+cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% ..
+cmake --build . --config Release --target install || goto :error
+cd ../../..
+
+cd third_party/protobuf/cmake
+mkdir build
+cd build
+cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% -Dprotobuf_MSVC_STATIC_RUNTIME=OFF -Dprotobuf_BUILD_TESTS=OFF ..
+cmake --build . --config Release --target install || goto :error
+cd ../../../..
+
+cd third_party/cares/cares
+mkdir cmake
+cd cmake
+cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% ..
+cmake --build . --config Release --target install || goto :error
+cd ../../../..
+
+@rem OpenSSL-Win32 and OpenSSL-Win64 can be downloaded from https://slproweb.com/products/Win32OpenSSL.html
+cd cmake
+mkdir build
+cd build
+cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% -DOPENSSL_ROOT_DIR=%OPENSSL_DIR% -DOPENSSL_INCLUDE_DIR=%OPENSSL_DIR%/include -DZLIB_LIBRARY=%INSTALL_DIR%/lib/zlibstatic.lib -DZLIB_INCLUDE_DIR=%INSTALL_DIR%/include -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DgRPC_PROTOBUF_PROVIDER=package -DgRPC_ZLIB_PROVIDER=package -DgRPC_CARES_PROVIDER=package -DgRPC_SSL_PROVIDER=package -DCMAKE_BUILD_TYPE=Release ../.. || goto :error
+cmake --build . --config Release --target install || goto :error
+cd ../..
+
+# Build helloworld example using cmake
+cd examples/cpp/helloworld
+mkdir cmake
+cd cmake
+mkdir build
+cd build
+cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% ../.. || goto :error
+cmake --build . --config Release || goto :error
+cd ../../../../..
+
+goto :EOF
+
+:error
+echo Failed!
+exit /b %errorlevel%
diff --git a/test/distrib/cpp/run_distrib_test_cmake.sh b/test/distrib/cpp/run_distrib_test_cmake.sh
index ead8cc1..a9c081c 100755
--- a/test/distrib/cpp/run_distrib_test_cmake.sh
+++ b/test/distrib/cpp/run_distrib_test_cmake.sh
@@ -19,7 +19,6 @@
 
 echo "deb http://ftp.debian.org/debian jessie-backports main" | tee /etc/apt/sources.list.d/jessie-backports.list
 apt-get update
-#apt-get install -t jessie-backports -y libc-ares-dev  # we need specifically version 1.12
 apt-get install -t jessie-backports -y libssl-dev
 
 # Install c-ares
diff --git a/tools/run_tests/artifacts/distribtest_targets.py b/tools/run_tests/artifacts/distribtest_targets.py
index 9959651..7ba0e0e 100644
--- a/tools/run_tests/artifacts/distribtest_targets.py
+++ b/tools/run_tests/artifacts/distribtest_targets.py
@@ -49,7 +49,8 @@
 
 def create_jobspec(name, cmdline, environ=None, shell=False,
                    flake_retries=0, timeout_retries=0,
-                   use_workspace=False):
+                   use_workspace=False,
+                   timeout_seconds=10*60):
   """Creates jobspec."""
   environ = environ.copy()
   if use_workspace:
@@ -60,7 +61,7 @@
           cmdline=cmdline,
           environ=environ,
           shortname='distribtest.%s' % (name),
-          timeout_seconds=10*60,
+          timeout_seconds=timeout_seconds,
           flake_retries=flake_retries,
           timeout_retries=timeout_retries,
           shell=shell)
@@ -214,7 +215,10 @@
   """Tests Cpp make intall by building examples."""
 
   def __init__(self, platform, arch, docker_suffix=None, testcase=None):
-    self.name = 'cpp_%s_%s_%s_%s' % (platform, arch, docker_suffix, testcase)
+    if platform == 'linux':
+      self.name = 'cpp_%s_%s_%s_%s' % (platform, arch, docker_suffix, testcase)
+    else:
+      self.name = 'cpp_%s_%s_%s' % (platform, arch, testcase)
     self.platform = platform
     self.arch = arch
     self.docker_suffix = docker_suffix
@@ -231,6 +235,12 @@
                                        self.docker_suffix,
                                        self.arch),
                                    'test/distrib/cpp/run_distrib_test_%s.sh' % self.testcase)
+    elif self.platform == 'windows':
+      return create_jobspec(self.name,
+                            ['test\\distrib\\cpp\\run_distrib_test_%s.bat' % self.testcase],
+                            environ={},
+                            timeout_seconds=30*60,
+                            use_workspace=True)
     else:
       raise Exception("Not supported yet.")
 
@@ -242,6 +252,7 @@
   """Gets list of supported targets"""
   return [CppDistribTest('linux', 'x64', 'jessie', 'routeguide'),
           CppDistribTest('linux', 'x64', 'jessie', 'cmake'),
+          CppDistribTest('windows', 'x86', testcase='cmake'),
           CSharpDistribTest('linux', 'x64', 'wheezy'),
           CSharpDistribTest('linux', 'x64', 'jessie'),
           CSharpDistribTest('linux', 'x86', 'jessie'),
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index ae7ddb2..3958d8e 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -2896,6 +2896,23 @@
       "gpr", 
       "gpr_test_util", 
       "grpc", 
+      "grpc_test_util"
+    ], 
+    "headers": [], 
+    "is_filegroup": false, 
+    "language": "c++", 
+    "name": "chttp2_settings_timeout_test", 
+    "src": [
+      "test/core/transport/chttp2/settings_timeout_test.cc"
+    ], 
+    "third_party": false, 
+    "type": "target"
+  }, 
+  {
+    "deps": [
+      "gpr", 
+      "gpr_test_util", 
+      "grpc", 
       "grpc++", 
       "grpc++_test_util", 
       "grpc_cli_libs", 
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index ce698cb..6ffc7f4 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -3376,6 +3376,30 @@
     "flaky": false, 
     "gtest": true, 
     "language": "c++", 
+    "name": "chttp2_settings_timeout_test", 
+    "platforms": [
+      "linux", 
+      "mac", 
+      "posix", 
+      "windows"
+    ], 
+    "uses_polling": true
+  }, 
+  {
+    "args": [], 
+    "benchmark": false, 
+    "ci_platforms": [
+      "linux", 
+      "mac", 
+      "posix", 
+      "windows"
+    ], 
+    "cpu_cost": 1.0, 
+    "exclude_configs": [], 
+    "exclude_iomgrs": [], 
+    "flaky": false, 
+    "gtest": true, 
+    "language": "c++", 
     "name": "cli_call_test", 
     "platforms": [
       "linux", 
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 766c1c0..471f5d9 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -1231,7 +1231,7 @@
       if test.flaky: flaky_tests.add(test.name)
       if test.cpu > 0: shortname_to_cpu[test.name] = test.cpu
   except:
-    print("Unexpected error getting flaky tests:", sys.exc_info()[0])
+    print("Unexpected error getting flaky tests: %s" % traceback.format_exc())
 
 if args.force_default_poller:
   _POLLING_STRATEGIES = {}