Implement grpclb drop support.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index b7c0e92..d2a2856 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -327,6 +327,11 @@
    * response has arrived. */
   grpc_grpclb_serverlist *serverlist;
 
+  /** Index into serverlist for next pick.
+   * If the server at this index is a drop, we return a drop.
+   * Otherwise, we delegate to the RR policy. */
+  size_t serverlist_index;
+
   /** list of picks that are waiting on RR's policy connectivity */
   pending_pick *pending_picks;
 
@@ -402,6 +407,9 @@
 
 static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
                             bool log) {
+  if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+    return false;
+  }
   const grpc_grpclb_ip_address *ip = &server->ip_address;
   if (server->port >> 16 != 0) {
     if (log) {
@@ -411,7 +419,6 @@
     }
     return false;
   }
-
   if (ip->size != 4 && ip->size != 16) {
     if (log) {
       gpr_log(GPR_ERROR,
@@ -445,11 +452,12 @@
 
 static void parse_server(const grpc_grpclb_server *server,
                          grpc_resolved_address *addr) {
+  memset(addr, 0, sizeof(*addr));
+  if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return;
   const uint16_t netorder_port = htons((uint16_t)server->port);
   /* the addresses are given in binary format (a in(6)_addr struct) in
    * server->ip_address.bytes. */
   const grpc_grpclb_ip_address *ip = &server->ip_address;
-  memset(addr, 0, sizeof(*addr));
   if (ip->size == 4) {
     addr->len = sizeof(struct sockaddr_in);
     struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr;
@@ -586,16 +594,51 @@
   return true;
 }
 
-/* perform a pick over \a rr_policy. Given that a pick can return immediately
- * (ignoring its completion callback) we need to perform the cleanups this
- * callback would be otherwise resposible for */
+/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
+ * immediately (ignoring its completion callback), we need to perform the
+ * cleanups this callback would otherwise be resposible for.
+ * If \a force_async is true, then we will manually schedule the
+ * completion callback even if the pick is available immediately. */
 static bool pick_from_internal_rr_locked(
-    grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy,
-    const grpc_lb_policy_pick_args *pick_args,
+    grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
+    const grpc_lb_policy_pick_args *pick_args, bool force_async,
     grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
-  GPR_ASSERT(rr_policy != NULL);
+  // Look at the index into the serverlist to see if we should drop this call.
+  grpc_grpclb_server *server =
+      glb_policy->serverlist->servers[glb_policy->serverlist_index++];
+  if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
+    glb_policy->serverlist_index = 0;  // Wrap-around.
+  }
+  if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+    // Not using the RR policy, so unref it.
+    if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
+      gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
+              (intptr_t)wc_arg->rr_policy);
+    }
+    GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
+    // Update client load reporting stats to indicate the number of
+    // dropped calls.  Note that we have to do this here instead of in
+    // the client_load_reporting filter, because we do not create a
+    // subchannel call (and therefore no client_load_reporting filter)
+    // for dropped calls.
+    grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats);
+    grpc_grpclb_client_stats_add_call_finished(
+        server->drop_for_rate_limiting, server->drop_for_load_balancing,
+        false /* failed_to_send */, false /* known_received */,
+        wc_arg->client_stats);
+    grpc_grpclb_client_stats_unref(wc_arg->client_stats);
+    if (force_async) {
+      GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+      grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+      gpr_free(wc_arg->free_when_done);
+      return false;
+    }
+    gpr_free(wc_arg->free_when_done);
+    return true;
+  }
+  // Pick via the RR policy.
   const bool pick_done = grpc_lb_policy_pick_locked(
-      exec_ctx, rr_policy, pick_args, target, wc_arg->context,
+      exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context,
       (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
   if (pick_done) {
     /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
@@ -604,17 +647,20 @@
               (intptr_t)wc_arg->rr_policy);
     }
     GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
-
     /* add the load reporting initial metadata */
     initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata,
                                   pick_args->lb_token_mdelem_storage,
                                   GRPC_MDELEM_REF(wc_arg->lb_token));
-
     // Pass on client stats via context. Passes ownership of the reference.
     GPR_ASSERT(wc_arg->client_stats != NULL);
     wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
     wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
-
+    if (force_async) {
+      GPR_ASSERT(wc_arg->wrapped_closure != NULL);
+      grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
+      gpr_free(wc_arg->free_when_done);
+      return false;
+    }
     gpr_free(wc_arg->free_when_done);
   }
   /* else, the pending pick will be registered and taken care of by the
@@ -744,8 +790,8 @@
       gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
               (intptr_t)glb_policy->rr_policy);
     }
-    pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
-                                 &pp->pick_args, pp->target,
+    pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args,
+                                 true /* force_async */, pp->target,
                                  &pp->wrapped_on_complete_arg);
   }
 
@@ -1115,8 +1161,9 @@
     wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
     wc_arg->initial_metadata = pick_args->initial_metadata;
     wc_arg->free_when_done = wc_arg;
-    pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
-                                             pick_args, target, wc_arg);
+    pick_done =
+        pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
+                                     false /* force_async */, target, wc_arg);
   } else {
     if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
       gpr_log(GPR_DEBUG,
@@ -1517,7 +1564,7 @@
              * serverlist instance will be destroyed either upon the next
              * update or in glb_destroy() */
             glb_policy->serverlist = serverlist;
-
+            glb_policy->serverlist_index = 0;
             rr_handover_locked(exec_ctx, glb_policy);
           }
         } else {