Merge pull request #15994 from yashykt/pingdoc

GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS only makes sens…
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 520431e..04f7a2c 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -126,9 +126,9 @@
   /* the following properties are guarded by a mutex since APIs require them
      to be instantaneously available */
   gpr_mu info_mu;
-  char* info_lb_policy_name;
+  grpc_core::UniquePtr<char> info_lb_policy_name;
   /** service config in JSON form */
-  char* info_service_config_json;
+  grpc_core::UniquePtr<char> info_service_config_json;
 } channel_data;
 
 typedef struct {
@@ -284,6 +284,78 @@
   }
 }
 
+// Invoked from the resolver NextLocked() callback when the resolver
+// is shutting down.
+static void on_resolver_shutdown_locked(channel_data* chand,
+                                        grpc_error* error) {
+  if (grpc_client_channel_trace.enabled()) {
+    gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
+  }
+  if (chand->lb_policy != nullptr) {
+    if (grpc_client_channel_trace.enabled()) {
+      gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
+              chand->lb_policy.get());
+    }
+    grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
+                                     chand->interested_parties);
+    chand->lb_policy.reset();
+  }
+  if (chand->resolver != nullptr) {
+    // This should never happen; it can only be triggered by a resolver
+    // implementation spotaneously deciding to report shutdown without
+    // being orphaned.  This code is included just to be defensive.
+    if (grpc_client_channel_trace.enabled()) {
+      gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p",
+              chand, chand->resolver.get());
+    }
+    chand->resolver.reset();
+    set_channel_connectivity_state_locked(
+        chand, GRPC_CHANNEL_SHUTDOWN,
+        GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+            "Resolver spontaneous shutdown", &error, 1),
+        "resolver_spontaneous_shutdown");
+  }
+  grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
+                             GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+                                 "Channel disconnected", &error, 1));
+  GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
+  GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
+  grpc_channel_args_destroy(chand->resolver_result);
+  chand->resolver_result = nullptr;
+  GRPC_ERROR_UNREF(error);
+}
+
+// Returns the LB policy name from the resolver result.
+static grpc_core::UniquePtr<char>
+get_lb_policy_name_from_resolver_result_locked(channel_data* chand) {
+  // Find LB policy name in channel args.
+  const grpc_arg* channel_arg =
+      grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
+  const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
+  // Special case: If at least one balancer address is present, we use
+  // the grpclb policy, regardless of what the resolver actually specified.
+  channel_arg =
+      grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
+  if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
+    grpc_lb_addresses* addresses =
+        static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
+    if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
+      if (lb_policy_name != nullptr &&
+          gpr_stricmp(lb_policy_name, "grpclb") != 0) {
+        gpr_log(GPR_INFO,
+                "resolver requested LB policy %s but provided at least one "
+                "balancer address -- forcing use of grpclb LB policy",
+                lb_policy_name);
+      }
+      lb_policy_name = "grpclb";
+    }
+  }
+  // Use pick_first if nothing was specified and we didn't select grpclb
+  // above.
+  if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
+  return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name));
+}
+
 static void request_reresolution_locked(void* arg, grpc_error* error) {
   reresolution_request_args* args =
       static_cast<reresolution_request_args*>(arg);
@@ -304,234 +376,183 @@
   chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
 }
 
-// TODO(roth): The logic in this function is very hard to follow.  We
-// should refactor this so that it's easier to understand, perhaps as
-// part of changing the resolver API to more clearly differentiate
-// between transient failures and shutdown.
-static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
-  channel_data* chand = static_cast<channel_data*>(arg);
-  if (grpc_client_channel_trace.enabled()) {
-    gpr_log(GPR_INFO,
-            "chand=%p: got resolver result: resolver_result=%p error=%s", chand,
-            chand->resolver_result, grpc_error_string(error));
-  }
-  // Extract the following fields from the resolver result, if non-nullptr.
-  bool lb_policy_updated = false;
-  bool lb_policy_created = false;
-  char* lb_policy_name_dup = nullptr;
-  bool lb_policy_name_changed = false;
-  grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy;
-  char* service_config_json = nullptr;
-  grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
-  grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
-  if (chand->resolver_result != nullptr) {
-    if (chand->resolver != nullptr) {
-      // Find LB policy name.
-      const grpc_arg* channel_arg = grpc_channel_args_find(
-          chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
-      const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
-      // Special case: If at least one balancer address is present, we use
-      // the grpclb policy, regardless of what the resolver actually specified.
-      channel_arg =
-          grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
-      if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
-        grpc_lb_addresses* addresses =
-            static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
-        bool found_balancer_address = false;
-        for (size_t i = 0; i < addresses->num_addresses; ++i) {
-          if (addresses->addresses[i].is_balancer) {
-            found_balancer_address = true;
-            break;
-          }
-        }
-        if (found_balancer_address) {
-          if (lb_policy_name != nullptr &&
-              strcmp(lb_policy_name, "grpclb") != 0) {
-            gpr_log(GPR_INFO,
-                    "resolver requested LB policy %s but provided at least one "
-                    "balancer address -- forcing use of grpclb LB policy",
-                    lb_policy_name);
-          }
-          lb_policy_name = "grpclb";
-        }
-      }
-      // Use pick_first if nothing was specified and we didn't select grpclb
-      // above.
-      if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
-      // Check to see if we're already using the right LB policy.
-      // Note: It's safe to use chand->info_lb_policy_name here without
-      // taking a lock on chand->info_mu, because this function is the
-      // only thing that modifies its value, and it can only be invoked
-      // once at any given time.
-      lb_policy_name_changed =
-          chand->info_lb_policy_name == nullptr ||
-          gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
-      if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
-        // Continue using the same LB policy.  Update with new addresses.
-        lb_policy_updated = true;
-        chand->lb_policy->UpdateLocked(*chand->resolver_result);
-      } else {
-        // Instantiate new LB policy.
-        grpc_core::LoadBalancingPolicy::Args lb_policy_args;
-        lb_policy_args.combiner = chand->combiner;
-        lb_policy_args.client_channel_factory = chand->client_channel_factory;
-        lb_policy_args.args = chand->resolver_result;
-        new_lb_policy =
-            grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
-                lb_policy_name, lb_policy_args);
-        if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
-          gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
-                  lb_policy_name);
-        } else {
-          lb_policy_created = true;
-          reresolution_request_args* args =
-              static_cast<reresolution_request_args*>(
-                  gpr_zalloc(sizeof(*args)));
-          args->chand = chand;
-          args->lb_policy = new_lb_policy.get();
-          GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
-                            grpc_combiner_scheduler(chand->combiner));
-          GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
-          new_lb_policy->SetReresolutionClosureLocked(&args->closure);
-        }
-      }
-      // Before we clean up, save a copy of lb_policy_name, since it might
-      // be pointing to data inside chand->resolver_result.
-      // The copy will be saved in chand->lb_policy_name below.
-      lb_policy_name_dup = gpr_strdup(lb_policy_name);
-      // Find service config.
-      channel_arg = grpc_channel_args_find(chand->resolver_result,
-                                           GRPC_ARG_SERVICE_CONFIG);
-      service_config_json =
-          gpr_strdup(grpc_channel_arg_get_string(channel_arg));
-      if (service_config_json != nullptr) {
-        grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
-            grpc_core::ServiceConfig::Create(service_config_json);
-        if (service_config != nullptr) {
-          if (chand->enable_retries) {
-            channel_arg = grpc_channel_args_find(chand->resolver_result,
-                                                 GRPC_ARG_SERVER_URI);
-            const char* server_uri = grpc_channel_arg_get_string(channel_arg);
-            GPR_ASSERT(server_uri != nullptr);
-            grpc_uri* uri = grpc_uri_parse(server_uri, true);
-            GPR_ASSERT(uri->path[0] != '\0');
-            service_config_parsing_state parsing_state;
-            memset(&parsing_state, 0, sizeof(parsing_state));
-            parsing_state.server_name =
-                uri->path[0] == '/' ? uri->path + 1 : uri->path;
-            service_config->ParseGlobalParams(parse_retry_throttle_params,
-                                              &parsing_state);
-            grpc_uri_destroy(uri);
-            retry_throttle_data = std::move(parsing_state.retry_throttle_data);
-          }
-          method_params_table = service_config->CreateMethodConfigTable(
-              ClientChannelMethodParams::CreateFromJson);
-        }
-      }
+// Creates a new LB policy, replacing any previous one.
+// If the new policy is created successfully, sets *connectivity_state and
+// *connectivity_error to its initial connectivity state; otherwise,
+// leaves them unchanged.
+static void create_new_lb_policy_locked(
+    channel_data* chand, char* lb_policy_name,
+    grpc_connectivity_state* connectivity_state,
+    grpc_error** connectivity_error) {
+  grpc_core::LoadBalancingPolicy::Args lb_policy_args;
+  lb_policy_args.combiner = chand->combiner;
+  lb_policy_args.client_channel_factory = chand->client_channel_factory;
+  lb_policy_args.args = chand->resolver_result;
+  grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy =
+      grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
+          lb_policy_name, lb_policy_args);
+  if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
+    gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
+  } else {
+    if (grpc_client_channel_trace.enabled()) {
+      gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
+              lb_policy_name, new_lb_policy.get());
     }
-  }
-  if (grpc_client_channel_trace.enabled()) {
-    gpr_log(GPR_INFO,
-            "chand=%p: resolver result: lb_policy_name=\"%s\"%s, "
-            "service_config=\"%s\"",
-            chand, lb_policy_name_dup,
-            lb_policy_name_changed ? " (changed)" : "", service_config_json);
-  }
-  // Now swap out fields in chand.  Note that the new values may still
-  // be nullptr if (e.g.) the resolver failed to return results or the
-  // results did not contain the necessary data.
-  //
-  // First, swap out the data used by cc_get_channel_info().
-  gpr_mu_lock(&chand->info_mu);
-  if (lb_policy_name_dup != nullptr) {
-    gpr_free(chand->info_lb_policy_name);
-    chand->info_lb_policy_name = lb_policy_name_dup;
-  }
-  if (service_config_json != nullptr) {
-    gpr_free(chand->info_service_config_json);
-    chand->info_service_config_json = service_config_json;
-  }
-  gpr_mu_unlock(&chand->info_mu);
-  // Swap out the retry throttle data.
-  chand->retry_throttle_data = std::move(retry_throttle_data);
-  // Swap out the method params table.
-  chand->method_params_table = std::move(method_params_table);
-  // If we have a new LB policy or are shutting down (in which case
-  // new_lb_policy will be nullptr), swap out the LB policy, unreffing the
-  // old one and removing its fds from chand->interested_parties.
-  // Note that we do NOT do this if either (a) we updated the existing
-  // LB policy above or (b) we failed to create the new LB policy (in
-  // which case we want to continue using the most recent one we had).
-  if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
-      chand->resolver == nullptr) {
+    // Swap out the LB policy and update the fds in
+    // chand->interested_parties.
     if (chand->lb_policy != nullptr) {
       if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO, "chand=%p: unreffing lb_policy=%p", chand,
+        gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
                 chand->lb_policy.get());
       }
       grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
                                        chand->interested_parties);
       chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
-      chand->lb_policy.reset();
     }
     chand->lb_policy = std::move(new_lb_policy);
+    grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
+                                     chand->interested_parties);
+    // Set up re-resolution callback.
+    reresolution_request_args* args =
+        static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args)));
+    args->chand = chand;
+    args->lb_policy = chand->lb_policy.get();
+    GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
+                      grpc_combiner_scheduler(chand->combiner));
+    GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
+    chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
+    // Get the new LB policy's initial connectivity state and start a
+    // connectivity watch.
+    GRPC_ERROR_UNREF(*connectivity_error);
+    *connectivity_state =
+        chand->lb_policy->CheckConnectivityLocked(connectivity_error);
+    if (chand->exit_idle_when_lb_policy_arrives) {
+      chand->lb_policy->ExitIdleLocked();
+      chand->exit_idle_when_lb_policy_arrives = false;
+    }
+    watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state);
   }
-  // Now that we've swapped out the relevant fields of chand, check for
-  // error or shutdown.
-  if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
+}
+
+// Returns the service config (as a JSON string) from the resolver result.
+// Also updates state in chand.
+static grpc_core::UniquePtr<char>
+get_service_config_from_resolver_result_locked(channel_data* chand) {
+  const grpc_arg* channel_arg =
+      grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
+  const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
+  if (service_config_json != nullptr) {
     if (grpc_client_channel_trace.enabled()) {
-      gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
+      gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
+              chand, service_config_json);
     }
-    if (chand->resolver != nullptr) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO, "chand=%p: shutting down resolver", chand);
+    grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
+        grpc_core::ServiceConfig::Create(service_config_json);
+    if (service_config != nullptr) {
+      if (chand->enable_retries) {
+        channel_arg =
+            grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
+        const char* server_uri = grpc_channel_arg_get_string(channel_arg);
+        GPR_ASSERT(server_uri != nullptr);
+        grpc_uri* uri = grpc_uri_parse(server_uri, true);
+        GPR_ASSERT(uri->path[0] != '\0');
+        service_config_parsing_state parsing_state;
+        memset(&parsing_state, 0, sizeof(parsing_state));
+        parsing_state.server_name =
+            uri->path[0] == '/' ? uri->path + 1 : uri->path;
+        service_config->ParseGlobalParams(parse_retry_throttle_params,
+                                          &parsing_state);
+        grpc_uri_destroy(uri);
+        chand->retry_throttle_data =
+            std::move(parsing_state.retry_throttle_data);
       }
-      chand->resolver.reset();
+      chand->method_params_table = service_config->CreateMethodConfigTable(
+          ClientChannelMethodParams::CreateFromJson);
     }
-    set_channel_connectivity_state_locked(
-        chand, GRPC_CHANNEL_SHUTDOWN,
-        GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
-            "Got resolver result after disconnection", &error, 1),
-        "resolver_gone");
-    grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
-                               GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
-                                   "Channel disconnected", &error, 1));
-    GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
-    GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
-    grpc_channel_args_destroy(chand->resolver_result);
-    chand->resolver_result = nullptr;
-  } else {  // Not shutting down.
-    grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
-    grpc_error* state_error =
-        GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
-    if (lb_policy_created) {
-      if (grpc_client_channel_trace.enabled()) {
-        gpr_log(GPR_INFO, "chand=%p: initializing new LB policy", chand);
-      }
-      GRPC_ERROR_UNREF(state_error);
-      state = chand->lb_policy->CheckConnectivityLocked(&state_error);
-      grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
-                                       chand->interested_parties);
-      GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
-      if (chand->exit_idle_when_lb_policy_arrives) {
-        chand->lb_policy->ExitIdleLocked();
-        chand->exit_idle_when_lb_policy_arrives = false;
-      }
-      watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
-    } else if (chand->resolver_result == nullptr) {
-      // Transient failure.
-      GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
-    }
-    if (!lb_policy_updated) {
-      set_channel_connectivity_state_locked(
-          chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
-    }
-    grpc_channel_args_destroy(chand->resolver_result);
-    chand->resolver_result = nullptr;
-    chand->resolver->NextLocked(&chand->resolver_result,
-                                &chand->on_resolver_result_changed);
-    GRPC_ERROR_UNREF(state_error);
   }
+  return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
+}
+
+// Callback invoked when a resolver result is available.
+static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
+  channel_data* chand = static_cast<channel_data*>(arg);
+  if (grpc_client_channel_trace.enabled()) {
+    const char* disposition =
+        chand->resolver_result != nullptr
+            ? ""
+            : (error == GRPC_ERROR_NONE ? " (transient error)"
+                                        : " (resolver shutdown)");
+    gpr_log(GPR_INFO,
+            "chand=%p: got resolver result: resolver_result=%p error=%s%s",
+            chand, chand->resolver_result, grpc_error_string(error),
+            disposition);
+  }
+  // Handle shutdown.
+  if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
+    on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error));
+    return;
+  }
+  // Data used to set the channel's connectivity state.
+  bool set_connectivity_state = true;
+  grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+  grpc_error* connectivity_error =
+      GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
+  // chand->resolver_result will be null in the case of a transient
+  // resolution error.  In that case, we don't have any new result to
+  // process, which means that we keep using the previous result (if any).
+  if (chand->resolver_result == nullptr) {
+    if (grpc_client_channel_trace.enabled()) {
+      gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand);
+    }
+  } else {
+    grpc_core::UniquePtr<char> lb_policy_name =
+        get_lb_policy_name_from_resolver_result_locked(chand);
+    // Check to see if we're already using the right LB policy.
+    // Note: It's safe to use chand->info_lb_policy_name here without
+    // taking a lock on chand->info_mu, because this function is the
+    // only thing that modifies its value, and it can only be invoked
+    // once at any given time.
+    bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr ||
+                                  gpr_stricmp(chand->info_lb_policy_name.get(),
+                                              lb_policy_name.get()) != 0;
+    if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
+      // Continue using the same LB policy.  Update with new addresses.
+      if (grpc_client_channel_trace.enabled()) {
+        gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
+                chand, lb_policy_name.get(), chand->lb_policy.get());
+      }
+      chand->lb_policy->UpdateLocked(*chand->resolver_result);
+      // No need to set the channel's connectivity state; the existing
+      // watch on the LB policy will take care of that.
+      set_connectivity_state = false;
+    } else {
+      // Instantiate new LB policy.
+      create_new_lb_policy_locked(chand, lb_policy_name.get(),
+                                  &connectivity_state, &connectivity_error);
+    }
+    // Find service config.
+    grpc_core::UniquePtr<char> service_config_json =
+        get_service_config_from_resolver_result_locked(chand);
+    // Swap out the data used by cc_get_channel_info().
+    gpr_mu_lock(&chand->info_mu);
+    chand->info_lb_policy_name = std::move(lb_policy_name);
+    chand->info_service_config_json = std::move(service_config_json);
+    gpr_mu_unlock(&chand->info_mu);
+    // Clean up.
+    grpc_channel_args_destroy(chand->resolver_result);
+    chand->resolver_result = nullptr;
+  }
+  // Set the channel's connectivity state if needed.
+  if (set_connectivity_state) {
+    set_channel_connectivity_state_locked(
+        chand, connectivity_state, connectivity_error, "resolver_result");
+  } else {
+    GRPC_ERROR_UNREF(connectivity_error);
+  }
+  // Invoke closures that were waiting for results and renew the watch.
+  GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
+  chand->resolver->NextLocked(&chand->resolver_result,
+                              &chand->on_resolver_result_changed);
 }
 
 static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
@@ -611,15 +632,11 @@
   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   gpr_mu_lock(&chand->info_mu);
   if (info->lb_policy_name != nullptr) {
-    *info->lb_policy_name = chand->info_lb_policy_name == nullptr
-                                ? nullptr
-                                : gpr_strdup(chand->info_lb_policy_name);
+    *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
   }
   if (info->service_config_json != nullptr) {
     *info->service_config_json =
-        chand->info_service_config_json == nullptr
-            ? nullptr
-            : gpr_strdup(chand->info_service_config_json);
+        gpr_strdup(chand->info_service_config_json.get());
   }
   gpr_mu_unlock(&chand->info_mu);
 }
@@ -699,19 +716,15 @@
   return GRPC_ERROR_NONE;
 }
 
-static void shutdown_resolver_locked(void* arg, grpc_error* error) {
-  grpc_core::Resolver* resolver = static_cast<grpc_core::Resolver*>(arg);
-  resolver->Orphan();
-}
-
 /* Destructor for channel_data */
 static void cc_destroy_channel_elem(grpc_channel_element* elem) {
   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
   if (chand->resolver != nullptr) {
-    GRPC_CLOSURE_SCHED(
-        GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver.release(),
-                            grpc_combiner_scheduler(chand->combiner)),
-        GRPC_ERROR_NONE);
+    // The only way we can get here is if we never started resolving,
+    // because we take a ref to the channel stack when we start
+    // resolving and do not release it until the resolver callback is
+    // invoked after the resolver shuts down.
+    chand->resolver.reset();
   }
   if (chand->client_channel_factory != nullptr) {
     grpc_client_channel_factory_unref(chand->client_channel_factory);
@@ -721,8 +734,10 @@
                                      chand->interested_parties);
     chand->lb_policy.reset();
   }
-  gpr_free(chand->info_lb_policy_name);
-  gpr_free(chand->info_service_config_json);
+  // TODO(roth): Once we convert the filter API to C++, there will no
+  // longer be any need to explicitly reset these smart pointer data members.
+  chand->info_lb_policy_name.reset();
+  chand->info_service_config_json.reset();
   chand->retry_throttle_data.reset();
   chand->method_params_table.reset();
   grpc_client_channel_stop_backup_polling(chand->interested_parties);
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index b177385..42e8e88 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -354,11 +354,11 @@
     if (DoPickLocked(pick)) return true;
   }
   /* no pick currently available. Save for later in list of pending picks */
+  pick->next = pending_picks_;
+  pending_picks_ = pick;
   if (!started_picking_) {
     StartPickingLocked();
   }
-  pick->next = pending_picks_;
-  pending_picks_ = pick;
   return false;
 }
 
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.cc b/src/core/ext/filters/client_channel/lb_policy_factory.cc
index 7c8cba5..5c6363d 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.cc
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.cc
@@ -153,3 +153,11 @@
     return nullptr;
   return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p);
 }
+
+bool grpc_lb_addresses_contains_balancer_address(
+    const grpc_lb_addresses& addresses) {
+  for (size_t i = 0; i < addresses.num_addresses; ++i) {
+    if (addresses.addresses[i].is_balancer) return true;
+  }
+  return false;
+}
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index 6440258..c07792d 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -101,6 +101,10 @@
 grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
     const grpc_channel_args* channel_args);
 
+// Returns true if addresses contains at least one balancer address.
+bool grpc_lb_addresses_contains_balancer_address(
+    const grpc_lb_addresses& addresses);
+
 //
 // LB policy factory
 //
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 0d6b72c..bc6fa0d 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -2663,6 +2663,9 @@
 
 static void start_keepalive_ping_locked(void* arg, grpc_error* error) {
   grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg);
+  if (error != GRPC_ERROR_NONE) {
+    return;
+  }
   GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog");
   grpc_timer_init(&t->keepalive_watchdog_timer,
                   grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout,
diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc
index 31d66e8..023ede5 100644
--- a/src/core/lib/channel/channelz_registry.cc
+++ b/src/core/lib/channel/channelz_registry.cc
@@ -26,27 +26,14 @@
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 
+#include <cstring>
+
 namespace grpc_core {
 namespace {
 
 // singleton instance of the registry.
 ChannelzRegistry* g_channelz_registry = nullptr;
 
-// avl vtable for uuid (intptr_t) -> channelz_obj (void*)
-// this table is only looking, it does not own anything.
-void destroy_intptr(void* not_used, void* user_data) {}
-void* copy_intptr(void* key, void* user_data) { return key; }
-long compare_intptr(void* key1, void* key2, void* user_data) {
-  return GPR_ICMP(key1, key2);
-}
-
-void destroy_channelz_obj(void* channelz_obj, void* user_data) {}
-void* copy_channelz_obj(void* channelz_obj, void* user_data) {
-  return channelz_obj;
-}
-const grpc_avl_vtable avl_vtable = {destroy_intptr, copy_intptr, compare_intptr,
-                                    destroy_channelz_obj, copy_channelz_obj};
-
 }  // anonymous namespace
 
 void ChannelzRegistry::Init() { g_channelz_registry = New<ChannelzRegistry>(); }
@@ -58,19 +45,15 @@
   return g_channelz_registry;
 }
 
-ChannelzRegistry::ChannelzRegistry() : uuid_(1) {
-  gpr_mu_init(&mu_);
-  avl_ = grpc_avl_create(&avl_vtable);
-}
+ChannelzRegistry::ChannelzRegistry() { gpr_mu_init(&mu_); }
 
-ChannelzRegistry::~ChannelzRegistry() {
-  grpc_avl_unref(avl_, nullptr);
-  gpr_mu_destroy(&mu_);
-}
+ChannelzRegistry::~ChannelzRegistry() { gpr_mu_destroy(&mu_); }
 
 void ChannelzRegistry::InternalUnregister(intptr_t uuid) {
+  GPR_ASSERT(uuid >= 1);
   gpr_mu_lock(&mu_);
-  avl_ = grpc_avl_remove(avl_, (void*)uuid, nullptr);
+  GPR_ASSERT(static_cast<size_t>(uuid) <= entities_.size());
+  entities_[uuid - 1] = nullptr;
   gpr_mu_unlock(&mu_);
 }
 
diff --git a/src/core/lib/channel/channelz_registry.h b/src/core/lib/channel/channelz_registry.h
index 4de7d47..a5a187a 100644
--- a/src/core/lib/channel/channelz_registry.h
+++ b/src/core/lib/channel/channelz_registry.h
@@ -21,8 +21,8 @@
 
 #include <grpc/impl/codegen/port_platform.h>
 
-#include "src/core/lib/avl/avl.h"
 #include "src/core/lib/channel/channel_trace.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
 
 #include <stdint.h>
 
@@ -67,11 +67,11 @@
   // globally registers a channelz Object. Returns its unique uuid
   template <typename Object>
   intptr_t InternalRegister(Object* object) {
-    intptr_t prior = gpr_atm_no_barrier_fetch_add(&uuid_, 1);
     gpr_mu_lock(&mu_);
-    avl_ = grpc_avl_add(avl_, (void*)prior, object, nullptr);
+    entities_.push_back(static_cast<void*>(object));
+    intptr_t uuid = entities_.size();
     gpr_mu_unlock(&mu_);
-    return prior;
+    return uuid;
   }
 
   // globally unregisters the object that is associated to uuid.
@@ -82,16 +82,20 @@
   template <typename Object>
   Object* InternalGet(intptr_t uuid) {
     gpr_mu_lock(&mu_);
-    Object* ret =
-        static_cast<Object*>(grpc_avl_get(avl_, (void*)uuid, nullptr));
+    if (uuid < 1 || uuid > static_cast<intptr_t>(entities_.size())) {
+      gpr_mu_unlock(&mu_);
+      return nullptr;
+    }
+    Object* ret = static_cast<Object*>(entities_[uuid - 1]);
     gpr_mu_unlock(&mu_);
     return ret;
   }
 
   // private members
+
+  // protects entities_ and uuid_
   gpr_mu mu_;
-  grpc_avl avl_;
-  gpr_atm uuid_;
+  InlinedVector<void*, 20> entities_;
 };
 
 }  // namespace grpc_core
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 3e4e434..8f3ad6c 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -148,6 +148,8 @@
       channel_tracer_max_nodes =
           (size_t)grpc_channel_arg_get_integer(&args->args[i], options);
     } else if (0 == strcmp(args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) {
+      // channelz will not be enabled by default until all concerns in
+      // https://github.com/grpc/grpc/issues/15986 are addressed.
       channelz_enabled = grpc_channel_arg_get_bool(&args->args[i], false);
     } else if (0 == strcmp(args->args[i].key,
                            GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC)) {
diff --git a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_integrity_only_record_protocol.cc b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_integrity_only_record_protocol.cc
index 54e59a6..352561d 100644
--- a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_integrity_only_record_protocol.cc
+++ b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_integrity_only_record_protocol.cc
@@ -42,34 +42,34 @@
 static tsi_result alts_grpc_integrity_only_extra_copy_protect(
     alts_grpc_record_protocol* rp, grpc_slice_buffer* unprotected_slices,
     grpc_slice_buffer* protected_slices) {
-  /* Allocates memory for protected frame.  */
+  /* Allocates memory for protected frame and copies data.  */
+  size_t data_length = unprotected_slices->length;
   size_t protected_frame_size =
       unprotected_slices->length + rp->header_length + rp->tag_length;
   grpc_slice protected_slice = GRPC_SLICE_MALLOC(protected_frame_size);
-  /* Calls alts_iovec_record_protocol protect.  */
-  char* error_details = nullptr;
-  iovec_t header_iovec = {GRPC_SLICE_START_PTR(protected_slice),
-                          rp->header_length};
-  iovec_t tag_iovec = {GRPC_SLICE_START_PTR(protected_slice) +
-                           rp->header_length + unprotected_slices->length,
-                       rp->tag_length};
-  alts_grpc_record_protocol_convert_slice_buffer_to_iovec(rp,
-                                                          unprotected_slices);
-  grpc_status_code status = alts_iovec_record_protocol_integrity_only_protect(
-      rp->iovec_rp, rp->iovec_buf, unprotected_slices->count, header_iovec,
-      tag_iovec, &error_details);
-  if (status != GRPC_STATUS_OK) {
-    gpr_log(GPR_ERROR, "Failed to protect, %s", error_details);
-    gpr_free(error_details);
-    return TSI_INTERNAL_ERROR;
-  }
-  /* Copies data from unprotected_slices to protected_slice.  */
   uint8_t* data = GRPC_SLICE_START_PTR(protected_slice) + rp->header_length;
   for (size_t i = 0; i < unprotected_slices->count; i++) {
     memcpy(data, GRPC_SLICE_START_PTR(unprotected_slices->slices[i]),
            GRPC_SLICE_LENGTH(unprotected_slices->slices[i]));
     data += GRPC_SLICE_LENGTH(unprotected_slices->slices[i]);
   }
+  /* Calls alts_iovec_record_protocol protect.  */
+  char* error_details = nullptr;
+  iovec_t header_iovec = {GRPC_SLICE_START_PTR(protected_slice),
+                          rp->header_length};
+  iovec_t tag_iovec = {
+      GRPC_SLICE_START_PTR(protected_slice) + rp->header_length + data_length,
+      rp->tag_length};
+  rp->iovec_buf[0].iov_base =
+      GRPC_SLICE_START_PTR(protected_slice) + rp->header_length;
+  rp->iovec_buf[0].iov_len = data_length;
+  grpc_status_code status = alts_iovec_record_protocol_integrity_only_protect(
+      rp->iovec_rp, rp->iovec_buf, 1, header_iovec, tag_iovec, &error_details);
+  if (status != GRPC_STATUS_OK) {
+    gpr_log(GPR_ERROR, "Failed to protect, %s", error_details);
+    gpr_free(error_details);
+    return TSI_INTERNAL_ERROR;
+  }
   grpc_slice_buffer_add(protected_slices, protected_slice);
   grpc_slice_buffer_reset_and_unref_internal(unprotected_slices);
   return TSI_OK;
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 6bb2f6c..a6a1d8a 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -422,9 +422,32 @@
                 {
                     if (!hooksRegistered)
                     {
+                        // Under normal circumstances, the user is expected to shutdown all
+                        // the gRPC channels and servers before the application exits. The following
+                        // hooks provide some extra handling for cases when this is not the case,
+                        // in the effort to achieve a reasonable behavior on shutdown.
 #if NETSTANDARD1_5
-                        System.Runtime.Loader.AssemblyLoadContext.Default.Unloading += (assemblyLoadContext) => { HandleShutdown(); };
+                        // No action required at shutdown on .NET Core
+                        // - In-progress P/Invoke calls (such as grpc_completion_queue_next) don't seem
+                        //   to prevent a .NET core application from terminating, so no special handling
+                        //   is needed.
+                        // - .NET core doesn't run finalizers on shutdown, so there's no risk of getting
+                        //   a crash because grpc_*_destroy methods for native objects being invoked
+                        //   in wrong order.
+                        // TODO(jtattermusch): Verify that the shutdown hooks are still not needed
+                        // once we add support for new platforms using netstandard (e.g. Xamarin).
 #else
+                        // On desktop .NET framework and Mono, we need to register for a shutdown
+                        // event to explicitly shutdown the GrpcEnvironment.
+                        // - On Desktop .NET framework, we need to do a proper shutdown to prevent a crash
+                        //   when the framework attempts to run the finalizers for SafeHandle object representing the native
+                        //   grpc objects. The finalizers calls the native grpc_*_destroy methods (e.g. grpc_server_destroy)
+                        //   in a random order, which is not supported by gRPC.
+                        // - On Mono, the process would hang as the GrpcThreadPool threads are sleeping
+                        //   in grpc_completion_queue_next P/Invoke invocation and mono won't let the
+                        //   process shutdown until the P/Invoke calls return. We achieve that by shutting down
+                        //   the completion queue(s) which associated with the GrpcThreadPool, which will
+                        //   cause the grpc_completion_queue_next calls to return immediately.
                         AppDomain.CurrentDomain.ProcessExit += (sender, eventArgs) => { HandleShutdown(); };
                         AppDomain.CurrentDomain.DomainUnload += (sender, eventArgs) => { HandleShutdown(); };
 #endif
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php
index 5ae6931..ecb419a 100644
--- a/src/php/lib/Grpc/BaseStub.php
+++ b/src/php/lib/Grpc/BaseStub.php
@@ -28,6 +28,7 @@
     private $hostname;
     private $hostname_override;
     private $channel;
+    private $call_invoker;
 
     // a callback function
     private $update_metadata;
@@ -58,6 +59,15 @@
         if (!empty($opts['grpc.ssl_target_name_override'])) {
             $this->hostname_override = $opts['grpc.ssl_target_name_override'];
         }
+        if (isset($opts['grpc_call_invoker'])) {
+            $this->call_invoker = $opts['grpc_call_invoker'];
+            unset($opts['grpc_call_invoker']);
+            $channel_opts = $this->updateOpts($opts);
+            // If the grpc_call_invoker is defined, use the channel created by the call invoker.
+            $this->channel = $this->call_invoker->createChannelFactory($hostname, $channel_opts);
+            return;
+        }
+        $this->call_invoker = new DefaultCallInvoker();
         if ($channel) {
             if (!is_a($channel, 'Grpc\Channel') &&
                 !is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
@@ -72,15 +82,7 @@
         $this->channel = static::getDefaultChannel($hostname, $opts);
     }
 
-    /**
-     * Creates and returns the default Channel
-     *
-     * @param array $opts Channel constructor options
-     *
-     * @return Channel The channel
-     */
-    public static function getDefaultChannel($hostname, array $opts)
-    {
+    private static function updateOpts($opts) {
         $package_config = json_decode(
             file_get_contents(dirname(__FILE__).'/../../composer.json'),
             true
@@ -97,6 +99,19 @@
                 'required. Please see one of the '.
                 'ChannelCredentials::create methods');
         }
+        return $opts;
+    }
+
+    /**
+     * Creates and returns the default Channel
+     *
+     * @param array $opts Channel constructor options
+     *
+     * @return Channel The channel
+     */
+    public static function getDefaultChannel($hostname, array $opts)
+    {
+        $channel_opts = self::updateOpts($opts);
         return new Channel($hostname, $opts);
     }
 
@@ -239,7 +254,7 @@
                          $deserialize,
                          array $metadata = [],
                          array $options = []) use ($channel) {
-            $call = new UnaryCall(
+            $call = $this->call_invoker->UnaryCall(
                 $channel,
                 $method,
                 $deserialize,
@@ -275,7 +290,7 @@
                          $deserialize,
                          array $metadata = [],
                          array $options = []) use ($channel) {
-            $call = new ClientStreamingCall(
+            $call = $this->call_invoker->ClientStreamingCall(
                 $channel,
                 $method,
                 $deserialize,
@@ -312,7 +327,7 @@
                          $deserialize,
                          array $metadata = [],
                          array $options = []) use ($channel) {
-            $call = new ServerStreamingCall(
+            $call = $this->call_invoker->ServerStreamingCall(
                 $channel,
                 $method,
                 $deserialize,
@@ -348,7 +363,7 @@
                          $deserialize,
                          array $metadata = [],
                          array $options = []) use ($channel) {
-            $call = new BidiStreamingCall(
+            $call = $this->call_invoker->BidiStreamingCall(
                 $channel,
                 $method,
                 $deserialize,
diff --git a/src/php/lib/Grpc/CallInvoker.php b/src/php/lib/Grpc/CallInvoker.php
new file mode 100644
index 0000000..a1b4553
--- /dev/null
+++ b/src/php/lib/Grpc/CallInvoker.php
@@ -0,0 +1,33 @@
+<?php
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+namespace Grpc;
+
+/**
+ * CallInvoker is used to pass the self defined channel into the stub,
+ * while intercept each RPC with the channel accessible.
+ * THIS IS AN EXPERIMENTAL API.
+ */
+interface CallInvoker
+{
+    public function createChannelFactory($hostname, $opts);
+    public function UnaryCall($channel, $method, $deserialize, $options);
+    public function ClientStreamingCall($channel, $method, $deserialize, $options);
+    public function ServerStreamingCall($channel, $method, $deserialize, $options);
+    public function BidiStreamingCall($channel, $method, $deserialize, $options);
+}
diff --git a/src/php/lib/Grpc/DefaultCallInvoker.php b/src/php/lib/Grpc/DefaultCallInvoker.php
new file mode 100644
index 0000000..e5b1e13
--- /dev/null
+++ b/src/php/lib/Grpc/DefaultCallInvoker.php
@@ -0,0 +1,47 @@
+<?php
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+namespace Grpc;
+
+/**
+ * Default call invoker in the gRPC stub.
+ * THIS IS AN EXPERIMENTAL API.
+ */
+class DefaultCallInvoker implements CallInvoker
+{
+    public function createChannelFactory($hostname, $opts) {
+        return new Channel($hostname, $opts);
+    }
+
+    public function UnaryCall($channel, $method, $deserialize, $options) {
+        return new UnaryCall($channel, $method, $deserialize, $options);
+    }
+
+    public function ClientStreamingCall($channel, $method, $deserialize, $options) {
+        return new ClientStreamingCall($channel, $method, $deserialize, $options);
+    }
+
+    public function ServerStreamingCall($channel, $method, $deserialize, $options) {
+        return new ServerStreamingCall($channel, $method, $deserialize, $options);
+    }
+
+    public function BidiStreamingCall($channel, $method, $deserialize, $options) {
+        return new BidiStreamingCall($channel, $method, $deserialize, $options);
+    }
+}
+
diff --git a/src/php/tests/unit_tests/CallInvokerTest.php b/src/php/tests/unit_tests/CallInvokerTest.php
new file mode 100644
index 0000000..00b5b99
--- /dev/null
+++ b/src/php/tests/unit_tests/CallInvokerTest.php
@@ -0,0 +1,227 @@
+<?php
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Interface exported by the server.
+ */
+require_once(dirname(__FILE__).'/../../lib/Grpc/BaseStub.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/AbstractCall.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/UnaryCall.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/ClientStreamingCall.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/Interceptor.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/CallInvoker.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/DefaultCallInvoker.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/Internal/InterceptorChannel.php');
+
+class CallInvokerSimpleRequest
+{
+    private $data;
+    public function __construct($data)
+    {
+        $this->data = $data;
+    }
+    public function setData($data)
+    {
+        $this->data = $data;
+    }
+    public function serializeToString()
+    {
+        return $this->data;
+    }
+}
+
+class CallInvokerClient extends Grpc\BaseStub
+{
+
+  /**
+   * @param string $hostname hostname
+   * @param array $opts channel options
+   * @param Channel|InterceptorChannel $channel (optional) re-use channel object
+   */
+  public function __construct($hostname, $opts, $channel = null)
+  {
+    parent::__construct($hostname, $opts, $channel);
+  }
+
+  /**
+   * A simple RPC.
+   * @param SimpleRequest $argument input argument
+   * @param array $metadata metadata
+   * @param array $options call options
+   */
+  public function UnaryCall(
+    CallInvokerSimpleRequest $argument,
+    $metadata = [],
+    $options = []
+  ) {
+    return $this->_simpleRequest(
+      '/dummy_method',
+      $argument,
+      [],
+      $metadata,
+      $options
+    );
+  }
+}
+
+class CallInvokerUpdateChannel implements \Grpc\CallInvoker
+{
+    private $channel;
+
+    public function getChannel() {
+        return $this->channel;
+    }
+
+    public function createChannelFactory($hostname, $opts) {
+        $this->channel = new \Grpc\Channel('localhost:50050', $opts);
+        return $this->channel;
+    }
+
+    public function UnaryCall($channel, $method, $deserialize, $options) {
+        return new UnaryCall($channel, $method, $deserialize, $options);
+    }
+
+    public function ClientStreamingCall($channel, $method, $deserialize, $options) {
+        return new ClientStreamingCall($channel, $method, $deserialize, $options);
+    }
+
+    public function ServerStreamingCall($channel, $method, $deserialize, $options) {
+        return new ServerStreamingCall($channel, $method, $deserialize, $options);
+    }
+
+    public function BidiStreamingCall($channel, $method, $deserialize, $options) {
+        return new BidiStreamingCall($channel, $method, $deserialize, $options);
+    }
+}
+
+
+class CallInvokerChangeRequest implements \Grpc\CallInvoker
+{
+    private $channel;
+
+    public function getChannel() {
+        return $this->channel;
+    }
+    public function createChannelFactory($hostname, $opts) {
+        $this->channel = new \Grpc\Channel($hostname, $opts);
+        return $this->channel;
+    }
+
+    public function UnaryCall($channel, $method, $deserialize, $options) {
+        return new CallInvokerChangeRequestCall($channel, $method, $deserialize, $options);
+    }
+
+    public function ClientStreamingCall($channel, $method, $deserialize, $options) {
+        return new ClientStreamingCall($channel, $method, $deserialize, $options);
+    }
+
+    public function ServerStreamingCall($channel, $method, $deserialize, $options) {
+        return new ServerStreamingCall($channel, $method, $deserialize, $options);
+    }
+
+    public function BidiStreamingCall($channel, $method, $deserialize, $options) {
+        return new BidiStreamingCall($channel, $method, $deserialize, $options);
+    }
+}
+
+class CallInvokerChangeRequestCall
+{
+    private $call;
+
+    public function __construct($channel, $method, $deserialize, $options)
+    {
+        $this->call = new \Grpc\UnaryCall($channel, $method, $deserialize, $options);
+    }
+
+    public function start($argument, $metadata, $options) {
+        $argument->setData('intercepted_unary_request');
+        $this->call->start($argument, $metadata, $options);
+    }
+
+    public function wait()
+    {
+        return $this->call->wait();
+    }
+}
+
+class CallInvokerTest extends PHPUnit_Framework_TestCase
+{
+    public function setUp()
+    {
+        $this->server = new Grpc\Server([]);
+        $this->port = $this->server->addHttp2Port('0.0.0.0:0');
+        $this->server->start();
+    }
+
+    public function tearDown()
+    {
+        unset($this->server);
+    }
+
+    public function testCreateDefaultCallInvoker()
+    {
+        $call_invoker = new \Grpc\DefaultCallInvoker();
+    }
+
+    public function testCreateCallInvoker()
+    {
+        $call_invoker = new CallInvokerUpdateChannel();
+    }
+
+    public function testCallInvokerAccessChannel()
+    {
+        $call_invoker = new CallInvokerUpdateChannel();
+        $stub = new \Grpc\BaseStub('localhost:50051',
+          ['credentials' => \Grpc\ChannelCredentials::createInsecure(),
+            'grpc_call_invoker' => $call_invoker]);
+        $this->assertEquals($call_invoker->getChannel()->getTarget(), 'localhost:50050');
+        $call_invoker->getChannel()->close();
+    }
+
+    public function testClientChangeRequestCallInvoker()
+    {
+        $req_text = 'client_request';
+        $call_invoker = new CallInvokerChangeRequest();
+        $client = new CallInvokerClient('localhost:'.$this->port, [
+            'force_new' => true,
+            'credentials' => Grpc\ChannelCredentials::createInsecure(),
+            'grpc_call_invoker' => $call_invoker,
+        ]);
+
+        $req = new CallInvokerSimpleRequest($req_text);
+        $unary_call = $client->UnaryCall($req);
+
+        $event = $this->server->requestCall();
+        $this->assertSame('/dummy_method', $event->method);
+        $server_call = $event->call;
+        $event = $server_call->startBatch([
+            Grpc\OP_SEND_INITIAL_METADATA => [],
+            Grpc\OP_SEND_STATUS_FROM_SERVER => [
+                'metadata' => [],
+                'code' => Grpc\STATUS_OK,
+                'details' => '',
+            ],
+            Grpc\OP_RECV_MESSAGE => true,
+            Grpc\OP_RECV_CLOSE_ON_SERVER => true,
+        ]);
+        $this->assertSame('intercepted_unary_request', $event->message);
+        $call_invoker->getChannel()->close();
+        unset($unary_call);
+        unset($server_call);
+    }
+}
diff --git a/src/php/tests/unit_tests/InterceptorTest.php b/src/php/tests/unit_tests/InterceptorTest.php
index d759ceb..0ad49fc 100644
--- a/src/php/tests/unit_tests/InterceptorTest.php
+++ b/src/php/tests/unit_tests/InterceptorTest.php
@@ -24,6 +24,7 @@
 require_once(dirname(__FILE__).'/../../lib/Grpc/UnaryCall.php');
 require_once(dirname(__FILE__).'/../../lib/Grpc/ClientStreamingCall.php');
 require_once(dirname(__FILE__).'/../../lib/Grpc/Interceptor.php');
+require_once(dirname(__FILE__).'/../../lib/Grpc/CallInvoker.php');
 require_once(dirname(__FILE__).'/../../lib/Grpc/Internal/InterceptorChannel.php');
 
 class SimpleRequest
diff --git a/templates/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile.template b/templates/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile.template
index 8b71716..4fb7b46 100644
--- a/templates/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile.template
+++ b/templates/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile.template
@@ -16,5 +16,8 @@
 
   FROM google/dart:latest
 
+  # Upgrade Dart to version 2.
+  RUN apt-get update && apt-get upgrade -y dart
+
   # Define the default command.
   CMD ["bash"]
diff --git a/test/core/channel/channelz_registry_test.cc b/test/core/channel/channelz_registry_test.cc
index eb6305e..24e5093 100644
--- a/test/core/channel/channelz_registry_test.cc
+++ b/test/core/channel/channelz_registry_test.cc
@@ -82,6 +82,15 @@
   EXPECT_EQ(&str_to_register, retrieved_str);
 }
 
+TEST(ChannelzRegistryTest, RegisterManyItems) {
+  int object_to_register = 42;
+  for (int i = 0; i < 100; i++) {
+    intptr_t uuid = ChannelzRegistry::Register(&object_to_register);
+    int* retrieved = ChannelzRegistry::Get<int>(uuid);
+    EXPECT_EQ(&object_to_register, retrieved);
+  }
+}
+
 namespace {
 class Foo {
  public:
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index feea7c3..8896fc6 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -539,6 +539,23 @@
   EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
 }
 
+TEST_F(ClientLbEnd2endTest, RoundRobinProcessPending) {
+  StartServers(1);  // Single server
+  auto channel = BuildChannel("round_robin");
+  auto stub = BuildStub(channel);
+  SetNextResolution({servers_[0]->port_});
+  WaitForServer(stub, 0, DEBUG_LOCATION);
+  // Create a new channel and its corresponding RR LB policy, which will pick
+  // the subchannels in READY state from the previous RPC against the same
+  // target (even if it happened over a different channel, because subchannels
+  // are globally reused). Progress should happen without any transition from
+  // this READY state.
+  auto second_channel = BuildChannel("round_robin");
+  auto second_stub = BuildStub(second_channel);
+  SetNextResolution({servers_[0]->port_});
+  CheckRpcSendOk(second_stub, DEBUG_LOCATION);
+}
+
 TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
   // Start servers and send one RPC per server.
   const int kNumServers = 3;
diff --git a/third_party/toolchains/BUILD b/third_party/toolchains/BUILD
new file mode 100644
index 0000000..1933072
--- /dev/null
+++ b/third_party/toolchains/BUILD
@@ -0,0 +1,46 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+licenses(["notice"])  # Apache v2
+
+package(default_visibility = ["//visibility:public"])
+
+# Latest RBE Ubuntu16_04 container
+# Update every time when a new container is released.
+alias(
+    name = "rbe_ubuntu1604",
+    actual = ":rbe_ubuntu1604_r328903",
+)
+
+# RBE Ubuntu16_04 r328903
+platform(
+    name = "rbe_ubuntu1604_r328903",
+    constraint_values = [
+        "@bazel_tools//platforms:x86_64",
+        "@bazel_tools//platforms:linux",
+        "@bazel_tools//tools/cpp:clang",
+        "@com_github_bazelbuild_bazeltoolchains//constraints:xenial",
+        "@com_github_bazelbuild_bazeltoolchains//constraints/sanitizers:support_msan",
+    ],
+    remote_execution_properties = """
+        properties: {
+          name: "container-image"
+          value:"docker://gcr.io/cloud-marketplace/google/rbe-ubuntu16-04@sha256:59bf0e191a6b5cc1ab62c2224c810681d1326bad5a27b1d36c9f40113e79da7f"
+        }
+        properties: {
+          name: "gceMachineType"
+          value: "n1-highmem-2"
+        }
+        """,
+)
diff --git a/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile b/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile
index d754996..bff20b5 100644
--- a/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile
+++ b/tools/dockerfile/interoptest/grpc_interop_dart/Dockerfile
@@ -14,5 +14,8 @@
 
 FROM google/dart:latest
 
+# Upgrade Dart to version 2.
+RUN apt-get update && apt-get upgrade -y dart
+
 # Define the default command.
 CMD ["bash"]
diff --git a/tools/internal_ci/helper_scripts/prepare_build_macos_rc b/tools/internal_ci/helper_scripts/prepare_build_macos_rc
index 2f12471..7cdb2e0 100644
--- a/tools/internal_ci/helper_scripts/prepare_build_macos_rc
+++ b/tools/internal_ci/helper_scripts/prepare_build_macos_rc
@@ -67,6 +67,10 @@
 pip install -U Mako six tox setuptools twisted pyyaml --user python
 export PYTHONPATH=/Library/Python/3.4/site-packages
 
+# Install Python 3.7
+curl -O https://www.python.org/ftp/python/3.7.0/python-3.7.0-macosx10.9.pkg
+sudo installer -pkg ./python-3.7.0-macosx10.9.pkg -target /
+
 # set xcode version for Obj-C tests
 sudo xcode-select -switch /Applications/Xcode_9.2.app/Contents/Developer/
 
diff --git a/tools/internal_ci/helper_scripts/prepare_build_windows.bat b/tools/internal_ci/helper_scripts/prepare_build_windows.bat
index f987f8a..0164e4a 100644
--- a/tools/internal_ci/helper_scripts/prepare_build_windows.bat
+++ b/tools/internal_ci/helper_scripts/prepare_build_windows.bat
@@ -14,7 +14,7 @@
 
 @rem make sure msys binaries are preferred over cygwin binaries
 @rem set path to python 2.7
-set PATH=C:\tools\msys64\usr\bin;C:\Python27;%PATH%
+set PATH=C:\tools\msys64\usr\bin;C:\Python27;C:\Python37;%PATH%
 
 @rem If this is a PR using RUN_TESTS_FLAGS var, then add flags to filter tests
 if defined KOKORO_GITHUB_PULL_REQUEST_NUMBER if defined RUN_TESTS_FLAGS (
@@ -34,6 +34,9 @@
 @rem Needed for big_query_utils
 python -m pip install google-api-python-client
 
+@rem Install Python 3.7
+chocolatey install -y -r python3 --version 3.7
+
 @rem Disable some unwanted dotnet options
 set NUGET_XMLDOC_MODE=skip
 set DOTNET_SKIP_FIRST_TIME_EXPERIENCE=true
diff --git a/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh b/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh
index d2dded0..8f9658f 100755
--- a/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh
+++ b/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh
@@ -49,12 +49,13 @@
   --strategy=Closure=remote  \
   --genrule_strategy=remote  \
   --experimental_strict_action_env=true \
-  --experimental_remote_platform_override='properties:{name:"container-image" value:"docker://gcr.io/cloud-marketplace/google/rbe-ubuntu16-04@sha256:59bf0e191a6b5cc1ab62c2224c810681d1326bad5a27b1d36c9f40113e79da7f" }' \
   --crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/default:toolchain \
   --define GRPC_PORT_ISOLATED_RUNTIME=1 \
   --action_env=BAZEL_DO_NOT_DETECT_CPP_TOOLCHAIN=1 \
   --extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/cpp:cc-toolchain-clang-x86_64-default \
-  --extra_execution_platforms=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0:rbe_ubuntu1604 \
+  --extra_execution_platforms=//third_party/toolchains:rbe_ubuntu1604 \
+  --host_platform=//third_party/toolchains:rbe_ubuntu1604 \
+  --platforms=//third_party/toolchains:rbe_ubuntu1604 \
   $1 \
   -- //test/... || FAILED="true"
 
diff --git a/tools/internal_ci/linux/pull_request/grpc_asan_on_foundry.sh b/tools/internal_ci/linux/pull_request/grpc_asan_on_foundry.sh
index 791c56c..2aebb65 100644
--- a/tools/internal_ci/linux/pull_request/grpc_asan_on_foundry.sh
+++ b/tools/internal_ci/linux/pull_request/grpc_asan_on_foundry.sh
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-export UPLOAD_TEST_RESULTS=true
 EXTRA_FLAGS="--copt=-gmlt --strip=never --copt=-fsanitize=address --linkopt=-fsanitize=address --test_timeout=3600"
 github/grpc/tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh "${EXTRA_FLAGS}"
 
diff --git a/tools/internal_ci/macos/grpc_build_artifacts.sh b/tools/internal_ci/macos/grpc_build_artifacts.sh
index 4dcc528..54b171d 100755
--- a/tools/internal_ci/macos/grpc_build_artifacts.sh
+++ b/tools/internal_ci/macos/grpc_build_artifacts.sh
@@ -25,6 +25,7 @@
 python3.4 -m pip install cython setuptools wheel
 python3.5 -m pip install cython setuptools wheel
 python3.6 -m pip install cython setuptools wheel
+python3.7 -m pip install cython setuptools wheel
 
 # needed to build ruby artifacts
 time bash tools/distrib/build_ruby_environment_macos.sh
diff --git a/tools/run_tests/artifacts/artifact_targets.py b/tools/run_tests/artifacts/artifact_targets.py
index 35cbd73..edde3ea 100644
--- a/tools/run_tests/artifacts/artifact_targets.py
+++ b/tools/run_tests/artifacts/artifact_targets.py
@@ -373,6 +373,7 @@
         PythonArtifact('macos', 'x64', 'python3.4'),
         PythonArtifact('macos', 'x64', 'python3.5'),
         PythonArtifact('macos', 'x64', 'python3.6'),
+        PythonArtifact('macos', 'x64', 'python3.7'),
         PythonArtifact('windows', 'x86', 'Python27_32bits'),
         PythonArtifact('windows', 'x86', 'Python34_32bits'),
         PythonArtifact('windows', 'x86', 'Python35_32bits'),
@@ -381,6 +382,7 @@
         PythonArtifact('windows', 'x64', 'Python34'),
         PythonArtifact('windows', 'x64', 'Python35'),
         PythonArtifact('windows', 'x64', 'Python36'),
+        PythonArtifact('windows', 'x64', 'Python37'),
         RubyArtifact('linux', 'x64'),
         RubyArtifact('macos', 'x64'),
         PHPArtifact('linux', 'x64'),
diff --git a/tools/run_tests/python_utils/upload_rbe_results.py b/tools/run_tests/python_utils/upload_rbe_results.py
index cbeb1ad..d29ebc6 100644
--- a/tools/run_tests/python_utils/upload_rbe_results.py
+++ b/tools/run_tests/python_utils/upload_rbe_results.py
@@ -136,7 +136,7 @@
     resultstore_actions = _get_resultstore_data(api_key, invocation_id)
 
     bq_rows = []
-    for action in resultstore_actions:
+    for index, action in enumerate(resultstore_actions):
         # Filter out non-test related data, such as build results.
         if 'testAction' not in action:
             continue
@@ -157,6 +157,23 @@
                     'timedOut': True
                 }
             }]
+        # When RBE believes its infrastructure is failing, it will abort and
+        # mark running tests as UNKNOWN. These infrastructure failures may be
+        # related to our tests, so we should investigate if specific tests are
+        # repeatedly being marked as UNKNOWN.
+        elif action['statusAttributes']['status'] == 'UNKNOWN':
+            test_cases = [{
+                'testCase': {
+                    'caseName': str(action['id']['actionId']),
+                    'unknown': True
+                }
+            }]
+            # Take the timestamp from the previous action, which should be
+            # a close approximation.
+            action['timing'] = {
+                'startTime':
+                resultstore_actions[index - 1]['timing']['startTime']
+            }
         else:
             test_cases = action['testAction']['testSuite']['tests'][0][
                 'testSuite']['tests']
@@ -165,28 +182,55 @@
                 result = 'FAILED'
             elif 'timedOut' in test_case['testCase']:
                 result = 'TIMEOUT'
+            elif 'unknown' in test_case['testCase']:
+                result = 'UNKNOWN'
             else:
                 result = 'PASSED'
-            bq_rows.append({
-                'insertId': str(uuid.uuid4()),
-                'json': {
-                    'job_name':
-                    os.getenv('KOKORO_JOB_NAME'),
-                    'build_id':
-                    os.getenv('KOKORO_BUILD_NUMBER'),
-                    'build_url':
-                    'https://source.cloud.google.com/results/invocations/%s' %
-                    invocation_id,
-                    'test_target':
-                    action['id']['targetId'],
-                    'test_case':
-                    test_case['testCase']['caseName'],
-                    'result':
-                    result,
-                    'timestamp':
-                    action['timing']['startTime'],
-                }
-            })
+            try:
+                bq_rows.append({
+                    'insertId': str(uuid.uuid4()),
+                    'json': {
+                        'job_name':
+                        os.getenv('KOKORO_JOB_NAME'),
+                        'build_id':
+                        os.getenv('KOKORO_BUILD_NUMBER'),
+                        'build_url':
+                        'https://source.cloud.google.com/results/invocations/%s'
+                        % invocation_id,
+                        'test_target':
+                        action['id']['targetId'],
+                        'test_case':
+                        test_case['testCase']['caseName'],
+                        'result':
+                        result,
+                        'timestamp':
+                        action['timing']['startTime'],
+                    }
+                })
+            except Exception as e:
+                print('Failed to parse test result. Error: %s' % str(e))
+                print(json.dumps(test_case, indent=4))
+                bq_rows.append({
+                    'insertId': str(uuid.uuid4()),
+                    'json': {
+                        'job_name':
+                        os.getenv('KOKORO_JOB_NAME'),
+                        'build_id':
+                        os.getenv('KOKORO_BUILD_NUMBER'),
+                        'build_url':
+                        'https://source.cloud.google.com/results/invocations/%s'
+                        % invocation_id,
+                        'test_target':
+                        action['id']['targetId'],
+                        'test_case':
+                        'N/A',
+                        'result':
+                        'UNPARSEABLE',
+                        'timestamp':
+                        'N/A',
+                    }
+                })
+
     # BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time.
     for i in range((len(bq_rows) / 1000) + 1):
         _upload_results_to_bq(bq_rows[i * 1000:(i + 1) * 1000])