Merge pull request #11026 from markdroth/grpclb_drop_alt

Implement grpclb drop support.
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index 24843d5..f2f27b9 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -914,10 +914,13 @@
   grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
                                            chand->interested_parties);
   if (calld->connected_subchannel == NULL) {
-    gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
+    gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL);
     fail_locked(exec_ctx, calld,
-                GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
-                    "Failed to create subchannel", &error, 1));
+                error == GRPC_ERROR_NONE
+                    ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+                          "Call dropped by load balancing policy")
+                    : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+                          "Failed to create subchannel", &error, 1));
   } else if (GET_CALL(calld) == CANCELLED_CALL) {
     /* already cancelled before subchannel became ready */
     grpc_error *cancellation_error =
@@ -1180,6 +1183,15 @@
             &calld->next_step)) {
       calld->pick_pending = false;
       GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
+      if (calld->connected_subchannel == NULL) {
+        gpr_atm_no_barrier_store(&calld->subchannel_call,
+                                 (gpr_atm)CANCELLED_CALL);
+        grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+            "Call dropped by load balancing policy");
+        fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
+        grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+        return;  // Early out.
+      }
     } else {
       grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent,
                                              chand->interested_parties);
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 184b2ef..fb4aa08 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -149,7 +149,9 @@
 
 /** Finds an appropriate subchannel for a call, based on \a pick_args.
 
-    \a target will be set to the selected subchannel, or NULL on failure.
+    \a target will be set to the selected subchannel, or NULL on failure
+    or when the LB policy decides to drop the call.
+
     Upon success, \a user_data will be set to whatever opaque information
     may need to be propagated from the LB policy, or NULL if not needed.
     \a context will be populated with context to pass to the subchannel
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index b7c0e92..d2a2856 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -327,6 +327,11 @@
    * response has arrived. */
   grpc_grpclb_serverlist *serverlist;
 
+  /** Index into serverlist for next pick.
+   * If the server at this index is a drop, we return a drop.
+   * Otherwise, we delegate to the RR policy. */
+  size_t serverlist_index;
+
   /** list of picks that are waiting on RR's policy connectivity */
   pending_pick *pending_picks;
 
@@ -402,6 +407,9 @@
 
 static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
                             bool log) {
+  if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+    return false;
+  }
   const grpc_grpclb_ip_address *ip = &server->ip_address;
   if (server->port >> 16 != 0) {
     if (log) {
@@ -411,7 +419,6 @@
     }
     return false;
   }
-
   if (ip->size != 4 && ip->size != 16) {
     if (log) {
       gpr_log(GPR_ERROR,
@@ -445,11 +452,12 @@
 
 static void parse_server(const grpc_grpclb_server *server,
                          grpc_resolved_address *addr) {
+  memset(addr, 0, sizeof(*addr));
+  if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return;
   const uint16_t netorder_port = htons((uint16_t)server->port);
   /* the addresses are given in binary format (a in(6)_addr struct) in
    * server->ip_address.bytes. */
   const grpc_grpclb_ip_address *ip = &server->ip_address;
-  memset(addr, 0, sizeof(*addr));
   if (ip->size == 4) {
     addr->len = sizeof(struct sockaddr_in);
     struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr;
@@ -586,16 +594,51 @@
   return true;
 }
 
-/* perform a pick over \a rr_policy. Given that a pick can return immediately
- * (ignoring its completion callback) we need to perform the cleanups this
- * callback would be otherwise resposible for */
+/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
+ * immediately (ignoring its completion callback), we need to perform the
+ * cleanups this callback would otherwise be resposible for.
+ * If \a force_async is true, then we will manually schedule the
+ * completion callback even if the pick is available immediately. */
 static bool pick_from_internal_rr_locked(
-    grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy,
-    const grpc_lb_policy_pick_args *pick_args,
+    grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
+    const grpc_lb_policy_pick_args *pick_args, bool force_async,
     grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
-  GPR_ASSERT(rr_policy != NULL);
+  // Look at the index into the serverlist to see if we should drop this call.
+  grpc_grpclb_server *server =
+      glb_policy->serverlist->servers[glb_policy->serverlist_index++];
+  if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
+    glb_policy->serverlist_index = 0;  // Wrap-around.
+  }
+  if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+    // Not using the RR policy, so unref it.
+    if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+      gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
+              (intptr_t)wc_arg->rr_policy);
+    }
+    GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
+    // Update client load reporting stats to indicate the number of
+    // dropped calls.  Note that we have to do this here instead of in
+    // the client_load_reporting filter, because we do not create a
+    // subchannel call (and therefore no client_load_reporting filter)
+    // for dropped calls.
+    grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats);
+    grpc_grpclb_client_stats_add_call_finished(
+        server->drop_for_rate_limiting, server->drop_for_load_balancing,
+        false /* failed_to_send */, false /* known_received */,
+        wc_arg->client_stats);
+    grpc_grpclb_client_stats_unref(wc_arg->client_stats);
+    if (force_async) {
+      GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+      grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+      gpr_free(wc_arg->free_when_done);
+      return false;
+    }
+    gpr_free(wc_arg->free_when_done);
+    return true;
+  }
+  // Pick via the RR policy.
   const bool pick_done = grpc_lb_policy_pick_locked(
-      exec_ctx, rr_policy, pick_args, target, wc_arg->context,
+      exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context,
       (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
   if (pick_done) {
     /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
@@ -604,17 +647,20 @@
               (intptr_t)wc_arg->rr_policy);
     }
     GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
-
     /* add the load reporting initial metadata */
     initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata,
                                   pick_args->lb_token_mdelem_storage,
                                   GRPC_MDELEM_REF(wc_arg->lb_token));
-
     // Pass on client stats via context. Passes ownership of the reference.
     GPR_ASSERT(wc_arg->client_stats != NULL);
     wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
     wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
-
+    if (force_async) {
+      GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+      grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+      gpr_free(wc_arg->free_when_done);
+      return false;
+    }
     gpr_free(wc_arg->free_when_done);
   }
   /* else, the pending pick will be registered and taken care of by the
@@ -744,8 +790,8 @@
       gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
               (intptr_t)glb_policy->rr_policy);
     }
-    pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
-                                 &pp->pick_args, pp->target,
+    pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args,
+                                 true /* force_async */, pp->target,
                                  &pp->wrapped_on_complete_arg);
   }
 
@@ -1115,8 +1161,9 @@
     wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
     wc_arg->initial_metadata = pick_args->initial_metadata;
     wc_arg->free_when_done = wc_arg;
-    pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
-                                             pick_args, target, wc_arg);
+    pick_done =
+        pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
+                                     false /* force_async */, target, wc_arg);
   } else {
     if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
       gpr_log(GPR_DEBUG,
@@ -1517,7 +1564,7 @@
              * serverlist instance will be destroyed either upon the next
              * update or in glb_destroy() */
             glb_policy->serverlist = serverlist;
-
+            glb_policy->serverlist_index = 0;
             rr_handover_locked(exec_ctx, glb_policy);
           }
         } else {
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
index 81b6932..90e7c2e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
@@ -37,44 +37,39 @@
 
 #include <grpc/support/alloc.h>
 
+/* invoked once for every Server in ServerList */
+static bool count_serverlist(pb_istream_t *stream, const pb_field_t *field,
+                             void **arg) {
+  grpc_grpclb_serverlist *sl = *arg;
+  grpc_grpclb_server server;
+  if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
+    gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
+    return false;
+  }
+  ++sl->num_servers;
+  return true;
+}
+
 typedef struct decode_serverlist_arg {
-  /* The first pass counts the number of servers in the server list. The second
-   * one allocates and decodes. */
-  bool first_pass;
   /* The decoding callback is invoked once per server in serverlist. Remember
    * which index of the serverlist are we currently decoding */
   size_t decoding_idx;
-  /* Populated after the first pass. Number of server in the input serverlist */
-  size_t num_servers;
   /* The decoded serverlist */
-  grpc_grpclb_server **servers;
+  grpc_grpclb_serverlist *serverlist;
 } decode_serverlist_arg;
 
 /* invoked once for every Server in ServerList */
 static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
                               void **arg) {
   decode_serverlist_arg *dec_arg = *arg;
-  if (dec_arg->first_pass) { /* count how many server do we have */
-    grpc_grpclb_server server;
-    if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
-      gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
-      return false;
-    }
-    dec_arg->num_servers++;
-  } else { /* second pass. Actually decode. */
-    grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server));
-    GPR_ASSERT(dec_arg->num_servers > 0);
-    if (dec_arg->decoding_idx == 0) { /* first iteration of second pass */
-      dec_arg->servers =
-          gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
-    }
-    if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
-      gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
-      return false;
-    }
-    dec_arg->servers[dec_arg->decoding_idx++] = server;
+  GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx);
+  grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server));
+  if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
+    gpr_free(server);
+    gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
+    return false;
   }
-
+  dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server;
   return true;
 }
 
@@ -165,36 +160,38 @@
 
 grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
     grpc_slice encoded_grpc_grpclb_response) {
-  bool status;
-  decode_serverlist_arg arg;
   pb_istream_t stream =
       pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response),
                              GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response));
   pb_istream_t stream_at_start = stream;
+  grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist));
   grpc_grpclb_response res;
   memset(&res, 0, sizeof(grpc_grpclb_response));
-  memset(&arg, 0, sizeof(decode_serverlist_arg));
-
-  res.server_list.servers.funcs.decode = decode_serverlist;
-  res.server_list.servers.arg = &arg;
-  arg.first_pass = true;
-  status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
+  // First pass: count number of servers.
+  res.server_list.servers.funcs.decode = count_serverlist;
+  res.server_list.servers.arg = sl;
+  bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
   if (!status) {
+    gpr_free(sl);
     gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
     return NULL;
   }
-
-  arg.first_pass = false;
-  status =
-      pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res);
-  if (!status) {
-    gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
-    return NULL;
+  // Second pass: populate servers.
+  if (sl->num_servers > 0) {
+    sl->servers = gpr_zalloc(sizeof(grpc_grpclb_server *) * sl->num_servers);
+    decode_serverlist_arg decode_arg;
+    memset(&decode_arg, 0, sizeof(decode_arg));
+    decode_arg.serverlist = sl;
+    res.server_list.servers.funcs.decode = decode_serverlist;
+    res.server_list.servers.arg = &decode_arg;
+    status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields,
+                       &res);
+    if (!status) {
+      grpc_grpclb_destroy_serverlist(sl);
+      gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
+      return NULL;
+    }
   }
-
-  grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist));
-  sl->num_servers = arg.num_servers;
-  sl->servers = arg.servers;
   if (res.server_list.has_expiration_interval) {
     sl->expiration_interval = res.server_list.expiration_interval;
   }
@@ -228,7 +225,7 @@
 
 bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs,
                                    const grpc_grpclb_serverlist *rhs) {
-  if ((lhs == NULL) || (rhs == NULL)) {
+  if (lhs == NULL || rhs == NULL) {
     return false;
   }
   if (lhs->num_servers != rhs->num_servers) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index 0687382..7f596ce 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -51,7 +51,7 @@
 typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response;
 typedef grpc_lb_v1_Server grpc_grpclb_server;
 typedef grpc_lb_v1_Duration grpc_grpclb_duration;
-typedef struct grpc_grpclb_serverlist {
+typedef struct {
   grpc_grpclb_server **servers;
   size_t num_servers;
   grpc_grpclb_duration expiration_interval;
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 8417f1a..4e1bcc7 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -98,12 +98,12 @@
 template <typename ServiceType>
 class CountedService : public ServiceType {
  public:
-  int request_count() {
+  size_t request_count() {
     std::unique_lock<std::mutex> lock(mu_);
     return request_count_;
   }
 
-  int response_count() {
+  size_t response_count() {
     std::unique_lock<std::mutex> lock(mu_);
     return response_count_;
   }
@@ -121,8 +121,8 @@
   std::mutex mu_;
 
  private:
-  int request_count_ = 0;
-  int response_count_ = 0;
+  size_t request_count_ = 0;
+  size_t response_count_ = 0;
 };
 
 using BackendService = CountedService<TestServiceImpl>;
@@ -243,9 +243,18 @@
   }
 
   static LoadBalanceResponse BuildResponseForBackends(
-      const std::vector<int>& backend_ports) {
+      const std::vector<int>& backend_ports, int num_drops_for_rate_limiting,
+      int num_drops_for_load_balancing) {
     LoadBalanceResponse response;
-    for (const int backend_port : backend_ports) {
+    for (int i = 0; i < num_drops_for_rate_limiting; ++i) {
+      auto* server = response.mutable_server_list()->add_servers();
+      server->set_drop_for_rate_limiting(true);
+    }
+    for (int i = 0; i < num_drops_for_load_balancing; ++i) {
+      auto* server = response.mutable_server_list()->add_servers();
+      server->set_drop_for_load_balancing(true);
+    }
+    for (const int& backend_port : backend_ports) {
       auto* server = response.mutable_server_list()->add_servers();
       server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
       server->set_port(backend_port);
@@ -327,10 +336,8 @@
     ChannelArguments args;
     args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
                     response_generator_);
-    std::ostringstream uri;
-    uri << "test:///servername_not_used";
-    channel_ =
-        CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args);
+    channel_ = CreateCustomChannel("test:///not_used",
+                                   InsecureChannelCredentials(), args);
     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
   }
 
@@ -467,26 +474,33 @@
 };
 
 TEST_F(SingleBalancerTest, Vanilla) {
+  const size_t kNumRpcsPerAddress = 100;
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0);
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+      0);
   // Make sure that trying to connect works without a call.
   channel_->GetState(true /* try_to_connect */);
-  // Start servers and send 100 RPCs per server.
-  const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_);
+  // Send 100 RPCs per server.
+  const auto& statuses_and_responses =
+      SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
 
   for (const auto& status_and_response : statuses_and_responses) {
-    EXPECT_TRUE(status_and_response.first.ok());
-    EXPECT_EQ(status_and_response.second.message(), kMessage_);
+    const Status& status = status_and_response.first;
+    const EchoResponse& response = status_and_response.second;
+    EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+                             << " message=" << status.error_message();
+    EXPECT_EQ(response.message(), kMessage_);
   }
 
   // Each backend should have gotten 100 requests.
   for (size_t i = 0; i < backends_.size(); ++i) {
-    EXPECT_EQ(100, backend_servers_[i].service_->request_count());
+    EXPECT_EQ(kNumRpcsPerAddress,
+              backend_servers_[i].service_->request_count());
   }
   // The balancer got a single request.
-  EXPECT_EQ(1, balancer_servers_[0].service_->request_count());
+  EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
   // and sent a single response.
-  EXPECT_EQ(1, balancer_servers_[0].service_->response_count());
+  EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
 
   // Check LB policy name for the channel.
   EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
@@ -500,7 +514,7 @@
   ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
   // Send non-empty serverlist only after kServerlistDelayMs
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()),
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
       kServerlistDelayMs);
 
   const auto t0 = system_clock::now();
@@ -518,17 +532,20 @@
 
   // Each backend should have gotten 1 request.
   for (size_t i = 0; i < backends_.size(); ++i) {
-    EXPECT_EQ(1, backend_servers_[i].service_->request_count());
+    EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
   }
   for (const auto& status_and_response : statuses_and_responses) {
-    EXPECT_TRUE(status_and_response.first.ok());
-    EXPECT_EQ(status_and_response.second.message(), kMessage_);
+    const Status& status = status_and_response.first;
+    const EchoResponse& response = status_and_response.second;
+    EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+                             << " message=" << status.error_message();
+    EXPECT_EQ(response.message(), kMessage_);
   }
 
   // The balancer got a single request.
-  EXPECT_EQ(1, balancer_servers_[0].service_->request_count());
+  EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
   // and sent two responses.
-  EXPECT_EQ(2, balancer_servers_[0].service_->response_count());
+  EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
 
   // Check LB policy name for the channel.
   EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
@@ -539,10 +556,11 @@
 
   // Send a serverlist right away.
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0);
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+      0);
   // ... and the same one a bit later.
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()),
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
       kServerlistDelayMs);
 
   // Send num_backends/2 requests.
@@ -550,14 +568,17 @@
   // only the first half of the backends will receive them.
   for (size_t i = 0; i < backends_.size(); ++i) {
     if (i < backends_.size() / 2)
-      EXPECT_EQ(1, backend_servers_[i].service_->request_count());
+      EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
     else
-      EXPECT_EQ(0, backend_servers_[i].service_->request_count());
+      EXPECT_EQ(0U, backend_servers_[i].service_->request_count());
   }
   EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
   for (const auto& status_and_response : statuses_and_responses) {
-    EXPECT_TRUE(status_and_response.first.ok());
-    EXPECT_EQ(status_and_response.second.message(), kMessage_);
+    const Status& status = status_and_response.first;
+    const EchoResponse& response = status_and_response.second;
+    EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+                             << " message=" << status.error_message();
+    EXPECT_EQ(response.message(), kMessage_);
   }
 
   // Wait for the (duplicated) serverlist update.
@@ -566,7 +587,7 @@
       gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN)));
 
   // Verify the LB has sent two responses.
-  EXPECT_EQ(2, balancer_servers_[0].service_->response_count());
+  EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
 
   // Some more calls to complete the total number of backends.
   statuses_and_responses = SendRpc(
@@ -575,52 +596,146 @@
   // Because a duplicated serverlist should have no effect, all backends must
   // have been hit once now.
   for (size_t i = 0; i < backends_.size(); ++i) {
-    EXPECT_EQ(1, backend_servers_[i].service_->request_count());
+    EXPECT_EQ(1U, backend_servers_[i].service_->request_count());
   }
   EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
   for (const auto& status_and_response : statuses_and_responses) {
-    EXPECT_TRUE(status_and_response.first.ok());
-    EXPECT_EQ(status_and_response.second.message(), kMessage_);
+    const Status& status = status_and_response.first;
+    const EchoResponse& response = status_and_response.second;
+    EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+                             << " message=" << status.error_message();
+    EXPECT_EQ(response.message(), kMessage_);
   }
 
   // The balancer got a single request.
-  EXPECT_EQ(1, balancer_servers_[0].service_->request_count());
+  EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
   // Check LB policy name for the channel.
   EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
 }
 
+TEST_F(SingleBalancerTest, Drop) {
+  const size_t kNumRpcsPerAddress = 100;
+  ScheduleResponseForBalancer(
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2),
+      0);
+  // Send 100 RPCs for each server and drop address.
+  const auto& statuses_and_responses =
+      SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3));
+
+  size_t num_drops = 0;
+  for (const auto& status_and_response : statuses_and_responses) {
+    const Status& status = status_and_response.first;
+    const EchoResponse& response = status_and_response.second;
+    if (!status.ok() &&
+        status.error_message() == "Call dropped by load balancing policy") {
+      ++num_drops;
+    } else {
+      EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+                               << " message=" << status.error_message();
+      EXPECT_EQ(response.message(), kMessage_);
+    }
+  }
+  EXPECT_EQ(kNumRpcsPerAddress * 3, num_drops);
+
+  // Each backend should have gotten 100 requests.
+  for (size_t i = 0; i < backends_.size(); ++i) {
+    EXPECT_EQ(kNumRpcsPerAddress,
+              backend_servers_[i].service_->request_count());
+  }
+  // The balancer got a single request.
+  EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+  // and sent a single response.
+  EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+}
+
 class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
  public:
   SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {}
 };
 
 TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
+  const size_t kNumRpcsPerAddress = 100;
   ScheduleResponseForBalancer(
-      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0);
-  // Start servers and send 100 RPCs per server.
-  const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_);
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+      0);
+  // Send 100 RPCs per server.
+  const auto& statuses_and_responses =
+      SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_);
 
   for (const auto& status_and_response : statuses_and_responses) {
-    EXPECT_TRUE(status_and_response.first.ok());
-    EXPECT_EQ(status_and_response.second.message(), kMessage_);
+    const Status& status = status_and_response.first;
+    const EchoResponse& response = status_and_response.second;
+    EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+                             << " message=" << status.error_message();
+    EXPECT_EQ(response.message(), kMessage_);
   }
 
   // Each backend should have gotten 100 requests.
   for (size_t i = 0; i < backends_.size(); ++i) {
-    EXPECT_EQ(100, backend_servers_[i].service_->request_count());
+    EXPECT_EQ(kNumRpcsPerAddress,
+              backend_servers_[i].service_->request_count());
   }
   // The balancer got a single request.
-  EXPECT_EQ(1, balancer_servers_[0].service_->request_count());
+  EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
   // and sent a single response.
-  EXPECT_EQ(1, balancer_servers_[0].service_->response_count());
+  EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
 
   const ClientStats client_stats = WaitForLoadReports();
-  EXPECT_EQ(100 * num_backends_, client_stats.num_calls_started);
-  EXPECT_EQ(100 * num_backends_, client_stats.num_calls_finished);
+  EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started);
+  EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
+            client_stats.num_calls_finished);
   EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting);
   EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing);
   EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
-  EXPECT_EQ(100 * num_backends_,
+  EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
+            client_stats.num_calls_finished_known_received);
+}
+
+TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
+  const size_t kNumRpcsPerAddress = 3;
+  ScheduleResponseForBalancer(
+      0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1),
+      0);
+  // Send 100 RPCs for each server and drop address.
+  const auto& statuses_and_responses =
+      SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3));
+
+  size_t num_drops = 0;
+  for (const auto& status_and_response : statuses_and_responses) {
+    const Status& status = status_and_response.first;
+    const EchoResponse& response = status_and_response.second;
+    if (!status.ok() &&
+        status.error_message() == "Call dropped by load balancing policy") {
+      ++num_drops;
+    } else {
+      EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+                               << " message=" << status.error_message();
+      EXPECT_EQ(response.message(), kMessage_);
+    }
+  }
+  EXPECT_EQ(kNumRpcsPerAddress * 3, num_drops);
+
+  // Each backend should have gotten 100 requests.
+  for (size_t i = 0; i < backends_.size(); ++i) {
+    EXPECT_EQ(kNumRpcsPerAddress,
+              backend_servers_[i].service_->request_count());
+  }
+  // The balancer got a single request.
+  EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+  // and sent a single response.
+  EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+
+  const ClientStats client_stats = WaitForLoadReports();
+  EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3),
+            client_stats.num_calls_started);
+  EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3),
+            client_stats.num_calls_finished);
+  EXPECT_EQ(kNumRpcsPerAddress * 2,
+            client_stats.num_calls_finished_with_drop_for_rate_limiting);
+  EXPECT_EQ(kNumRpcsPerAddress,
+            client_stats.num_calls_finished_with_drop_for_load_balancing);
+  EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
+  EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
             client_stats.num_calls_finished_known_received);
 }