Merge branch 'master' of github.com:grpc/grpc into lb_add_md
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 566d3d5..c718e9a 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -399,9 +399,11 @@
     int r;
     GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
     gpr_mu_unlock(&chand->mu);
-    r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
-                            initial_metadata, initial_metadata_flags,
-                            connected_subchannel, on_ready);
+    const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
+                                             initial_metadata_flags,
+                                             &calld->lb_token_mdelem};
+    r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
+                            on_ready);
     GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
     GPR_TIMER_END("cc_pick_subchannel", 0);
     return r;
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 8b980b2..71170f5 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -100,13 +100,10 @@
 }
 
 int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                        grpc_polling_entity *pollent,
-                        grpc_metadata_batch *initial_metadata,
-                        uint32_t initial_metadata_flags,
+                        const grpc_lb_policy_pick_args *pick_args,
                         grpc_connected_subchannel **target,
                         grpc_closure *on_complete) {
-  return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata,
-                              initial_metadata_flags, target, on_complete);
+  return policy->vtable->pick(exec_ctx, policy, pick_args, target, on_complete);
 }
 
 void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index a2f5446..6f133a2 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -53,23 +53,37 @@
   grpc_pollset_set *interested_parties;
 };
 
+/** Extra arguments for an LB pick */
+typedef struct grpc_lb_policy_pick_args {
+  /** Parties interested in the pick's progress */
+  grpc_polling_entity *pollent;
+  /** Initial metadata associated with the picking call. */
+  grpc_metadata_batch *initial_metadata;
+  /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
+  uint32_t initial_metadata_flags;
+  /** Storage for LB token in \a initial_metadata, or NULL if not used */
+  grpc_linked_mdelem *lb_token_mdelem_storage;
+} grpc_lb_policy_pick_args;
+
 struct grpc_lb_policy_vtable {
   void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
-
   void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
 
-  /** implement grpc_lb_policy_pick */
+  /** \see grpc_lb_policy_pick */
   int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-              grpc_polling_entity *pollent,
-              grpc_metadata_batch *initial_metadata,
-              uint32_t initial_metadata_flags,
+              const grpc_lb_policy_pick_args *pick_args,
               grpc_connected_subchannel **target, grpc_closure *on_complete);
+
+  /** \see grpc_lb_policy_cancel_pick */
   void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
                       grpc_connected_subchannel **target);
+
+  /** \see grpc_lb_policy_cancel_picks */
   void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
                        uint32_t initial_metadata_flags_mask,
                        uint32_t initial_metadata_flags_eq);
 
+  /** \see grpc_lb_policy_ping_one */
   void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
                    grpc_closure *closure);
 
@@ -83,8 +97,7 @@
 
   /** call notify when the connectivity state of a channel changes from *state.
       Updates *state with the new state of the policy. Calling with a NULL \a
-      state cancels the subscription.
-      */
+      state cancels the subscription.  */
   void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
                                  grpc_lb_policy *policy,
                                  grpc_connectivity_state *state,
@@ -124,26 +137,31 @@
 void grpc_lb_policy_init(grpc_lb_policy *policy,
                          const grpc_lb_policy_vtable *vtable);
 
-/** Given initial metadata in \a initial_metadata, find an appropriate
-    target for this rpc, and 'return' it by calling \a on_complete after setting
-    \a target.
-    Picking can be asynchronous. Any IO should be done under \a pollent. */
+/** Find an appropriate target for this call, based on \a pick_args.
+    Upon completion \a on_complete will be called, with \a *target set to an
+    appropriate connected subchannel if the pick was successful or NULL
+    otherwise.
+    Picking can be asynchronous. Any IO should be done under \a
+    pick_args->pollent. */
 int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
-                        grpc_polling_entity *pollent,
-                        grpc_metadata_batch *initial_metadata,
-                        uint32_t initial_metadata_flags,
+                        const grpc_lb_policy_pick_args *pick_args,
                         grpc_connected_subchannel **target,
                         grpc_closure *on_complete);
 
+/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
+    against one of the connected subchannels managed by \a policy. */
 void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
                              grpc_closure *closure);
 
+/** Cancel picks for \a target.
+    The \a on_complete callback of the pending picks will be invoked with \a
+    *target set to NULL. */
 void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
                                 grpc_connected_subchannel **target);
 
-/** Cancel all pending picks which have:
-    (initial_metadata_flags & initial_metadata_flags_mask) ==
-         initial_metadata_flags_eq */
+/** Cancel all pending picks for which their \a initial_metadata_flags (as given
+    in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
+    when AND'd with \a initial_metadata_flags_mask */
 void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
                                  grpc_lb_policy *policy,
                                  uint32_t initial_metadata_flags_mask,
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index da1de35..cfe6f1a 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -47,8 +47,16 @@
   const grpc_lb_policy_factory_vtable *vtable;
 };
 
+typedef struct grpc_lb_policy_address_token {
+  uint8_t *token;
+  size_t token_size;
+} grpc_lb_policy_address_token;
+
 typedef struct grpc_lb_policy_args {
   grpc_resolved_addresses *addresses;
+  /* It not NULL, array of load balancing tokens associated with \a addresses,
+   * on a 1:1 correspondence. Some indices may be NULL for missing tokens. */
+  grpc_lb_policy_address_token *tokens;
   grpc_client_channel_factory *client_channel_factory;
 } grpc_lb_policy_args;
 
diff --git a/src/core/ext/client_config/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h
index 8d2deb0..40a0681 100644
--- a/src/core/ext/client_config/subchannel_call_holder.h
+++ b/src/core/ext/client_config/subchannel_call_holder.h
@@ -81,6 +81,8 @@
   grpc_closure next_step;
 
   grpc_call_stack *owning_call;
+
+  grpc_linked_mdelem lb_token_mdelem;
 } grpc_subchannel_call_holder;
 
 void grpc_subchannel_call_holder_init(
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index af913d8..c3294b7 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -76,9 +76,9 @@
  *    operations in progress over the old RR instance. This is done by
  *    decreasing the reference count on the old policy. The moment no more
  *    references are held on the old RR policy, it'll be destroyed and \a
- *    rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state.
- *    At this point we can transition to a new RR instance safely, which is done
- *    once again via \a rr_handover().
+ *    glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
+ *    state. At this point we can transition to a new RR instance safely, which
+ *    is done once again via \a rr_handover().
  *
  *
  * Once a RR policy instance is in place (and getting updated as described),
@@ -96,6 +96,8 @@
  * - Implement LB service forwarding (point 2c. in the doc's diagram).
  */
 
+#include <errno.h>
+
 #include <string.h>
 
 #include <grpc/byte_buffer_reader.h>
@@ -109,6 +111,7 @@
 #include "src/core/ext/client_config/parse_address.h"
 #include "src/core/ext/lb_policy/grpclb/grpclb.h"
 #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/lib/iomgr/sockaddr.h"
 #include "src/core/lib/iomgr/sockaddr_utils.h"
 #include "src/core/lib/support/string.h"
 #include "src/core/lib/surface/call.h"
@@ -164,6 +167,9 @@
   /* the initial metadata for the pick. See grpc_lb_policy_pick() */
   grpc_metadata_batch *initial_metadata;
 
+  /* storage for the lb token initial metadata mdelem */
+  grpc_linked_mdelem *lb_token_mdelem_storage;
+
   /* bitmask passed to pick() and used for selective cancelling. See
    * grpc_lb_policy_cancel_picks() */
   uint32_t initial_metadata_flags;
@@ -180,19 +186,19 @@
   wrapped_rr_closure_arg wrapped_on_complete_arg;
 } pending_pick;
 
-static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
-                             grpc_metadata_batch *initial_metadata,
-                             uint32_t initial_metadata_flags,
+static void add_pending_pick(pending_pick **root,
+                             const grpc_lb_policy_pick_args *pick_args,
                              grpc_connected_subchannel **target,
                              grpc_closure *on_complete) {
   pending_pick *pp = gpr_malloc(sizeof(*pp));
   memset(pp, 0, sizeof(pending_pick));
   memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
   pp->next = *root;
-  pp->pollent = pollent;
+  pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
+  pp->pollent = pick_args->pollent;
   pp->target = target;
-  pp->initial_metadata = initial_metadata;
-  pp->initial_metadata_flags = initial_metadata_flags;
+  pp->initial_metadata = pick_args->initial_metadata;
+  pp->initial_metadata_flags = pick_args->initial_metadata_flags;
   pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
   grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
                     &pp->wrapped_on_complete_arg);
@@ -279,58 +285,84 @@
   glb_lb_policy *glb_policy;
 };
 
+static bool process_serverlist(const grpc_grpclb_server *server,
+                               struct sockaddr_storage *sa, size_t *sa_len) {
+  if (server->port >> 16 != 0) {
+    gpr_log(GPR_ERROR, "Invalid port '%d'.", server->port);
+    return false;
+  }
+  const uint16_t netorder_port = htons((uint16_t)server->port);
+  /* the addresses are given in binary format (a in(6)_addr struct) in
+   * server->ip_address.bytes. */
+  const grpc_grpclb_ip_address *ip = &server->ip_address;
+  *sa_len = 0;
+  if (ip->size == 4) {
+    struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
+    *sa_len = sizeof(struct sockaddr_in);
+    memset(addr4, 0, *sa_len);
+    addr4->sin_family = AF_INET;
+    memcpy(&addr4->sin_addr, ip->bytes, ip->size);
+    addr4->sin_port = netorder_port;
+  } else if (ip->size == 6) {
+    struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
+    *sa_len = sizeof(struct sockaddr_in6);
+    memset(addr6, 0, *sa_len);
+    addr6->sin6_family = AF_INET;
+    memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
+    addr6->sin6_port = netorder_port;
+  } else {
+    gpr_log(GPR_ERROR, "Expected IP to be 4 or 16 bytes. Got %d.", ip->size);
+    return false;
+  }
+  GPR_ASSERT(*sa_len > 0);
+  return true;
+}
+
 static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
                                  const grpc_grpclb_serverlist *serverlist,
                                  glb_lb_policy *glb_policy) {
-  /* TODO(dgq): support mixed ip version */
   GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
-  char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
-  for (size_t i = 0; i < serverlist->num_servers; ++i) {
-    gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
-                       serverlist->servers[i]->port);
-  }
-
-  size_t uri_path_len;
-  char *concat_ipports = gpr_strjoin_sep(
-      (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
 
   grpc_lb_policy_args args;
+  memset(&args, 0, sizeof(args));
   args.client_channel_factory = glb_policy->cc_factory;
+  args.tokens = gpr_malloc(sizeof(grpc_lb_policy_address_token) *
+                           serverlist->num_servers);
   args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
-  args.addresses->naddrs = serverlist->num_servers;
   args.addresses->addrs =
-      gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
-  size_t out_addrs_idx = 0;
+      gpr_malloc(sizeof(grpc_resolved_address) * serverlist->num_servers);
+  size_t addr_idx = 0;
   for (size_t i = 0; i < serverlist->num_servers; ++i) {
-    grpc_uri uri;
-    struct sockaddr_storage sa;
-    size_t sa_len;
-    uri.path = host_ports[i];
-    if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
-      memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
-      args.addresses->addrs[out_addrs_idx].len = sa_len;
-      ++out_addrs_idx;
-    } else {
-      gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
-              host_ports[i]);
+    const grpc_grpclb_server *server = serverlist->servers[i];
+    grpc_resolved_address *raddr = &args.addresses->addrs[addr_idx];
+    if (!process_serverlist(server, (struct sockaddr_storage *)raddr->addr,
+                            &raddr->len)) {
+      gpr_log(GPR_INFO,
+              "Problem processing server at index %zu of received serverlist, "
+              "ignoring.",
+              i);
+      continue;
     }
+    ++addr_idx;
+    args.tokens[i].token_size = GPR_ARRAY_SIZE(server->load_balance_token) - 1;
+    args.tokens[i].token = gpr_malloc(args.tokens[i].token_size);
+    memcpy(args.tokens[i].token, server->load_balance_token,
+           args.tokens[i].token_size);
   }
+  args.addresses->naddrs = addr_idx;
 
   grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
 
-  gpr_free(concat_ipports);
-  for (size_t i = 0; i < serverlist->num_servers; i++) {
-    gpr_free(host_ports[i]);
-  }
-  gpr_free(host_ports);
   gpr_free(args.addresses->addrs);
   gpr_free(args.addresses);
+  gpr_free(args.tokens);
   return rr;
 }
 
 static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
                         grpc_error *error) {
-  GRPC_ERROR_REF(error);
+  GPR_ASSERT(glb_policy->serverlist != NULL &&
+             glb_policy->serverlist->num_servers > 0);
   glb_policy->rr_policy =
       create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
 
@@ -345,8 +377,8 @@
       exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
       &glb_policy->rr_connectivity->on_change);
   grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
-                              glb_policy->rr_connectivity->state, error,
-                              "rr_handover");
+                              glb_policy->rr_connectivity->state,
+                              GRPC_ERROR_REF(error), "rr_handover");
   grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
 
   /* flush pending ops */
@@ -359,9 +391,11 @@
       gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
               (intptr_t)glb_policy->rr_policy);
     }
-    grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
-                        pp->initial_metadata, pp->initial_metadata_flags,
-                        pp->target, &pp->wrapped_on_complete);
+    const grpc_lb_policy_pick_args pick_args = {
+        pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
+        pp->lb_token_mdelem_storage};
+    grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
+                        &pp->wrapped_on_complete);
     pp->wrapped_on_complete_arg.owning_pending_node = pp;
   }
 
@@ -378,13 +412,13 @@
                             &pping->wrapped_notify);
     pping->wrapped_notify_arg.owning_pending_node = pping;
   }
-  GRPC_ERROR_UNREF(error);
 }
 
-static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
-                                    grpc_error *error) {
+static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
+                                        grpc_error *error) {
   rr_connectivity_data *rr_conn_data = arg;
   glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
+
   if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
     if (glb_policy->serverlist != NULL) {
       /* a RR policy is shutting down but there's a serverlist available ->
@@ -398,8 +432,8 @@
     if (error == GRPC_ERROR_NONE) {
       /* RR not shutting down. Mimic the RR's policy state */
       grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
-                                  rr_conn_data->state, error,
-                                  "rr_connectivity_changed");
+                                  rr_conn_data->state, GRPC_ERROR_REF(error),
+                                  "glb_rr_connectivity_changed");
       /* resubscribe */
       grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
                                             &rr_conn_data->state,
@@ -408,7 +442,6 @@
       gpr_free(rr_conn_data);
     }
   }
-  GRPC_ERROR_UNREF(error);
 }
 
 static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@@ -463,7 +496,7 @@
   rr_connectivity_data *rr_connectivity =
       gpr_malloc(sizeof(rr_connectivity_data));
   memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
-  grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
+  grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
                     rr_connectivity);
   rr_connectivity->glb_policy = glb_policy;
   glb_policy->rr_connectivity = rr_connectivity;
@@ -546,7 +579,6 @@
       *target = NULL;
       grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
                           GRPC_ERROR_CANCELLED, NULL);
-      gpr_free(pp);
     } else {
       pp->next = glb_policy->pending_picks;
       glb_policy->pending_picks = pp;
@@ -576,7 +608,6 @@
           exec_ctx, pp->pollent, glb_policy->base.interested_parties);
       grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
                           GRPC_ERROR_CANCELLED, NULL);
-      gpr_free(pp);
     } else {
       pp->next = glb_policy->pending_picks;
       glb_policy->pending_picks = pp;
@@ -603,12 +634,22 @@
 }
 
 static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                    grpc_polling_entity *pollent,
-                    grpc_metadata_batch *initial_metadata,
-                    uint32_t initial_metadata_flags,
+                    const grpc_lb_policy_pick_args *pick_args,
                     grpc_connected_subchannel **target,
                     grpc_closure *on_complete) {
   glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+
+  if (pick_args->lb_token_mdelem_storage == NULL) {
+    /* TODO(dgq): should this be an assert? If storage is NULL, something has
+     * gone very wrong at the client channel filter */
+    gpr_log(GPR_ERROR,
+            "No mdelem storage for the LB token. Load reporting won't work "
+            "without it. Failing");
+    *target = NULL;
+    grpc_exec_ctx_sched(exec_ctx, on_complete, GRPC_ERROR_NONE, NULL);
+    return 1;
+  }
+
   gpr_mu_lock(&glb_policy->mu);
   int r;
 
@@ -623,8 +664,8 @@
     glb_policy->wc_arg.wrapped_closure = on_complete;
     grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
                       &glb_policy->wc_arg);
-    r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
-                            initial_metadata, initial_metadata_flags, target,
+
+    r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
                             &glb_policy->wrapped_on_complete);
     if (r != 0) {
       /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
@@ -639,10 +680,10 @@
                           GRPC_ERROR_NONE, NULL);
     }
   } else {
-    grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+    grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
                                            glb_policy->base.interested_parties);
-    add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
-                     initial_metadata_flags, target, on_complete);
+    add_pending_pick(&glb_policy->pending_picks, pick_args, target,
+                     on_complete);
 
     if (!glb_policy->started_picking) {
       start_picking(exec_ctx, glb_policy);
@@ -702,9 +743,6 @@
   /* called once initial metadata's been sent */
   grpc_closure md_sent;
 
-  /* called once initial metadata's been received */
-  grpc_closure md_rcvd;
-
   /* called once the LoadBalanceRequest has been sent to the LB server. See
    * src/proto/grpc/.../load_balancer.proto */
   grpc_closure req_sent;
@@ -741,7 +779,6 @@
 } lb_client_data;
 
 static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
 static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
 static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
 static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
@@ -756,7 +793,6 @@
   gpr_mu_init(&lb_client->mu);
   grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
 
-  grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client);
   grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
   grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
   grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
@@ -855,23 +891,6 @@
   grpc_op ops[1];
   memset(ops, 0, sizeof(ops));
   grpc_op *op = ops;
-  op->op = GRPC_OP_RECV_INITIAL_METADATA;
-  op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
-  op->flags = 0;
-  op->reserved = NULL;
-  op++;
-  grpc_call_error call_error = grpc_call_start_batch_and_execute(
-      exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
-      &lb_client->md_rcvd);
-  GPR_ASSERT(GRPC_CALL_OK == call_error);
-}
-
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
-  lb_client_data *lb_client = arg;
-  GPR_ASSERT(lb_client->lb_call);
-  grpc_op ops[1];
-  memset(ops, 0, sizeof(ops));
-  grpc_op *op = ops;
 
   op->op = GRPC_OP_SEND_MESSAGE;
   op->data.send_message = lb_client->request_payload;
@@ -886,11 +905,18 @@
 
 static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
   lb_client_data *lb_client = arg;
+  GPR_ASSERT(lb_client->lb_call);
 
-  grpc_op ops[1];
+  grpc_op ops[2];
   memset(ops, 0, sizeof(ops));
   grpc_op *op = ops;
 
+  op->op = GRPC_OP_RECV_INITIAL_METADATA;
+  op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+
   op->op = GRPC_OP_RECV_MESSAGE;
   op->data.recv_message = &lb_client->response_payload;
   op->flags = 0;
@@ -909,8 +935,7 @@
   grpc_op *op = ops;
   if (lb_client->response_payload != NULL) {
     /* Received data from the LB server. Look inside
-     * lb_client->response_payload, for
-     * a serverlist. */
+     * lb_client->response_payload, for a serverlist. */
     grpc_byte_buffer_reader bbr;
     grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
     gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
@@ -947,7 +972,7 @@
         } else {
           /* unref the RR policy, eventually leading to its substitution with a
            * new one constructed from the received serverlist (see
-           * rr_connectivity_changed) */
+           * glb_rr_connectivity_changed) */
           GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
                                "serverlist_received");
         }
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
index f4720a1..a888100 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
@@ -57,6 +57,7 @@
   if (dec_arg->first_pass) { /* count how many server do we have */
     grpc_grpclb_server server;
     if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
+      gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
       return false;
     }
     dec_arg->num_servers++;
@@ -69,6 +70,7 @@
           gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
     }
     if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
+      gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
       return false;
     }
     dec_arg->servers[dec_arg->decoding_idx++] = server;
@@ -118,6 +120,7 @@
   grpc_grpclb_response res;
   memset(&res, 0, sizeof(grpc_grpclb_response));
   if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) {
+    gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
     return NULL;
   }
   grpc_grpclb_initial_response *initial_res =
@@ -145,6 +148,7 @@
   arg.first_pass = true;
   status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
   if (!status) {
+    gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
     return NULL;
   }
 
@@ -152,6 +156,7 @@
   status =
       pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res);
   if (!status) {
+    gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
     return NULL;
   }
 
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
index 9726c87..c1e73d0 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
@@ -45,6 +45,7 @@
 
 #define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
 
+typedef grpc_lb_v1_Server_ip_address_t grpc_grpclb_ip_address;
 typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
 typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response;
 typedef grpc_lb_v1_Server grpc_grpclb_server;
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index 52e11c4..2676714 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -31,10 +31,11 @@
  *
  */
 /* Automatically generated nanopb constant definitions */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev */
 
 #include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
 
+/* @@protoc_insertion_point(includes) */
 #if PB_PROTO_HEADER_VERSION != 30
 #error Regenerate this file with the current version of nanopb generator.
 #endif
@@ -72,7 +73,7 @@
 };
 
 const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = {
-    PB_FIELD(  2, STRING  , OPTIONAL, STATIC  , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
+    PB_FIELD(  1, STRING  , OPTIONAL, STATIC  , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
     PB_FIELD(  3, MESSAGE , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields),
     PB_LAST_FIELD
 };
@@ -84,7 +85,7 @@
 };
 
 const pb_field_t grpc_lb_v1_Server_fields[5] = {
-    PB_FIELD(  1, STRING  , OPTIONAL, STATIC  , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
+    PB_FIELD(  1, BYTES   , OPTIONAL, STATIC  , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
     PB_FIELD(  2, INT32   , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
     PB_FIELD(  3, STRING  , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
     PB_FIELD(  4, BOOL    , OPTIONAL, STATIC  , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0),
@@ -116,3 +117,4 @@
 #endif
 
 
+/* @@protoc_insertion_point(eof) */
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index 46fe588..832c851 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -31,11 +31,12 @@
  *
  */
 /* Automatically generated nanopb header */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev */
 
-#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
-#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
+#ifndef PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
+#define PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
 #include "third_party/nanopb/pb.h"
+/* @@protoc_insertion_point(includes) */
 #if PB_PROTO_HEADER_VERSION != 30
 #error Regenerate this file with the current version of nanopb generator.
 #endif
@@ -52,6 +53,7 @@
     int64_t client_rpc_errors;
     bool has_dropped_requests;
     int64_t dropped_requests;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
 } grpc_lb_v1_ClientStats;
 
 typedef struct _grpc_lb_v1_Duration {
@@ -59,22 +61,26 @@
     int64_t seconds;
     bool has_nanos;
     int32_t nanos;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Duration) */
 } grpc_lb_v1_Duration;
 
 typedef struct _grpc_lb_v1_InitialLoadBalanceRequest {
     bool has_name;
     char name[128];
+/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceRequest) */
 } grpc_lb_v1_InitialLoadBalanceRequest;
 
+typedef PB_BYTES_ARRAY_T(16) grpc_lb_v1_Server_ip_address_t;
 typedef struct _grpc_lb_v1_Server {
     bool has_ip_address;
-    char ip_address[46];
+    grpc_lb_v1_Server_ip_address_t ip_address;
     bool has_port;
     int32_t port;
     bool has_load_balance_token;
-    char load_balance_token[64];
+    char load_balance_token[65];
     bool has_drop_request;
     bool drop_request;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
 } grpc_lb_v1_Server;
 
 typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
@@ -82,6 +88,7 @@
     char load_balancer_delegate[64];
     bool has_client_stats_report_interval;
     grpc_lb_v1_Duration client_stats_report_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */
 } grpc_lb_v1_InitialLoadBalanceResponse;
 
 typedef struct _grpc_lb_v1_LoadBalanceRequest {
@@ -89,12 +96,14 @@
     grpc_lb_v1_InitialLoadBalanceRequest initial_request;
     bool has_client_stats;
     grpc_lb_v1_ClientStats client_stats;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceRequest) */
 } grpc_lb_v1_LoadBalanceRequest;
 
 typedef struct _grpc_lb_v1_ServerList {
     pb_callback_t servers;
     bool has_expiration_interval;
     grpc_lb_v1_Duration expiration_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
 } grpc_lb_v1_ServerList;
 
 typedef struct _grpc_lb_v1_LoadBalanceResponse {
@@ -102,6 +111,7 @@
     grpc_lb_v1_InitialLoadBalanceResponse initial_response;
     bool has_server_list;
     grpc_lb_v1_ServerList server_list;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceResponse) */
 } grpc_lb_v1_LoadBalanceResponse;
 
 /* Default values for struct fields */
@@ -114,7 +124,7 @@
 #define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
 #define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
 #define grpc_lb_v1_ServerList_init_default       {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default           {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_default           {false, {0, {0}}, false, 0, false, "", false, 0}
 #define grpc_lb_v1_Duration_init_zero            {false, 0, false, 0}
 #define grpc_lb_v1_LoadBalanceRequest_init_zero  {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
 #define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
@@ -122,7 +132,7 @@
 #define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
 #define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
 #define grpc_lb_v1_ServerList_init_zero          {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero              {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_zero              {false, {0, {0}}, false, 0, false, "", false, 0}
 
 /* Field tags (for use in manual encoding/decoding) */
 #define grpc_lb_v1_ClientStats_total_requests_tag 1
@@ -135,7 +145,7 @@
 #define grpc_lb_v1_Server_port_tag               2
 #define grpc_lb_v1_Server_load_balance_token_tag 3
 #define grpc_lb_v1_Server_drop_request_tag       4
-#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 2
+#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
 #define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 3
 #define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
 #define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
@@ -161,7 +171,8 @@
 #define grpc_lb_v1_ClientStats_size              33
 #define grpc_lb_v1_LoadBalanceResponse_size      (98 + grpc_lb_v1_ServerList_size)
 #define grpc_lb_v1_InitialLoadBalanceResponse_size 90
-#define grpc_lb_v1_Server_size                   127
+/* grpc_lb_v1_ServerList_size depends on runtime parameters */
+#define grpc_lb_v1_Server_size 98
 
 /* Message IDs (where set with "msgid" option) */
 #ifdef PB_MSGID
@@ -174,5 +185,6 @@
 #ifdef __cplusplus
 } /* extern "C" */
 #endif
+/* @@protoc_insertion_point(eof) */
 
 #endif
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9decf70..e1277b3 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -199,9 +199,7 @@
 }
 
 static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                   grpc_polling_entity *pollent,
-                   grpc_metadata_batch *initial_metadata,
-                   uint32_t initial_metadata_flags,
+                   const grpc_lb_policy_pick_args *pick_args,
                    grpc_connected_subchannel **target,
                    grpc_closure *on_complete) {
   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -225,13 +223,13 @@
     if (!p->started_picking) {
       start_picking(exec_ctx, p);
     }
-    grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+    grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
                                            p->base.interested_parties);
     pp = gpr_malloc(sizeof(*pp));
     pp->next = p->pending_picks;
-    pp->pollent = pollent;
+    pp->pollent = pick_args->pollent;
     pp->target = target;
-    pp->initial_metadata_flags = initial_metadata_flags;
+    pp->initial_metadata_flags = pick_args->initial_metadata_flags;
     pp->on_complete = on_complete;
     p->pending_picks = pp;
     gpr_mu_unlock(&p->mu);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 7bcf608..8fda405 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -66,6 +66,7 @@
 #include "src/core/ext/client_config/lb_policy_registry.h"
 #include "src/core/lib/debug/trace.h"
 #include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/static_metadata.h"
 
 typedef struct round_robin_lb_policy round_robin_lb_policy;
 
@@ -76,15 +77,33 @@
  * Once a pick is available, \a target is updated and \a on_complete called. */
 typedef struct pending_pick {
   struct pending_pick *next;
+
+  /* polling entity for the pick()'s async notification */
   grpc_polling_entity *pollent;
+
+  /* the initial metadata for the pick. See grpc_lb_policy_pick() */
+  grpc_metadata_batch *initial_metadata;
+
+  /* storage for the lb token initial metadata mdelem */
+  grpc_linked_mdelem *lb_token_mdelem_storage;
+
+  /* bitmask passed to pick() and used for selective cancelling. See
+   * grpc_lb_policy_cancel_picks() */
   uint32_t initial_metadata_flags;
+
+  /* output argument where to store the pick()ed connected subchannel, or NULL
+   * upon error. */
   grpc_connected_subchannel **target;
+
+  /* to be invoked once the pick() has completed (regardless of success) */
   grpc_closure *on_complete;
 } pending_pick;
 
 /** List of subchannels in a connectivity READY state */
 typedef struct ready_list {
   grpc_subchannel *subchannel;
+  /* references namesake entry in subchannel_data */
+  grpc_lb_policy_address_token *lb_token;
   struct ready_list *next;
   struct ready_list *prev;
 } ready_list;
@@ -102,12 +121,19 @@
   ready_list *ready_list_node;
   /** last observed connectivity */
   grpc_connectivity_state connectivity_state;
+  /** the subchannel's target LB token */
+  grpc_lb_policy_address_token *lb_token;
 } subchannel_data;
 
 struct round_robin_lb_policy {
   /** base policy: must be first */
   grpc_lb_policy base;
 
+  /** total number of addresses received at creation time */
+  size_t num_addresses;
+  /** load balancing tokens, one per incoming address */
+  grpc_lb_policy_address_token *lb_tokens;
+
   /** all our subchannels */
   size_t num_subchannels;
   subchannel_data **subchannels;
@@ -166,16 +192,19 @@
 
   if (grpc_lb_round_robin_trace) {
     gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)",
-            p->ready_list_last_pick, p->ready_list_last_pick->subchannel);
+            (void *)p->ready_list_last_pick,
+            (void *)p->ready_list_last_pick->subchannel);
   }
 }
 
 /** Prepends (relative to the root at p->ready_list) the connected subchannel \a
  * csc to the list of ready subchannels. */
 static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
-                                           grpc_subchannel *sc) {
+                                           subchannel_data *sd) {
   ready_list *new_elem = gpr_malloc(sizeof(ready_list));
-  new_elem->subchannel = sc;
+  memset(new_elem, 0, sizeof(ready_list));
+  new_elem->subchannel = sd->subchannel;
+  new_elem->lb_token = sd->lb_token;
   if (p->ready_list.prev == NULL) {
     /* first element */
     new_elem->next = &p->ready_list;
@@ -189,7 +218,8 @@
     p->ready_list.prev = new_elem;
   }
   if (grpc_lb_round_robin_trace) {
-    gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
+    gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
+            (void *)new_elem, (void *)sd->subchannel);
   }
   return new_elem;
 }
@@ -217,7 +247,7 @@
 
   if (grpc_lb_round_robin_trace) {
     gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
-            node->subchannel);
+            (void *)node->subchannel);
   }
 
   node->next = NULL;
@@ -251,6 +281,13 @@
     gpr_free(elem);
     elem = tmp;
   }
+
+  if (p->lb_tokens != NULL) {
+    for (i = 0; i < p->num_addresses; i++) {
+      gpr_free(p->lb_tokens[i].token);
+    }
+    gpr_free(p->lb_tokens);
+  }
   gpr_free(p);
 }
 
@@ -337,7 +374,7 @@
   p->started_picking = 1;
 
   if (grpc_lb_round_robin_trace) {
-    gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p,
+    gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p,
             p->num_subchannels);
   }
 
@@ -360,10 +397,25 @@
   gpr_mu_unlock(&p->mu);
 }
 
+/* add lb_token of selected subchannel (address) to the call's initial
+ * metadata */
+static void initial_metadata_add_lb_token(
+    grpc_metadata_batch *initial_metadata,
+    grpc_linked_mdelem *lb_token_mdelem_storage,
+    grpc_lb_policy_address_token *lb_token) {
+  if (lb_token != NULL && lb_token->token_size > 0) {
+    GPR_ASSERT(lb_token->token != NULL);
+    grpc_mdstr *lb_token_mdstr =
+        grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size);
+    grpc_metadata_batch_add_tail(
+        initial_metadata, lb_token_mdelem_storage,
+        grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL,
+                                          lb_token_mdstr));
+  }
+}
+
 static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
-                   grpc_polling_entity *pollent,
-                   grpc_metadata_batch *initial_metadata,
-                   uint32_t initial_metadata_flags,
+                   const grpc_lb_policy_pick_args *pick_args,
                    grpc_connected_subchannel **target,
                    grpc_closure *on_complete) {
   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -371,28 +423,35 @@
   ready_list *selected;
   gpr_mu_lock(&p->mu);
   if ((selected = peek_next_connected_locked(p))) {
+    /* readily available, report right away */
     gpr_mu_unlock(&p->mu);
     *target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+    initial_metadata_add_lb_token(pick_args->initial_metadata,
+                                  pick_args->lb_token_mdelem_storage,
+                                  selected->lb_token);
     if (grpc_lb_round_robin_trace) {
       gpr_log(GPR_DEBUG,
-              "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target,
-              selected);
+              "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
+              (void *)*target, (void *)selected);
     }
     /* only advance the last picked pointer if the selection was used */
     advance_last_picked_locked(p);
     return 1;
   } else {
+    /* no pick currently available. Save for later in list of pending picks */
     if (!p->started_picking) {
       start_picking(exec_ctx, p);
     }
-    grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+    grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
                                            p->base.interested_parties);
     pp = gpr_malloc(sizeof(*pp));
     pp->next = p->pending_picks;
-    pp->pollent = pollent;
+    pp->pollent = pick_args->pollent;
     pp->target = target;
     pp->on_complete = on_complete;
-    pp->initial_metadata_flags = initial_metadata_flags;
+    pp->initial_metadata = pick_args->initial_metadata;
+    pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+    pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
     p->pending_picks = pp;
     gpr_mu_unlock(&p->mu);
     return 0;
@@ -421,7 +480,7 @@
                                     "connecting_ready");
         /* add the newly connected subchannel to the list of connected ones.
          * Note that it goes to the "end of the line". */
-        sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
+        sd->ready_list_node = add_connected_sc_locked(p, sd);
         /* at this point we know there's at least one suitable subchannel. Go
          * ahead and pick one and notify the pending suitors in
          * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
@@ -433,12 +492,16 @@
         }
         while ((pp = p->pending_picks)) {
           p->pending_picks = pp->next;
+
+          initial_metadata_add_lb_token(pp->initial_metadata,
+                                        pp->lb_token_mdelem_storage,
+                                        selected->lb_token);
           *pp->target =
               grpc_subchannel_get_connected_subchannel(selected->subchannel);
           if (grpc_lb_round_robin_trace) {
             gpr_log(GPR_DEBUG,
                     "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
-                    selected->subchannel, selected);
+                    (void *)selected->subchannel, (void *)selected);
           }
           grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
                                                    p->base.interested_parties);
@@ -574,13 +637,21 @@
   round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
   memset(p, 0, sizeof(*p));
 
-  p->subchannels =
-      gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
-  memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+  p->num_addresses = args->addresses->naddrs;
+  if (args->tokens != NULL) {
+    /* we need to copy because args contents aren't owned */
+    p->lb_tokens =
+        gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses);
+    memcpy(p->lb_tokens, args->tokens,
+           sizeof(grpc_lb_policy_address_token) * p->num_addresses);
+  }
+
+  p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
+  memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
 
   grpc_subchannel_args sc_args;
   size_t subchannel_idx = 0;
-  for (size_t i = 0; i < args->addresses->naddrs; i++) {
+  for (size_t i = 0; i < p->num_addresses; i++) {
     memset(&sc_args, 0, sizeof(grpc_subchannel_args));
     sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
     sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
@@ -595,12 +666,16 @@
       sd->policy = p;
       sd->index = subchannel_idx;
       sd->subchannel = subchannel;
+      if (p->lb_tokens != NULL) {
+        sd->lb_token = &p->lb_tokens[i];
+      }
       ++subchannel_idx;
       grpc_closure_init(&sd->connectivity_changed_closure,
                         rr_connectivity_changed, sd);
     }
   }
   if (subchannel_idx == 0) {
+    /* couldn't create any subchannel. Bail out */
     gpr_free(p->subchannels);
     gpr_free(p);
     return NULL;
diff --git a/src/proto/grpc/lb/v1/load_balancer.options b/src/proto/grpc/lb/v1/load_balancer.options
index d903669..a9398d5 100644
--- a/src/proto/grpc/lb/v1/load_balancer.options
+++ b/src/proto/grpc/lb/v1/load_balancer.options
@@ -1,6 +1,6 @@
 grpc.lb.v1.InitialLoadBalanceRequest.name max_size:128
 grpc.lb.v1.InitialLoadBalanceResponse.client_config max_size:64
 grpc.lb.v1.InitialLoadBalanceResponse.load_balancer_delegate max_size:64
-grpc.lb.v1.Server.ip_address max_size:46
-grpc.lb.v1.Server.load_balance_token max_size:64
+grpc.lb.v1.Server.ip_address max_size:16
+grpc.lb.v1.Server.load_balance_token max_size:65
 load_balancer.proto no_unions:true
diff --git a/src/proto/grpc/lb/v1/load_balancer.proto b/src/proto/grpc/lb/v1/load_balancer.proto
index 1bcad0b..b4a33f3 100644
--- a/src/proto/grpc/lb/v1/load_balancer.proto
+++ b/src/proto/grpc/lb/v1/load_balancer.proto
@@ -32,7 +32,6 @@
 package grpc.lb.v1;
 
 message Duration {
-
   // Signed seconds of the span of time. Must be from -315,576,000,000
   // to +315,576,000,000 inclusive.
   int64 seconds = 1;
@@ -93,16 +92,11 @@
 }
 
 message InitialLoadBalanceResponse {
-  oneof initial_response_type {
-    // TODO(zhangkun83): ClientConfig not yet defined
-    //ClientConfig client_config = 1;
-
-    // This is an application layer redirect that indicates the client should
-    // use the specified server for load balancing. When this field is set in
-    // the response, the client should open a separate connection to the
-    // load_balancer_delegate and call the BalanceLoad method.
-    string load_balancer_delegate = 2;
-  }
+  // This is an application layer redirect that indicates the client should use
+  // the specified server for load balancing. When this field is non-empty in
+  // the response, the client should open a separate connection to the
+  // load_balancer_delegate and call the BalanceLoad method.
+  string load_balancer_delegate = 1;
 
   // This interval defines how often the client should send the client stats
   // to the load balancer. Stats should only be reported when the duration is
@@ -125,14 +119,17 @@
 }
 
 message Server {
-  // A resolved address and port for the server. The IP address string may
+  // A resolved address for the server, serialized in network-byte-order. It may
   // either be an IPv4 or IPv6 address.
-  string ip_address = 1;
+  bytes ip_address = 1;
+
+  // A resolved port number for the server.
   int32 port = 2;
 
-  // An opaque token that is passed from the client to the server in metadata.
-  // The server may expect this token to indicate that the request from the
-  // client was load balanced.
+  // An opaque but printable token given to the frontend for each pick. All
+  // frontend requests for that pick must include the token in its initial
+  // metadata. The token is used by the backend to verify the request and to
+  // allow the backend to report load to the gRPC LB system.
   string load_balance_token = 3;
 
   // Indicates whether this particular request should be dropped by the client
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index 33de1ee..e67189c 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -31,10 +31,12 @@
  *
  */
 
+#include <grpc++/impl/codegen/config.h>
 #include <gtest/gtest.h>
-#include <string>
 
 #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
 #include "src/proto/grpc/lb/v1/load_balancer.pb.h"  // C++ version
 
 namespace grpc {
@@ -45,8 +47,28 @@
 
 class GrpclbTest : public ::testing::Test {};
 
+grpc::string Ip4ToPackedString(const char* ip_str) {
+  struct in_addr ip4;
+  GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1);
+  return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
+}
+
+grpc::string PackedStringToIp(const grpc_grpclb_ip_address& pb_ip) {
+  char ip_str[46] = {0};
+  int af = -1;
+  if (pb_ip.size == 4) {
+    af = AF_INET;
+  } else if (pb_ip.size == 16) {
+    af = AF_INET6;
+  } else {
+    abort();
+  }
+  GPR_ASSERT(inet_ntop(af, pb_ip.bytes, ip_str, 46) != NULL);
+  return ip_str;
+}
+
 TEST_F(GrpclbTest, CreateRequest) {
-  const std::string service_name = "AServiceName";
+  const grpc::string service_name = "AServiceName";
   LoadBalanceRequest request;
   grpc_grpclb_request* c_req = grpc_grpclb_request_create(service_name.c_str());
   gpr_slice slice = grpc_grpclb_request_encode(c_req);
@@ -65,7 +87,7 @@
       initial_response->mutable_client_stats_report_interval();
   client_stats_report_interval->set_seconds(123);
   client_stats_report_interval->set_nanos(456);
-  const std::string encoded_response = response.SerializeAsString();
+  const grpc::string encoded_response = response.SerializeAsString();
   gpr_slice encoded_slice =
       gpr_slice_from_copied_string(encoded_response.c_str());
 
@@ -82,29 +104,31 @@
   LoadBalanceResponse response;
   auto* serverlist = response.mutable_server_list();
   auto* server = serverlist->add_servers();
-  server->set_ip_address("127.0.0.1");
+  server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
   server->set_port(12345);
   server->set_drop_request(true);
   server = response.mutable_server_list()->add_servers();
-  server->set_ip_address("10.0.0.1");
+  server->set_ip_address(Ip4ToPackedString("10.0.0.1"));
   server->set_port(54321);
   server->set_drop_request(false);
   auto* expiration_interval = serverlist->mutable_expiration_interval();
   expiration_interval->set_seconds(888);
   expiration_interval->set_nanos(999);
 
-  const std::string encoded_response = response.SerializeAsString();
-  gpr_slice encoded_slice =
-      gpr_slice_from_copied_string(encoded_response.c_str());
+  const grpc::string encoded_response = response.SerializeAsString();
+  const gpr_slice encoded_slice = gpr_slice_from_copied_buffer(
+      encoded_response.data(), encoded_response.size());
   grpc_grpclb_serverlist* c_serverlist =
       grpc_grpclb_response_parse_serverlist(encoded_slice);
   ASSERT_EQ(c_serverlist->num_servers, 2ul);
   EXPECT_TRUE(c_serverlist->servers[0]->has_ip_address);
-  EXPECT_TRUE(strcmp(c_serverlist->servers[0]->ip_address, "127.0.0.1") == 0);
+  EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address),
+            "127.0.0.1");
   EXPECT_EQ(c_serverlist->servers[0]->port, 12345);
   EXPECT_TRUE(c_serverlist->servers[0]->drop_request);
   EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address);
-  EXPECT_TRUE(strcmp(c_serverlist->servers[1]->ip_address, "10.0.0.1") == 0);
+
+  EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1");
   EXPECT_EQ(c_serverlist->servers[1]->port, 54321);
   EXPECT_FALSE(c_serverlist->servers[1]->drop_request);
 
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index b2fdce2..29b3bfe 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -37,8 +37,8 @@
 #include <cstring>
 #include <string>
 
-extern "C" {
 #include <grpc/grpc.h>
+#include <grpc/impl/codegen/byte_buffer_reader.h>
 #include <grpc/support/alloc.h>
 #include <grpc/support/host_port.h>
 #include <grpc/support/log.h>
@@ -47,8 +47,12 @@
 #include <grpc/support/thd.h>
 #include <grpc/support/time.h>
 
+#include <grpc++/impl/codegen/config.h>
+
+extern "C" {
 #include "src/core/ext/client_config/client_channel.h"
 #include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/sockaddr.h"
 #include "src/core/lib/support/string.h"
 #include "src/core/lib/support/tmpfile.h"
 #include "src/core/lib/surface/channel.h"
@@ -61,6 +65,7 @@
 #include "src/proto/grpc/lb/v1/load_balancer.pb.h"
 
 #define NUM_BACKENDS 4
+#define PAYLOAD "hello you"
 
 // TODO(dgq): Other scenarios in need of testing:
 // - Send an empty serverlist update and verify that the client request blocks
@@ -105,8 +110,8 @@
     int64_t expiration_interval_secs, int32_t expiration_interval_nanos) {
   // server_list {
   //   servers {
-  //     ip_address: "127.0.0.1"
-  //     port: ...
+  //     ip_address: <in_addr/6 bytes of an IP>
+  //     port: <16 bit uint>
   //     load_balance_token: "token..."
   //   }
   //   ...
@@ -125,21 +130,21 @@
   }
   for (size_t i = 0; i < nports; i++) {
     auto *server = serverlist->add_servers();
-    server->set_ip_address(host);
+    // TODO(dgq): test ipv6
+    struct in_addr ip4;
+    GPR_ASSERT(inet_pton(AF_INET, host, &ip4) == 1);
+    server->set_ip_address(
+        grpc::string(reinterpret_cast<const char *>(&ip4), sizeof(ip4)));
     server->set_port(ports[i]);
     // The following long long int cast is meant to work around the
     // disfunctional implementation of std::to_string in gcc 4.4, which doesn't
     // have a version for int but does have one for long long int.
-    server->set_load_balance_token("token" +
-                                   std::to_string((long long int)ports[i]));
+    string token_data = "token" + std::to_string((long long int)ports[i]);
+    token_data.resize(64, '-');
+    server->set_load_balance_token(token_data);
   }
-
-  gpr_log(GPR_INFO, "generating response: %s",
-          response.ShortDebugString().c_str());
-
-  const gpr_slice response_slice =
-      gpr_slice_from_copied_string(response.SerializeAsString().c_str());
-  return response_slice;
+  const grpc::string &enc_resp = response.SerializeAsString();
+  return gpr_slice_from_copied_buffer(enc_resp.data(), enc_resp.size());
 }
 
 static void drain_cq(grpc_completion_queue *cq) {
@@ -181,6 +186,37 @@
   cq_verify(cqv);
   gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport);
 
+  // make sure we've received the initial metadata from the grpclb request.
+  GPR_ASSERT(request_metadata_recv.count > 0);
+  GPR_ASSERT(request_metadata_recv.metadata != NULL);
+
+  // receive request for backends
+  op = ops;
+  op->op = GRPC_OP_RECV_MESSAGE;
+  op->data.recv_message = &request_payload_recv;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
+  error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL);
+  GPR_ASSERT(GRPC_CALL_OK == error);
+  cq_expect_completion(cqv, tag(202), 1);
+  cq_verify(cqv);
+  gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport);
+
+  // validate initial request.
+  grpc_byte_buffer_reader bbr;
+  grpc_byte_buffer_reader_init(&bbr, request_payload_recv);
+  gpr_slice request_payload_slice = grpc_byte_buffer_reader_readall(&bbr);
+  grpc::lb::v1::LoadBalanceRequest request;
+  request.ParseFromArray(GPR_SLICE_START_PTR(request_payload_slice),
+                         GPR_SLICE_LENGTH(request_payload_slice));
+  GPR_ASSERT(request.has_initial_request());
+  GPR_ASSERT(request.initial_request().name() == "load.balanced.service.name");
+  gpr_slice_unref(request_payload_slice);
+  grpc_byte_buffer_reader_destroy(&bbr);
+  grpc_byte_buffer_destroy(request_payload_recv);
+
+  gpr_slice response_payload_slice;
   op = ops;
   op->op = GRPC_OP_SEND_INITIAL_METADATA;
   op->data.send_initial_metadata.count = 0;
@@ -196,21 +232,6 @@
   GPR_ASSERT(GRPC_CALL_OK == error);
   gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport);
 
-  // receive request for backends
-  op = ops;
-  op->op = GRPC_OP_RECV_MESSAGE;
-  op->data.recv_message = &request_payload_recv;
-  op->flags = 0;
-  op->reserved = NULL;
-  op++;
-  error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL);
-  GPR_ASSERT(GRPC_CALL_OK == error);
-  cq_expect_completion(cqv, tag(202), 1);
-  cq_verify(cqv);
-  gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport);
-  // TODO(dgq): validate request.
-  grpc_byte_buffer_destroy(request_payload_recv);
-  gpr_slice response_payload_slice;
   for (int i = 0; i < 2; i++) {
     if (i == 0) {
       // First half of the ports.
@@ -303,6 +324,16 @@
       return;
     }
     GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
+
+    // The following long long int cast is meant to work around the
+    // disfunctional implementation of std::to_string in gcc 4.4, which doesn't
+    // have a version for int but does have one for long long int.
+    string expected_token = "token" + std::to_string((long long int)sf->port);
+    expected_token.resize(64, '-');
+    GPR_ASSERT(contains_metadata(&request_metadata_recv,
+                                 "load-reporting-initial",
+                                 expected_token.c_str()));
+
     gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport);
 
     op = ops;
@@ -321,8 +352,7 @@
     gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport);
 
     bool exit = false;
-    gpr_slice response_payload_slice =
-        gpr_slice_from_copied_string("hello you");
+    gpr_slice response_payload_slice = gpr_slice_from_copied_string(PAYLOAD);
     while (!exit) {
       op = ops;
       op->op = GRPC_OP_RECV_MESSAGE;
@@ -474,10 +504,9 @@
     error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
     GPR_ASSERT(GRPC_CALL_OK == error);
 
-    peer = grpc_call_get_peer(c);
     cq_expect_completion(cqv, tag(2), 1);
     cq_verify(cqv);
-    gpr_free(peer);
+    GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD));
 
     grpc_byte_buffer_destroy(request_payload);
     grpc_byte_buffer_destroy(response_payload_recv);
diff --git a/third_party/nanopb b/third_party/nanopb
index f8ac463..68a86e9 160000
--- a/third_party/nanopb
+++ b/third_party/nanopb
@@ -1 +1 @@
-Subproject commit f8ac463766281625ad710900479130c7fcb4d63b
+Subproject commit 68a86e96481e6bea987df8de47027847b30c325b
diff --git a/tools/codegen/core/gen_nano_proto.sh b/tools/codegen/core/gen_nano_proto.sh
index c880fc2..df107c2 100755
--- a/tools/codegen/core/gen_nano_proto.sh
+++ b/tools/codegen/core/gen_nano_proto.sh
@@ -123,7 +123,7 @@
 
 # this should be the same version as the submodule we compile against
 # ideally we'd update this as a template to ensure that
-pip install protobuf==3.0.0b2
+pip install protobuf==3.0.0
 
 pushd "$(dirname $INPUT_PROTO)" > /dev/null
 
diff --git a/tools/run_tests/sanity/check_submodules.sh b/tools/run_tests/sanity/check_submodules.sh
index d2ab5b0..4dac74b 100755
--- a/tools/run_tests/sanity/check_submodules.sh
+++ b/tools/run_tests/sanity/check_submodules.sh
@@ -44,7 +44,7 @@
  c880e42ba1c8032d4cdde2aba0541d8a9d9fa2e9 third_party/boringssl (version_for_cocoapods_2.0-100-gc880e42)
  05b155ff59114735ec8cd089f669c4c3d8f59029 third_party/gflags (v2.1.0-45-g05b155f)
  c99458533a9b4c743ed51537e25989ea55944908 third_party/googletest (release-1.7.0)
- f8ac463766281625ad710900479130c7fcb4d63b third_party/nanopb (nanopb-0.3.4-29-gf8ac463)
+ 68a86e96481e6bea987df8de47027847b30c325b third_party/nanopb (nanopb-0.3.6-6-g68a86e9)
  e8ae137c96444ea313485ed1118c5e43b2099cf1 third_party/protobuf (v3.0.0-beta-4-74-ge8ae137)
  50893291621658f355bc5b4d450a8d06a563053d third_party/zlib (v1.2.8)
  bcad91771b7f0bff28a1cac1981d7ef2b9bcef3c third_party/thrift