Convert LB policy API to C++.
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index da82b3f..11163a5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -16,68 +16,48 @@
  *
  */
 
-/** Implementation of the gRPC LB policy.
- *
- * This policy takes as input a set of resolved addresses {a1..an} for which the
- * LB set was set (it's the resolver's responsibility to ensure this). That is
- * to say, {a1..an} represent a collection of LB servers.
- *
- * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
- * This channel behaves just like a regular channel. In particular, the
- * constructed URI over the addresses a1..an will use the default pick first
- * policy to select from this list of LB server backends.
- *
- * The first time the policy gets a request for a pick, a ping, or to exit the
- * idle state, \a query_for_backends_locked() is called. This function sets up
- * and initiates the internal communication with the LB server. In particular,
- * it's responsible for instantiating the internal *streaming* call to the LB
- * server (whichever address from {a1..an} pick-first chose). This call is
- * serviced by two callbacks, \a lb_on_server_status_received and \a
- * lb_on_response_received. The former will be called when the call to the LB
- * server completes. This can happen if the LB server closes the connection or
- * if this policy itself cancels the call (for example because it's shutting
- * down). If the internal call times out, the usual behavior of pick-first
- * applies, continuing to pick from the list {a1..an}.
- *
- * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a
- * res_recv. An invalid one results in the termination of the streaming call. A
- * new streaming call should be created if possible, failing the original call
- * otherwise. For a valid \a LoadBalancingResponse, the server list of actual
- * backends is extracted. A Round Robin policy will be created from this list.
- * There are two possible scenarios:
- *
- * 1. This is the first server list received. There was no previous instance of
- *    the Round Robin policy. \a rr_handover_locked() will instantiate the RR
- *    policy and perform all the pending operations over it.
- * 2. There's already a RR policy instance active. We need to introduce the new
- *    one build from the new serverlist, but taking care not to disrupt the
- *    operations in progress over the old RR instance. This is done by
- *    decreasing the reference count on the old policy. The moment no more
- *    references are held on the old RR policy, it'll be destroyed and \a
- *    on_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
- *    state. At this point we can transition to a new RR instance safely, which
- *    is done once again via \a rr_handover_locked().
- *
- *
- * Once a RR policy instance is in place (and getting updated as described),
- * calls to for a pick, a ping or a cancellation will be serviced right away by
- * forwarding them to the RR instance. Any time there's no RR policy available
- * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is
- * received, etc), pick/ping requests are added to a list of pending picks/pings
- * to be flushed and serviced as part of \a rr_handover_locked() the moment the
- * RR policy instance becomes available.
- *
- * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
- * high level design and details. */
+/// Implementation of the gRPC LB policy.
+///
+/// This policy takes as input a list of resolved addresses, which must
+/// include at least one balancer address.
+///
+/// An internal channel (\a lb_channel_) is created for the addresses
+/// from that are balancers.  This channel behaves just like a regular
+/// channel that uses pick_first to select from the list of balancer
+/// addresses.
+///
+/// The first time the policy gets a request for a pick, a ping, or to exit
+/// the idle state, \a StartPickingLocked() is called. This method is
+/// responsible for instantiating the internal *streaming* call to the LB
+/// server (whichever address pick_first chose).  The call will be complete
+/// when either the balancer sends status or when we cancel the call (e.g.,
+/// because we are shutting down).  In needed, we retry the call.  If we
+/// received at least one valid message from the server, a new call attempt
+/// will be made immediately; otherwise, we apply back-off delays between
+/// attempts.
+///
+/// We maintain an internal round_robin policy instance for distributing
+/// requests across backends.  Whenever we receive a new serverlist from
+/// the balancer, we update the round_robin policy with the new list of
+/// addresses.  If we cannot communicate with the balancer on startup,
+/// however, we may enter fallback mode, in which case we will populate
+/// the RR policy's addresses from the backend addresses returned by the
+/// resolver.
+///
+/// Once an RR policy instance is in place (and getting updated as described),
+/// calls for a pick, a ping, or a cancellation will be serviced right
+/// away by forwarding them to the RR instance.  Any time there's no RR
+/// policy available (i.e., right after the creation of the gRPCLB policy),
+/// pick and ping requests are added to a list of pending picks and pings
+/// to be flushed and serviced when the RR policy instance becomes available.
+///
+/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
+/// high level design and details.
 
-/* TODO(dgq):
- * - Implement LB service forwarding (point 2c. in the doc's diagram).
- */
-
-/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
-   using that endpoint. Because of various transitive includes in uv.h,
-   including windows.h on Windows, uv.h must be included before other system
-   headers. Therefore, sockaddr.h must always be included first */
+// With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
+// using that endpoint. Because of various transitive includes in uv.h,
+// including windows.h on Windows, uv.h must be included before other system
+// headers. Therefore, sockaddr.h must always be included first.
 #include "src/core/lib/iomgr/sockaddr.h"
 
 #include <inttypes.h>
@@ -93,7 +73,6 @@
 #include "src/core/ext/filters/client_channel/client_channel.h"
 #include "src/core/ext/filters/client_channel/client_channel_factory.h"
 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
-#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
@@ -108,6 +87,8 @@
 #include "src/core/lib/gpr/host_port.h"
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/gprpp/manual_constructor.h"
+#include "src/core/lib/gprpp/memory.h"
+#include "src/core/lib/gprpp/orphanable.h"
 #include "src/core/lib/gprpp/ref_counted_ptr.h"
 #include "src/core/lib/iomgr/combiner.h"
 #include "src/core/lib/iomgr/sockaddr.h"
@@ -127,336 +108,294 @@
 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
 
-grpc_core::TraceFlag grpc_lb_glb_trace(false, "glb");
+namespace grpc_core {
 
-struct glb_lb_policy;
+TraceFlag grpc_lb_glb_trace(false, "glb");
 
 namespace {
 
-/// Linked list of pending pick requests. It stores all information needed to
-/// eventually call (Round Robin's) pick() on them. They mainly stay pending
-/// waiting for the RR policy to be created.
-///
-/// Note that when a pick is sent to the RR policy, we inject our own
-/// on_complete callback, so that we can intercept the result before
-/// invoking the original on_complete callback.  This allows us to set the
-/// LB token metadata and add client_stats to the call context.
-/// See \a pending_pick_complete() for details.
-struct pending_pick {
-  // Our on_complete closure and the original one.
-  grpc_closure on_complete;
-  grpc_closure* original_on_complete;
-  // The original pick.
-  grpc_lb_policy_pick_state* pick;
-  // Stats for client-side load reporting. Note that this holds a
-  // reference, which must be either passed on via context or unreffed.
-  grpc_grpclb_client_stats* client_stats;
-  // The LB token associated with the pick.  This is set via user_data in
-  // the pick.
-  grpc_mdelem lb_token;
-  // The grpclb instance that created the wrapping. This instance is not owned,
-  // reference counts are untouched. It's used only for logging purposes.
-  glb_lb_policy* glb_policy;
-  // Next pending pick.
-  struct pending_pick* next;
+class GrpcLb : public LoadBalancingPolicy {
+ public:
+  GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
+
+  void UpdateLocked(const grpc_channel_args& args) override;
+  bool PickLocked(PickState* pick) override;
+  void CancelPickLocked(PickState* pick, grpc_error* error) override;
+  void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+                                 uint32_t initial_metadata_flags_eq,
+                                 grpc_error* error) override;
+  void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
+                                 grpc_closure* closure) override;
+  grpc_connectivity_state CheckConnectivityLocked(
+      grpc_error** connectivity_error) override;
+  void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
+  void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
+  void ExitIdleLocked() override;
+
+ private:
+  /// Linked list of pending pick requests. It stores all information needed to
+  /// eventually call (Round Robin's) pick() on them. They mainly stay pending
+  /// waiting for the RR policy to be created.
+  ///
+  /// Note that when a pick is sent to the RR policy, we inject our own
+  /// on_complete callback, so that we can intercept the result before
+  /// invoking the original on_complete callback.  This allows us to set the
+  /// LB token metadata and add client_stats to the call context.
+  /// See \a pending_pick_complete() for details.
+  struct PendingPick {
+    // The grpclb instance that created the wrapping. This instance is not
+    // owned; reference counts are untouched. It's used only for logging
+    // purposes.
+    GrpcLb* grpclb_policy;
+    // The original pick.
+    PickState* pick;
+    // Our on_complete closure and the original one.
+    grpc_closure on_complete;
+    grpc_closure* original_on_complete;
+    // The LB token associated with the pick.  This is set via user_data in
+    // the pick.
+    grpc_mdelem lb_token;
+    // Stats for client-side load reporting. Note that this holds a
+    // reference, which must be either passed on via context or unreffed.
+    grpc_grpclb_client_stats* client_stats = nullptr;
+    // Next pending pick.
+    PendingPick* next = nullptr;
+  };
+
+  /// A linked list of pending pings waiting for the RR policy to be created.
+  struct PendingPing {
+    grpc_closure* on_initiate;
+    grpc_closure* on_ack;
+    PendingPing* next = nullptr;
+  };
+
+  /// Contains a call to the LB server and all the data related to the call.
+  class BalancerCallState
+      : public InternallyRefCountedWithTracing<BalancerCallState> {
+   public:
+    explicit BalancerCallState(
+        RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
+
+    // It's the caller's responsibility to ensure that Orphan() is called from
+    // inside the combiner.
+    void Orphan() override;
+
+    void StartQuery();
+
+    grpc_grpclb_client_stats* client_stats() const { return client_stats_; }
+    bool seen_initial_response() const { return seen_initial_response_; }
+
+   private:
+    ~BalancerCallState();
+
+    GrpcLb* grpclb_policy() const {
+      return reinterpret_cast<GrpcLb*>(grpclb_policy_.get());
+    }
+
+    void ScheduleNextClientLoadReportLocked();
+    void SendClientLoadReportLocked();
+
+    static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
+
+    static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
+    static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
+    static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
+    static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
+    static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
+
+    // The owning LB policy.
+    RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
+
+    // The streaming call to the LB server. Always non-NULL.
+    grpc_call* lb_call_ = nullptr;
+
+    // recv_initial_metadata
+    grpc_metadata_array lb_initial_metadata_recv_;
+
+    // send_message
+    grpc_byte_buffer* send_message_payload_ = nullptr;
+    grpc_closure lb_on_initial_request_sent_;
+
+    // recv_message
+    grpc_byte_buffer* recv_message_payload_ = nullptr;
+    grpc_closure lb_on_balancer_message_received_;
+    bool seen_initial_response_ = false;
+
+    // recv_trailing_metadata
+    grpc_closure lb_on_balancer_status_received_;
+    grpc_metadata_array lb_trailing_metadata_recv_;
+    grpc_status_code lb_call_status_;
+    grpc_slice lb_call_status_details_;
+
+    // The stats for client-side load reporting associated with this LB call.
+    // Created after the first serverlist is received.
+    grpc_grpclb_client_stats* client_stats_ = nullptr;
+    grpc_millis client_stats_report_interval_ = 0;
+    grpc_timer client_load_report_timer_;
+    bool client_load_report_timer_callback_pending_ = false;
+    bool last_client_load_report_counters_were_zero_ = false;
+    bool client_load_report_is_due_ = false;
+    // The closure used for either the load report timer or the callback for
+    // completion of sending the load report.
+    grpc_closure client_load_report_closure_;
+  };
+
+  ~GrpcLb();
+
+  void ShutdownLocked() override;
+
+  // Helper function used in ctor and UpdateLocked().
+  void ProcessChannelArgsLocked(const grpc_channel_args& args);
+
+  // Methods for dealing with the balancer channel and call.
+  void StartPickingLocked();
+  void StartBalancerCallLocked();
+  static void OnFallbackTimerLocked(void* arg, grpc_error* error);
+  void StartBalancerCallRetryTimerLocked();
+  static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
+  static void OnBalancerChannelConnectivityChangedLocked(void* arg,
+                                                         grpc_error* error);
+
+  // Pending pick methods.
+  static void PendingPickSetMetadataAndContext(PendingPick* pp);
+  PendingPick* PendingPickCreate(PickState* pick);
+  void AddPendingPick(PendingPick* pp);
+  static void OnPendingPickComplete(void* arg, grpc_error* error);
+
+  // Pending ping methods.
+  void AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack);
+
+  // Methods for dealing with the RR policy.
+  void CreateOrUpdateRoundRobinPolicyLocked();
+  grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
+  void CreateRoundRobinPolicyLocked(const Args& args);
+  bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp);
+  void UpdateConnectivityStateFromRoundRobinPolicyLocked(
+      grpc_error* rr_state_error);
+  static void OnRoundRobinConnectivityChangedLocked(void* arg,
+                                                    grpc_error* error);
+  static void OnRoundRobinRequestReresolutionLocked(void* arg,
+                                                    grpc_error* error);
+
+  // Who the client is trying to communicate with.
+  const char* server_name_ = nullptr;
+
+  // Current channel args from the resolver.
+  grpc_channel_args* args_ = nullptr;
+
+  // Internal state.
+  bool started_picking_ = false;
+  bool shutting_down_ = false;
+  grpc_connectivity_state_tracker state_tracker_;
+
+  // The channel for communicating with the LB server.
+  grpc_channel* lb_channel_ = nullptr;
+  grpc_connectivity_state lb_channel_connectivity_;
+  grpc_closure lb_channel_on_connectivity_changed_;
+  // Are we already watching the LB channel's connectivity?
+  bool watching_lb_channel_ = false;
+  // Response generator to inject address updates into lb_channel_.
+  RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
+
+  // The data associated with the current LB call. It holds a ref to this LB
+  // policy. It's initialized every time we query for backends. It's reset to
+  // NULL whenever the current LB call is no longer needed (e.g., the LB policy
+  // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
+  // contains a non-NULL lb_call_.
+  OrphanablePtr<BalancerCallState> lb_calld_;
+  // Timeout in milliseconds for the LB call. 0 means no deadline.
+  int lb_call_timeout_ms_ = 0;
+  // Balancer call retry state.
+  BackOff lb_call_backoff_;
+  bool retry_timer_callback_pending_ = false;
+  grpc_timer lb_call_retry_timer_;
+  grpc_closure lb_on_call_retry_;
+
+  // The deserialized response from the balancer. May be nullptr until one
+  // such response has arrived.
+  grpc_grpclb_serverlist* serverlist_ = nullptr;
+  // Index into serverlist for next pick.
+  // If the server at this index is a drop, we return a drop.
+  // Otherwise, we delegate to the RR policy.
+  size_t serverlist_index_ = 0;
+
+  // Timeout in milliseconds for before using fallback backend addresses.
+  // 0 means not using fallback.
+  int lb_fallback_timeout_ms_ = 0;
+  // The backend addresses from the resolver.
+  grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
+  // Fallback timer.
+  bool fallback_timer_callback_pending_ = false;
+  grpc_timer lb_fallback_timer_;
+  grpc_closure lb_on_fallback_;
+
+  // Pending picks and pings that are waiting on the RR policy's connectivity.
+  PendingPick* pending_picks_ = nullptr;
+  PendingPing* pending_pings_ = nullptr;
+
+  // The RR policy to use for the backends.
+  OrphanablePtr<LoadBalancingPolicy> rr_policy_;
+  grpc_connectivity_state rr_connectivity_state_;
+  grpc_closure on_rr_connectivity_changed_;
+  grpc_closure on_rr_request_reresolution_;
 };
 
-/// A linked list of pending pings waiting for the RR policy to be created.
-struct pending_ping {
-  grpc_closure* on_initiate;
-  grpc_closure* on_ack;
-  struct pending_ping* next;
-};
+//
+// serverlist parsing code
+//
 
-}  // namespace
-
-typedef struct glb_lb_call_data {
-  struct glb_lb_policy* glb_policy;
-  // TODO(juanlishen): c++ize this struct.
-  gpr_refcount refs;
-
-  /** The streaming call to the LB server. Always non-NULL. */
-  grpc_call* lb_call;
-
-  /** The initial metadata received from the LB server. */
-  grpc_metadata_array lb_initial_metadata_recv;
-
-  /** The message sent to the LB server. It's used to query for backends (the
-   * value may vary if the LB server indicates a redirect) or send client load
-   * report. */
-  grpc_byte_buffer* send_message_payload;
-  /** The callback after the initial request is sent. */
-  grpc_closure lb_on_sent_initial_request;
-
-  /** The response received from the LB server, if any. */
-  grpc_byte_buffer* recv_message_payload;
-  /** The callback to process the response received from the LB server. */
-  grpc_closure lb_on_response_received;
-  bool seen_initial_response;
-
-  /** The callback to process the status received from the LB server, which
-   * signals the end of the LB call. */
-  grpc_closure lb_on_server_status_received;
-  /** The trailing metadata from the LB server. */
-  grpc_metadata_array lb_trailing_metadata_recv;
-  /** The call status code and details. */
-  grpc_status_code lb_call_status;
-  grpc_slice lb_call_status_details;
-
-  /** The stats for client-side load reporting associated with this LB call.
-   * Created after the first serverlist is received. */
-  grpc_grpclb_client_stats* client_stats;
-  /** The interval and timer for next client load report. */
-  grpc_millis client_stats_report_interval;
-  grpc_timer client_load_report_timer;
-  bool client_load_report_timer_callback_pending;
-  bool last_client_load_report_counters_were_zero;
-  bool client_load_report_is_due;
-  /** The closure used for either the load report timer or the callback for
-   * completion of sending the load report. */
-  grpc_closure client_load_report_closure;
-} glb_lb_call_data;
-
-typedef struct glb_lb_policy {
-  /** Base policy: must be first. */
-  grpc_lb_policy base;
-
-  /** Who the client is trying to communicate with. */
-  const char* server_name;
-
-  /** Channel related data that will be propagated to the internal RR policy. */
-  grpc_client_channel_factory* cc_factory;
-  grpc_channel_args* args;
-
-  /** Timeout in milliseconds for before using fallback backend addresses.
-   * 0 means not using fallback. */
-  int lb_fallback_timeout_ms;
-
-  /** The channel for communicating with the LB server. */
-  grpc_channel* lb_channel;
-
-  /** The data associated with the current LB call. It holds a ref to this LB
-   * policy. It's initialized every time we query for backends. It's reset to
-   * NULL whenever the current LB call is no longer needed (e.g., the LB policy
-   * is shutting down, or the LB call has ended). A non-NULL lb_calld always
-   * contains a non-NULL lb_call. */
-  glb_lb_call_data* lb_calld;
-
-  /** response generator to inject address updates into \a lb_channel */
-  grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
-      response_generator;
-
-  /** the RR policy to use of the backend servers returned by the LB server */
-  grpc_lb_policy* rr_policy;
-
-  /** the connectivity state of the embedded RR policy */
-  grpc_connectivity_state rr_connectivity_state;
-
-  bool started_picking;
-
-  /** our connectivity state tracker */
-  grpc_connectivity_state_tracker state_tracker;
-
-  /** connectivity state of the LB channel */
-  grpc_connectivity_state lb_channel_connectivity;
-
-  /** stores the deserialized response from the LB. May be nullptr until one
-   * such response has arrived. */
-  grpc_grpclb_serverlist* serverlist;
-
-  /** Index into serverlist for next pick.
-   * If the server at this index is a drop, we return a drop.
-   * Otherwise, we delegate to the RR policy. */
-  size_t serverlist_index;
-
-  /** stores the backend addresses from the resolver */
-  grpc_lb_addresses* fallback_backend_addresses;
-
-  /** list of picks that are waiting on RR's policy connectivity */
-  pending_pick* pending_picks;
-
-  /** list of pings that are waiting on RR's policy connectivity */
-  pending_ping* pending_pings;
-
-  bool shutting_down;
-
-  /** are we already watching the LB channel's connectivity? */
-  bool watching_lb_channel;
-
-  /** is the callback associated with \a lb_call_retry_timer pending? */
-  bool retry_timer_callback_pending;
-
-  /** is the callback associated with \a lb_fallback_timer pending? */
-  bool fallback_timer_callback_pending;
-
-  /** called upon changes to the LB channel's connectivity. */
-  grpc_closure lb_channel_on_connectivity_changed;
-
-  /** called upon changes to the RR's connectivity. */
-  grpc_closure rr_on_connectivity_changed;
-
-  /** called upon reresolution request from the RR policy. */
-  grpc_closure rr_on_reresolution_requested;
-
-  /************************************************************/
-  /*  client data associated with the LB server communication */
-  /************************************************************/
-
-  /** LB call retry backoff state */
-  grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff;
-
-  /** timeout in milliseconds for the LB call. 0 means no deadline. */
-  int lb_call_timeout_ms;
-
-  /** LB call retry timer */
-  grpc_timer lb_call_retry_timer;
-  /** LB call retry timer callback */
-  grpc_closure lb_on_call_retry;
-
-  /** LB fallback timer */
-  grpc_timer lb_fallback_timer;
-  /** LB fallback timer callback */
-  grpc_closure lb_on_fallback;
-} glb_lb_policy;
-
-static void glb_lb_call_data_ref(glb_lb_call_data* lb_calld,
-                                 const char* reason) {
-  gpr_ref_non_zero(&lb_calld->refs);
-  if (grpc_lb_glb_trace.enabled()) {
-    const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count);
-    gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p REF %lu->%lu (%s)",
-            grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld,
-            static_cast<unsigned long>(count - 1),
-            static_cast<unsigned long>(count), reason);
+// vtable for LB tokens in grpc_lb_addresses
+void* lb_token_copy(void* token) {
+  return token == nullptr
+             ? nullptr
+             : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
+}
+void lb_token_destroy(void* token) {
+  if (token != nullptr) {
+    GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
   }
 }
-
-static void glb_lb_call_data_unref(glb_lb_call_data* lb_calld,
-                                   const char* reason) {
-  const bool done = gpr_unref(&lb_calld->refs);
-  if (grpc_lb_glb_trace.enabled()) {
-    const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count);
-    gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p UNREF %lu->%lu (%s)",
-            grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld,
-            static_cast<unsigned long>(count + 1),
-            static_cast<unsigned long>(count), reason);
-  }
-  if (done) {
-    GPR_ASSERT(lb_calld->lb_call != nullptr);
-    grpc_call_unref(lb_calld->lb_call);
-    grpc_metadata_array_destroy(&lb_calld->lb_initial_metadata_recv);
-    grpc_metadata_array_destroy(&lb_calld->lb_trailing_metadata_recv);
-    grpc_byte_buffer_destroy(lb_calld->send_message_payload);
-    grpc_byte_buffer_destroy(lb_calld->recv_message_payload);
-    grpc_slice_unref_internal(lb_calld->lb_call_status_details);
-    if (lb_calld->client_stats != nullptr) {
-      grpc_grpclb_client_stats_unref(lb_calld->client_stats);
-    }
-    GRPC_LB_POLICY_UNREF(&lb_calld->glb_policy->base, "lb_calld");
-    gpr_free(lb_calld);
-  }
+int lb_token_cmp(void* token1, void* token2) {
+  if (token1 > token2) return 1;
+  if (token1 < token2) return -1;
+  return 0;
 }
+const grpc_lb_user_data_vtable lb_token_vtable = {
+    lb_token_copy, lb_token_destroy, lb_token_cmp};
 
-static void lb_call_data_shutdown(glb_lb_policy* glb_policy) {
-  GPR_ASSERT(glb_policy->lb_calld != nullptr);
-  GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr);
-  // lb_on_server_status_received will complete the cancellation and clean up.
-  grpc_call_cancel(glb_policy->lb_calld->lb_call, nullptr);
-  if (glb_policy->lb_calld->client_load_report_timer_callback_pending) {
-    grpc_timer_cancel(&glb_policy->lb_calld->client_load_report_timer);
-  }
-  glb_policy->lb_calld = nullptr;
-}
-
-/* add lb_token of selected subchannel (address) to the call's initial
- * metadata */
-static grpc_error* initial_metadata_add_lb_token(
-    grpc_metadata_batch* initial_metadata,
-    grpc_linked_mdelem* lb_token_mdelem_storage, grpc_mdelem lb_token) {
-  GPR_ASSERT(lb_token_mdelem_storage != nullptr);
-  GPR_ASSERT(!GRPC_MDISNULL(lb_token));
-  return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
-                                      lb_token);
-}
-
-static void destroy_client_stats(void* arg) {
-  grpc_grpclb_client_stats_unref(static_cast<grpc_grpclb_client_stats*>(arg));
-}
-
-static void pending_pick_set_metadata_and_context(pending_pick* pp) {
-  /* if connected_subchannel is nullptr, no pick has been made by the RR
-   * policy (e.g., all addresses failed to connect). There won't be any
-   * user_data/token available */
-  if (pp->pick->connected_subchannel != nullptr) {
-    if (!GRPC_MDISNULL(pp->lb_token)) {
-      initial_metadata_add_lb_token(pp->pick->initial_metadata,
-                                    &pp->pick->lb_token_mdelem_storage,
-                                    GRPC_MDELEM_REF(pp->lb_token));
-    } else {
-      gpr_log(GPR_ERROR,
-              "[grpclb %p] No LB token for connected subchannel pick %p",
-              pp->glb_policy, pp->pick);
-      abort();
-    }
-    // Pass on client stats via context. Passes ownership of the reference.
-    if (pp->client_stats != nullptr) {
-      pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
-          pp->client_stats;
-      pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
-          destroy_client_stats;
-    }
-  } else {
-    if (pp->client_stats != nullptr) {
-      grpc_grpclb_client_stats_unref(pp->client_stats);
+// Returns the backend addresses extracted from the given addresses.
+grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
+  // First pass: count the number of backend addresses.
+  size_t num_backends = 0;
+  for (size_t i = 0; i < addresses->num_addresses; ++i) {
+    if (!addresses->addresses[i].is_balancer) {
+      ++num_backends;
     }
   }
+  // Second pass: actually populate the addresses and (empty) LB tokens.
+  grpc_lb_addresses* backend_addresses =
+      grpc_lb_addresses_create(num_backends, &lb_token_vtable);
+  size_t num_copied = 0;
+  for (size_t i = 0; i < addresses->num_addresses; ++i) {
+    if (addresses->addresses[i].is_balancer) continue;
+    const grpc_resolved_address* addr = &addresses->addresses[i].address;
+    grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
+                                  addr->len, false /* is_balancer */,
+                                  nullptr /* balancer_name */,
+                                  (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
+    ++num_copied;
+  }
+  return backend_addresses;
 }
 
-/* The \a on_complete closure passed as part of the pick requires keeping a
- * reference to its associated round robin instance. We wrap this closure in
- * order to unref the round robin instance upon its invocation */
-static void pending_pick_complete(void* arg, grpc_error* error) {
-  pending_pick* pp = static_cast<pending_pick*>(arg);
-  pending_pick_set_metadata_and_context(pp);
-  GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
-  gpr_free(pp);
-}
-
-static pending_pick* pending_pick_create(glb_lb_policy* glb_policy,
-                                         grpc_lb_policy_pick_state* pick) {
-  pending_pick* pp = static_cast<pending_pick*>(gpr_zalloc(sizeof(*pp)));
-  pp->pick = pick;
-  pp->glb_policy = glb_policy;
-  GRPC_CLOSURE_INIT(&pp->on_complete, pending_pick_complete, pp,
-                    grpc_schedule_on_exec_ctx);
-  pp->original_on_complete = pick->on_complete;
-  pp->pick->on_complete = &pp->on_complete;
-  return pp;
-}
-
-static void pending_pick_add(pending_pick** root, pending_pick* new_pp) {
-  new_pp->next = *root;
-  *root = new_pp;
-}
-
-static void pending_ping_add(pending_ping** root, grpc_closure* on_initiate,
-                             grpc_closure* on_ack) {
-  pending_ping* pping = static_cast<pending_ping*>(gpr_zalloc(sizeof(*pping)));
-  pping->on_initiate = on_initiate;
-  pping->on_ack = on_ack;
-  pping->next = *root;
-  *root = pping;
-}
-
-static bool is_server_valid(const grpc_grpclb_server* server, size_t idx,
-                            bool log) {
+bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
   if (server->drop) return false;
   const grpc_grpclb_ip_address* ip = &server->ip_address;
   if (server->port >> 16 != 0) {
     if (log) {
       gpr_log(GPR_ERROR,
               "Invalid port '%d' at index %lu of serverlist. Ignoring.",
-              server->port, static_cast<unsigned long>(idx));
+              server->port, (unsigned long)idx);
     }
     return false;
   }
@@ -465,65 +404,43 @@
       gpr_log(GPR_ERROR,
               "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
               "serverlist. Ignoring",
-              ip->size, static_cast<unsigned long>(idx));
+              ip->size, (unsigned long)idx);
     }
     return false;
   }
   return true;
 }
 
-/* vtable for LB tokens in grpc_lb_addresses. */
-static void* lb_token_copy(void* token) {
-  return token == nullptr
-             ? nullptr
-             : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
-}
-static void lb_token_destroy(void* token) {
-  if (token != nullptr) {
-    GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
-  }
-}
-static int lb_token_cmp(void* token1, void* token2) {
-  if (token1 > token2) return 1;
-  if (token1 < token2) return -1;
-  return 0;
-}
-static const grpc_lb_user_data_vtable lb_token_vtable = {
-    lb_token_copy, lb_token_destroy, lb_token_cmp};
-
-static void parse_server(const grpc_grpclb_server* server,
-                         grpc_resolved_address* addr) {
+void ParseServer(const grpc_grpclb_server* server,
+                 grpc_resolved_address* addr) {
   memset(addr, 0, sizeof(*addr));
   if (server->drop) return;
-  const uint16_t netorder_port = htons(static_cast<uint16_t>(server->port));
+  const uint16_t netorder_port = htons((uint16_t)server->port);
   /* the addresses are given in binary format (a in(6)_addr struct) in
    * server->ip_address.bytes. */
   const grpc_grpclb_ip_address* ip = &server->ip_address;
   if (ip->size == 4) {
     addr->len = sizeof(struct sockaddr_in);
-    struct sockaddr_in* addr4 =
-        reinterpret_cast<struct sockaddr_in*>(&addr->addr);
+    struct sockaddr_in* addr4 = (struct sockaddr_in*)&addr->addr;
     addr4->sin_family = AF_INET;
     memcpy(&addr4->sin_addr, ip->bytes, ip->size);
     addr4->sin_port = netorder_port;
   } else if (ip->size == 16) {
     addr->len = sizeof(struct sockaddr_in6);
-    struct sockaddr_in6* addr6 =
-        reinterpret_cast<struct sockaddr_in6*>(&addr->addr);
+    struct sockaddr_in6* addr6 = (struct sockaddr_in6*)&addr->addr;
     addr6->sin6_family = AF_INET6;
     memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
     addr6->sin6_port = netorder_port;
   }
 }
 
-/* Returns addresses extracted from \a serverlist. */
-static grpc_lb_addresses* process_serverlist_locked(
-    const grpc_grpclb_serverlist* serverlist) {
+// Returns addresses extracted from \a serverlist.
+grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
   size_t num_valid = 0;
   /* first pass: count how many are valid in order to allocate the necessary
    * memory in a single block */
   for (size_t i = 0; i < serverlist->num_servers; ++i) {
-    if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
+    if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
   }
   grpc_lb_addresses* lb_addresses =
       grpc_lb_addresses_create(num_valid, &lb_token_vtable);
@@ -535,11 +452,11 @@
   size_t addr_idx = 0;
   for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
     const grpc_grpclb_server* server = serverlist->servers[sl_idx];
-    if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
+    if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
     GPR_ASSERT(addr_idx < num_valid);
     /* address processing */
     grpc_resolved_address addr;
-    parse_server(server, &addr);
+    ParseServer(server, &addr);
     /* lb token processing */
     void* user_data;
     if (server->has_load_balance_token) {
@@ -570,36 +487,1273 @@
   return lb_addresses;
 }
 
-/* Returns the backend addresses extracted from the given addresses */
-static grpc_lb_addresses* extract_backend_addresses_locked(
-    const grpc_lb_addresses* addresses) {
-  /* first pass: count the number of backend addresses */
-  size_t num_backends = 0;
-  for (size_t i = 0; i < addresses->num_addresses; ++i) {
-    if (!addresses->addresses[i].is_balancer) {
-      ++num_backends;
-    }
-  }
-  /* second pass: actually populate the addresses and (empty) LB tokens */
-  grpc_lb_addresses* backend_addresses =
-      grpc_lb_addresses_create(num_backends, &lb_token_vtable);
-  size_t num_copied = 0;
-  for (size_t i = 0; i < addresses->num_addresses; ++i) {
-    if (addresses->addresses[i].is_balancer) continue;
-    const grpc_resolved_address* addr = &addresses->addresses[i].address;
-    grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
-                                  addr->len, false /* is_balancer */,
-                                  nullptr /* balancer_name */,
-                                  (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
-    ++num_copied;
-  }
-  return backend_addresses;
+//
+// GrpcLb::BalancerCallState
+//
+
+GrpcLb::BalancerCallState::BalancerCallState(
+    RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
+    : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_glb_trace),
+      grpclb_policy_(std::move(parent_grpclb_policy)) {
+  GPR_ASSERT(grpclb_policy_ != nullptr);
+  GPR_ASSERT(!grpclb_policy()->shutting_down_);
+  // Init the LB call. Note that the LB call will progress every time there's
+  // activity in grpclb_policy_->interested_parties(), which is comprised of
+  // the polling entities from client_channel.
+  GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
+  GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
+  grpc_slice host =
+      grpc_slice_from_copied_string(grpclb_policy()->server_name_);
+  grpc_millis deadline =
+      grpclb_policy()->lb_call_timeout_ms_ == 0
+          ? GRPC_MILLIS_INF_FUTURE
+          : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
+  lb_call_ = grpc_channel_create_pollset_set_call(
+      grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
+      grpclb_policy_->interested_parties(),
+      GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
+      &host, deadline, nullptr);
+  grpc_slice_unref_internal(host);
+  // Init the LB call request payload.
+  grpc_grpclb_request* request =
+      grpc_grpclb_request_create(grpclb_policy()->server_name_);
+  grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
+  send_message_payload_ =
+      grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+  grpc_slice_unref_internal(request_payload_slice);
+  grpc_grpclb_request_destroy(request);
+  // Init other data associated with the LB call.
+  grpc_metadata_array_init(&lb_initial_metadata_recv_);
+  grpc_metadata_array_init(&lb_trailing_metadata_recv_);
+  GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
+                    this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
+  GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
+                    OnBalancerMessageReceivedLocked, this,
+                    grpc_combiner_scheduler(grpclb_policy()->combiner()));
+  GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
+                    OnBalancerStatusReceivedLocked, this,
+                    grpc_combiner_scheduler(grpclb_policy()->combiner()));
 }
 
-static void update_lb_connectivity_status_locked(glb_lb_policy* glb_policy,
-                                                 grpc_error* rr_state_error) {
+GrpcLb::BalancerCallState::~BalancerCallState() {
+  GPR_ASSERT(lb_call_ != nullptr);
+  grpc_call_unref(lb_call_);
+  grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
+  grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
+  grpc_byte_buffer_destroy(send_message_payload_);
+  grpc_byte_buffer_destroy(recv_message_payload_);
+  grpc_slice_unref_internal(lb_call_status_details_);
+  if (client_stats_ != nullptr) {
+    grpc_grpclb_client_stats_unref(client_stats_);
+  }
+}
+
+void GrpcLb::BalancerCallState::Orphan() {
+  GPR_ASSERT(lb_call_ != nullptr);
+  // If we are here because grpclb_policy wants to cancel the call,
+  // lb_on_balancer_status_received_ will complete the cancellation and clean
+  // up. Otherwise, we are here because grpclb_policy has to orphan a failed
+  // call, then the following cancellation will be a no-op.
+  grpc_call_cancel(lb_call_, nullptr);
+  if (client_load_report_timer_callback_pending_) {
+    grpc_timer_cancel(&client_load_report_timer_);
+  }
+  // Note that the initial ref is hold by lb_on_balancer_status_received_
+  // instead of the caller of this function. So the corresponding unref happens
+  // in lb_on_balancer_status_received_ instead of here.
+}
+
+void GrpcLb::BalancerCallState::StartQuery() {
+  GPR_ASSERT(lb_call_ != nullptr);
+  if (grpc_lb_glb_trace.enabled()) {
+    gpr_log(GPR_INFO,
+            "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
+            grpclb_policy_.get(), this, lb_call_);
+  }
+  // Create the ops.
+  grpc_call_error call_error;
+  grpc_op ops[3];
+  memset(ops, 0, sizeof(ops));
+  // Op: send initial metadata.
+  grpc_op* op = ops;
+  op->op = GRPC_OP_SEND_INITIAL_METADATA;
+  op->data.send_initial_metadata.count = 0;
+  op->flags = 0;
+  op->reserved = nullptr;
+  op++;
+  // Op: send request message.
+  GPR_ASSERT(send_message_payload_ != nullptr);
+  op->op = GRPC_OP_SEND_MESSAGE;
+  op->data.send_message.send_message = send_message_payload_;
+  op->flags = 0;
+  op->reserved = nullptr;
+  op++;
+  // TODO(roth): We currently track this ref manually.  Once the
+  // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+  // with the callback.
+  auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
+  self.release();
+  call_error = grpc_call_start_batch_and_execute(
+      lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
+  GPR_ASSERT(GRPC_CALL_OK == call_error);
+  // Op: recv initial metadata.
+  op = ops;
+  op->op = GRPC_OP_RECV_INITIAL_METADATA;
+  op->data.recv_initial_metadata.recv_initial_metadata =
+      &lb_initial_metadata_recv_;
+  op->flags = 0;
+  op->reserved = nullptr;
+  op++;
+  // Op: recv response.
+  op->op = GRPC_OP_RECV_MESSAGE;
+  op->data.recv_message.recv_message = &recv_message_payload_;
+  op->flags = 0;
+  op->reserved = nullptr;
+  op++;
+  // TODO(roth): We currently track this ref manually.  Once the
+  // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+  // with the callback.
+  self = Ref(DEBUG_LOCATION, "on_message_received");
+  self.release();
+  call_error = grpc_call_start_batch_and_execute(
+      lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
+  GPR_ASSERT(GRPC_CALL_OK == call_error);
+  // Op: recv server status.
+  op = ops;
+  op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+  op->data.recv_status_on_client.trailing_metadata =
+      &lb_trailing_metadata_recv_;
+  op->data.recv_status_on_client.status = &lb_call_status_;
+  op->data.recv_status_on_client.status_details = &lb_call_status_details_;
+  op->flags = 0;
+  op->reserved = nullptr;
+  op++;
+  // This callback signals the end of the LB call, so it relies on the initial
+  // ref instead of a new ref. When it's invoked, it's the initial ref that is
+  // unreffed.
+  call_error = grpc_call_start_batch_and_execute(
+      lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
+  GPR_ASSERT(GRPC_CALL_OK == call_error);
+};
+
+void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
+  const grpc_millis next_client_load_report_time =
+      ExecCtx::Get()->Now() + client_stats_report_interval_;
+  GRPC_CLOSURE_INIT(&client_load_report_closure_,
+                    MaybeSendClientLoadReportLocked, this,
+                    grpc_combiner_scheduler(grpclb_policy()->combiner()));
+  grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
+                  &client_load_report_closure_);
+  client_load_report_timer_callback_pending_ = true;
+}
+
+void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
+    void* arg, grpc_error* error) {
+  BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg);
+  GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
+  lb_calld->client_load_report_timer_callback_pending_ = false;
+  if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
+    lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
+    return;
+  }
+  // If we've already sent the initial request, then we can go ahead and send
+  // the load report. Otherwise, we need to wait until the initial request has
+  // been sent to send this (see OnInitialRequestSentLocked()).
+  if (lb_calld->send_message_payload_ == nullptr) {
+    lb_calld->SendClientLoadReportLocked();
+  } else {
+    lb_calld->client_load_report_is_due_ = true;
+  }
+}
+
+bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
+    grpc_grpclb_request* request) {
+  grpc_grpclb_dropped_call_counts* drop_entries =
+      static_cast<grpc_grpclb_dropped_call_counts*>(
+          request->client_stats.calls_finished_with_drop.arg);
+  return request->client_stats.num_calls_started == 0 &&
+         request->client_stats.num_calls_finished == 0 &&
+         request->client_stats.num_calls_finished_with_client_failed_to_send ==
+             0 &&
+         request->client_stats.num_calls_finished_known_received == 0 &&
+         (drop_entries == nullptr || drop_entries->num_entries == 0);
+}
+
+void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
+  // Construct message payload.
+  GPR_ASSERT(send_message_payload_ == nullptr);
+  grpc_grpclb_request* request =
+      grpc_grpclb_load_report_request_create_locked(client_stats_);
+  // Skip client load report if the counters were all zero in the last
+  // report and they are still zero in this one.
+  if (LoadReportCountersAreZero(request)) {
+    if (last_client_load_report_counters_were_zero_) {
+      grpc_grpclb_request_destroy(request);
+      ScheduleNextClientLoadReportLocked();
+      return;
+    }
+    last_client_load_report_counters_were_zero_ = true;
+  } else {
+    last_client_load_report_counters_were_zero_ = false;
+  }
+  grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
+  send_message_payload_ =
+      grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+  grpc_slice_unref_internal(request_payload_slice);
+  grpc_grpclb_request_destroy(request);
+  // Send the report.
+  grpc_op op;
+  memset(&op, 0, sizeof(op));
+  op.op = GRPC_OP_SEND_MESSAGE;
+  op.data.send_message.send_message = send_message_payload_;
+  GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
+                    this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
+  grpc_call_error call_error = grpc_call_start_batch_and_execute(
+      lb_call_, &op, 1, &client_load_report_closure_);
+  if (call_error != GRPC_CALL_OK) {
+    gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(),
+            call_error);
+    GPR_ASSERT(GRPC_CALL_OK == call_error);
+  }
+}
+
+void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
+                                                           grpc_error* error) {
+  BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg);
+  GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
+  grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
+  lb_calld->send_message_payload_ = nullptr;
+  if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
+    lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
+    return;
+  }
+  lb_calld->ScheduleNextClientLoadReportLocked();
+}
+
+void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
+                                                           grpc_error* error) {
+  BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg);
+  grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
+  lb_calld->send_message_payload_ = nullptr;
+  // If we attempted to send a client load report before the initial request was
+  // sent (and this lb_calld is still in use), send the load report now.
+  if (lb_calld->client_load_report_is_due_ &&
+      lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
+    lb_calld->SendClientLoadReportLocked();
+    lb_calld->client_load_report_is_due_ = false;
+  }
+  lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
+}
+
+void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
+    void* arg, grpc_error* error) {
+  BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg);
+  GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
+  // Empty payload means the LB call was cancelled.
+  if (lb_calld != grpclb_policy->lb_calld_.get() ||
+      lb_calld->recv_message_payload_ == nullptr) {
+    lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
+    return;
+  }
+  grpc_byte_buffer_reader bbr;
+  grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
+  grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
+  grpc_byte_buffer_reader_destroy(&bbr);
+  grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
+  lb_calld->recv_message_payload_ = nullptr;
+  grpc_grpclb_initial_response* initial_response;
+  grpc_grpclb_serverlist* serverlist;
+  if (!lb_calld->seen_initial_response_ &&
+      (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
+          nullptr) {
+    // Have NOT seen initial response, look for initial response.
+    if (initial_response->has_client_stats_report_interval) {
+      lb_calld->client_stats_report_interval_ = GPR_MAX(
+          GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
+                              &initial_response->client_stats_report_interval));
+      if (grpc_lb_glb_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "[grpclb %p] Received initial LB response message; "
+                "client load reporting interval = %" PRIdPTR " milliseconds",
+                grpclb_policy, lb_calld->client_stats_report_interval_);
+      }
+    } else if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_INFO,
+              "[grpclb %p] Received initial LB response message; client load "
+              "reporting NOT enabled",
+              grpclb_policy);
+    }
+    grpc_grpclb_initial_response_destroy(initial_response);
+    lb_calld->seen_initial_response_ = true;
+  } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
+                  response_slice)) != nullptr) {
+    // Have seen initial response, look for serverlist.
+    GPR_ASSERT(lb_calld->lb_call_ != nullptr);
+    if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_INFO,
+              "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
+              grpclb_policy, serverlist->num_servers);
+      for (size_t i = 0; i < serverlist->num_servers; ++i) {
+        grpc_resolved_address addr;
+        ParseServer(serverlist->servers[i], &addr);
+        char* ipport;
+        grpc_sockaddr_to_string(&ipport, &addr, false);
+        gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
+                grpclb_policy, i, ipport);
+        gpr_free(ipport);
+      }
+    }
+    /* update serverlist */
+    if (serverlist->num_servers > 0) {
+      // Start sending client load report only after we start using the
+      // serverlist returned from the current LB call.
+      if (lb_calld->client_stats_report_interval_ > 0 &&
+          lb_calld->client_stats_ == nullptr) {
+        lb_calld->client_stats_ = grpc_grpclb_client_stats_create();
+        // TODO(roth): We currently track this ref manually.  Once the
+        // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+        // with the callback.
+        auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
+        self.release();
+        lb_calld->ScheduleNextClientLoadReportLocked();
+      }
+      if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_,
+                                        serverlist)) {
+        if (grpc_lb_glb_trace.enabled()) {
+          gpr_log(GPR_INFO,
+                  "[grpclb %p] Incoming server list identical to current, "
+                  "ignoring.",
+                  grpclb_policy);
+        }
+        grpc_grpclb_destroy_serverlist(serverlist);
+      } else { /* new serverlist */
+        if (grpclb_policy->serverlist_ != nullptr) {
+          /* dispose of the old serverlist */
+          grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
+        } else {
+          /* or dispose of the fallback */
+          grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
+          grpclb_policy->fallback_backend_addresses_ = nullptr;
+          if (grpclb_policy->fallback_timer_callback_pending_) {
+            grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
+          }
+        }
+        // and update the copy in the GrpcLb instance. This
+        // serverlist instance will be destroyed either upon the next
+        // update or when the GrpcLb instance is destroyed.
+        grpclb_policy->serverlist_ = serverlist;
+        grpclb_policy->serverlist_index_ = 0;
+        grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
+      }
+    } else {
+      if (grpc_lb_glb_trace.enabled()) {
+        gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
+                grpclb_policy);
+      }
+      grpc_grpclb_destroy_serverlist(serverlist);
+    }
+  } else {
+    // No valid initial response or serverlist found.
+    gpr_log(GPR_ERROR,
+            "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
+            grpclb_policy,
+            grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
+  }
+  grpc_slice_unref_internal(response_slice);
+  if (!grpclb_policy->shutting_down_) {
+    // Keep listening for serverlist updates.
+    grpc_op op;
+    memset(&op, 0, sizeof(op));
+    op.op = GRPC_OP_RECV_MESSAGE;
+    op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
+    op.flags = 0;
+    op.reserved = nullptr;
+    // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
+    const grpc_call_error call_error = grpc_call_start_batch_and_execute(
+        lb_calld->lb_call_, &op, 1,
+        &lb_calld->lb_on_balancer_message_received_);
+    GPR_ASSERT(GRPC_CALL_OK == call_error);
+  } else {
+    lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
+  }
+}
+
+void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
+    void* arg, grpc_error* error) {
+  BalancerCallState* lb_calld = reinterpret_cast<BalancerCallState*>(arg);
+  GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
+  GPR_ASSERT(lb_calld->lb_call_ != nullptr);
+  if (grpc_lb_glb_trace.enabled()) {
+    char* status_details =
+        grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
+    gpr_log(GPR_INFO,
+            "[grpclb %p] Status from LB server received. Status = %d, details "
+            "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
+            grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
+            lb_calld->lb_call_, grpc_error_string(error));
+    gpr_free(status_details);
+  }
+  grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
+  // If this lb_calld is still in use, this call ended because of a failure so
+  // we want to retry connecting. Otherwise, we have deliberately ended this
+  // call and no further action is required.
+  if (lb_calld == grpclb_policy->lb_calld_.get()) {
+    grpclb_policy->lb_calld_.reset();
+    GPR_ASSERT(!grpclb_policy->shutting_down_);
+    if (lb_calld->seen_initial_response_) {
+      // If we lose connection to the LB server, reset the backoff and restart
+      // the LB call immediately.
+      grpclb_policy->lb_call_backoff_.Reset();
+      grpclb_policy->StartBalancerCallLocked();
+    } else {
+      // If this LB call fails establishing any connection to the LB server,
+      // retry later.
+      grpclb_policy->StartBalancerCallRetryTimerLocked();
+    }
+  }
+  lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
+}
+
+//
+// helper code for creating balancer channel
+//
+
+// Helper function to construct a target info entry.
+grpc_slice_hash_table_entry BalancerEntryCreate(const char* address,
+                                                const char* balancer_name) {
+  grpc_slice_hash_table_entry entry;
+  entry.key = grpc_slice_from_copied_string(address);
+  entry.value = gpr_strdup(balancer_name);
+  return entry;
+}
+
+// Comparison function used for slice_hash_table vtable.
+int BalancerNameCmp(void* a, void* b) {
+  const char* a_str = static_cast<const char*>(a);
+  const char* b_str = static_cast<const char*>(b);
+  return strcmp(a_str, b_str);
+}
+
+/* Returns the channel args for the LB channel, used to create a bidirectional
+ * stream for the reception of load balancing updates.
+ *
+ * Inputs:
+ *   - \a addresses: corresponding to the balancers.
+ *   - \a response_generator: in order to propagate updates from the resolver
+ *   above the grpclb policy.
+ *   - \a args: other args inherited from the grpclb policy. */
+grpc_channel_args* BuildBalancerChannelArgs(
+    const grpc_lb_addresses* addresses,
+    FakeResolverResponseGenerator* response_generator,
+    const grpc_channel_args* args) {
+  size_t num_grpclb_addrs = 0;
+  for (size_t i = 0; i < addresses->num_addresses; ++i) {
+    if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+  }
+  // There must be at least one balancer address, or else the
+  // client_channel would not have chosen this LB policy.
+  GPR_ASSERT(num_grpclb_addrs > 0);
+  grpc_lb_addresses* lb_addresses =
+      grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
+  grpc_slice_hash_table_entry* targets_info_entries =
+      (grpc_slice_hash_table_entry*)gpr_zalloc(sizeof(*targets_info_entries) *
+                                               num_grpclb_addrs);
+  size_t lb_addresses_idx = 0;
+  for (size_t i = 0; i < addresses->num_addresses; ++i) {
+    if (!addresses->addresses[i].is_balancer) continue;
+    if (addresses->addresses[i].user_data != nullptr) {
+      gpr_log(GPR_ERROR,
+              "This LB policy doesn't support user data. It will be ignored");
+    }
+    char* addr_str;
+    GPR_ASSERT(grpc_sockaddr_to_string(
+                   &addr_str, &addresses->addresses[i].address, true) > 0);
+    targets_info_entries[lb_addresses_idx] =
+        BalancerEntryCreate(addr_str, addresses->addresses[i].balancer_name);
+    gpr_free(addr_str);
+    grpc_lb_addresses_set_address(
+        lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
+        addresses->addresses[i].address.len, false /* is balancer */,
+        addresses->addresses[i].balancer_name, nullptr /* user data */);
+  }
+  GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
+  grpc_slice_hash_table* targets_info = grpc_slice_hash_table_create(
+      num_grpclb_addrs, targets_info_entries, gpr_free, BalancerNameCmp);
+  gpr_free(targets_info_entries);
+  grpc_channel_args* lb_channel_args =
+      grpc_lb_policy_grpclb_build_lb_channel_args(targets_info,
+                                                  response_generator, args);
+  grpc_arg lb_channel_addresses_arg =
+      grpc_lb_addresses_create_channel_arg(lb_addresses);
+  grpc_channel_args* result = grpc_channel_args_copy_and_add(
+      lb_channel_args, &lb_channel_addresses_arg, 1);
+  grpc_slice_hash_table_unref(targets_info);
+  grpc_channel_args_destroy(lb_channel_args);
+  grpc_lb_addresses_destroy(lb_addresses);
+  return result;
+}
+
+//
+// ctor and dtor
+//
+
+GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
+               const LoadBalancingPolicy::Args& args)
+    : LoadBalancingPolicy(args),
+      response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
+      lb_call_backoff_(
+          BackOff::Options()
+              .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
+                                   1000)
+              .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
+              .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
+              .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
+                               1000)) {
+  // Initialization.
+  grpc_subchannel_index_ref();
+  GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
+                    &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
+                    grpc_combiner_scheduler(args.combiner));
+  GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_,
+                    &GrpcLb::OnRoundRobinConnectivityChangedLocked, this,
+                    grpc_combiner_scheduler(args.combiner));
+  GRPC_CLOSURE_INIT(&on_rr_request_reresolution_,
+                    &GrpcLb::OnRoundRobinRequestReresolutionLocked, this,
+                    grpc_combiner_scheduler(args.combiner));
+  grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb");
+  // Record server name.
+  const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
+  const char* server_uri = grpc_channel_arg_get_string(arg);
+  GPR_ASSERT(server_uri != nullptr);
+  grpc_uri* uri = grpc_uri_parse(server_uri, true);
+  GPR_ASSERT(uri->path[0] != '\0');
+  server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
+  if (grpc_lb_glb_trace.enabled()) {
+    gpr_log(GPR_INFO,
+            "[grpclb %p] Will use '%s' as the server name for LB request.",
+            this, server_name_);
+  }
+  grpc_uri_destroy(uri);
+  // Record LB call timeout.
+  arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
+  lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
+  // Record fallback timeout.
+  arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
+  lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
+      arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
+  // Process channel args.
+  ProcessChannelArgsLocked(*args.args);
+}
+
+GrpcLb::~GrpcLb() {
+  GPR_ASSERT(pending_picks_ == nullptr);
+  GPR_ASSERT(pending_pings_ == nullptr);
+  gpr_free((void*)server_name_);
+  grpc_channel_args_destroy(args_);
+  grpc_connectivity_state_destroy(&state_tracker_);
+  if (serverlist_ != nullptr) {
+    grpc_grpclb_destroy_serverlist(serverlist_);
+  }
+  if (fallback_backend_addresses_ != nullptr) {
+    grpc_lb_addresses_destroy(fallback_backend_addresses_);
+  }
+  grpc_subchannel_index_unref();
+}
+
+void GrpcLb::ShutdownLocked() {
+  grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
+  shutting_down_ = true;
+  lb_calld_.reset();
+  if (retry_timer_callback_pending_) {
+    grpc_timer_cancel(&lb_call_retry_timer_);
+  }
+  if (fallback_timer_callback_pending_) {
+    grpc_timer_cancel(&lb_fallback_timer_);
+  }
+  rr_policy_.reset();
+  TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
+  // We destroy the LB channel here instead of in our destructor because
+  // destroying the channel triggers a last callback to
+  // OnBalancerChannelConnectivityChangedLocked(), and we need to be
+  // alive when that callback is invoked.
+  if (lb_channel_ != nullptr) {
+    grpc_channel_destroy(lb_channel_);
+    lb_channel_ = nullptr;
+  }
+  grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
+                              GRPC_ERROR_REF(error), "grpclb_shutdown");
+  // Clear pending picks.
+  PendingPick* pp;
+  while ((pp = pending_picks_) != nullptr) {
+    pending_picks_ = pp->next;
+    pp->pick->connected_subchannel.reset();
+    // Note: pp is deleted in this callback.
+    GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
+  }
+  // Clear pending pings.
+  PendingPing* pping;
+  while ((pping = pending_pings_) != nullptr) {
+    pending_pings_ = pping->next;
+    GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
+    GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
+    Delete(pping);
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+//
+// public methods
+//
+
+void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
+  PendingPick* pp;
+  while ((pp = pending_picks_) != nullptr) {
+    pending_picks_ = pp->next;
+    pp->pick->on_complete = pp->original_on_complete;
+    pp->pick->user_data = nullptr;
+    if (new_policy->PickLocked(pp->pick)) {
+      // Synchronous return; schedule closure.
+      GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
+    }
+    Delete(pp);
+  }
+}
+
+// Cancel a specific pending pick.
+//
+// A grpclb pick progresses as follows:
+// - If there's a Round Robin policy (rr_policy_) available, it'll be
+//   handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
+//   that point onwards, it'll be RR's responsibility. For cancellations, that
+//   implies the pick needs also be cancelled by the RR instance.
+// - Otherwise, without an RR instance, picks stay pending at this policy's
+//   level (grpclb), inside the pending_picks_ list. To cancel these,
+//   we invoke the completion closure and set the pick's connected
+//   subchannel to nullptr right here.
+void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) {
+  PendingPick* pp = pending_picks_;
+  pending_picks_ = nullptr;
+  while (pp != nullptr) {
+    PendingPick* next = pp->next;
+    if (pp->pick == pick) {
+      pick->connected_subchannel.reset();
+      // Note: pp is deleted in this callback.
+      GRPC_CLOSURE_SCHED(&pp->on_complete,
+                         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+                             "Pick Cancelled", &error, 1));
+    } else {
+      pp->next = pending_picks_;
+      pending_picks_ = pp;
+    }
+    pp = next;
+  }
+  if (rr_policy_ != nullptr) {
+    rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+// Cancel all pending picks.
+//
+// A grpclb pick progresses as follows:
+// - If there's a Round Robin policy (rr_policy_) available, it'll be
+//   handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
+//   that point onwards, it'll be RR's responsibility. For cancellations, that
+//   implies the pick needs also be cancelled by the RR instance.
+// - Otherwise, without an RR instance, picks stay pending at this policy's
+//   level (grpclb), inside the pending_picks_ list. To cancel these,
+//   we invoke the completion closure and set the pick's connected
+//   subchannel to nullptr right here.
+void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
+                                       uint32_t initial_metadata_flags_eq,
+                                       grpc_error* error) {
+  PendingPick* pp = pending_picks_;
+  pending_picks_ = nullptr;
+  while (pp != nullptr) {
+    PendingPick* next = pp->next;
+    if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
+        initial_metadata_flags_eq) {
+      // Note: pp is deleted in this callback.
+      GRPC_CLOSURE_SCHED(&pp->on_complete,
+                         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+                             "Pick Cancelled", &error, 1));
+    } else {
+      pp->next = pending_picks_;
+      pending_picks_ = pp;
+    }
+    pp = next;
+  }
+  if (rr_policy_ != nullptr) {
+    rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
+                                          initial_metadata_flags_eq,
+                                          GRPC_ERROR_REF(error));
+  }
+  GRPC_ERROR_UNREF(error);
+}
+
+void GrpcLb::ExitIdleLocked() {
+  if (!started_picking_) {
+    StartPickingLocked();
+  }
+}
+
+bool GrpcLb::PickLocked(PickState* pick) {
+  PendingPick* pp = PendingPickCreate(pick);
+  bool pick_done = false;
+  if (rr_policy_ != nullptr) {
+    const grpc_connectivity_state rr_connectivity_state =
+        rr_policy_->CheckConnectivityLocked(nullptr);
+    // The RR policy may have transitioned to SHUTDOWN but the callback
+    // registered to capture this event (on_rr_connectivity_changed_) may not
+    // have been invoked yet. We need to make sure we aren't trying to pick
+    // from an RR policy instance that's in shutdown.
+    if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
+      if (grpc_lb_glb_trace.enabled()) {
+        gpr_log(GPR_INFO,
+                "[grpclb %p] NOT picking from from RR %p: RR conn state=%s",
+                this, rr_policy_.get(),
+                grpc_connectivity_state_name(rr_connectivity_state));
+      }
+      AddPendingPick(pp);
+      pick_done = false;
+    } else {  // RR not in shutdown
+      if (grpc_lb_glb_trace.enabled()) {
+        gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
+                rr_policy_.get());
+      }
+      pick_done = PickFromRoundRobinPolicyLocked(false /* force_async */, pp);
+    }
+  } else {  // rr_policy_ == NULL
+    if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_DEBUG,
+              "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
+              this);
+    }
+    AddPendingPick(pp);
+    if (!started_picking_) {
+      StartPickingLocked();
+    }
+    pick_done = false;
+  }
+  return pick_done;
+}
+
+void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
+  if (rr_policy_ != nullptr) {
+    rr_policy_->PingOneLocked(on_initiate, on_ack);
+  } else {
+    AddPendingPing(on_initiate, on_ack);
+    if (!started_picking_) {
+      StartPickingLocked();
+    }
+  }
+}
+
+grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
+    grpc_error** connectivity_error) {
+  return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
+}
+
+void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
+                                       grpc_closure* notify) {
+  grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
+                                                 notify);
+}
+
+void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
+  const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
+  if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+    // Ignore this update.
+    gpr_log(
+        GPR_ERROR,
+        "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
+        this);
+    return;
+  }
+  const grpc_lb_addresses* addresses =
+      reinterpret_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
+  // Update fallback address list.
+  if (fallback_backend_addresses_ != nullptr) {
+    grpc_lb_addresses_destroy(fallback_backend_addresses_);
+  }
+  fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
+  // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
+  // since we use this to trigger the client_load_reporting filter.
+  static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
+  grpc_arg new_arg = grpc_channel_arg_string_create(
+      (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
+  grpc_channel_args_destroy(args_);
+  args_ = grpc_channel_args_copy_and_add_and_remove(
+      &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
+  // Construct args for balancer channel.
+  grpc_channel_args* lb_channel_args =
+      BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
+  // Create balancer channel if needed.
+  if (lb_channel_ == nullptr) {
+    char* uri_str;
+    gpr_asprintf(&uri_str, "fake:///%s", server_name_);
+    lb_channel_ = grpc_lb_policy_grpclb_create_lb_channel(
+        uri_str, client_channel_factory(), lb_channel_args);
+    GPR_ASSERT(lb_channel_ != nullptr);
+    gpr_free(uri_str);
+  }
+  // Propagate updates to the LB channel (pick_first) through the fake
+  // resolver.
+  response_generator_->SetResponse(lb_channel_args);
+  grpc_channel_args_destroy(lb_channel_args);
+}
+
+void GrpcLb::UpdateLocked(const grpc_channel_args& args) {
+  ProcessChannelArgsLocked(args);
+  // If fallback is configured and the RR policy already exists, update
+  // it with the new fallback addresses.
+  if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) {
+    CreateOrUpdateRoundRobinPolicyLocked();
+  }
+  // Start watching the LB channel connectivity for connection, if not
+  // already doing so.
+  if (!watching_lb_channel_) {
+    lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
+        lb_channel_, true /* try to connect */);
+    grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
+        grpc_channel_get_channel_stack(lb_channel_));
+    GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+    watching_lb_channel_ = true;
+    // TODO(roth): We currently track this ref manually.  Once the
+    // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+    // with the callback.
+    auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
+    self.release();
+    grpc_client_channel_watch_connectivity_state(
+        client_channel_elem,
+        grpc_polling_entity_create_from_pollset_set(interested_parties()),
+        &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
+        nullptr);
+  }
+}
+
+//
+// code for balancer channel and call
+//
+
+void GrpcLb::StartPickingLocked() {
+  // Start a timer to fall back.
+  if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
+      !fallback_timer_callback_pending_) {
+    grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
+    // TODO(roth): We currently track this ref manually.  Once the
+    // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+    // with the callback.
+    auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
+    self.release();
+    GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
+                      grpc_combiner_scheduler(combiner()));
+    fallback_timer_callback_pending_ = true;
+    grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
+  }
+  started_picking_ = true;
+  StartBalancerCallLocked();
+}
+
+void GrpcLb::StartBalancerCallLocked() {
+  GPR_ASSERT(lb_channel_ != nullptr);
+  if (shutting_down_) return;
+  // Init the LB call data.
+  GPR_ASSERT(lb_calld_ == nullptr);
+  lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
+  if (grpc_lb_glb_trace.enabled()) {
+    gpr_log(GPR_INFO,
+            "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
+            this, lb_channel_, lb_calld_.get());
+  }
+  lb_calld_->StartQuery();
+}
+
+void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
+  GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
+  grpclb_policy->fallback_timer_callback_pending_ = false;
+  // If we receive a serverlist after the timer fires but before this callback
+  // actually runs, don't fall back.
+  if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
+      error == GRPC_ERROR_NONE) {
+    if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_INFO,
+              "[grpclb %p] Falling back to use backends from resolver",
+              grpclb_policy);
+    }
+    GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr);
+    grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
+  }
+  grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
+}
+
+void GrpcLb::StartBalancerCallRetryTimerLocked() {
+  grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
+  if (grpc_lb_glb_trace.enabled()) {
+    gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", this);
+    grpc_millis timeout = next_try - ExecCtx::Get()->Now();
+    if (timeout > 0) {
+      gpr_log(GPR_DEBUG,
+              "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", this,
+              timeout);
+    } else {
+      gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
+              this);
+    }
+  }
+  // TODO(roth): We currently track this ref manually.  Once the
+  // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+  // with the callback.
+  auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
+  self.release();
+  GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
+                    this, grpc_combiner_scheduler(combiner()));
+  retry_timer_callback_pending_ = true;
+  grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
+}
+
+void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
+  GrpcLb* grpclb_policy = reinterpret_cast<GrpcLb*>(arg);
+  grpclb_policy->retry_timer_callback_pending_ = false;
+  if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
+      grpclb_policy->lb_calld_ == nullptr) {
+    if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
+              grpclb_policy);
+    }
+    grpclb_policy->StartBalancerCallLocked();
+  }
+  grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
+}
+
+// Invoked as part of the update process. It continues watching the LB channel
+// until it shuts down or becomes READY. It's invoked even if the LB channel
+// stayed READY throughout the update (for example if the update is identical).
+void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
+                                                        grpc_error* error) {
+  GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
+  if (grpclb_policy->shutting_down_) goto done;
+  // Re-initialize the lb_call. This should also take care of updating the
+  // embedded RR policy. Note that the current RR policy, if any, will stay in
+  // effect until an update from the new lb_call is received.
+  switch (grpclb_policy->lb_channel_connectivity_) {
+    case GRPC_CHANNEL_CONNECTING:
+    case GRPC_CHANNEL_TRANSIENT_FAILURE: {
+      // Keep watching the LB channel.
+      grpc_channel_element* client_channel_elem =
+          grpc_channel_stack_last_element(
+              grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
+      GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
+      grpc_client_channel_watch_connectivity_state(
+          client_channel_elem,
+          grpc_polling_entity_create_from_pollset_set(
+              grpclb_policy->interested_parties()),
+          &grpclb_policy->lb_channel_connectivity_,
+          &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
+      break;
+    }
+      // The LB channel may be IDLE because it's shut down before the update.
+      // Restart the LB call to kick the LB channel into gear.
+    case GRPC_CHANNEL_IDLE:
+    case GRPC_CHANNEL_READY:
+      grpclb_policy->lb_calld_.reset();
+      if (grpclb_policy->started_picking_) {
+        if (grpclb_policy->retry_timer_callback_pending_) {
+          grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_);
+        }
+        grpclb_policy->lb_call_backoff_.Reset();
+        grpclb_policy->StartBalancerCallLocked();
+      }
+      // Fall through.
+    case GRPC_CHANNEL_SHUTDOWN:
+    done:
+      grpclb_policy->watching_lb_channel_ = false;
+      grpclb_policy->Unref(DEBUG_LOCATION,
+                           "watch_lb_channel_connectivity_cb_shutdown");
+  }
+}
+
+//
+// PendingPick
+//
+
+// Adds lb_token of selected subchannel (address) to the call's initial
+// metadata.
+grpc_error* AddLbTokenToInitialMetadata(
+    grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
+    grpc_metadata_batch* initial_metadata) {
+  GPR_ASSERT(lb_token_mdelem_storage != nullptr);
+  GPR_ASSERT(!GRPC_MDISNULL(lb_token));
+  return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
+                                      lb_token);
+}
+
+// Destroy function used when embedding client stats in call context.
+void DestroyClientStats(void* arg) {
+  grpc_grpclb_client_stats_unref(
+      reinterpret_cast<grpc_grpclb_client_stats*>(arg));
+}
+
+void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
+  /* if connected_subchannel is nullptr, no pick has been made by the RR
+   * policy (e.g., all addresses failed to connect). There won't be any
+   * user_data/token available */
+  if (pp->pick->connected_subchannel != nullptr) {
+    if (!GRPC_MDISNULL(pp->lb_token)) {
+      AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
+                                  &pp->pick->lb_token_mdelem_storage,
+                                  pp->pick->initial_metadata);
+    } else {
+      gpr_log(GPR_ERROR,
+              "[grpclb %p] No LB token for connected subchannel pick %p",
+              pp->grpclb_policy, pp->pick);
+      abort();
+    }
+    // Pass on client stats via context. Passes ownership of the reference.
+    if (pp->client_stats != nullptr) {
+      pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
+          pp->client_stats;
+      pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
+          DestroyClientStats;
+    }
+  } else {
+    if (pp->client_stats != nullptr) {
+      grpc_grpclb_client_stats_unref(pp->client_stats);
+    }
+  }
+}
+
+/* The \a on_complete closure passed as part of the pick requires keeping a
+ * reference to its associated round robin instance. We wrap this closure in
+ * order to unref the round robin instance upon its invocation */
+void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) {
+  PendingPick* pp = reinterpret_cast<PendingPick*>(arg);
+  PendingPickSetMetadataAndContext(pp);
+  GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
+  Delete(pp);
+}
+
+GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) {
+  PendingPick* pp = New<PendingPick>();
+  pp->grpclb_policy = this;
+  pp->pick = pick;
+  GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp,
+                    grpc_schedule_on_exec_ctx);
+  pp->original_on_complete = pick->on_complete;
+  pick->on_complete = &pp->on_complete;
+  return pp;
+}
+
+void GrpcLb::AddPendingPick(PendingPick* pp) {
+  pp->next = pending_picks_;
+  pending_picks_ = pp;
+}
+
+//
+// PendingPing
+//
+
+void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
+  PendingPing* pping = New<PendingPing>();
+  pping->on_initiate = on_initiate;
+  pping->on_ack = on_ack;
+  pping->next = pending_pings_;
+  pending_pings_ = pping;
+}
+
+//
+// code for interacting with the RR policy
+//
+
+// Performs a pick over \a rr_policy_. Given that a pick can return
+// immediately (ignoring its completion callback), we need to perform the
+// cleanups this callback would otherwise be responsible for.
+// If \a force_async is true, then we will manually schedule the
+// completion callback even if the pick is available immediately.
+bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
+  // Check for drops if we are not using fallback backend addresses.
+  if (serverlist_ != nullptr) {
+    // Look at the index into the serverlist to see if we should drop this call.
+    grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++];
+    if (serverlist_index_ == serverlist_->num_servers) {
+      serverlist_index_ = 0;  // Wrap-around.
+    }
+    if (server->drop) {
+      // Update client load reporting stats to indicate the number of
+      // dropped calls.  Note that we have to do this here instead of in
+      // the client_load_reporting filter, because we do not create a
+      // subchannel call (and therefore no client_load_reporting filter)
+      // for dropped calls.
+      if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
+        grpc_grpclb_client_stats_add_call_dropped_locked(
+            server->load_balance_token, lb_calld_->client_stats());
+      }
+      if (force_async) {
+        GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+        Delete(pp);
+        return false;
+      }
+      Delete(pp);
+      return true;
+    }
+  }
+  // Set client_stats and user_data.
+  if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
+    pp->client_stats = grpc_grpclb_client_stats_ref(lb_calld_->client_stats());
+  }
+  GPR_ASSERT(pp->pick->user_data == nullptr);
+  pp->pick->user_data = (void**)&pp->lb_token;
+  // Pick via the RR policy.
+  bool pick_done = rr_policy_->PickLocked(pp->pick);
+  if (pick_done) {
+    PendingPickSetMetadataAndContext(pp);
+    if (force_async) {
+      GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
+      pick_done = false;
+    }
+    Delete(pp);
+  }
+  // else, the pending pick will be registered and taken care of by the
+  // pending pick list inside the RR policy.  Eventually,
+  // OnPendingPickComplete() will be called, which will (among other
+  // things) add the LB token to the call's initial metadata.
+  return pick_done;
+}
+
+void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
+  GPR_ASSERT(rr_policy_ == nullptr);
+  rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
+      "round_robin", args);
+  if (rr_policy_ == nullptr) {
+    gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
+            this);
+    return;
+  }
+  // TODO(roth): We currently track this ref manually.  Once the new
+  // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
+  auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
+  self.release();
+  rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_);
+  grpc_error* rr_state_error = nullptr;
+  rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error);
+  // Connectivity state is a function of the RR policy updated/created.
+  UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error);
+  // Add the gRPC LB's interested_parties pollset_set to that of the newly
+  // created RR policy. This will make the RR policy progress upon activity on
+  // gRPC LB, which in turn is tied to the application's call.
+  grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
+                                   interested_parties());
+  // Subscribe to changes to the connectivity of the new RR.
+  // TODO(roth): We currently track this ref manually.  Once the new
+  // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
+  self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed");
+  self.release();
+  rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_,
+                                        &on_rr_connectivity_changed_);
+  rr_policy_->ExitIdleLocked();
+  // Send pending picks to RR policy.
+  PendingPick* pp;
+  while ((pp = pending_picks_)) {
+    pending_picks_ = pp->next;
+    if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_INFO,
+              "[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
+              rr_policy_.get());
+    }
+    PickFromRoundRobinPolicyLocked(true /* force_async */, pp);
+  }
+  // Send pending pings to RR policy.
+  PendingPing* pping;
+  while ((pping = pending_pings_)) {
+    pending_pings_ = pping->next;
+    if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
+              this, rr_policy_.get());
+    }
+    rr_policy_->PingOneLocked(pping->on_initiate, pping->on_ack);
+    Delete(pping);
+  }
+}
+
+grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
+  grpc_lb_addresses* addresses;
+  if (serverlist_ != nullptr) {
+    GPR_ASSERT(serverlist_->num_servers > 0);
+    addresses = ProcessServerlist(serverlist_);
+  } else {
+    // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
+    // received any serverlist from the balancer, we use the fallback backends
+    // returned by the resolver. Note that the fallback backend list may be
+    // empty, in which case the new round_robin policy will keep the requested
+    // picks pending.
+    GPR_ASSERT(fallback_backend_addresses_ != nullptr);
+    addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
+  }
+  GPR_ASSERT(addresses != nullptr);
+  // Replace the LB addresses in the channel args that we pass down to
+  // the subchannel.
+  static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
+  const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
+  grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
+      args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1);
+  grpc_lb_addresses_destroy(addresses);
+  return args;
+}
+
+void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
+  if (shutting_down_) return;
+  grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
+  GPR_ASSERT(args != nullptr);
+  if (rr_policy_ != nullptr) {
+    if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", this,
+              rr_policy_.get());
+    }
+    rr_policy_->UpdateLocked(*args);
+  } else {
+    LoadBalancingPolicy::Args lb_policy_args;
+    lb_policy_args.combiner = combiner();
+    lb_policy_args.client_channel_factory = client_channel_factory();
+    lb_policy_args.args = args;
+    CreateRoundRobinPolicyLocked(lb_policy_args);
+    if (grpc_lb_glb_trace.enabled()) {
+      gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", this,
+              rr_policy_.get());
+    }
+  }
+  grpc_channel_args_destroy(args);
+}
+
+void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg,
+                                                   grpc_error* error) {
+  GrpcLb* grpclb_policy = reinterpret_cast<GrpcLb*>(arg);
+  if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) {
+    grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested");
+    return;
+  }
+  if (grpc_lb_glb_trace.enabled()) {
+    gpr_log(
+        GPR_DEBUG,
+        "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
+        grpclb_policy, grpclb_policy->rr_policy_.get());
+  }
+  // If we are talking to a balancer, we expect to get updated addresses form
+  // the balancer, so we can ignore the re-resolution request from the RR
+  // policy. Otherwise, handle the re-resolution request using the
+  // grpclb policy's original re-resolution closure.
+  if (grpclb_policy->lb_calld_ == nullptr ||
+      !grpclb_policy->lb_calld_->seen_initial_response()) {
+    grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
+  }
+  // Give back the wrapper closure to the RR policy.
+  grpclb_policy->rr_policy_->SetReresolutionClosureLocked(
+      &grpclb_policy->on_rr_request_reresolution_);
+}
+
+void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
+    grpc_error* rr_state_error) {
   const grpc_connectivity_state curr_glb_state =
-      grpc_connectivity_state_check(&glb_policy->state_tracker);
+      grpc_connectivity_state_check(&state_tracker_);
   /* The new connectivity status is a function of the previous one and the new
    * input coming from the status of the RR policy.
    *
@@ -629,7 +1783,7 @@
    *
    *  (*) This function mustn't be called during shutting down. */
   GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
-  switch (glb_policy->rr_connectivity_state) {
+  switch (rr_connectivity_state_) {
     case GRPC_CHANNEL_TRANSIENT_FAILURE:
     case GRPC_CHANNEL_SHUTDOWN:
       GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
@@ -643,1294 +1797,69 @@
     gpr_log(
         GPR_INFO,
         "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
-        glb_policy,
-        grpc_connectivity_state_name(glb_policy->rr_connectivity_state),
-        glb_policy->rr_policy);
+        this, grpc_connectivity_state_name(rr_connectivity_state_),
+        rr_policy_.get());
   }
-  grpc_connectivity_state_set(&glb_policy->state_tracker,
-                              glb_policy->rr_connectivity_state, rr_state_error,
+  grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_,
+                              rr_state_error,
                               "update_lb_connectivity_status_locked");
 }
 
-/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
- * immediately (ignoring its completion callback), we need to perform the
- * cleanups this callback would otherwise be responsible for.
- * If \a force_async is true, then we will manually schedule the
- * completion callback even if the pick is available immediately. */
-static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy,
-                                         bool force_async, pending_pick* pp) {
-  // Check for drops if we are not using fallback backend addresses.
-  if (glb_policy->serverlist != nullptr) {
-    // Look at the index into the serverlist to see if we should drop this call.
-    grpc_grpclb_server* server =
-        glb_policy->serverlist->servers[glb_policy->serverlist_index++];
-    if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
-      glb_policy->serverlist_index = 0;  // Wrap-around.
-    }
-    if (server->drop) {
-      // Update client load reporting stats to indicate the number of
-      // dropped calls.  Note that we have to do this here instead of in
-      // the client_load_reporting filter, because we do not create a
-      // subchannel call (and therefore no client_load_reporting filter)
-      // for dropped calls.
-      if (glb_policy->lb_calld != nullptr &&
-          glb_policy->lb_calld->client_stats != nullptr) {
-        grpc_grpclb_client_stats_add_call_dropped_locked(
-            server->load_balance_token, glb_policy->lb_calld->client_stats);
-      }
-      if (force_async) {
-        GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
-        gpr_free(pp);
-        return false;
-      }
-      gpr_free(pp);
-      return true;
-    }
-  }
-  // Set client_stats and user_data.
-  if (glb_policy->lb_calld != nullptr &&
-      glb_policy->lb_calld->client_stats != nullptr) {
-    pp->client_stats =
-        grpc_grpclb_client_stats_ref(glb_policy->lb_calld->client_stats);
-  }
-  GPR_ASSERT(pp->pick->user_data == nullptr);
-  pp->pick->user_data = reinterpret_cast<void**>(&pp->lb_token);
-  // Pick via the RR policy.
-  bool pick_done = grpc_lb_policy_pick_locked(glb_policy->rr_policy, pp->pick);
-  if (pick_done) {
-    pending_pick_set_metadata_and_context(pp);
-    if (force_async) {
-      GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
-      pick_done = false;
-    }
-    gpr_free(pp);
-  }
-  /* else, the pending pick will be registered and taken care of by the
-   * pending pick list inside the RR policy (glb_policy->rr_policy).
-   * Eventually, wrapped_on_complete will be called, which will -among other
-   * things- add the LB token to the call's initial metadata */
-  return pick_done;
-}
-
-static grpc_lb_policy_args* lb_policy_args_create(glb_lb_policy* glb_policy) {
-  grpc_lb_addresses* addresses;
-  if (glb_policy->serverlist != nullptr) {
-    GPR_ASSERT(glb_policy->serverlist->num_servers > 0);
-    addresses = process_serverlist_locked(glb_policy->serverlist);
-  } else {
-    // If rr_handover_locked() is invoked when we haven't received any
-    // serverlist from the balancer, we use the fallback backends returned by
-    // the resolver. Note that the fallback backend list may be empty, in which
-    // case the new round_robin policy will keep the requested picks pending.
-    GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
-    addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses);
-  }
-  GPR_ASSERT(addresses != nullptr);
-  grpc_lb_policy_args* args =
-      static_cast<grpc_lb_policy_args*>(gpr_zalloc(sizeof(*args)));
-  args->client_channel_factory = glb_policy->cc_factory;
-  args->combiner = glb_policy->base.combiner;
-  // Replace the LB addresses in the channel args that we pass down to
-  // the subchannel.
-  static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
-  const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
-  args->args = grpc_channel_args_copy_and_add_and_remove(
-      glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
-      1);
-  grpc_lb_addresses_destroy(addresses);
-  return args;
-}
-
-static void lb_policy_args_destroy(grpc_lb_policy_args* args) {
-  grpc_channel_args_destroy(args->args);
-  gpr_free(args);
-}
-
-static void rr_on_reresolution_requested_locked(void* arg, grpc_error* error) {
-  glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
-  if (glb_policy->shutting_down || error != GRPC_ERROR_NONE) {
-    GRPC_LB_POLICY_UNREF(&glb_policy->base,
-                         "rr_on_reresolution_requested_locked");
+void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg,
+                                                   grpc_error* error) {
+  GrpcLb* grpclb_policy = reinterpret_cast<GrpcLb*>(arg);
+  if (grpclb_policy->shutting_down_) {
+    grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed");
     return;
   }
-  if (grpc_lb_glb_trace.enabled()) {
-    gpr_log(
-        GPR_DEBUG,
-        "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
-        glb_policy, glb_policy->rr_policy);
-  }
-  // If we are talking to a balancer, we expect to get updated addresses form
-  // the balancer, so we can ignore the re-resolution request from the RR
-  // policy. Otherwise, handle the re-resolution request using glb's original
-  // re-resolution closure.
-  if (glb_policy->lb_calld == nullptr ||
-      !glb_policy->lb_calld->seen_initial_response) {
-    grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
-                                 GRPC_ERROR_NONE);
-  }
-  // Give back the wrapper closure to the RR policy.
-  grpc_lb_policy_set_reresolve_closure_locked(
-      glb_policy->rr_policy, &glb_policy->rr_on_reresolution_requested);
+  grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked(
+      GRPC_ERROR_REF(error));
+  // Resubscribe. Reuse the "on_rr_connectivity_changed" ref.
+  grpclb_policy->rr_policy_->NotifyOnStateChangeLocked(
+      &grpclb_policy->rr_connectivity_state_,
+      &grpclb_policy->on_rr_connectivity_changed_);
 }
 
-static void create_rr_locked(glb_lb_policy* glb_policy,
-                             grpc_lb_policy_args* args) {
-  GPR_ASSERT(glb_policy->rr_policy == nullptr);
-  grpc_lb_policy* new_rr_policy = grpc_lb_policy_create("round_robin", args);
-  if (new_rr_policy == nullptr) {
-    gpr_log(GPR_ERROR,
-            "[grpclb %p] Failure creating a RoundRobin policy for serverlist "
-            "update with %" PRIuPTR
-            " entries. The previous RR instance (%p), if any, will continue to "
-            "be used. Future updates from the LB will attempt to create new "
-            "instances.",
-            glb_policy, glb_policy->serverlist->num_servers,
-            glb_policy->rr_policy);
-    return;
-  }
-  GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_reresolution_requested_locked");
-  grpc_lb_policy_set_reresolve_closure_locked(
-      new_rr_policy, &glb_policy->rr_on_reresolution_requested);
-  glb_policy->rr_policy = new_rr_policy;
-  grpc_error* rr_state_error = nullptr;
-  glb_policy->rr_connectivity_state = grpc_lb_policy_check_connectivity_locked(
-      glb_policy->rr_policy, &rr_state_error);
-  /* Connectivity state is a function of the RR policy updated/created */
-  update_lb_connectivity_status_locked(glb_policy, rr_state_error);
-  /* Add the gRPC LB's interested_parties pollset_set to that of the newly
-   * created RR policy. This will make the RR policy progress upon activity on
-   * gRPC LB, which in turn is tied to the application's call */
-  grpc_pollset_set_add_pollset_set(glb_policy->rr_policy->interested_parties,
-                                   glb_policy->base.interested_parties);
-  /* Subscribe to changes to the connectivity of the new RR */
-  GRPC_LB_POLICY_REF(&glb_policy->base, "rr_on_connectivity_changed_locked");
-  grpc_lb_policy_notify_on_state_change_locked(
-      glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
-      &glb_policy->rr_on_connectivity_changed);
-  grpc_lb_policy_exit_idle_locked(glb_policy->rr_policy);
-  // Send pending picks to RR policy.
-  pending_pick* pp;
-  while ((pp = glb_policy->pending_picks)) {
-    glb_policy->pending_picks = pp->next;
-    if (grpc_lb_glb_trace.enabled()) {
-      gpr_log(GPR_INFO,
-              "[grpclb %p] Pending pick about to (async) PICK from RR %p",
-              glb_policy, glb_policy->rr_policy);
-    }
-    pick_from_internal_rr_locked(glb_policy, true /* force_async */, pp);
-  }
-  // Send pending pings to RR policy.
-  pending_ping* pping;
-  while ((pping = glb_policy->pending_pings)) {
-    glb_policy->pending_pings = pping->next;
-    if (grpc_lb_glb_trace.enabled()) {
-      gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
-              glb_policy, glb_policy->rr_policy);
-    }
-    grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, pping->on_initiate,
-                                   pping->on_ack);
-    gpr_free(pping);
-  }
-}
-
-/* glb_policy->rr_policy may be nullptr (initial handover) */
-static void rr_handover_locked(glb_lb_policy* glb_policy) {
-  if (glb_policy->shutting_down) return;
-  grpc_lb_policy_args* args = lb_policy_args_create(glb_policy);
-  GPR_ASSERT(args != nullptr);
-  if (glb_policy->rr_policy != nullptr) {
-    if (grpc_lb_glb_trace.enabled()) {
-      gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", glb_policy,
-              glb_policy->rr_policy);
-    }
-    grpc_lb_policy_update_locked(glb_policy->rr_policy, args);
-  } else {
-    create_rr_locked(glb_policy, args);
-    if (grpc_lb_glb_trace.enabled()) {
-      gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", glb_policy,
-              glb_policy->rr_policy);
-    }
-  }
-  lb_policy_args_destroy(args);
-}
-
-static void rr_on_connectivity_changed_locked(void* arg, grpc_error* error) {
-  glb_lb_policy* glb_policy = (glb_lb_policy*)arg;
-  if (glb_policy->shutting_down) {
-    GRPC_LB_POLICY_UNREF(&glb_policy->base,
-                         "rr_on_connectivity_changed_locked");
-    return;
-  }
-  update_lb_connectivity_status_locked(glb_policy, GRPC_ERROR_REF(error));
-  // Resubscribe. Reuse the "rr_on_connectivity_changed_locked" ref.
-  grpc_lb_policy_notify_on_state_change_locked(
-      glb_policy->rr_policy, &glb_policy->rr_connectivity_state,
-      &glb_policy->rr_on_connectivity_changed);
-}
-
-static void destroy_balancer_name(void* balancer_name) {
-  gpr_free(balancer_name);
-}
-
-static grpc_slice_hash_table_entry targets_info_entry_create(
-    const char* address, const char* balancer_name) {
-  grpc_slice_hash_table_entry entry;
-  entry.key = grpc_slice_from_copied_string(address);
-  entry.value = gpr_strdup(balancer_name);
-  return entry;
-}
-
-static int balancer_name_cmp_fn(void* a, void* b) {
-  const char* a_str = static_cast<const char*>(a);
-  const char* b_str = static_cast<const char*>(b);
-  return strcmp(a_str, b_str);
-}
-
-/* Returns the channel args for the LB channel, used to create a bidirectional
- * stream for the reception of load balancing updates.
- *
- * Inputs:
- *   - \a addresses: corresponding to the balancers.
- *   - \a response_generator: in order to propagate updates from the resolver
- *   above the grpclb policy.
- *   - \a args: other args inherited from the grpclb policy. */
-static grpc_channel_args* build_lb_channel_args(
-    const grpc_lb_addresses* addresses,
-    grpc_core::FakeResolverResponseGenerator* response_generator,
-    const grpc_channel_args* args) {
-  size_t num_grpclb_addrs = 0;
-  for (size_t i = 0; i < addresses->num_addresses; ++i) {
-    if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
-  }
-  /* All input addresses come from a resolver that claims they are LB services.
-   * It's the resolver's responsibility to make sure this policy is only
-   * instantiated and used in that case. Otherwise, something has gone wrong. */
-  GPR_ASSERT(num_grpclb_addrs > 0);
-  grpc_lb_addresses* lb_addresses =
-      grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
-  grpc_slice_hash_table_entry* targets_info_entries =
-      static_cast<grpc_slice_hash_table_entry*>(
-          gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs));
-
-  size_t lb_addresses_idx = 0;
-  for (size_t i = 0; i < addresses->num_addresses; ++i) {
-    if (!addresses->addresses[i].is_balancer) continue;
-    if (addresses->addresses[i].user_data != nullptr) {
-      gpr_log(GPR_ERROR,
-              "This LB policy doesn't support user data. It will be ignored");
-    }
-    char* addr_str;
-    GPR_ASSERT(grpc_sockaddr_to_string(
-                   &addr_str, &addresses->addresses[i].address, true) > 0);
-    targets_info_entries[lb_addresses_idx] = targets_info_entry_create(
-        addr_str, addresses->addresses[i].balancer_name);
-    gpr_free(addr_str);
-
-    grpc_lb_addresses_set_address(
-        lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
-        addresses->addresses[i].address.len, false /* is balancer */,
-        addresses->addresses[i].balancer_name, nullptr /* user data */);
-  }
-  GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
-  grpc_slice_hash_table* targets_info =
-      grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries,
-                                   destroy_balancer_name, balancer_name_cmp_fn);
-  gpr_free(targets_info_entries);
-
-  grpc_channel_args* lb_channel_args =
-      grpc_lb_policy_grpclb_build_lb_channel_args(targets_info,
-                                                  response_generator, args);
-
-  grpc_arg lb_channel_addresses_arg =
-      grpc_lb_addresses_create_channel_arg(lb_addresses);
-
-  grpc_channel_args* result = grpc_channel_args_copy_and_add(
-      lb_channel_args, &lb_channel_addresses_arg, 1);
-  grpc_slice_hash_table_unref(targets_info);
-  grpc_channel_args_destroy(lb_channel_args);
-  grpc_lb_addresses_destroy(lb_addresses);
-  return result;
-}
-
-static void glb_destroy(grpc_lb_policy* pol) {
-  glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
-  GPR_ASSERT(glb_policy->pending_picks == nullptr);
-  GPR_ASSERT(glb_policy->pending_pings == nullptr);
-  gpr_free((void*)glb_policy->server_name);
-  grpc_channel_args_destroy(glb_policy->args);
-  grpc_connectivity_state_destroy(&glb_policy->state_tracker);
-  if (glb_policy->serverlist != nullptr) {
-    grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
-  }
-  if (glb_policy->fallback_backend_addresses != nullptr) {
-    grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
-  }
-  // TODO(roth): Remove this once the LB policy becomes a C++ object.
-  glb_policy->response_generator.reset();
-  grpc_subchannel_index_unref();
-  gpr_free(glb_policy);
-}
-
-static void glb_shutdown_locked(grpc_lb_policy* pol,
-                                grpc_lb_policy* new_policy) {
-  glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
-  grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
-  glb_policy->shutting_down = true;
-  if (glb_policy->lb_calld != nullptr) {
-    lb_call_data_shutdown(glb_policy);
-  }
-  if (glb_policy->retry_timer_callback_pending) {
-    grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
-  }
-  if (glb_policy->fallback_timer_callback_pending) {
-    grpc_timer_cancel(&glb_policy->lb_fallback_timer);
-  }
-  if (glb_policy->rr_policy != nullptr) {
-    grpc_lb_policy_shutdown_locked(glb_policy->rr_policy, nullptr);
-    GRPC_LB_POLICY_UNREF(glb_policy->rr_policy, "glb_shutdown");
-  }
-  // We destroy the LB channel here because
-  // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
-  // instance.  Destroying the lb channel in glb_destroy would likely result in
-  // a callback invocation without a valid glb_policy arg.
-  if (glb_policy->lb_channel != nullptr) {
-    grpc_channel_destroy(glb_policy->lb_channel);
-    glb_policy->lb_channel = nullptr;
-  }
-  grpc_connectivity_state_set(&glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
-                              GRPC_ERROR_REF(error), "glb_shutdown");
-  grpc_lb_policy_try_reresolve(pol, &grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
-  // Clear pending picks.
-  pending_pick* pp = glb_policy->pending_picks;
-  glb_policy->pending_picks = nullptr;
-  while (pp != nullptr) {
-    pending_pick* next = pp->next;
-    if (new_policy != nullptr) {
-      // Hand pick over to new policy.
-      if (pp->client_stats != nullptr) {
-        grpc_grpclb_client_stats_unref(pp->client_stats);
-      }
-      pp->pick->on_complete = pp->original_on_complete;
-      if (grpc_lb_policy_pick_locked(new_policy, pp->pick)) {
-        // Synchronous return; schedule callback.
-        GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
-      }
-      gpr_free(pp);
-    } else {
-      pp->pick->connected_subchannel.reset();
-      GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
-    }
-    pp = next;
-  }
-  // Clear pending pings.
-  pending_ping* pping = glb_policy->pending_pings;
-  glb_policy->pending_pings = nullptr;
-  while (pping != nullptr) {
-    pending_ping* next = pping->next;
-    GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
-    GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
-    gpr_free(pping);
-    pping = next;
-  }
-  GRPC_ERROR_UNREF(error);
-}
-
-// Cancel a specific pending pick.
 //
-// A grpclb pick progresses as follows:
-// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
-//   handed over to the RR policy (in create_rr_locked()). From that point
-//   onwards, it'll be RR's responsibility. For cancellations, that implies the
-//   pick needs also be cancelled by the RR instance.
-// - Otherwise, without an RR instance, picks stay pending at this policy's
-//   level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
-//   we invoke the completion closure and set *target to nullptr right here.
-static void glb_cancel_pick_locked(grpc_lb_policy* pol,
-                                   grpc_lb_policy_pick_state* pick,
-                                   grpc_error* error) {
-  glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
-  pending_pick* pp = glb_policy->pending_picks;
-  glb_policy->pending_picks = nullptr;
-  while (pp != nullptr) {
-    pending_pick* next = pp->next;
-    if (pp->pick == pick) {
-      pick->connected_subchannel.reset();
-      GRPC_CLOSURE_SCHED(&pp->on_complete,
-                         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
-                             "Pick Cancelled", &error, 1));
-    } else {
-      pp->next = glb_policy->pending_picks;
-      glb_policy->pending_picks = pp;
-    }
-    pp = next;
-  }
-  if (glb_policy->rr_policy != nullptr) {
-    grpc_lb_policy_cancel_pick_locked(glb_policy->rr_policy, pick,
-                                      GRPC_ERROR_REF(error));
-  }
-  GRPC_ERROR_UNREF(error);
-}
-
-// Cancel all pending picks.
+// factory
 //
-// A grpclb pick progresses as follows:
-// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
-//   handed over to the RR policy (in create_rr_locked()). From that point
-//   onwards, it'll be RR's responsibility. For cancellations, that implies the
-//   pick needs also be cancelled by the RR instance.
-// - Otherwise, without an RR instance, picks stay pending at this policy's
-//   level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
-//   we invoke the completion closure and set *target to nullptr right here.
-static void glb_cancel_picks_locked(grpc_lb_policy* pol,
-                                    uint32_t initial_metadata_flags_mask,
-                                    uint32_t initial_metadata_flags_eq,
-                                    grpc_error* error) {
-  glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
-  pending_pick* pp = glb_policy->pending_picks;
-  glb_policy->pending_picks = nullptr;
-  while (pp != nullptr) {
-    pending_pick* next = pp->next;
-    if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
-        initial_metadata_flags_eq) {
-      GRPC_CLOSURE_SCHED(&pp->on_complete,
-                         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
-                             "Pick Cancelled", &error, 1));
-    } else {
-      pp->next = glb_policy->pending_picks;
-      glb_policy->pending_picks = pp;
+
+class GrpcLbFactory : public LoadBalancingPolicyFactory {
+ public:
+  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+      const LoadBalancingPolicy::Args& args) const override {
+    /* Count the number of gRPC-LB addresses. There must be at least one. */
+    const grpc_arg* arg =
+        grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
+    if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
+      return nullptr;
     }
-    pp = next;
-  }
-  if (glb_policy->rr_policy != nullptr) {
-    grpc_lb_policy_cancel_picks_locked(
-        glb_policy->rr_policy, initial_metadata_flags_mask,
-        initial_metadata_flags_eq, GRPC_ERROR_REF(error));
-  }
-  GRPC_ERROR_UNREF(error);
-}
-
-static void lb_on_fallback_timer_locked(void* arg, grpc_error* error);
-static void query_for_backends_locked(glb_lb_policy* glb_policy);
-static void start_picking_locked(glb_lb_policy* glb_policy) {
-  /* start a timer to fall back */
-  if (glb_policy->lb_fallback_timeout_ms > 0 &&
-      glb_policy->serverlist == nullptr &&
-      !glb_policy->fallback_timer_callback_pending) {
-    grpc_millis deadline =
-        grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_fallback_timeout_ms;
-    GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_fallback_timer");
-    GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked,
-                      glb_policy,
-                      grpc_combiner_scheduler(glb_policy->base.combiner));
-    glb_policy->fallback_timer_callback_pending = true;
-    grpc_timer_init(&glb_policy->lb_fallback_timer, deadline,
-                    &glb_policy->lb_on_fallback);
-  }
-  glb_policy->started_picking = true;
-  glb_policy->lb_call_backoff->Reset();
-  query_for_backends_locked(glb_policy);
-}
-
-static void glb_exit_idle_locked(grpc_lb_policy* pol) {
-  glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
-  if (!glb_policy->started_picking) {
-    start_picking_locked(glb_policy);
-  }
-}
-
-static int glb_pick_locked(grpc_lb_policy* pol,
-                           grpc_lb_policy_pick_state* pick) {
-  glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
-  pending_pick* pp = pending_pick_create(glb_policy, pick);
-  bool pick_done = false;
-  if (glb_policy->rr_policy != nullptr) {
-    const grpc_connectivity_state rr_connectivity_state =
-        grpc_lb_policy_check_connectivity_locked(glb_policy->rr_policy,
-                                                 nullptr);
-    // The glb_policy->rr_policy may have transitioned to SHUTDOWN but the
-    // callback registered to capture this event
-    // (on_rr_connectivity_changed_locked) may not have been invoked yet. We
-    // need to make sure we aren't trying to pick from a RR policy instance
-    // that's in shutdown.
-    if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
-      if (grpc_lb_glb_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "[grpclb %p] NOT picking from from RR %p: RR conn state=%s",
-                glb_policy, glb_policy->rr_policy,
-                grpc_connectivity_state_name(rr_connectivity_state));
-      }
-      pending_pick_add(&glb_policy->pending_picks, pp);
-      pick_done = false;
-    } else {  // RR not in shutdown
-      if (grpc_lb_glb_trace.enabled()) {
-        gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy,
-                glb_policy->rr_policy);
-      }
-      pick_done =
-          pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp);
+    grpc_lb_addresses* addresses =
+        reinterpret_cast<grpc_lb_addresses*>(arg->value.pointer.p);
+    size_t num_grpclb_addrs = 0;
+    for (size_t i = 0; i < addresses->num_addresses; ++i) {
+      if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
     }
-  } else {  // glb_policy->rr_policy == NULL
-    if (grpc_lb_glb_trace.enabled()) {
-      gpr_log(GPR_DEBUG,
-              "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
-              glb_policy);
-    }
-    pending_pick_add(&glb_policy->pending_picks, pp);
-    if (!glb_policy->started_picking) {
-      start_picking_locked(glb_policy);
-    }
-    pick_done = false;
+    if (num_grpclb_addrs == 0) return nullptr;
+    return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args));
   }
-  return pick_done;
-}
 
-static grpc_connectivity_state glb_check_connectivity_locked(
-    grpc_lb_policy* pol, grpc_error** connectivity_error) {
-  glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
-  return grpc_connectivity_state_get(&glb_policy->state_tracker,
-                                     connectivity_error);
-}
+  const char* name() const override { return "grpclb"; }
+};
 
-static void glb_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate,
-                                grpc_closure* on_ack) {
-  glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
-  if (glb_policy->rr_policy) {
-    grpc_lb_policy_ping_one_locked(glb_policy->rr_policy, on_initiate, on_ack);
-  } else {
-    pending_ping_add(&glb_policy->pending_pings, on_initiate, on_ack);
-    if (!glb_policy->started_picking) {
-      start_picking_locked(glb_policy);
-    }
-  }
-}
+}  // namespace
 
-static void glb_notify_on_state_change_locked(grpc_lb_policy* pol,
-                                              grpc_connectivity_state* current,
-                                              grpc_closure* notify) {
-  glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(pol);
-  grpc_connectivity_state_notify_on_state_change(&glb_policy->state_tracker,
-                                                 current, notify);
-}
+}  // namespace grpc_core
 
-static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) {
-  glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg);
-  glb_policy->retry_timer_callback_pending = false;
-  if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE &&
-      glb_policy->lb_calld == nullptr) {
-    if (grpc_lb_glb_trace.enabled()) {
-      gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy);
-    }
-    query_for_backends_locked(glb_policy);
-  }
-  GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer");
-}
+//
+// Plugin registration
+//
 
-static void start_lb_call_retry_timer_locked(glb_lb_policy* glb_policy) {
-  grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime();
-  if (grpc_lb_glb_trace.enabled()) {
-    gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
-            glb_policy);
-    grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
-    if (timeout > 0) {
-      gpr_log(GPR_DEBUG,
-              "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.",
-              glb_policy, timeout);
-    } else {
-      gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
-              glb_policy);
-    }
-  }
-  GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer");
-  GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
-                    lb_call_on_retry_timer_locked, glb_policy,
-                    grpc_combiner_scheduler(glb_policy->base.combiner));
-  glb_policy->retry_timer_callback_pending = true;
-  grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try,
-                  &glb_policy->lb_on_call_retry);
-}
-
-static void maybe_send_client_load_report_locked(void* arg, grpc_error* error);
-
-static void schedule_next_client_load_report(glb_lb_call_data* lb_calld) {
-  const grpc_millis next_client_load_report_time =
-      grpc_core::ExecCtx::Get()->Now() + lb_calld->client_stats_report_interval;
-  GRPC_CLOSURE_INIT(
-      &lb_calld->client_load_report_closure,
-      maybe_send_client_load_report_locked, lb_calld,
-      grpc_combiner_scheduler(lb_calld->glb_policy->base.combiner));
-  grpc_timer_init(&lb_calld->client_load_report_timer,
-                  next_client_load_report_time,
-                  &lb_calld->client_load_report_closure);
-  lb_calld->client_load_report_timer_callback_pending = true;
-}
-
-static void client_load_report_done_locked(void* arg, grpc_error* error) {
-  glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg);
-  glb_lb_policy* glb_policy = lb_calld->glb_policy;
-  grpc_byte_buffer_destroy(lb_calld->send_message_payload);
-  lb_calld->send_message_payload = nullptr;
-  if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) {
-    glb_lb_call_data_unref(lb_calld, "client_load_report");
-    return;
-  }
-  schedule_next_client_load_report(lb_calld);
-}
-
-static bool load_report_counters_are_zero(grpc_grpclb_request* request) {
-  grpc_grpclb_dropped_call_counts* drop_entries =
-      static_cast<grpc_grpclb_dropped_call_counts*>(
-          request->client_stats.calls_finished_with_drop.arg);
-  return request->client_stats.num_calls_started == 0 &&
-         request->client_stats.num_calls_finished == 0 &&
-         request->client_stats.num_calls_finished_with_client_failed_to_send ==
-             0 &&
-         request->client_stats.num_calls_finished_known_received == 0 &&
-         (drop_entries == nullptr || drop_entries->num_entries == 0);
-}
-
-static void send_client_load_report_locked(glb_lb_call_data* lb_calld) {
-  glb_lb_policy* glb_policy = lb_calld->glb_policy;
-  // Construct message payload.
-  GPR_ASSERT(lb_calld->send_message_payload == nullptr);
-  grpc_grpclb_request* request =
-      grpc_grpclb_load_report_request_create_locked(lb_calld->client_stats);
-  // Skip client load report if the counters were all zero in the last
-  // report and they are still zero in this one.
-  if (load_report_counters_are_zero(request)) {
-    if (lb_calld->last_client_load_report_counters_were_zero) {
-      grpc_grpclb_request_destroy(request);
-      schedule_next_client_load_report(lb_calld);
-      return;
-    }
-    lb_calld->last_client_load_report_counters_were_zero = true;
-  } else {
-    lb_calld->last_client_load_report_counters_were_zero = false;
-  }
-  grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
-  lb_calld->send_message_payload =
-      grpc_raw_byte_buffer_create(&request_payload_slice, 1);
-  grpc_slice_unref_internal(request_payload_slice);
-  grpc_grpclb_request_destroy(request);
-  // Send the report.
-  grpc_op op;
-  memset(&op, 0, sizeof(op));
-  op.op = GRPC_OP_SEND_MESSAGE;
-  op.data.send_message.send_message = lb_calld->send_message_payload;
-  GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure,
-                    client_load_report_done_locked, lb_calld,
-                    grpc_combiner_scheduler(glb_policy->base.combiner));
-  grpc_call_error call_error = grpc_call_start_batch_and_execute(
-      lb_calld->lb_call, &op, 1, &lb_calld->client_load_report_closure);
-  if (call_error != GRPC_CALL_OK) {
-    gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error);
-    GPR_ASSERT(GRPC_CALL_OK == call_error);
-  }
-}
-
-static void maybe_send_client_load_report_locked(void* arg, grpc_error* error) {
-  glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg);
-  glb_lb_policy* glb_policy = lb_calld->glb_policy;
-  lb_calld->client_load_report_timer_callback_pending = false;
-  if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) {
-    glb_lb_call_data_unref(lb_calld, "client_load_report");
-    return;
-  }
-  // If we've already sent the initial request, then we can go ahead and send
-  // the load report. Otherwise, we need to wait until the initial request has
-  // been sent to send this (see lb_on_sent_initial_request_locked()).
-  if (lb_calld->send_message_payload == nullptr) {
-    send_client_load_report_locked(lb_calld);
-  } else {
-    lb_calld->client_load_report_is_due = true;
-  }
-}
-
-static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error);
-static void lb_on_server_status_received_locked(void* arg, grpc_error* error);
-static void lb_on_response_received_locked(void* arg, grpc_error* error);
-static glb_lb_call_data* lb_call_data_create_locked(glb_lb_policy* glb_policy) {
-  GPR_ASSERT(!glb_policy->shutting_down);
-  // Init the LB call. Note that the LB call will progress every time there's
-  // activity in glb_policy->base.interested_parties, which is comprised of the
-  // polling entities from client_channel.
-  GPR_ASSERT(glb_policy->server_name != nullptr);
-  GPR_ASSERT(glb_policy->server_name[0] != '\0');
-  grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name);
-  grpc_millis deadline =
-      glb_policy->lb_call_timeout_ms == 0
-          ? GRPC_MILLIS_INF_FUTURE
-          : grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms;
-  glb_lb_call_data* lb_calld =
-      static_cast<glb_lb_call_data*>(gpr_zalloc(sizeof(*lb_calld)));
-  lb_calld->lb_call = grpc_channel_create_pollset_set_call(
-      glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS,
-      glb_policy->base.interested_parties,
-      GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
-      &host, deadline, nullptr);
-  grpc_slice_unref_internal(host);
-  // Init the LB call request payload.
-  grpc_grpclb_request* request =
-      grpc_grpclb_request_create(glb_policy->server_name);
-  grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
-  lb_calld->send_message_payload =
-      grpc_raw_byte_buffer_create(&request_payload_slice, 1);
-  grpc_slice_unref_internal(request_payload_slice);
-  grpc_grpclb_request_destroy(request);
-  // Init other data associated with the LB call.
-  lb_calld->glb_policy = glb_policy;
-  gpr_ref_init(&lb_calld->refs, 1);
-  grpc_metadata_array_init(&lb_calld->lb_initial_metadata_recv);
-  grpc_metadata_array_init(&lb_calld->lb_trailing_metadata_recv);
-  GRPC_CLOSURE_INIT(&lb_calld->lb_on_sent_initial_request,
-                    lb_on_sent_initial_request_locked, lb_calld,
-                    grpc_combiner_scheduler(glb_policy->base.combiner));
-  GRPC_CLOSURE_INIT(&lb_calld->lb_on_response_received,
-                    lb_on_response_received_locked, lb_calld,
-                    grpc_combiner_scheduler(glb_policy->base.combiner));
-  GRPC_CLOSURE_INIT(&lb_calld->lb_on_server_status_received,
-                    lb_on_server_status_received_locked, lb_calld,
-                    grpc_combiner_scheduler(glb_policy->base.combiner));
-  // Hold a ref to the glb_policy.
-  GRPC_LB_POLICY_REF(&glb_policy->base, "lb_calld");
-  return lb_calld;
-}
-
-/*
- * Auxiliary functions and LB client callbacks.
- */
-
-static void query_for_backends_locked(glb_lb_policy* glb_policy) {
-  GPR_ASSERT(glb_policy->lb_channel != nullptr);
-  if (glb_policy->shutting_down) return;
-  // Init the LB call data.
-  GPR_ASSERT(glb_policy->lb_calld == nullptr);
-  glb_policy->lb_calld = lb_call_data_create_locked(glb_policy);
-  if (grpc_lb_glb_trace.enabled()) {
-    gpr_log(GPR_INFO,
-            "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p, "
-            "lb_call: %p)",
-            glb_policy, glb_policy->lb_channel, glb_policy->lb_calld,
-            glb_policy->lb_calld->lb_call);
-  }
-  GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr);
-  // Create the ops.
-  grpc_call_error call_error;
-  grpc_op ops[3];
-  memset(ops, 0, sizeof(ops));
-  // Op: send initial metadata.
-  grpc_op* op = ops;
-  op->op = GRPC_OP_SEND_INITIAL_METADATA;
-  op->data.send_initial_metadata.count = 0;
-  op->flags = 0;
-  op->reserved = nullptr;
-  op++;
-  // Op: send request message.
-  GPR_ASSERT(glb_policy->lb_calld->send_message_payload != nullptr);
-  op->op = GRPC_OP_SEND_MESSAGE;
-  op->data.send_message.send_message =
-      glb_policy->lb_calld->send_message_payload;
-  op->flags = 0;
-  op->reserved = nullptr;
-  op++;
-  glb_lb_call_data_ref(glb_policy->lb_calld,
-                       "lb_on_sent_initial_request_locked");
-  call_error = grpc_call_start_batch_and_execute(
-      glb_policy->lb_calld->lb_call, ops, static_cast<size_t>(op - ops),
-      &glb_policy->lb_calld->lb_on_sent_initial_request);
-  GPR_ASSERT(GRPC_CALL_OK == call_error);
-  // Op: recv initial metadata.
-  op = ops;
-  op->op = GRPC_OP_RECV_INITIAL_METADATA;
-  op->data.recv_initial_metadata.recv_initial_metadata =
-      &glb_policy->lb_calld->lb_initial_metadata_recv;
-  op->flags = 0;
-  op->reserved = nullptr;
-  op++;
-  // Op: recv response.
-  op->op = GRPC_OP_RECV_MESSAGE;
-  op->data.recv_message.recv_message =
-      &glb_policy->lb_calld->recv_message_payload;
-  op->flags = 0;
-  op->reserved = nullptr;
-  op++;
-  glb_lb_call_data_ref(glb_policy->lb_calld, "lb_on_response_received_locked");
-  call_error = grpc_call_start_batch_and_execute(
-      glb_policy->lb_calld->lb_call, ops, static_cast<size_t>(op - ops),
-      &glb_policy->lb_calld->lb_on_response_received);
-  GPR_ASSERT(GRPC_CALL_OK == call_error);
-  // Op: recv server status.
-  op = ops;
-  op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
-  op->data.recv_status_on_client.trailing_metadata =
-      &glb_policy->lb_calld->lb_trailing_metadata_recv;
-  op->data.recv_status_on_client.status = &glb_policy->lb_calld->lb_call_status;
-  op->data.recv_status_on_client.status_details =
-      &glb_policy->lb_calld->lb_call_status_details;
-  op->flags = 0;
-  op->reserved = nullptr;
-  op++;
-  // This callback signals the end of the LB call, so it relies on the initial
-  // ref instead of a new ref. When it's invoked, it's the initial ref that is
-  // unreffed.
-  call_error = grpc_call_start_batch_and_execute(
-      glb_policy->lb_calld->lb_call, ops, static_cast<size_t>(op - ops),
-      &glb_policy->lb_calld->lb_on_server_status_received);
-  GPR_ASSERT(GRPC_CALL_OK == call_error);
-}
-
-static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) {
-  glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg);
-  grpc_byte_buffer_destroy(lb_calld->send_message_payload);
-  lb_calld->send_message_payload = nullptr;
-  // If we attempted to send a client load report before the initial request was
-  // sent (and this lb_calld is still in use), send the load report now.
-  if (lb_calld->client_load_report_is_due &&
-      lb_calld == lb_calld->glb_policy->lb_calld) {
-    send_client_load_report_locked(lb_calld);
-    lb_calld->client_load_report_is_due = false;
-  }
-  glb_lb_call_data_unref(lb_calld, "lb_on_sent_initial_request_locked");
-}
-
-static void lb_on_response_received_locked(void* arg, grpc_error* error) {
-  glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg);
-  glb_lb_policy* glb_policy = lb_calld->glb_policy;
-  // Empty payload means the LB call was cancelled.
-  if (lb_calld != glb_policy->lb_calld ||
-      lb_calld->recv_message_payload == nullptr) {
-    glb_lb_call_data_unref(lb_calld, "lb_on_response_received_locked");
-    return;
-  }
-  grpc_op ops[2];
-  memset(ops, 0, sizeof(ops));
-  grpc_op* op = ops;
-  glb_policy->lb_call_backoff->Reset();
-  grpc_byte_buffer_reader bbr;
-  grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload);
-  grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
-  grpc_byte_buffer_reader_destroy(&bbr);
-  grpc_byte_buffer_destroy(lb_calld->recv_message_payload);
-  lb_calld->recv_message_payload = nullptr;
-  grpc_grpclb_initial_response* initial_response;
-  grpc_grpclb_serverlist* serverlist;
-  if (!lb_calld->seen_initial_response &&
-      (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
-          nullptr) {
-    // Have NOT seen initial response, look for initial response.
-    if (initial_response->has_client_stats_report_interval) {
-      lb_calld->client_stats_report_interval = GPR_MAX(
-          GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
-                              &initial_response->client_stats_report_interval));
-      if (grpc_lb_glb_trace.enabled()) {
-        gpr_log(GPR_INFO,
-                "[grpclb %p] Received initial LB response message; "
-                "client load reporting interval = %" PRIdPTR " milliseconds",
-                glb_policy, lb_calld->client_stats_report_interval);
-      }
-    } else if (grpc_lb_glb_trace.enabled()) {
-      gpr_log(GPR_INFO,
-              "[grpclb %p] Received initial LB response message; client load "
-              "reporting NOT enabled",
-              glb_policy);
-    }
-    grpc_grpclb_initial_response_destroy(initial_response);
-    lb_calld->seen_initial_response = true;
-  } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
-                  response_slice)) != nullptr) {
-    // Have seen initial response, look for serverlist.
-    GPR_ASSERT(lb_calld->lb_call != nullptr);
-    if (grpc_lb_glb_trace.enabled()) {
-      gpr_log(GPR_INFO,
-              "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
-              glb_policy, serverlist->num_servers);
-      for (size_t i = 0; i < serverlist->num_servers; ++i) {
-        grpc_resolved_address addr;
-        parse_server(serverlist->servers[i], &addr);
-        char* ipport;
-        grpc_sockaddr_to_string(&ipport, &addr, false);
-        gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
-                glb_policy, i, ipport);
-        gpr_free(ipport);
-      }
-    }
-    /* update serverlist */
-    if (serverlist->num_servers > 0) {
-      // Start sending client load report only after we start using the
-      // serverlist returned from the current LB call.
-      if (lb_calld->client_stats_report_interval > 0 &&
-          lb_calld->client_stats == nullptr) {
-        lb_calld->client_stats = grpc_grpclb_client_stats_create();
-        glb_lb_call_data_ref(lb_calld, "client_load_report");
-        schedule_next_client_load_report(lb_calld);
-      }
-      if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
-        if (grpc_lb_glb_trace.enabled()) {
-          gpr_log(GPR_INFO,
-                  "[grpclb %p] Incoming server list identical to current, "
-                  "ignoring.",
-                  glb_policy);
-        }
-        grpc_grpclb_destroy_serverlist(serverlist);
-      } else { /* new serverlist */
-        if (glb_policy->serverlist != nullptr) {
-          /* dispose of the old serverlist */
-          grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
-        } else {
-          /* or dispose of the fallback */
-          grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
-          glb_policy->fallback_backend_addresses = nullptr;
-          if (glb_policy->fallback_timer_callback_pending) {
-            grpc_timer_cancel(&glb_policy->lb_fallback_timer);
-            glb_policy->fallback_timer_callback_pending = false;
-          }
-        }
-        /* and update the copy in the glb_lb_policy instance. This
-         * serverlist instance will be destroyed either upon the next
-         * update or in glb_destroy() */
-        glb_policy->serverlist = serverlist;
-        glb_policy->serverlist_index = 0;
-        rr_handover_locked(glb_policy);
-      }
-    } else {
-      if (grpc_lb_glb_trace.enabled()) {
-        gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
-                glb_policy);
-      }
-      grpc_grpclb_destroy_serverlist(serverlist);
-    }
-  } else {
-    // No valid initial response or serverlist found.
-    gpr_log(GPR_ERROR,
-            "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
-            glb_policy,
-            grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
-  }
-  grpc_slice_unref_internal(response_slice);
-  if (!glb_policy->shutting_down) {
-    // Keep listening for serverlist updates.
-    op->op = GRPC_OP_RECV_MESSAGE;
-    op->data.recv_message.recv_message = &lb_calld->recv_message_payload;
-    op->flags = 0;
-    op->reserved = nullptr;
-    op++;
-    // Reuse the "lb_on_response_received_locked" ref taken in
-    // query_for_backends_locked().
-    const grpc_call_error call_error = grpc_call_start_batch_and_execute(
-        lb_calld->lb_call, ops, static_cast<size_t>(op - ops),
-        &lb_calld->lb_on_response_received);
-    GPR_ASSERT(GRPC_CALL_OK == call_error);
-  } else {
-    glb_lb_call_data_unref(lb_calld,
-                           "lb_on_response_received_locked+glb_shutdown");
-  }
-}
-
-static void lb_on_server_status_received_locked(void* arg, grpc_error* error) {
-  glb_lb_call_data* lb_calld = static_cast<glb_lb_call_data*>(arg);
-  glb_lb_policy* glb_policy = lb_calld->glb_policy;
-  GPR_ASSERT(lb_calld->lb_call != nullptr);
-  if (grpc_lb_glb_trace.enabled()) {
-    char* status_details =
-        grpc_slice_to_c_string(lb_calld->lb_call_status_details);
-    gpr_log(GPR_INFO,
-            "[grpclb %p] Status from LB server received. Status = %d, details "
-            "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
-            lb_calld->glb_policy, lb_calld->lb_call_status, status_details,
-            lb_calld, lb_calld->lb_call, grpc_error_string(error));
-    gpr_free(status_details);
-  }
-  grpc_lb_policy_try_reresolve(&glb_policy->base, &grpc_lb_glb_trace,
-                               GRPC_ERROR_NONE);
-  // If this lb_calld is still in use, this call ended because of a failure so
-  // we want to retry connecting. Otherwise, we have deliberately ended this
-  // call and no further action is required.
-  if (lb_calld == glb_policy->lb_calld) {
-    glb_policy->lb_calld = nullptr;
-    if (lb_calld->client_load_report_timer_callback_pending) {
-      grpc_timer_cancel(&lb_calld->client_load_report_timer);
-    }
-    GPR_ASSERT(!glb_policy->shutting_down);
-    if (lb_calld->seen_initial_response) {
-      // If we lose connection to the LB server, reset the backoff and restart
-      // the LB call immediately.
-      glb_policy->lb_call_backoff->Reset();
-      query_for_backends_locked(glb_policy);
-    } else {
-      // If this LB call fails establishing any connection to the LB server,
-      // retry later.
-      start_lb_call_retry_timer_locked(glb_policy);
-    }
-  }
-  glb_lb_call_data_unref(lb_calld, "lb_call_ended");
-}
-
-static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) {
-  glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg);
-  glb_policy->fallback_timer_callback_pending = false;
-  /* If we receive a serverlist after the timer fires but before this callback
-   * actually runs, don't fall back. */
-  if (glb_policy->serverlist == nullptr && !glb_policy->shutting_down &&
-      error == GRPC_ERROR_NONE) {
-    if (grpc_lb_glb_trace.enabled()) {
-      gpr_log(GPR_INFO,
-              "[grpclb %p] Falling back to use backends from resolver",
-              glb_policy);
-    }
-    GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
-    rr_handover_locked(glb_policy);
-  }
-  GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer");
-}
-
-static void fallback_update_locked(glb_lb_policy* glb_policy,
-                                   const grpc_lb_addresses* addresses) {
-  GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr);
-  grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses);
-  glb_policy->fallback_backend_addresses =
-      extract_backend_addresses_locked(addresses);
-  if (glb_policy->lb_fallback_timeout_ms > 0 &&
-      glb_policy->rr_policy != nullptr) {
-    rr_handover_locked(glb_policy);
-  }
-}
-
-static void glb_update_locked(grpc_lb_policy* policy,
-                              const grpc_lb_policy_args* args) {
-  glb_lb_policy* glb_policy = reinterpret_cast<glb_lb_policy*>(policy);
-  const grpc_arg* arg =
-      grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
-  if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
-    if (glb_policy->lb_channel == nullptr) {
-      // If we don't have a current channel to the LB, go into TRANSIENT
-      // FAILURE.
-      grpc_connectivity_state_set(
-          &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
-          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
-          "glb_update_missing");
-    } else {
-      // otherwise, keep using the current LB channel (ignore this update).
-      gpr_log(
-          GPR_ERROR,
-          "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
-          glb_policy);
-    }
-    return;
-  }
-  const grpc_lb_addresses* addresses =
-      static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
-  // If a non-empty serverlist hasn't been received from the balancer,
-  // propagate the update to fallback_backend_addresses.
-  if (glb_policy->serverlist == nullptr) {
-    fallback_update_locked(glb_policy, addresses);
-  }
-  GPR_ASSERT(glb_policy->lb_channel != nullptr);
-  // Propagate updates to the LB channel (pick_first) through the fake
-  // resolver.
-  grpc_channel_args* lb_channel_args = build_lb_channel_args(
-      addresses, glb_policy->response_generator.get(), args->args);
-  glb_policy->response_generator->SetResponse(lb_channel_args);
-  grpc_channel_args_destroy(lb_channel_args);
-  // Start watching the LB channel connectivity for connection, if not
-  // already doing so.
-  if (!glb_policy->watching_lb_channel) {
-    glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
-        glb_policy->lb_channel, true /* try to connect */);
-    grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
-        grpc_channel_get_channel_stack(glb_policy->lb_channel));
-    GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
-    glb_policy->watching_lb_channel = true;
-    GRPC_LB_POLICY_REF(&glb_policy->base, "watch_lb_channel_connectivity");
-    grpc_client_channel_watch_connectivity_state(
-        client_channel_elem,
-        grpc_polling_entity_create_from_pollset_set(
-            glb_policy->base.interested_parties),
-        &glb_policy->lb_channel_connectivity,
-        &glb_policy->lb_channel_on_connectivity_changed, nullptr);
-  }
-}
-
-// Invoked as part of the update process. It continues watching the LB channel
-// until it shuts down or becomes READY. It's invoked even if the LB channel
-// stayed READY throughout the update (for example if the update is identical).
-static void glb_lb_channel_on_connectivity_changed_cb(void* arg,
-                                                      grpc_error* error) {
-  glb_lb_policy* glb_policy = static_cast<glb_lb_policy*>(arg);
-  if (glb_policy->shutting_down) goto done;
-  // Re-initialize the lb_call. This should also take care of updating the
-  // embedded RR policy. Note that the current RR policy, if any, will stay in
-  // effect until an update from the new lb_call is received.
-  switch (glb_policy->lb_channel_connectivity) {
-    case GRPC_CHANNEL_CONNECTING:
-    case GRPC_CHANNEL_TRANSIENT_FAILURE: {
-      // Keep watching the LB channel.
-      grpc_channel_element* client_channel_elem =
-          grpc_channel_stack_last_element(
-              grpc_channel_get_channel_stack(glb_policy->lb_channel));
-      GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
-      grpc_client_channel_watch_connectivity_state(
-          client_channel_elem,
-          grpc_polling_entity_create_from_pollset_set(
-              glb_policy->base.interested_parties),
-          &glb_policy->lb_channel_connectivity,
-          &glb_policy->lb_channel_on_connectivity_changed, nullptr);
-      break;
-    }
-      // The LB channel may be IDLE because it's shut down before the update.
-      // Restart the LB call to kick the LB channel into gear.
-    case GRPC_CHANNEL_IDLE:
-    case GRPC_CHANNEL_READY:
-      if (glb_policy->lb_calld != nullptr) {
-        lb_call_data_shutdown(glb_policy);
-      }
-      if (glb_policy->started_picking) {
-        if (glb_policy->retry_timer_callback_pending) {
-          grpc_timer_cancel(&glb_policy->lb_call_retry_timer);
-        }
-        glb_policy->lb_call_backoff->Reset();
-        query_for_backends_locked(glb_policy);
-      }
-      // Fall through.
-    case GRPC_CHANNEL_SHUTDOWN:
-    done:
-      glb_policy->watching_lb_channel = false;
-      GRPC_LB_POLICY_UNREF(&glb_policy->base,
-                           "watch_lb_channel_connectivity_cb_shutdown");
-  }
-}
-
-/* Code wiring the policy with the rest of the core */
-static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
-    glb_destroy,
-    glb_shutdown_locked,
-    glb_pick_locked,
-    glb_cancel_pick_locked,
-    glb_cancel_picks_locked,
-    glb_ping_one_locked,
-    glb_exit_idle_locked,
-    glb_check_connectivity_locked,
-    glb_notify_on_state_change_locked,
-    glb_update_locked};
-
-static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory,
-                                  grpc_lb_policy_args* args) {
-  /* Count the number of gRPC-LB addresses. There must be at least one. */
-  const grpc_arg* arg =
-      grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
-  if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
-    return nullptr;
-  }
-  grpc_lb_addresses* addresses =
-      static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
-  size_t num_grpclb_addrs = 0;
-  for (size_t i = 0; i < addresses->num_addresses; ++i) {
-    if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
-  }
-  if (num_grpclb_addrs == 0) return nullptr;
-
-  glb_lb_policy* glb_policy =
-      static_cast<glb_lb_policy*>(gpr_zalloc(sizeof(*glb_policy)));
-
-  /* Get server name. */
-  arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
-  const char* server_uri = grpc_channel_arg_get_string(arg);
-  GPR_ASSERT(server_uri != nullptr);
-  grpc_uri* uri = grpc_uri_parse(server_uri, true);
-  GPR_ASSERT(uri->path[0] != '\0');
-  glb_policy->server_name =
-      gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
-  if (grpc_lb_glb_trace.enabled()) {
-    gpr_log(GPR_INFO,
-            "[grpclb %p] Will use '%s' as the server name for LB request.",
-            glb_policy, glb_policy->server_name);
-  }
-  grpc_uri_destroy(uri);
-
-  glb_policy->cc_factory = args->client_channel_factory;
-  GPR_ASSERT(glb_policy->cc_factory != nullptr);
-
-  arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
-  glb_policy->lb_call_timeout_ms =
-      grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
-
-  arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
-  glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer(
-      arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
-
-  // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
-  // since we use this to trigger the client_load_reporting filter.
-  grpc_arg new_arg = grpc_channel_arg_string_create(
-      (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
-  static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
-  glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
-      args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
-
-  /* Extract the backend addresses (may be empty) from the resolver for
-   * fallback. */
-  glb_policy->fallback_backend_addresses =
-      extract_backend_addresses_locked(addresses);
-
-  /* Create a client channel over them to communicate with a LB service */
-  glb_policy->response_generator =
-      grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
-  grpc_channel_args* lb_channel_args = build_lb_channel_args(
-      addresses, glb_policy->response_generator.get(), args->args);
-  char* uri_str;
-  gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
-  glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
-      uri_str, args->client_channel_factory, lb_channel_args);
-
-  /* Propagate initial resolution */
-  glb_policy->response_generator->SetResponse(lb_channel_args);
-  grpc_channel_args_destroy(lb_channel_args);
-  gpr_free(uri_str);
-  if (glb_policy->lb_channel == nullptr) {
-    gpr_free((void*)glb_policy->server_name);
-    grpc_channel_args_destroy(glb_policy->args);
-    gpr_free(glb_policy);
-    return nullptr;
-  }
-  grpc_subchannel_index_ref();
-  GRPC_CLOSURE_INIT(&glb_policy->rr_on_connectivity_changed,
-                    rr_on_connectivity_changed_locked, glb_policy,
-                    grpc_combiner_scheduler(args->combiner));
-  GRPC_CLOSURE_INIT(&glb_policy->rr_on_reresolution_requested,
-                    rr_on_reresolution_requested_locked, glb_policy,
-                    grpc_combiner_scheduler(args->combiner));
-  GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
-                    glb_lb_channel_on_connectivity_changed_cb, glb_policy,
-                    grpc_combiner_scheduler(args->combiner));
-  grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
-  grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
-                               "grpclb");
-  // Init LB call backoff option.
-  grpc_core::BackOff::Options backoff_options;
-  backoff_options
-      .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
-      .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
-      .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
-      .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
-  glb_policy->lb_call_backoff.Init(backoff_options);
-  return &glb_policy->base;
-}
-
-static void glb_factory_ref(grpc_lb_policy_factory* factory) {}
-
-static void glb_factory_unref(grpc_lb_policy_factory* factory) {}
-
-static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
-    glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
-
-static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
-
-grpc_lb_policy_factory* grpc_glb_lb_factory_create() {
-  return &glb_lb_policy_factory;
-}
-
-/* Plugin registration */
+namespace {
 
 // Only add client_load_reporting filter if the grpclb LB policy is used.
-static bool maybe_add_client_load_reporting_filter(
-    grpc_channel_stack_builder* builder, void* arg) {
+bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
+                                            void* arg) {
   const grpc_channel_args* args =
       grpc_channel_stack_builder_get_channel_arguments(builder);
   const grpc_arg* channel_arg =
@@ -1938,14 +1867,18 @@
   if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
       strcmp(channel_arg->value.string, "grpclb") == 0) {
     return grpc_channel_stack_builder_append_filter(
-        builder, static_cast<const grpc_channel_filter*>(arg), nullptr,
-        nullptr);
+        builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
   }
   return true;
 }
 
+}  // namespace
+
 void grpc_lb_policy_grpclb_init() {
-  grpc_register_lb_policy(grpc_glb_lb_factory_create());
+  grpc_core::LoadBalancingPolicyRegistry::Builder::
+      RegisterLoadBalancingPolicyFactory(
+          grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
+              grpc_core::New<grpc_core::GrpcLbFactory>()));
   grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
                                    GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
                                    maybe_add_client_load_reporting_filter,