Merge pull request #12739 from markdroth/proxy_mapper_uds_fix

Fix http_proxy proxy mapper to not set proxy_name when returning false.
diff --git a/include/grpc++/support/channel_arguments.h b/include/grpc++/support/channel_arguments.h
index 7b6befe..9dc505f 100644
--- a/include/grpc++/support/channel_arguments.h
+++ b/include/grpc++/support/channel_arguments.h
@@ -64,6 +64,12 @@
   /// Set the compression algorithm for the channel.
   void SetCompressionAlgorithm(grpc_compression_algorithm algorithm);
 
+  /// Set the grpclb fallback timeout (in ms) for the channel. If this amount
+  /// of time has passed but we have not gotten any non-empty \a serverlist from
+  /// the balancer, we will fall back to use the backend address(es) returned by
+  /// the resolver.
+  void SetGrpclbFallbackTimeout(int fallback_timeout);
+
   /// Set the socket mutator for the channel.
   void SetSocketMutator(grpc_socket_mutator* mutator);
 
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 90f03f4..65463bb 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -288,7 +288,11 @@
   "grpc.experimental.tcp_max_read_chunk_size"
 /* Timeout in milliseconds to use for calls to the grpclb load balancer.
    If 0 or unset, the balancer calls will have no deadline. */
-#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_timeout_ms"
+#define GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS "grpc.grpclb_call_timeout_ms"
+/* Timeout in milliseconds to wait for the serverlist from the grpclb load
+   balancer before using fallback backend addresses from the resolver.
+   If 0, fallback will never be used. */
+#define GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS "grpc.grpclb_fallback_timeout_ms"
 /** If non-zero, grpc server's cronet compression workaround will be enabled */
 #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \
   "grpc.workaround.cronet_compression"
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index 05a106f..8dc81b4 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -123,6 +123,7 @@
 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
+#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
 
 grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb");
 
@@ -299,6 +300,10 @@
   /** timeout in milliseconds for the LB call. 0 means no deadline. */
   int lb_call_timeout_ms;
 
+  /** timeout in milliseconds for before using fallback backend addresses.
+   * 0 means not using fallback. */
+  int lb_fallback_timeout_ms;
+
   /** for communicating with the LB server */
   grpc_channel *lb_channel;
 
@@ -325,6 +330,9 @@
    * Otherwise, we delegate to the RR policy. */
   size_t serverlist_index;
 
+  /** stores the backend addresses from the resolver */
+  grpc_lb_addresses *fallback_backend_addresses;
+
   /** list of picks that are waiting on RR's policy connectivity */
   pending_pick *pending_picks;
 
@@ -345,6 +353,9 @@
   /** is \a lb_call_retry_timer active? */
   bool retry_timer_active;
 
+  /** is \a lb_fallback_timer active? */
+  bool fallback_timer_active;
+
   /** called upon changes to the LB channel's connectivity. */
   grpc_closure lb_channel_on_connectivity_changed;
 
@@ -364,6 +375,9 @@
   /* LB call retry timer callback. */
   grpc_closure lb_on_call_retry;
 
+  /* LB fallback timer callback. */
+  grpc_closure lb_on_fallback;
+
   grpc_call *lb_call; /* streaming call to the LB server, */
 
   grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
@@ -387,6 +401,9 @@
   /** LB call retry timer */
   grpc_timer lb_call_retry_timer;
 
+  /** LB fallback timer */
+  grpc_timer lb_fallback_timer;
+
   bool seen_initial_response;
 
   /* Stats for client-side load reporting. Should be unreffed and
@@ -532,6 +549,32 @@
   return lb_addresses;
 }
 
+/* Returns the backend addresses extracted from the given addresses */
+static grpc_lb_addresses *extract_backend_addresses_locked(
+    grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) {
+  /* first pass: count the number of backend addresses */
+  size_t num_backends = 0;
+  for (size_t i = 0; i < addresses->num_addresses; ++i) {
+    if (!addresses->addresses[i].is_balancer) {
+      ++num_backends;
+    }
+  }
+  /* second pass: actually populate the addresses and (empty) LB tokens */
+  grpc_lb_addresses *backend_addresses =
+      grpc_lb_addresses_create(num_backends, &lb_token_vtable);
+  size_t num_copied = 0;
+  for (size_t i = 0; i < addresses->num_addresses; ++i) {
+    if (addresses->addresses[i].is_balancer) continue;
+    const grpc_resolved_address *addr = &addresses->addresses[i].address;
+    grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
+                                  addr->len, false /* is_balancer */,
+                                  NULL /* balancer_name */,
+                                  (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
+    ++num_copied;
+  }
+  return backend_addresses;
+}
+
 static void update_lb_connectivity_status_locked(
     grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
     grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
@@ -599,35 +642,38 @@
     grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
     const grpc_lb_policy_pick_args *pick_args, bool force_async,
     grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
-  // Look at the index into the serverlist to see if we should drop this call.
-  grpc_grpclb_server *server =
-      glb_policy->serverlist->servers[glb_policy->serverlist_index++];
-  if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
-    glb_policy->serverlist_index = 0;  // Wrap-around.
-  }
-  if (server->drop) {
-    // Not using the RR policy, so unref it.
-    if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
-      gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
-              (intptr_t)wc_arg->rr_policy);
+  // Check for drops if we are not using fallback backend addresses.
+  if (glb_policy->serverlist != NULL) {
+    // Look at the index into the serverlist to see if we should drop this call.
+    grpc_grpclb_server *server =
+        glb_policy->serverlist->servers[glb_policy->serverlist_index++];
+    if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
+      glb_policy->serverlist_index = 0;  // Wrap-around.
     }
-    GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
-    // Update client load reporting stats to indicate the number of
-    // dropped calls.  Note that we have to do this here instead of in
-    // the client_load_reporting filter, because we do not create a
-    // subchannel call (and therefore no client_load_reporting filter)
-    // for dropped calls.
-    grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
-                                                     wc_arg->client_stats);
-    grpc_grpclb_client_stats_unref(wc_arg->client_stats);
-    if (force_async) {
-      GPR_ASSERT(wc_arg->wrapped_closure != NULL);
-      GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+    if (server->drop) {
+      // Not using the RR policy, so unref it.
+      if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+        gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
+                (intptr_t)wc_arg->rr_policy);
+      }
+      GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
+      // Update client load reporting stats to indicate the number of
+      // dropped calls.  Note that we have to do this here instead of in
+      // the client_load_reporting filter, because we do not create a
+      // subchannel call (and therefore no client_load_reporting filter)
+      // for dropped calls.
+      grpc_grpclb_client_stats_add_call_dropped_locked(
+          server->load_balance_token, wc_arg->client_stats);
+      grpc_grpclb_client_stats_unref(wc_arg->client_stats);
+      if (force_async) {
+        GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+        GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+        gpr_free(wc_arg->free_when_done);
+        return false;
+      }
       gpr_free(wc_arg->free_when_done);
-      return false;
+      return true;
     }
-    gpr_free(wc_arg->free_when_done);
-    return true;
   }
   // Pick via the RR policy.
   const bool pick_done = grpc_lb_policy_pick_locked(
@@ -665,8 +711,18 @@
 
 static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
                                                   glb_lb_policy *glb_policy) {
-  grpc_lb_addresses *addresses =
-      process_serverlist_locked(exec_ctx, glb_policy->serverlist);
+  grpc_lb_addresses *addresses;
+  if (glb_policy->serverlist != NULL) {
+    GPR_ASSERT(glb_policy->serverlist->num_servers > 0);
+    addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist);
+  } else {
+    // If rr_handover_locked() is invoked when we haven't received any
+    // serverlist from the balancer, we use the fallback backends returned by
+    // the resolver. Note that the fallback backend list may be empty, in which
+    // case the new round_robin policy will keep the requested picks pending.
+    GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
+    addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses);
+  }
   GPR_ASSERT(addresses != NULL);
   grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args));
   args->client_channel_factory = glb_policy->cc_factory;
@@ -772,8 +828,6 @@
 /* glb_policy->rr_policy may be NULL (initial handover) */
 static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
                                glb_lb_policy *glb_policy) {
-  GPR_ASSERT(glb_policy->serverlist != NULL &&
-             glb_policy->serverlist->num_servers > 0);
   if (glb_policy->shutting_down) return;
   grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
   GPR_ASSERT(args != NULL);
@@ -922,6 +976,9 @@
   if (glb_policy->serverlist != NULL) {
     grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
   }
+  if (glb_policy->fallback_backend_addresses != NULL) {
+    grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
+  }
   grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
   grpc_subchannel_index_unref();
   if (glb_policy->pending_update_args != NULL) {
@@ -1063,10 +1120,28 @@
   GRPC_ERROR_UNREF(error);
 }
 
+static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                        grpc_error *error);
 static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
                                       glb_lb_policy *glb_policy);
 static void start_picking_locked(grpc_exec_ctx *exec_ctx,
                                  glb_lb_policy *glb_policy) {
+  /* start a timer to fall back */
+  if (glb_policy->lb_fallback_timeout_ms > 0 &&
+      glb_policy->serverlist == NULL && !glb_policy->fallback_timer_active) {
+    gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+    gpr_timespec deadline = gpr_time_add(
+        now,
+        gpr_time_from_millis(glb_policy->lb_fallback_timeout_ms, GPR_TIMESPAN));
+    GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer");
+    GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
+                      glb_policy,
+                      grpc_combiner_scheduler(glb_policy->base.combiner));
+    glb_policy->fallback_timer_active = true;
+    grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline,
+                    &glb_policy->lb_on_fallback, now);
+  }
+
   glb_policy->started_picking = true;
   gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
   query_for_backends_locked(exec_ctx, glb_policy);
@@ -1545,6 +1620,15 @@
             if (glb_policy->serverlist != NULL) {
               /* dispose of the old serverlist */
               grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
+            } else {
+              /* or dispose of the fallback */
+              grpc_lb_addresses_destroy(exec_ctx,
+                                        glb_policy->fallback_backend_addresses);
+              glb_policy->fallback_backend_addresses = NULL;
+              if (glb_policy->fallback_timer_active) {
+                grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer);
+                glb_policy->fallback_timer_active = false;
+              }
             }
             /* and update the copy in the glb_lb_policy instance. This
              * serverlist instance will be destroyed either upon the next
@@ -1555,9 +1639,7 @@
           }
         } else {
           if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
-            gpr_log(GPR_INFO,
-                    "Received empty server list. Picks will stay pending until "
-                    "a response with > 0 servers is received");
+            gpr_log(GPR_INFO, "Received empty server list, ignoring.");
           }
           grpc_grpclb_destroy_serverlist(serverlist);
         }
@@ -1592,6 +1674,27 @@
   }
 }
 
+static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                        grpc_error *error) {
+  glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
+  glb_policy->fallback_timer_active = false;
+  /* If we receive a serverlist after the timer fires but before this callback
+   * actually runs, don't fall back. */
+  if (glb_policy->serverlist == NULL) {
+    if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
+      if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+        gpr_log(GPR_INFO,
+                "Falling back to use backends from resolver (grpclb %p)",
+                (void *)glb_policy);
+      }
+      GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
+      rr_handover_locked(exec_ctx, glb_policy);
+    }
+  }
+  GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+                            "grpclb_fallback_timer");
+}
+
 static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
                                                 void *arg, grpc_error *error) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
@@ -1616,10 +1719,51 @@
   }
 }
 
+static void fallback_update_locked(grpc_exec_ctx *exec_ctx,
+                                   glb_lb_policy *glb_policy,
+                                   const grpc_lb_addresses *addresses) {
+  GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL);
+  grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
+  glb_policy->fallback_backend_addresses =
+      extract_backend_addresses_locked(exec_ctx, addresses);
+  if (glb_policy->lb_fallback_timeout_ms > 0 &&
+      !glb_policy->fallback_timer_active) {
+    rr_handover_locked(exec_ctx, glb_policy);
+  }
+}
+
 static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
                               const grpc_lb_policy_args *args) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
-  if (glb_policy->updating_lb_channel) {
+  const grpc_arg *arg =
+      grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+  if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
+    if (glb_policy->lb_channel == NULL) {
+      // If we don't have a current channel to the LB, go into TRANSIENT
+      // FAILURE.
+      grpc_connectivity_state_set(
+          exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
+          "glb_update_missing");
+    } else {
+      // otherwise, keep using the current LB channel (ignore this update).
+      gpr_log(GPR_ERROR,
+              "No valid LB addresses channel arg for grpclb %p update, "
+              "ignoring.",
+              (void *)glb_policy);
+    }
+    return;
+  }
+  const grpc_lb_addresses *addresses =
+      (const grpc_lb_addresses *)arg->value.pointer.p;
+
+  if (glb_policy->serverlist == NULL) {
+    // If a non-empty serverlist hasn't been received from the balancer,
+    // propagate the update to fallback_backend_addresses.
+    fallback_update_locked(exec_ctx, glb_policy, addresses);
+  } else if (glb_policy->updating_lb_channel) {
+    // If we have recieved serverlist from the balancer, we need to defer update
+    // when there is an in-progress one.
     if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
       gpr_log(GPR_INFO,
               "Update already in progress for grpclb %p. Deferring update.",
@@ -1640,31 +1784,11 @@
   }
 
   glb_policy->updating_lb_channel = true;
-  // Propagate update to lb_channel (pick first).
-  const grpc_arg *arg =
-      grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
-  if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
-    if (glb_policy->lb_channel == NULL) {
-      // If we don't have a current channel to the LB, go into TRANSIENT
-      // FAILURE.
-      grpc_connectivity_state_set(
-          exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
-          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
-          "glb_update_missing");
-    } else {
-      // otherwise, keep using the current LB channel (ignore this update).
-      gpr_log(GPR_ERROR,
-              "No valid LB addresses channel arg for grpclb %p update, "
-              "ignoring.",
-              (void *)glb_policy);
-    }
-  }
-  const grpc_lb_addresses *addresses =
-      (const grpc_lb_addresses *)arg->value.pointer.p;
   GPR_ASSERT(glb_policy->lb_channel != NULL);
   grpc_channel_args *lb_channel_args = build_lb_channel_args(
       exec_ctx, addresses, glb_policy->response_generator, args->args);
-  /* Propagate updates to the LB channel through the fake resolver */
+  /* Propagate updates to the LB channel (pick first) through the fake resolver
+   */
   grpc_fake_resolver_response_generator_set_response(
       exec_ctx, glb_policy->response_generator, lb_channel_args);
   grpc_channel_args_destroy(exec_ctx, lb_channel_args);
@@ -1767,13 +1891,7 @@
 static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
                                   grpc_lb_policy_factory *factory,
                                   grpc_lb_policy_args *args) {
-  /* Count the number of gRPC-LB addresses. There must be at least one.
-   * TODO(roth): For now, we ignore non-balancer addresses, but in the
-   * future, we may change the behavior such that we fall back to using
-   * the non-balancer addresses if we cannot reach any balancers. In the
-   * fallback case, we should use the LB policy indicated by
-   * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
-   * unset, we should default to pick_first). */
+  /* Count the number of gRPC-LB addresses. There must be at least one. */
   const grpc_arg *arg =
       grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
   if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
@@ -1809,6 +1927,11 @@
   glb_policy->lb_call_timeout_ms =
       grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
 
+  arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
+  glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer(
+      arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0,
+                                  INT_MAX});
+
   // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
   // since we use this to trigger the client_load_reporting filter.
   grpc_arg new_arg = grpc_channel_arg_string_create(
@@ -1817,6 +1940,11 @@
   glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
       args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
 
+  /* Extract the backend addresses (may be empty) from the resolver for
+   * fallback. */
+  glb_policy->fallback_backend_addresses =
+      extract_backend_addresses_locked(exec_ctx, addresses);
+
   /* Create a client channel over them to communicate with a LB service */
   glb_policy->response_generator =
       grpc_fake_resolver_response_generator_create();
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c
index 4d14054..05ab43d 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.c
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.c
@@ -56,7 +56,7 @@
 }
 
 void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
-                                   void* address, size_t address_len,
+                                   const void* address, size_t address_len,
                                    bool is_balancer, const char* balancer_name,
                                    void* user_data) {
   GPR_ASSERT(index < addresses->num_addresses);
diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h
index 9d9fb14..cf0f8cb 100644
--- a/src/core/ext/filters/client_channel/lb_policy_factory.h
+++ b/src/core/ext/filters/client_channel/lb_policy_factory.h
@@ -73,7 +73,7 @@
  * \a address is a socket address of length \a address_len.
  * Takes ownership of \a balancer_name. */
 void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
-                                   void *address, size_t address_len,
+                                   const void *address, size_t address_len,
                                    bool is_balancer, const char *balancer_name,
                                    void *user_data);
 
diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc
index f130aec..f89f5f1 100644
--- a/src/cpp/common/channel_arguments.cc
+++ b/src/cpp/common/channel_arguments.cc
@@ -86,6 +86,10 @@
   SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm);
 }
 
+void ChannelArguments::SetGrpclbFallbackTimeout(int fallback_timeout) {
+  SetInt(GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS, fallback_timeout);
+}
+
 void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) {
   if (!mutator) {
     return;
diff --git a/src/ruby/lib/grpc/google_rpc_status_utils.rb b/src/ruby/lib/grpc/google_rpc_status_utils.rb
index fdadd6b..f253b08 100644
--- a/src/ruby/lib/grpc/google_rpc_status_utils.rb
+++ b/src/ruby/lib/grpc/google_rpc_status_utils.rb
@@ -19,10 +19,17 @@
 module GRPC
   # GoogleRpcStatusUtils provides utilities to convert between a
   # GRPC::Core::Status and a deserialized Google::Rpc::Status proto
+  # Returns nil if the grpc-status-details-bin trailer could not be
+  # converted to a GoogleRpcStatus due to the server not providing
+  # the necessary trailers.
+  # Raises an error if the server did provide the necessary trailers
+  # but they fail to deseriliaze into a GoogleRpcStatus protobuf.
   class GoogleRpcStatusUtils
     def self.extract_google_rpc_status(status)
       fail ArgumentError, 'bad type' unless status.is_a? Struct::Status
-      Google::Rpc::Status.decode(status.metadata['grpc-status-details-bin'])
+      grpc_status_details_bin_trailer = 'grpc-status-details-bin'
+      return nil if status.metadata[grpc_status_details_bin_trailer].nil?
+      Google::Rpc::Status.decode(status.metadata[grpc_status_details_bin_trailer])
     end
   end
 end
diff --git a/src/ruby/spec/google_rpc_status_utils_spec.rb b/src/ruby/spec/google_rpc_status_utils_spec.rb
index fe221c3..6f2a06b 100644
--- a/src/ruby/spec/google_rpc_status_utils_spec.rb
+++ b/src/ruby/spec/google_rpc_status_utils_spec.rb
@@ -31,12 +31,11 @@
     expect(exception.message.include?('bad type')).to be true
   end
 
-  it 'fails with some error if the header key is missing' do
+  it 'returns nil if the header key is missing' do
     status = Struct::Status.new(1, 'details', key: 'val')
     expect(status.metadata.nil?).to be false
-    expect do
-      GRPC::GoogleRpcStatusUtils.extract_google_rpc_status(status)
-    end.to raise_error(StandardError)
+    expect(GRPC::GoogleRpcStatusUtils.extract_google_rpc_status(
+             status)).to be(nil)
   end
 
   it 'fails with some error if the header key fails to deserialize' do
@@ -221,3 +220,73 @@
              status_from_exception)).to eq(rpc_status)
   end
 end
+
+# A test service that fails without explicitly setting the
+# grpc-status-details-bin trailer. Tests assumptions about value
+# of grpc-status-details-bin on the client side when the trailer wasn't
+# set explicitly.
+class NoStatusDetailsBinTestService
+  include GRPC::GenericService
+  rpc :an_rpc, EchoMsg, EchoMsg
+
+  def an_rpc(_, _)
+    fail GRPC::Unknown
+  end
+end
+
+NoStatusDetailsBinTestServiceStub = NoStatusDetailsBinTestService.rpc_stub_class
+
+describe 'when the endpoint doesnt send grpc-status-details-bin' do
+  def start_server
+    @srv = GRPC::RpcServer.new(pool_size: 1)
+    @server_port = @srv.add_http2_port('localhost:0',
+                                       :this_port_is_insecure)
+    @srv.handle(NoStatusDetailsBinTestService)
+    @server_thd = Thread.new { @srv.run }
+    @srv.wait_till_running
+  end
+
+  def stop_server
+    expect(@srv.stopped?).to be(false)
+    @srv.stop
+    @server_thd.join
+    expect(@srv.stopped?).to be(true)
+  end
+
+  before(:each) do
+    start_server
+  end
+
+  after(:each) do
+    stop_server
+  end
+
+  it 'should receive nil when we extract try to extract a google '\
+    'rpc status from a BadStatus exception that didnt have it' do
+    stub = NoStatusDetailsBinTestServiceStub.new("localhost:#{@server_port}",
+                                                 :this_channel_is_insecure)
+    begin
+      stub.an_rpc(EchoMsg.new)
+    rescue GRPC::Unknown => e
+      rpc_status = GRPC::GoogleRpcStatusUtils.extract_google_rpc_status(
+        e.to_status)
+    end
+    expect(rpc_status).to be(nil)
+  end
+
+  it 'should receive nil when we extract try to extract a google '\
+    'rpc status from an op views status object that didnt have it' do
+    stub = NoStatusDetailsBinTestServiceStub.new("localhost:#{@server_port}",
+                                                 :this_channel_is_insecure)
+    op = stub.an_rpc(EchoMsg.new, return_op: true)
+    begin
+      op.execute
+    rescue GRPC::Unknown => e
+      status_from_exception = e.to_status
+    end
+    expect(GRPC::GoogleRpcStatusUtils.extract_google_rpc_status(
+             status_from_exception)).to be(nil)
+    expect(GRPC::GoogleRpcStatusUtils.extract_google_rpc_status(
+             op.status)).to be nil
+  end
+end
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 77ed155..f73a9c1 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -368,8 +368,9 @@
     grpc_fake_resolver_response_generator_unref(response_generator_);
   }
 
-  void ResetStub() {
+  void ResetStub(int fallback_timeout = 0) {
     ChannelArguments args;
+    args.SetGrpclbFallbackTimeout(fallback_timeout);
     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
                     response_generator_);
     std::ostringstream uri;
@@ -470,10 +471,10 @@
     grpc_exec_ctx_finish(&exec_ctx);
   }
 
-  const std::vector<int> GetBackendPorts() const {
+  const std::vector<int> GetBackendPorts(const size_t start_index = 0) const {
     std::vector<int> backend_ports;
-    for (const auto& bs : backend_servers_) {
-      backend_ports.push_back(bs.port_);
+    for (size_t i = start_index; i < backend_servers_.size(); ++i) {
+      backend_ports.push_back(backend_servers_[i].port_);
     }
     return backend_ports;
   }
@@ -642,6 +643,177 @@
   EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
 }
 
+TEST_F(SingleBalancerTest, Fallback) {
+  const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
+  const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
+  const size_t kNumBackendInResolution = backends_.size() / 2;
+
+  ResetStub(kFallbackTimeoutMs);
+  std::vector<AddressData> addresses;
+  addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
+  for (size_t i = 0; i < kNumBackendInResolution; ++i) {
+    addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
+  }
+  SetNextResolution(addresses);
+
+  // Send non-empty serverlist only after kServerlistDelayMs.
+  ScheduleResponseForBalancer(
+      0, BalancerServiceImpl::BuildResponseForBackends(
+             GetBackendPorts(kNumBackendInResolution /* start_index */), {}),
+      kServerlistDelayMs);
+
+  // Wait until all the fallback backends are reachable.
+  for (size_t i = 0; i < kNumBackendInResolution; ++i) {
+    WaitForBackend(i);
+  }
+
+  // The first request.
+  gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
+  CheckRpcSendOk(kNumBackendInResolution);
+  gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
+
+  // Fallback is used: each backend returned by the resolver should have
+  // gotten one request.
+  for (size_t i = 0; i < kNumBackendInResolution; ++i) {
+    EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
+  }
+  for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
+    EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
+  }
+
+  // Wait until the serverlist reception has been processed and all backends
+  // in the serverlist are reachable.
+  for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
+    WaitForBackend(i);
+  }
+
+  // Send out the second request.
+  gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
+  CheckRpcSendOk(backends_.size() - kNumBackendInResolution);
+  gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
+
+  // Serverlist is used: each backend returned by the balancer should
+  // have gotten one request.
+  for (size_t i = 0; i < kNumBackendInResolution; ++i) {
+    EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
+  }
+  for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
+    EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
+  }
+
+  balancers_[0]->NotifyDoneWithServerlists();
+  // The balancer got a single request.
+  EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+  // and sent a single response.
+  EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+}
+
+TEST_F(SingleBalancerTest, FallbackUpdate) {
+  const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
+  const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
+  const size_t kNumBackendInResolution = backends_.size() / 3;
+  const size_t kNumBackendInResolutionUpdate = backends_.size() / 3;
+
+  ResetStub(kFallbackTimeoutMs);
+  std::vector<AddressData> addresses;
+  addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
+  for (size_t i = 0; i < kNumBackendInResolution; ++i) {
+    addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
+  }
+  SetNextResolution(addresses);
+
+  // Send non-empty serverlist only after kServerlistDelayMs.
+  ScheduleResponseForBalancer(
+      0, BalancerServiceImpl::BuildResponseForBackends(
+             GetBackendPorts(kNumBackendInResolution +
+                             kNumBackendInResolutionUpdate /* start_index */),
+             {}),
+      kServerlistDelayMs);
+
+  // Wait until all the fallback backends are reachable.
+  for (size_t i = 0; i < kNumBackendInResolution; ++i) {
+    WaitForBackend(i);
+  }
+
+  // The first request.
+  gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
+  CheckRpcSendOk(kNumBackendInResolution);
+  gpr_log(GPR_INFO, "========= DONE WITH FIRST BATCH ==========");
+
+  // Fallback is used: each backend returned by the resolver should have
+  // gotten one request.
+  for (size_t i = 0; i < kNumBackendInResolution; ++i) {
+    EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
+  }
+  for (size_t i = kNumBackendInResolution; i < backends_.size(); ++i) {
+    EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
+  }
+
+  addresses.clear();
+  addresses.emplace_back(AddressData{balancer_servers_[0].port_, true, ""});
+  for (size_t i = kNumBackendInResolution;
+       i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
+    addresses.emplace_back(AddressData{backend_servers_[i].port_, false, ""});
+  }
+  SetNextResolution(addresses);
+
+  // Wait until the resolution update has been processed and all the new
+  // fallback backends are reachable.
+  for (size_t i = kNumBackendInResolution;
+       i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
+    WaitForBackend(i);
+  }
+
+  // Send out the second request.
+  gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
+  CheckRpcSendOk(kNumBackendInResolutionUpdate);
+  gpr_log(GPR_INFO, "========= DONE WITH SECOND BATCH ==========");
+
+  // The resolution update is used: each backend in the resolution update should
+  // have gotten one request.
+  for (size_t i = 0; i < kNumBackendInResolution; ++i) {
+    EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
+  }
+  for (size_t i = kNumBackendInResolution;
+       i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
+    EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
+  }
+  for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
+       i < backends_.size(); ++i) {
+    EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
+  }
+
+  // Wait until the serverlist reception has been processed and all backends
+  // in the serverlist are reachable.
+  for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
+       i < backends_.size(); ++i) {
+    WaitForBackend(i);
+  }
+
+  // Send out the third request.
+  gpr_log(GPR_INFO, "========= BEFORE THIRD BATCH ==========");
+  CheckRpcSendOk(backends_.size() - kNumBackendInResolution -
+                 kNumBackendInResolutionUpdate);
+  gpr_log(GPR_INFO, "========= DONE WITH THIRD BATCH ==========");
+
+  // Serverlist is used: each backend returned by the balancer should
+  // have gotten one request.
+  for (size_t i = 0;
+       i < kNumBackendInResolution + kNumBackendInResolutionUpdate; ++i) {
+    EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
+  }
+  for (size_t i = kNumBackendInResolution + kNumBackendInResolutionUpdate;
+       i < backends_.size(); ++i) {
+    EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
+  }
+
+  balancers_[0]->NotifyDoneWithServerlists();
+  // The balancer got a single request.
+  EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+  // and sent a single response.
+  EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+}
+
 TEST_F(SingleBalancerTest, BackendsRestart) {
   const size_t kNumRpcsPerAddress = 100;
   ScheduleResponseForBalancer(