Implement client-side load reporting for grpclb.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index 9e158a9..3746810 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -108,13 +108,16 @@
 
 #include "src/core/ext/filters/client_channel/client_channel.h"
 #include "src/core/ext/filters/client_channel/client_channel_factory.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
 #include "src/core/ext/filters/client_channel/parse_address.h"
 #include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/channel_stack.h"
 #include "src/core/lib/iomgr/combiner.h"
 #include "src/core/lib/iomgr/sockaddr.h"
 #include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -126,6 +129,7 @@
 #include "src/core/lib/support/string.h"
 #include "src/core/lib/surface/call.h"
 #include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/channel_init.h"
 #include "src/core/lib/transport/static_metadata.h"
 
 #define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
@@ -147,6 +151,10 @@
                                       lb_token_mdelem_storage, lb_token);
 }
 
+static void destroy_client_stats(void *arg) {
+  grpc_grpclb_client_stats_unref(arg);
+}
+
 typedef struct wrapped_rr_closure_arg {
   /* the closure instance using this struct as argument */
   grpc_closure wrapper_closure;
@@ -163,6 +171,13 @@
    * initial metadata */
   grpc_connected_subchannel **target;
 
+  /* the context to be populated for the subchannel call */
+  grpc_call_context_element *context;
+
+  /* Stats for client-side load reporting. Note that this holds a
+   * reference, which must be either passed on via context or unreffed. */
+  grpc_grpclb_client_stats *client_stats;
+
   /* the LB token associated with the pick */
   grpc_mdelem lb_token;
 
@@ -202,6 +217,12 @@
                 (void *)*wc_arg->target, (void *)wc_arg->rr_policy);
         abort();
       }
+      // Pass on client stats via context. Passes ownership of the reference.
+      GPR_ASSERT(wc_arg->client_stats != NULL);
+      wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
+      wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
+    } else {
+      grpc_grpclb_client_stats_unref(wc_arg->client_stats);
     }
     if (grpc_lb_glb_trace) {
       gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
@@ -237,6 +258,7 @@
 static void add_pending_pick(pending_pick **root,
                              const grpc_lb_policy_pick_args *pick_args,
                              grpc_connected_subchannel **target,
+                             grpc_call_context_element *context,
                              grpc_closure *on_complete) {
   pending_pick *pp = gpr_zalloc(sizeof(*pp));
   pp->next = *root;
@@ -244,6 +266,7 @@
   pp->target = target;
   pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
   pp->wrapped_on_complete_arg.target = target;
+  pp->wrapped_on_complete_arg.context = context;
   pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
   pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
       pick_args->lb_token_mdelem_storage;
@@ -316,6 +339,10 @@
   /************************************************************/
   /*  client data associated with the LB server communication */
   /************************************************************/
+
+  /* Finished sending initial request. */
+  grpc_closure lb_on_sent_initial_request;
+
   /* Status from the LB server has been received. This signals the end of the LB
    * call. */
   grpc_closure lb_on_server_status_received;
@@ -348,6 +375,23 @@
 
   /** LB call retry timer */
   grpc_timer lb_call_retry_timer;
+
+  bool initial_request_sent;
+  bool seen_initial_response;
+
+  /* Stats for client-side load reporting. Should be unreffed and
+   * recreated whenever lb_call is replaced. */
+  grpc_grpclb_client_stats *client_stats;
+  /* Interval and timer for next client load report. */
+  gpr_timespec client_stats_report_interval;
+  grpc_timer client_load_report_timer;
+  bool client_load_report_timer_pending;
+  bool last_client_load_report_counters_were_zero;
+  /* Closure used for either the load report timer or the callback for
+   * completion of sending the load report. */
+  grpc_closure client_load_report_closure;
+  /* Client load report message payload. */
+  grpc_byte_buffer *client_load_report_payload;
 } glb_lb_policy;
 
 /* Keeps track and reacts to changes in connectivity of the RR instance */
@@ -552,8 +596,8 @@
     grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
   GPR_ASSERT(rr_policy != NULL);
   const bool pick_done = grpc_lb_policy_pick_locked(
-      exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token,
-      &wc_arg->wrapper_closure);
+      exec_ctx, rr_policy, pick_args, target, wc_arg->context,
+      (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
   if (pick_done) {
     /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
     if (grpc_lb_glb_trace) {
@@ -567,7 +611,12 @@
                                   pick_args->lb_token_mdelem_storage,
                                   GRPC_MDELEM_REF(wc_arg->lb_token));
 
-    gpr_free(wc_arg);
+    // Pass on client stats via context. Passes ownership of the reference.
+    GPR_ASSERT(wc_arg->client_stats != NULL);
+    wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
+    wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
+
+    gpr_free(wc_arg->free_when_done);
   }
   /* else, the pending pick will be registered and taken care of by the
    * pending pick list inside the RR policy (glb_policy->rr_policy).
@@ -690,6 +739,8 @@
     glb_policy->pending_picks = pp->next;
     GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
     pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
+    pp->wrapped_on_complete_arg.client_stats =
+        grpc_grpclb_client_stats_ref(glb_policy->client_stats);
     if (grpc_lb_glb_trace) {
       gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
               (intptr_t)glb_policy->rr_policy);
@@ -864,9 +915,18 @@
   grpc_uri_destroy(uri);
 
   glb_policy->cc_factory = args->client_channel_factory;
-  glb_policy->args = grpc_channel_args_copy(args->args);
   GPR_ASSERT(glb_policy->cc_factory != NULL);
 
+  // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
+  // since we use this to trigger the client_load_reporting filter.
+  grpc_arg new_arg;
+  new_arg.key = GRPC_ARG_LB_POLICY_NAME;
+  new_arg.type = GRPC_ARG_STRING;
+  new_arg.value.string = "grpclb";
+  static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
+  glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
+      args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
+
   grpc_slice_hash_table *targets_info = NULL;
   /* Create a client channel over them to communicate with a LB service */
   char *lb_service_target_addresses =
@@ -880,6 +940,8 @@
   grpc_channel_args_destroy(exec_ctx, lb_channel_args);
   gpr_free(lb_service_target_addresses);
   if (glb_policy->lb_channel == NULL) {
+    gpr_free((void *)glb_policy->server_name);
+    grpc_channel_args_destroy(exec_ctx, glb_policy->args);
     gpr_free(glb_policy);
     return NULL;
   }
@@ -895,6 +957,9 @@
   GPR_ASSERT(glb_policy->pending_pings == NULL);
   gpr_free((void *)glb_policy->server_name);
   grpc_channel_args_destroy(exec_ctx, glb_policy->args);
+  if (glb_policy->client_stats != NULL) {
+    grpc_grpclb_client_stats_unref(glb_policy->client_stats);
+  }
   grpc_channel_destroy(glb_policy->lb_channel);
   glb_policy->lb_channel = NULL;
   grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
@@ -1011,7 +1076,8 @@
 
 static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
                            const grpc_lb_policy_pick_args *pick_args,
-                           grpc_connected_subchannel **target, void **user_data,
+                           grpc_connected_subchannel **target,
+                           grpc_call_context_element *context, void **user_data,
                            grpc_closure *on_complete) {
   if (pick_args->lb_token_mdelem_storage == NULL) {
     *target = NULL;
@@ -1039,6 +1105,10 @@
                       grpc_schedule_on_exec_ctx);
     wc_arg->rr_policy = glb_policy->rr_policy;
     wc_arg->target = target;
+    wc_arg->context = context;
+    GPR_ASSERT(glb_policy->client_stats != NULL);
+    wc_arg->client_stats =
+        grpc_grpclb_client_stats_ref(glb_policy->client_stats);
     wc_arg->wrapped_closure = on_complete;
     wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
     wc_arg->initial_metadata = pick_args->initial_metadata;
@@ -1052,7 +1122,7 @@
               "picks",
               (void *)(glb_policy));
     }
-    add_pending_pick(&glb_policy->pending_picks, pick_args, target,
+    add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
                      on_complete);
 
     if (!glb_policy->started_picking) {
@@ -1093,6 +1163,104 @@
       exec_ctx, &glb_policy->state_tracker, current, notify);
 }
 
+static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                           grpc_error *error);
+
+static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
+                                             glb_lb_policy *glb_policy) {
+  const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+  const gpr_timespec next_client_load_report_time =
+      gpr_time_add(now, glb_policy->client_stats_report_interval);
+  grpc_closure_init(&glb_policy->client_load_report_closure,
+                    send_client_load_report_locked, glb_policy,
+                    grpc_combiner_scheduler(glb_policy->base.combiner, false));
+  grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer,
+                  next_client_load_report_time,
+                  &glb_policy->client_load_report_closure, now);
+}
+
+static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                           grpc_error *error) {
+  glb_lb_policy *glb_policy = arg;
+  grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
+  glb_policy->client_load_report_payload = NULL;
+  if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) {
+    glb_policy->client_load_report_timer_pending = false;
+    GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+                              "client_load_report");
+    return;
+  }
+  schedule_next_client_load_report(exec_ctx, glb_policy);
+}
+
+static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
+                                              glb_lb_policy *glb_policy) {
+  grpc_op op;
+  memset(&op, 0, sizeof(op));
+  op.op = GRPC_OP_SEND_MESSAGE;
+  op.data.send_message.send_message = glb_policy->client_load_report_payload;
+  grpc_closure_init(&glb_policy->client_load_report_closure,
+                    client_load_report_done_locked, glb_policy,
+                    grpc_combiner_scheduler(glb_policy->base.combiner, false));
+  grpc_call_error call_error = grpc_call_start_batch_and_execute(
+      exec_ctx, glb_policy->lb_call, &op, 1,
+      &glb_policy->client_load_report_closure);
+  GPR_ASSERT(GRPC_CALL_OK == call_error);
+}
+
+static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
+  return request->client_stats.num_calls_started == 0 &&
+         request->client_stats.num_calls_finished == 0 &&
+         request->client_stats.num_calls_finished_with_drop_for_rate_limiting ==
+             0 &&
+         request->client_stats
+                 .num_calls_finished_with_drop_for_load_balancing == 0 &&
+         request->client_stats.num_calls_finished_with_client_failed_to_send ==
+             0 &&
+         request->client_stats.num_calls_finished_known_received == 0;
+}
+
+static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
+                                           grpc_error *error) {
+  glb_lb_policy *glb_policy = arg;
+  if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) {
+    glb_policy->client_load_report_timer_pending = false;
+    GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+                              "client_load_report");
+    return;
+  }
+  // Construct message payload.
+  GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
+  grpc_grpclb_request *request =
+      grpc_grpclb_load_report_request_create(glb_policy->client_stats);
+  // Skip client load report if the counters were all zero in the last
+  // report and they are still zero in this one.
+  if (load_report_counters_are_zero(request)) {
+    if (glb_policy->last_client_load_report_counters_were_zero) {
+      grpc_grpclb_request_destroy(request);
+      schedule_next_client_load_report(exec_ctx, glb_policy);
+      return;
+    }
+    glb_policy->last_client_load_report_counters_were_zero = true;
+  } else {
+    glb_policy->last_client_load_report_counters_were_zero = false;
+  }
+  grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
+  glb_policy->client_load_report_payload =
+      grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+  grpc_slice_unref_internal(exec_ctx, request_payload_slice);
+  grpc_grpclb_request_destroy(request);
+  // If we've already sent the initial request, then we can go ahead and
+  // sent the load report.  Otherwise, we need to wait until the initial
+  // request has been sent to send this
+  // (see lb_on_sent_initial_request_locked() below).
+  if (glb_policy->initial_request_sent) {
+    do_send_client_load_report_locked(exec_ctx, glb_policy);
+  }
+}
+
+static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx,
+                                              void *arg, grpc_error *error);
 static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
                                                 void *arg, grpc_error *error);
 static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -1114,6 +1282,11 @@
       &host, glb_policy->deadline, NULL);
   grpc_slice_unref_internal(exec_ctx, host);
 
+  if (glb_policy->client_stats != NULL) {
+    grpc_grpclb_client_stats_unref(glb_policy->client_stats);
+  }
+  glb_policy->client_stats = grpc_grpclb_client_stats_create();
+
   grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
   grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
 
@@ -1125,6 +1298,9 @@
   grpc_slice_unref_internal(exec_ctx, request_payload_slice);
   grpc_grpclb_request_destroy(request);
 
+  grpc_closure_init(&glb_policy->lb_on_sent_initial_request,
+                    lb_on_sent_initial_request_locked, glb_policy,
+                    grpc_combiner_scheduler(glb_policy->base.combiner, false));
   grpc_closure_init(&glb_policy->lb_on_server_status_received,
                     lb_on_server_status_received_locked, glb_policy,
                     grpc_combiner_scheduler(glb_policy->base.combiner, false));
@@ -1138,6 +1314,10 @@
                    GRPC_GRPCLB_RECONNECT_JITTER,
                    GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
                    GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+
+  glb_policy->initial_request_sent = false;
+  glb_policy->seen_initial_response = false;
+  glb_policy->last_client_load_report_counters_were_zero = false;
 }
 
 static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
@@ -1151,6 +1331,10 @@
 
   grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
   grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details);
+
+  if (!glb_policy->client_load_report_timer_pending) {
+    grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer);
+  }
 }
 
 /*
@@ -1179,21 +1363,27 @@
   op->flags = 0;
   op->reserved = NULL;
   op++;
-
   op->op = GRPC_OP_RECV_INITIAL_METADATA;
   op->data.recv_initial_metadata.recv_initial_metadata =
       &glb_policy->lb_initial_metadata_recv;
   op->flags = 0;
   op->reserved = NULL;
   op++;
-
   GPR_ASSERT(glb_policy->lb_request_payload != NULL);
   op->op = GRPC_OP_SEND_MESSAGE;
   op->data.send_message.send_message = glb_policy->lb_request_payload;
   op->flags = 0;
   op->reserved = NULL;
   op++;
+  /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
+   * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */
+  GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received");
+  call_error = grpc_call_start_batch_and_execute(
+      exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
+      &glb_policy->lb_on_sent_initial_request);
+  GPR_ASSERT(GRPC_CALL_OK == call_error);
 
+  op = ops;
   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
   op->data.recv_status_on_client.trailing_metadata =
       &glb_policy->lb_trailing_metadata_recv;
@@ -1225,6 +1415,19 @@
   GPR_ASSERT(GRPC_CALL_OK == call_error);
 }
 
+static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx,
+                                              void *arg, grpc_error *error) {
+  glb_lb_policy *glb_policy = arg;
+  glb_policy->initial_request_sent = true;
+  // If we attempted to send a client load report before the initial
+  // request was sent, send the load report now.
+  if (glb_policy->client_load_report_payload != NULL) {
+    do_send_client_load_report_locked(exec_ctx, glb_policy);
+  }
+  GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
+                            "lb_on_response_received_locked");
+}
+
 static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
                                            grpc_error *error) {
   glb_lb_policy *glb_policy = arg;
@@ -1240,58 +1443,91 @@
     grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
     grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
     grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
-    grpc_grpclb_serverlist *serverlist =
-        grpc_grpclb_response_parse_serverlist(response_slice);
-    if (serverlist != NULL) {
-      GPR_ASSERT(glb_policy->lb_call != NULL);
-      grpc_slice_unref_internal(exec_ctx, response_slice);
-      if (grpc_lb_glb_trace) {
-        gpr_log(GPR_INFO, "Serverlist with %lu servers received",
-                (unsigned long)serverlist->num_servers);
-        for (size_t i = 0; i < serverlist->num_servers; ++i) {
-          grpc_resolved_address addr;
-          parse_server(serverlist->servers[i], &addr);
-          char *ipport;
-          grpc_sockaddr_to_string(&ipport, &addr, false);
-          gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
-          gpr_free(ipport);
-        }
-      }
 
-      /* update serverlist */
-      if (serverlist->num_servers > 0) {
-        if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
-          if (grpc_lb_glb_trace) {
-            gpr_log(GPR_INFO,
-                    "Incoming server list identical to current, ignoring.");
-          }
-          grpc_grpclb_destroy_serverlist(serverlist);
-        } else { /* new serverlist */
-          if (glb_policy->serverlist != NULL) {
-            /* dispose of the old serverlist */
-            grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
-          }
-          /* and update the copy in the glb_lb_policy instance. This serverlist
-           * instance will be destroyed either upon the next update or in
-           * glb_destroy() */
-          glb_policy->serverlist = serverlist;
-
-          rr_handover_locked(exec_ctx, glb_policy);
-        }
-      } else {
+    grpc_grpclb_initial_response *response = NULL;
+    if (!glb_policy->seen_initial_response &&
+        (response = grpc_grpclb_initial_response_parse(response_slice)) !=
+            NULL) {
+      if (response->has_client_stats_report_interval) {
+        glb_policy->client_stats_report_interval =
+            gpr_time_max(gpr_time_from_seconds(1, GPR_TIMESPAN),
+                         grpc_grpclb_duration_to_timespec(
+                             &response->client_stats_report_interval));
         if (grpc_lb_glb_trace) {
           gpr_log(GPR_INFO,
-                  "Received empty server list. Picks will stay pending until a "
-                  "response with > 0 servers is received");
+                  "received initial LB response message; "
+                  "client load reporting interval = %" PRId64 ".%09d sec",
+                  glb_policy->client_stats_report_interval.tv_sec,
+                  glb_policy->client_stats_report_interval.tv_nsec);
         }
-        grpc_grpclb_destroy_serverlist(serverlist);
+        /* take a weak ref (won't prevent calling of \a glb_shutdown() if the
+         * strong ref count goes to zero) to be unref'd in
+         * send_client_load_report() */
+        glb_policy->client_load_report_timer_pending = true;
+        GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
+        schedule_next_client_load_report(exec_ctx, glb_policy);
+      } else if (grpc_lb_glb_trace) {
+        gpr_log(GPR_INFO,
+                "received initial LB response message; "
+                "client load reporting NOT enabled");
       }
-    } else { /* serverlist == NULL */
-      gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
-              grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
-      grpc_slice_unref_internal(exec_ctx, response_slice);
+      grpc_grpclb_initial_response_destroy(response);
+      glb_policy->seen_initial_response = true;
+    } else {
+      grpc_grpclb_serverlist *serverlist =
+          grpc_grpclb_response_parse_serverlist(response_slice);
+      if (serverlist != NULL) {
+        GPR_ASSERT(glb_policy->lb_call != NULL);
+        if (grpc_lb_glb_trace) {
+          gpr_log(GPR_INFO, "Serverlist with %lu servers received",
+                  (unsigned long)serverlist->num_servers);
+          for (size_t i = 0; i < serverlist->num_servers; ++i) {
+            grpc_resolved_address addr;
+            parse_server(serverlist->servers[i], &addr);
+            char *ipport;
+            grpc_sockaddr_to_string(&ipport, &addr, false);
+            gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
+            gpr_free(ipport);
+          }
+        }
+
+        /* update serverlist */
+        if (serverlist->num_servers > 0) {
+          if (grpc_grpclb_serverlist_equals(glb_policy->serverlist,
+                                            serverlist)) {
+            if (grpc_lb_glb_trace) {
+              gpr_log(GPR_INFO,
+                      "Incoming server list identical to current, ignoring.");
+            }
+            grpc_grpclb_destroy_serverlist(serverlist);
+          } else { /* new serverlist */
+            if (glb_policy->serverlist != NULL) {
+              /* dispose of the old serverlist */
+              grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
+            }
+            /* and update the copy in the glb_lb_policy instance. This
+             * serverlist instance will be destroyed either upon the next
+             * update or in glb_destroy() */
+            glb_policy->serverlist = serverlist;
+
+            rr_handover_locked(exec_ctx, glb_policy);
+          }
+        } else {
+          if (grpc_lb_glb_trace) {
+            gpr_log(GPR_INFO,
+                    "Received empty server list. Picks will stay pending until "
+                    "a response with > 0 servers is received");
+          }
+          grpc_grpclb_destroy_serverlist(serverlist);
+        }
+      } else { /* serverlist == NULL */
+        gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
+                grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
+      }
     }
 
+    grpc_slice_unref_internal(exec_ctx, response_slice);
+
     if (!glb_policy->shutting_down) {
       /* keep listening for serverlist updates */
       op->op = GRPC_OP_RECV_MESSAGE;
@@ -1403,9 +1639,29 @@
 }
 
 /* Plugin registration */
+
+// Only add client_load_reporting filter if the grpclb LB policy is used.
+static bool maybe_add_client_load_reporting_filter(
+    grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) {
+  const grpc_channel_args *args =
+      grpc_channel_stack_builder_get_channel_arguments(builder);
+  const grpc_arg *channel_arg =
+      grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
+  if (channel_arg != NULL && channel_arg->type == GRPC_ARG_STRING &&
+      strcmp(channel_arg->value.string, "grpclb") == 0) {
+    return grpc_channel_stack_builder_append_filter(
+        builder, (const grpc_channel_filter *)arg, NULL, NULL);
+  }
+  return true;
+}
+
 void grpc_lb_policy_grpclb_init() {
   grpc_register_lb_policy(grpc_glb_lb_factory_create());
   grpc_register_tracer("glb", &grpc_lb_glb_trace);
+  grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
+                                   GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+                                   maybe_add_client_load_reporting_filter,
+                                   (void *)&grpc_client_load_reporting_filter);
 }
 
 void grpc_lb_policy_grpclb_shutdown() {}