Merge branch 'master' of github.com:grpc/grpc into lb_add_md
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 566d3d5..c718e9a 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -399,9 +399,11 @@
int r;
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu);
- r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
- initial_metadata, initial_metadata_flags,
- connected_subchannel, on_ready);
+ const grpc_lb_policy_pick_args inputs = {calld->pollent, initial_metadata,
+ initial_metadata_flags,
+ &calld->lb_token_mdelem};
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
+ on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
GPR_TIMER_END("cc_pick_subchannel", 0);
return r;
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 8b980b2..71170f5 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -100,13 +100,10 @@
}
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
- return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata,
- initial_metadata_flags, target, on_complete);
+ return policy->vtable->pick(exec_ctx, policy, pick_args, target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index a2f5446..6f133a2 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -53,23 +53,37 @@
grpc_pollset_set *interested_parties;
};
+/** Extra arguments for an LB pick */
+typedef struct grpc_lb_policy_pick_args {
+ /** Parties interested in the pick's progress */
+ grpc_polling_entity *pollent;
+ /** Initial metadata associated with the picking call. */
+ grpc_metadata_batch *initial_metadata;
+ /** See \a GRPC_INITIAL_METADATA_* in grpc_types.h */
+ uint32_t initial_metadata_flags;
+ /** Storage for LB token in \a initial_metadata, or NULL if not used */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+} grpc_lb_policy_pick_args;
+
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
-
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
- /** implement grpc_lb_policy_pick */
+ /** \see grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target, grpc_closure *on_complete);
+
+ /** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
+
+ /** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
+ /** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
@@ -83,8 +97,7 @@
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy. Calling with a NULL \a
- state cancels the subscription.
- */
+ state cancels the subscription. */
void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
grpc_connectivity_state *state,
@@ -124,26 +137,31 @@
void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
-/** Given initial metadata in \a initial_metadata, find an appropriate
- target for this rpc, and 'return' it by calling \a on_complete after setting
- \a target.
- Picking can be asynchronous. Any IO should be done under \a pollent. */
+/** Find an appropriate target for this call, based on \a pick_args.
+ Upon completion \a on_complete will be called, with \a *target set to an
+ appropriate connected subchannel if the pick was successful or NULL
+ otherwise.
+ Picking can be asynchronous. Any IO should be done under \a
+ pick_args->pollent. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete);
+/** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping)
+ against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_closure *closure);
+/** Cancel picks for \a target.
+ The \a on_complete callback of the pending picks will be invoked with \a
+ *target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
-/** Cancel all pending picks which have:
- (initial_metadata_flags & initial_metadata_flags_mask) ==
- initial_metadata_flags_eq */
+/** Cancel all pending picks for which their \a initial_metadata_flags (as given
+ in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
+ when AND'd with \a initial_metadata_flags_mask */
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index da1de35..cfe6f1a 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -47,8 +47,16 @@
const grpc_lb_policy_factory_vtable *vtable;
};
+typedef struct grpc_lb_policy_address_token {
+ uint8_t *token;
+ size_t token_size;
+} grpc_lb_policy_address_token;
+
typedef struct grpc_lb_policy_args {
grpc_resolved_addresses *addresses;
+ /* It not NULL, array of load balancing tokens associated with \a addresses,
+ * on a 1:1 correspondence. Some indices may be NULL for missing tokens. */
+ grpc_lb_policy_address_token *tokens;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
diff --git a/src/core/ext/client_config/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h
index 8d2deb0..40a0681 100644
--- a/src/core/ext/client_config/subchannel_call_holder.h
+++ b/src/core/ext/client_config/subchannel_call_holder.h
@@ -81,6 +81,8 @@
grpc_closure next_step;
grpc_call_stack *owning_call;
+
+ grpc_linked_mdelem lb_token_mdelem;
} grpc_subchannel_call_holder;
void grpc_subchannel_call_holder_init(
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index af913d8..c3294b7 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -76,9 +76,9 @@
* operations in progress over the old RR instance. This is done by
* decreasing the reference count on the old policy. The moment no more
* references are held on the old RR policy, it'll be destroyed and \a
- * rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state.
- * At this point we can transition to a new RR instance safely, which is done
- * once again via \a rr_handover().
+ * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
+ * state. At this point we can transition to a new RR instance safely, which
+ * is done once again via \a rr_handover().
*
*
* Once a RR policy instance is in place (and getting updated as described),
@@ -96,6 +96,8 @@
* - Implement LB service forwarding (point 2c. in the doc's diagram).
*/
+#include <errno.h>
+
#include <string.h>
#include <grpc/byte_buffer_reader.h>
@@ -109,6 +111,7 @@
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
@@ -164,6 +167,9 @@
/* the initial metadata for the pick. See grpc_lb_policy_pick() */
grpc_metadata_batch *initial_metadata;
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
/* bitmask passed to pick() and used for selective cancelling. See
* grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
@@ -180,19 +186,19 @@
wrapped_rr_closure_arg wrapped_on_complete_arg;
} pending_pick;
-static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+static void add_pending_pick(pending_pick **root,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pending_pick *pp = gpr_malloc(sizeof(*pp));
memset(pp, 0, sizeof(pending_pick));
memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
pp->next = *root;
- pp->pollent = pollent;
+ pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
+ pp->pollent = pick_args->pollent;
pp->target = target;
- pp->initial_metadata = initial_metadata;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata = pick_args->initial_metadata;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
&pp->wrapped_on_complete_arg);
@@ -279,58 +285,84 @@
glb_lb_policy *glb_policy;
};
+static bool process_serverlist(const grpc_grpclb_server *server,
+ struct sockaddr_storage *sa, size_t *sa_len) {
+ if (server->port >> 16 != 0) {
+ gpr_log(GPR_ERROR, "Invalid port '%d'.", server->port);
+ return false;
+ }
+ const uint16_t netorder_port = htons((uint16_t)server->port);
+ /* the addresses are given in binary format (a in(6)_addr struct) in
+ * server->ip_address.bytes. */
+ const grpc_grpclb_ip_address *ip = &server->ip_address;
+ *sa_len = 0;
+ if (ip->size == 4) {
+ struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
+ *sa_len = sizeof(struct sockaddr_in);
+ memset(addr4, 0, *sa_len);
+ addr4->sin_family = AF_INET;
+ memcpy(&addr4->sin_addr, ip->bytes, ip->size);
+ addr4->sin_port = netorder_port;
+ } else if (ip->size == 6) {
+ struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
+ *sa_len = sizeof(struct sockaddr_in6);
+ memset(addr6, 0, *sa_len);
+ addr6->sin6_family = AF_INET;
+ memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
+ addr6->sin6_port = netorder_port;
+ } else {
+ gpr_log(GPR_ERROR, "Expected IP to be 4 or 16 bytes. Got %d.", ip->size);
+ return false;
+ }
+ GPR_ASSERT(*sa_len > 0);
+ return true;
+}
+
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
const grpc_grpclb_serverlist *serverlist,
glb_lb_policy *glb_policy) {
- /* TODO(dgq): support mixed ip version */
GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
- char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
- for (size_t i = 0; i < serverlist->num_servers; ++i) {
- gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
- serverlist->servers[i]->port);
- }
-
- size_t uri_path_len;
- char *concat_ipports = gpr_strjoin_sep(
- (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
grpc_lb_policy_args args;
+ memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
+ args.tokens = gpr_malloc(sizeof(grpc_lb_policy_address_token) *
+ serverlist->num_servers);
args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
- args.addresses->naddrs = serverlist->num_servers;
args.addresses->addrs =
- gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
- size_t out_addrs_idx = 0;
+ gpr_malloc(sizeof(grpc_resolved_address) * serverlist->num_servers);
+ size_t addr_idx = 0;
for (size_t i = 0; i < serverlist->num_servers; ++i) {
- grpc_uri uri;
- struct sockaddr_storage sa;
- size_t sa_len;
- uri.path = host_ports[i];
- if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
- memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
- args.addresses->addrs[out_addrs_idx].len = sa_len;
- ++out_addrs_idx;
- } else {
- gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
- host_ports[i]);
+ const grpc_grpclb_server *server = serverlist->servers[i];
+ grpc_resolved_address *raddr = &args.addresses->addrs[addr_idx];
+ if (!process_serverlist(server, (struct sockaddr_storage *)raddr->addr,
+ &raddr->len)) {
+ gpr_log(GPR_INFO,
+ "Problem processing server at index %zu of received serverlist, "
+ "ignoring.",
+ i);
+ continue;
}
+ ++addr_idx;
+ args.tokens[i].token_size = GPR_ARRAY_SIZE(server->load_balance_token) - 1;
+ args.tokens[i].token = gpr_malloc(args.tokens[i].token_size);
+ memcpy(args.tokens[i].token, server->load_balance_token,
+ args.tokens[i].token_size);
}
+ args.addresses->naddrs = addr_idx;
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
- gpr_free(concat_ipports);
- for (size_t i = 0; i < serverlist->num_servers; i++) {
- gpr_free(host_ports[i]);
- }
- gpr_free(host_ports);
gpr_free(args.addresses->addrs);
gpr_free(args.addresses);
+ gpr_free(args.tokens);
return rr;
}
static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
grpc_error *error) {
- GRPC_ERROR_REF(error);
+ GPR_ASSERT(glb_policy->serverlist != NULL &&
+ glb_policy->serverlist->num_servers > 0);
glb_policy->rr_policy =
create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
@@ -345,8 +377,8 @@
exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
&glb_policy->rr_connectivity->on_change);
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- glb_policy->rr_connectivity->state, error,
- "rr_handover");
+ glb_policy->rr_connectivity->state,
+ GRPC_ERROR_REF(error), "rr_handover");
grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
/* flush pending ops */
@@ -359,9 +391,11 @@
gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
(intptr_t)glb_policy->rr_policy);
}
- grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
- pp->initial_metadata, pp->initial_metadata_flags,
- pp->target, &pp->wrapped_on_complete);
+ const grpc_lb_policy_pick_args pick_args = {
+ pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
+ pp->lb_token_mdelem_storage};
+ grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
+ &pp->wrapped_on_complete);
pp->wrapped_on_complete_arg.owning_pending_node = pp;
}
@@ -378,13 +412,13 @@
&pping->wrapped_notify);
pping->wrapped_notify_arg.owning_pending_node = pping;
}
- GRPC_ERROR_UNREF(error);
}
-static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
rr_connectivity_data *rr_conn_data = arg;
glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
+
if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
if (glb_policy->serverlist != NULL) {
/* a RR policy is shutting down but there's a serverlist available ->
@@ -398,8 +432,8 @@
if (error == GRPC_ERROR_NONE) {
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- rr_conn_data->state, error,
- "rr_connectivity_changed");
+ rr_conn_data->state, GRPC_ERROR_REF(error),
+ "glb_rr_connectivity_changed");
/* resubscribe */
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
@@ -408,7 +442,6 @@
gpr_free(rr_conn_data);
}
}
- GRPC_ERROR_UNREF(error);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@@ -463,7 +496,7 @@
rr_connectivity_data *rr_connectivity =
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
- grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
+ grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
rr_connectivity);
rr_connectivity->glb_policy = glb_policy;
glb_policy->rr_connectivity = rr_connectivity;
@@ -546,7 +579,6 @@
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
- gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -576,7 +608,6 @@
exec_ctx, pp->pollent, glb_policy->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
- gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -603,12 +634,22 @@
}
static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
+
+ if (pick_args->lb_token_mdelem_storage == NULL) {
+ /* TODO(dgq): should this be an assert? If storage is NULL, something has
+ * gone very wrong at the client channel filter */
+ gpr_log(GPR_ERROR,
+ "No mdelem storage for the LB token. Load reporting won't work "
+ "without it. Failing");
+ *target = NULL;
+ grpc_exec_ctx_sched(exec_ctx, on_complete, GRPC_ERROR_NONE, NULL);
+ return 1;
+ }
+
gpr_mu_lock(&glb_policy->mu);
int r;
@@ -623,8 +664,8 @@
glb_policy->wc_arg.wrapped_closure = on_complete;
grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
&glb_policy->wc_arg);
- r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
- initial_metadata, initial_metadata_flags, target,
+
+ r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
&glb_policy->wrapped_on_complete);
if (r != 0) {
/* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
@@ -639,10 +680,10 @@
GRPC_ERROR_NONE, NULL);
}
} else {
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
glb_policy->base.interested_parties);
- add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
- initial_metadata_flags, target, on_complete);
+ add_pending_pick(&glb_policy->pending_picks, pick_args, target,
+ on_complete);
if (!glb_policy->started_picking) {
start_picking(exec_ctx, glb_policy);
@@ -702,9 +743,6 @@
/* called once initial metadata's been sent */
grpc_closure md_sent;
- /* called once initial metadata's been received */
- grpc_closure md_rcvd;
-
/* called once the LoadBalanceRequest has been sent to the LB server. See
* src/proto/grpc/.../load_balancer.proto */
grpc_closure req_sent;
@@ -741,7 +779,6 @@
} lb_client_data;
static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
@@ -756,7 +793,6 @@
gpr_mu_init(&lb_client->mu);
grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
- grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client);
grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
@@ -855,23 +891,6 @@
grpc_op ops[1];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
- op->op = GRPC_OP_RECV_INITIAL_METADATA;
- op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
- op->flags = 0;
- op->reserved = NULL;
- op++;
- grpc_call_error call_error = grpc_call_start_batch_and_execute(
- exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
- &lb_client->md_rcvd);
- GPR_ASSERT(GRPC_CALL_OK == call_error);
-}
-
-static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- lb_client_data *lb_client = arg;
- GPR_ASSERT(lb_client->lb_call);
- grpc_op ops[1];
- memset(ops, 0, sizeof(ops));
- grpc_op *op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = lb_client->request_payload;
@@ -886,11 +905,18 @@
static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
lb_client_data *lb_client = arg;
+ GPR_ASSERT(lb_client->lb_call);
- grpc_op ops[1];
+ grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &lb_client->response_payload;
op->flags = 0;
@@ -909,8 +935,7 @@
grpc_op *op = ops;
if (lb_client->response_payload != NULL) {
/* Received data from the LB server. Look inside
- * lb_client->response_payload, for
- * a serverlist. */
+ * lb_client->response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
@@ -947,7 +972,7 @@
} else {
/* unref the RR policy, eventually leading to its substitution with a
* new one constructed from the received serverlist (see
- * rr_connectivity_changed) */
+ * glb_rr_connectivity_changed) */
GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
"serverlist_received");
}
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
index f4720a1..a888100 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
@@ -57,6 +57,7 @@
if (dec_arg->first_pass) { /* count how many server do we have */
grpc_grpclb_server server;
if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
dec_arg->num_servers++;
@@ -69,6 +70,7 @@
gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
}
if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream));
return false;
}
dec_arg->servers[dec_arg->decoding_idx++] = server;
@@ -118,6 +120,7 @@
grpc_grpclb_response res;
memset(&res, 0, sizeof(grpc_grpclb_response));
if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
grpc_grpclb_initial_response *initial_res =
@@ -145,6 +148,7 @@
arg.first_pass = true;
status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
@@ -152,6 +156,7 @@
status =
pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res);
if (!status) {
+ gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream));
return NULL;
}
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
index 9726c87..c1e73d0 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
@@ -45,6 +45,7 @@
#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
+typedef grpc_lb_v1_Server_ip_address_t grpc_grpclb_ip_address;
typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response;
typedef grpc_lb_v1_Server grpc_grpclb_server;
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index 52e11c4..2676714 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -31,10 +31,11 @@
*
*/
/* Automatically generated nanopb constant definitions */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev */
#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@@ -72,7 +73,7 @@
};
const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = {
- PB_FIELD( 2, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
+ PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields),
PB_LAST_FIELD
};
@@ -84,7 +85,7 @@
};
const pb_field_t grpc_lb_v1_Server_fields[5] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
+ PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0),
@@ -116,3 +117,4 @@
#endif
+/* @@protoc_insertion_point(eof) */
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index 46fe588..832c851 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -31,11 +31,12 @@
*
*/
/* Automatically generated nanopb header */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev */
-#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
-#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
+#ifndef PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
+#define PB_GRPC_LB_V1_LOAD_BALANCER_PB_H_INCLUDED
#include "third_party/nanopb/pb.h"
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@@ -52,6 +53,7 @@
int64_t client_rpc_errors;
bool has_dropped_requests;
int64_t dropped_requests;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
} grpc_lb_v1_ClientStats;
typedef struct _grpc_lb_v1_Duration {
@@ -59,22 +61,26 @@
int64_t seconds;
bool has_nanos;
int32_t nanos;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Duration) */
} grpc_lb_v1_Duration;
typedef struct _grpc_lb_v1_InitialLoadBalanceRequest {
bool has_name;
char name[128];
+/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceRequest) */
} grpc_lb_v1_InitialLoadBalanceRequest;
+typedef PB_BYTES_ARRAY_T(16) grpc_lb_v1_Server_ip_address_t;
typedef struct _grpc_lb_v1_Server {
bool has_ip_address;
- char ip_address[46];
+ grpc_lb_v1_Server_ip_address_t ip_address;
bool has_port;
int32_t port;
bool has_load_balance_token;
- char load_balance_token[64];
+ char load_balance_token[65];
bool has_drop_request;
bool drop_request;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
@@ -82,6 +88,7 @@
char load_balancer_delegate[64];
bool has_client_stats_report_interval;
grpc_lb_v1_Duration client_stats_report_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_InitialLoadBalanceResponse) */
} grpc_lb_v1_InitialLoadBalanceResponse;
typedef struct _grpc_lb_v1_LoadBalanceRequest {
@@ -89,12 +96,14 @@
grpc_lb_v1_InitialLoadBalanceRequest initial_request;
bool has_client_stats;
grpc_lb_v1_ClientStats client_stats;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceRequest) */
} grpc_lb_v1_LoadBalanceRequest;
typedef struct _grpc_lb_v1_ServerList {
pb_callback_t servers;
bool has_expiration_interval;
grpc_lb_v1_Duration expiration_interval;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ServerList) */
} grpc_lb_v1_ServerList;
typedef struct _grpc_lb_v1_LoadBalanceResponse {
@@ -102,6 +111,7 @@
grpc_lb_v1_InitialLoadBalanceResponse initial_response;
bool has_server_list;
grpc_lb_v1_ServerList server_list;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_LoadBalanceResponse) */
} grpc_lb_v1_LoadBalanceResponse;
/* Default values for struct fields */
@@ -114,7 +124,7 @@
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
@@ -122,7 +132,7 @@
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
#define grpc_lb_v1_ClientStats_total_requests_tag 1
@@ -135,7 +145,7 @@
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
#define grpc_lb_v1_Server_drop_request_tag 4
-#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 2
+#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 3
#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
@@ -161,7 +171,8 @@
#define grpc_lb_v1_ClientStats_size 33
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
-#define grpc_lb_v1_Server_size 127
+/* grpc_lb_v1_ServerList_size depends on runtime parameters */
+#define grpc_lb_v1_Server_size 98
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
@@ -174,5 +185,6 @@
#ifdef __cplusplus
} /* extern "C" */
#endif
+/* @@protoc_insertion_point(eof) */
#endif
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 9decf70..e1277b3 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -199,9 +199,7 @@
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -225,13 +223,13 @@
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 7bcf608..8fda405 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -66,6 +66,7 @@
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/static_metadata.h"
typedef struct round_robin_lb_policy round_robin_lb_policy;
@@ -76,15 +77,33 @@
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
+
+ /* polling entity for the pick()'s async notification */
grpc_polling_entity *pollent;
+
+ /* the initial metadata for the pick. See grpc_lb_policy_pick() */
+ grpc_metadata_batch *initial_metadata;
+
+ /* storage for the lb token initial metadata mdelem */
+ grpc_linked_mdelem *lb_token_mdelem_storage;
+
+ /* bitmask passed to pick() and used for selective cancelling. See
+ * grpc_lb_policy_cancel_picks() */
uint32_t initial_metadata_flags;
+
+ /* output argument where to store the pick()ed connected subchannel, or NULL
+ * upon error. */
grpc_connected_subchannel **target;
+
+ /* to be invoked once the pick() has completed (regardless of success) */
grpc_closure *on_complete;
} pending_pick;
/** List of subchannels in a connectivity READY state */
typedef struct ready_list {
grpc_subchannel *subchannel;
+ /* references namesake entry in subchannel_data */
+ grpc_lb_policy_address_token *lb_token;
struct ready_list *next;
struct ready_list *prev;
} ready_list;
@@ -102,12 +121,19 @@
ready_list *ready_list_node;
/** last observed connectivity */
grpc_connectivity_state connectivity_state;
+ /** the subchannel's target LB token */
+ grpc_lb_policy_address_token *lb_token;
} subchannel_data;
struct round_robin_lb_policy {
/** base policy: must be first */
grpc_lb_policy base;
+ /** total number of addresses received at creation time */
+ size_t num_addresses;
+ /** load balancing tokens, one per incoming address */
+ grpc_lb_policy_address_token *lb_tokens;
+
/** all our subchannels */
size_t num_subchannels;
subchannel_data **subchannels;
@@ -166,16 +192,19 @@
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)",
- p->ready_list_last_pick, p->ready_list_last_pick->subchannel);
+ (void *)p->ready_list_last_pick,
+ (void *)p->ready_list_last_pick->subchannel);
}
}
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
* csc to the list of ready subchannels. */
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- grpc_subchannel *sc) {
+ subchannel_data *sd) {
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
- new_elem->subchannel = sc;
+ memset(new_elem, 0, sizeof(ready_list));
+ new_elem->subchannel = sd->subchannel;
+ new_elem->lb_token = sd->lb_token;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@@ -189,7 +218,8 @@
p->ready_list.prev = new_elem;
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
+ gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (Conn. SC %p)",
+ (void *)new_elem, (void *)sd->subchannel);
}
return new_elem;
}
@@ -217,7 +247,7 @@
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
- node->subchannel);
+ (void *)node->subchannel);
}
node->next = NULL;
@@ -251,6 +281,13 @@
gpr_free(elem);
elem = tmp;
}
+
+ if (p->lb_tokens != NULL) {
+ for (i = 0; i < p->num_addresses; i++) {
+ gpr_free(p->lb_tokens[i].token);
+ }
+ gpr_free(p->lb_tokens);
+ }
gpr_free(p);
}
@@ -337,7 +374,7 @@
p->started_picking = 1;
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p,
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, (void *)p,
p->num_subchannels);
}
@@ -360,10 +397,25 @@
gpr_mu_unlock(&p->mu);
}
+/* add lb_token of selected subchannel (address) to the call's initial
+ * metadata */
+static void initial_metadata_add_lb_token(
+ grpc_metadata_batch *initial_metadata,
+ grpc_linked_mdelem *lb_token_mdelem_storage,
+ grpc_lb_policy_address_token *lb_token) {
+ if (lb_token != NULL && lb_token->token_size > 0) {
+ GPR_ASSERT(lb_token->token != NULL);
+ grpc_mdstr *lb_token_mdstr =
+ grpc_mdstr_from_buffer(lb_token->token, lb_token->token_size);
+ grpc_metadata_batch_add_tail(
+ initial_metadata, lb_token_mdelem_storage,
+ grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LOAD_REPORTING_INITIAL,
+ lb_token_mdstr));
+ }
+}
+
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_polling_entity *pollent,
- grpc_metadata_batch *initial_metadata,
- uint32_t initial_metadata_flags,
+ const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -371,28 +423,35 @@
ready_list *selected;
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
+ /* readily available, report right away */
gpr_mu_unlock(&p->mu);
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
+ initial_metadata_add_lb_token(pick_args->initial_metadata,
+ pick_args->lb_token_mdelem_storage,
+ selected->lb_token);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target,
- selected);
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
+ (void *)*target, (void *)selected);
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
return 1;
} else {
+ /* no pick currently available. Save for later in list of pending picks */
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollent = pollent;
+ pp->pollent = pick_args->pollent;
pp->target = target;
pp->on_complete = on_complete;
- pp->initial_metadata_flags = initial_metadata_flags;
+ pp->initial_metadata = pick_args->initial_metadata;
+ pp->initial_metadata_flags = pick_args->initial_metadata_flags;
+ pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
return 0;
@@ -421,7 +480,7 @@
"connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
- sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
+ sd->ready_list_node = add_connected_sc_locked(p, sd);
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
@@ -433,12 +492,16 @@
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
+
+ initial_metadata_add_lb_token(pp->initial_metadata,
+ pp->lb_token_mdelem_storage,
+ selected->lb_token);
*pp->target =
grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
- selected->subchannel, selected);
+ (void *)selected->subchannel, (void *)selected);
}
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
@@ -574,13 +637,21 @@
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels =
- gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ p->num_addresses = args->addresses->naddrs;
+ if (args->tokens != NULL) {
+ /* we need to copy because args contents aren't owned */
+ p->lb_tokens =
+ gpr_malloc(sizeof(grpc_lb_policy_address_token) * p->num_addresses);
+ memcpy(p->lb_tokens, args->tokens,
+ sizeof(grpc_lb_policy_address_token) * p->num_addresses);
+ }
+
+ p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ for (size_t i = 0; i < p->num_addresses; i++) {
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
@@ -595,12 +666,16 @@
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
+ if (p->lb_tokens != NULL) {
+ sd->lb_token = &p->lb_tokens[i];
+ }
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
}
}
if (subchannel_idx == 0) {
+ /* couldn't create any subchannel. Bail out */
gpr_free(p->subchannels);
gpr_free(p);
return NULL;
diff --git a/src/proto/grpc/lb/v1/load_balancer.options b/src/proto/grpc/lb/v1/load_balancer.options
index d903669..a9398d5 100644
--- a/src/proto/grpc/lb/v1/load_balancer.options
+++ b/src/proto/grpc/lb/v1/load_balancer.options
@@ -1,6 +1,6 @@
grpc.lb.v1.InitialLoadBalanceRequest.name max_size:128
grpc.lb.v1.InitialLoadBalanceResponse.client_config max_size:64
grpc.lb.v1.InitialLoadBalanceResponse.load_balancer_delegate max_size:64
-grpc.lb.v1.Server.ip_address max_size:46
-grpc.lb.v1.Server.load_balance_token max_size:64
+grpc.lb.v1.Server.ip_address max_size:16
+grpc.lb.v1.Server.load_balance_token max_size:65
load_balancer.proto no_unions:true
diff --git a/src/proto/grpc/lb/v1/load_balancer.proto b/src/proto/grpc/lb/v1/load_balancer.proto
index 1bcad0b..b4a33f3 100644
--- a/src/proto/grpc/lb/v1/load_balancer.proto
+++ b/src/proto/grpc/lb/v1/load_balancer.proto
@@ -32,7 +32,6 @@
package grpc.lb.v1;
message Duration {
-
// Signed seconds of the span of time. Must be from -315,576,000,000
// to +315,576,000,000 inclusive.
int64 seconds = 1;
@@ -93,16 +92,11 @@
}
message InitialLoadBalanceResponse {
- oneof initial_response_type {
- // TODO(zhangkun83): ClientConfig not yet defined
- //ClientConfig client_config = 1;
-
- // This is an application layer redirect that indicates the client should
- // use the specified server for load balancing. When this field is set in
- // the response, the client should open a separate connection to the
- // load_balancer_delegate and call the BalanceLoad method.
- string load_balancer_delegate = 2;
- }
+ // This is an application layer redirect that indicates the client should use
+ // the specified server for load balancing. When this field is non-empty in
+ // the response, the client should open a separate connection to the
+ // load_balancer_delegate and call the BalanceLoad method.
+ string load_balancer_delegate = 1;
// This interval defines how often the client should send the client stats
// to the load balancer. Stats should only be reported when the duration is
@@ -125,14 +119,17 @@
}
message Server {
- // A resolved address and port for the server. The IP address string may
+ // A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address.
- string ip_address = 1;
+ bytes ip_address = 1;
+
+ // A resolved port number for the server.
int32 port = 2;
- // An opaque token that is passed from the client to the server in metadata.
- // The server may expect this token to indicate that the request from the
- // client was load balanced.
+ // An opaque but printable token given to the frontend for each pick. All
+ // frontend requests for that pick must include the token in its initial
+ // metadata. The token is used by the backend to verify the request and to
+ // allow the backend to report load to the gRPC LB system.
string load_balance_token = 3;
// Indicates whether this particular request should be dropped by the client
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index 33de1ee..e67189c 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -31,10 +31,12 @@
*
*/
+#include <grpc++/impl/codegen/config.h>
#include <gtest/gtest.h>
-#include <string>
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/proto/grpc/lb/v1/load_balancer.pb.h" // C++ version
namespace grpc {
@@ -45,8 +47,28 @@
class GrpclbTest : public ::testing::Test {};
+grpc::string Ip4ToPackedString(const char* ip_str) {
+ struct in_addr ip4;
+ GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1);
+ return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
+}
+
+grpc::string PackedStringToIp(const grpc_grpclb_ip_address& pb_ip) {
+ char ip_str[46] = {0};
+ int af = -1;
+ if (pb_ip.size == 4) {
+ af = AF_INET;
+ } else if (pb_ip.size == 16) {
+ af = AF_INET6;
+ } else {
+ abort();
+ }
+ GPR_ASSERT(inet_ntop(af, pb_ip.bytes, ip_str, 46) != NULL);
+ return ip_str;
+}
+
TEST_F(GrpclbTest, CreateRequest) {
- const std::string service_name = "AServiceName";
+ const grpc::string service_name = "AServiceName";
LoadBalanceRequest request;
grpc_grpclb_request* c_req = grpc_grpclb_request_create(service_name.c_str());
gpr_slice slice = grpc_grpclb_request_encode(c_req);
@@ -65,7 +87,7 @@
initial_response->mutable_client_stats_report_interval();
client_stats_report_interval->set_seconds(123);
client_stats_report_interval->set_nanos(456);
- const std::string encoded_response = response.SerializeAsString();
+ const grpc::string encoded_response = response.SerializeAsString();
gpr_slice encoded_slice =
gpr_slice_from_copied_string(encoded_response.c_str());
@@ -82,29 +104,31 @@
LoadBalanceResponse response;
auto* serverlist = response.mutable_server_list();
auto* server = serverlist->add_servers();
- server->set_ip_address("127.0.0.1");
+ server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
server->set_port(12345);
server->set_drop_request(true);
server = response.mutable_server_list()->add_servers();
- server->set_ip_address("10.0.0.1");
+ server->set_ip_address(Ip4ToPackedString("10.0.0.1"));
server->set_port(54321);
server->set_drop_request(false);
auto* expiration_interval = serverlist->mutable_expiration_interval();
expiration_interval->set_seconds(888);
expiration_interval->set_nanos(999);
- const std::string encoded_response = response.SerializeAsString();
- gpr_slice encoded_slice =
- gpr_slice_from_copied_string(encoded_response.c_str());
+ const grpc::string encoded_response = response.SerializeAsString();
+ const gpr_slice encoded_slice = gpr_slice_from_copied_buffer(
+ encoded_response.data(), encoded_response.size());
grpc_grpclb_serverlist* c_serverlist =
grpc_grpclb_response_parse_serverlist(encoded_slice);
ASSERT_EQ(c_serverlist->num_servers, 2ul);
EXPECT_TRUE(c_serverlist->servers[0]->has_ip_address);
- EXPECT_TRUE(strcmp(c_serverlist->servers[0]->ip_address, "127.0.0.1") == 0);
+ EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address),
+ "127.0.0.1");
EXPECT_EQ(c_serverlist->servers[0]->port, 12345);
EXPECT_TRUE(c_serverlist->servers[0]->drop_request);
EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address);
- EXPECT_TRUE(strcmp(c_serverlist->servers[1]->ip_address, "10.0.0.1") == 0);
+
+ EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1");
EXPECT_EQ(c_serverlist->servers[1]->port, 54321);
EXPECT_FALSE(c_serverlist->servers[1]->drop_request);
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index b2fdce2..29b3bfe 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -37,8 +37,8 @@
#include <cstring>
#include <string>
-extern "C" {
#include <grpc/grpc.h>
+#include <grpc/impl/codegen/byte_buffer_reader.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
@@ -47,8 +47,12 @@
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
+#include <grpc++/impl/codegen/config.h>
+
+extern "C" {
#include "src/core/ext/client_config/client_channel.h"
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/support/tmpfile.h"
#include "src/core/lib/surface/channel.h"
@@ -61,6 +65,7 @@
#include "src/proto/grpc/lb/v1/load_balancer.pb.h"
#define NUM_BACKENDS 4
+#define PAYLOAD "hello you"
// TODO(dgq): Other scenarios in need of testing:
// - Send an empty serverlist update and verify that the client request blocks
@@ -105,8 +110,8 @@
int64_t expiration_interval_secs, int32_t expiration_interval_nanos) {
// server_list {
// servers {
- // ip_address: "127.0.0.1"
- // port: ...
+ // ip_address: <in_addr/6 bytes of an IP>
+ // port: <16 bit uint>
// load_balance_token: "token..."
// }
// ...
@@ -125,21 +130,21 @@
}
for (size_t i = 0; i < nports; i++) {
auto *server = serverlist->add_servers();
- server->set_ip_address(host);
+ // TODO(dgq): test ipv6
+ struct in_addr ip4;
+ GPR_ASSERT(inet_pton(AF_INET, host, &ip4) == 1);
+ server->set_ip_address(
+ grpc::string(reinterpret_cast<const char *>(&ip4), sizeof(ip4)));
server->set_port(ports[i]);
// The following long long int cast is meant to work around the
// disfunctional implementation of std::to_string in gcc 4.4, which doesn't
// have a version for int but does have one for long long int.
- server->set_load_balance_token("token" +
- std::to_string((long long int)ports[i]));
+ string token_data = "token" + std::to_string((long long int)ports[i]);
+ token_data.resize(64, '-');
+ server->set_load_balance_token(token_data);
}
-
- gpr_log(GPR_INFO, "generating response: %s",
- response.ShortDebugString().c_str());
-
- const gpr_slice response_slice =
- gpr_slice_from_copied_string(response.SerializeAsString().c_str());
- return response_slice;
+ const grpc::string &enc_resp = response.SerializeAsString();
+ return gpr_slice_from_copied_buffer(enc_resp.data(), enc_resp.size());
}
static void drain_cq(grpc_completion_queue *cq) {
@@ -181,6 +186,37 @@
cq_verify(cqv);
gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport);
+ // make sure we've received the initial metadata from the grpclb request.
+ GPR_ASSERT(request_metadata_recv.count > 0);
+ GPR_ASSERT(request_metadata_recv.metadata != NULL);
+
+ // receive request for backends
+ op = ops;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &request_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ cq_expect_completion(cqv, tag(202), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport);
+
+ // validate initial request.
+ grpc_byte_buffer_reader bbr;
+ grpc_byte_buffer_reader_init(&bbr, request_payload_recv);
+ gpr_slice request_payload_slice = grpc_byte_buffer_reader_readall(&bbr);
+ grpc::lb::v1::LoadBalanceRequest request;
+ request.ParseFromArray(GPR_SLICE_START_PTR(request_payload_slice),
+ GPR_SLICE_LENGTH(request_payload_slice));
+ GPR_ASSERT(request.has_initial_request());
+ GPR_ASSERT(request.initial_request().name() == "load.balanced.service.name");
+ gpr_slice_unref(request_payload_slice);
+ grpc_byte_buffer_reader_destroy(&bbr);
+ grpc_byte_buffer_destroy(request_payload_recv);
+
+ gpr_slice response_payload_slice;
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
@@ -196,21 +232,6 @@
GPR_ASSERT(GRPC_CALL_OK == error);
gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport);
- // receive request for backends
- op = ops;
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message = &request_payload_recv;
- op->flags = 0;
- op->reserved = NULL;
- op++;
- error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL);
- GPR_ASSERT(GRPC_CALL_OK == error);
- cq_expect_completion(cqv, tag(202), 1);
- cq_verify(cqv);
- gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport);
- // TODO(dgq): validate request.
- grpc_byte_buffer_destroy(request_payload_recv);
- gpr_slice response_payload_slice;
for (int i = 0; i < 2; i++) {
if (i == 0) {
// First half of the ports.
@@ -303,6 +324,16 @@
return;
}
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
+
+ // The following long long int cast is meant to work around the
+ // disfunctional implementation of std::to_string in gcc 4.4, which doesn't
+ // have a version for int but does have one for long long int.
+ string expected_token = "token" + std::to_string((long long int)sf->port);
+ expected_token.resize(64, '-');
+ GPR_ASSERT(contains_metadata(&request_metadata_recv,
+ "load-reporting-initial",
+ expected_token.c_str()));
+
gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport);
op = ops;
@@ -321,8 +352,7 @@
gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport);
bool exit = false;
- gpr_slice response_payload_slice =
- gpr_slice_from_copied_string("hello you");
+ gpr_slice response_payload_slice = gpr_slice_from_copied_string(PAYLOAD);
while (!exit) {
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
@@ -474,10 +504,9 @@
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
- peer = grpc_call_get_peer(c);
cq_expect_completion(cqv, tag(2), 1);
cq_verify(cqv);
- gpr_free(peer);
+ GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD));
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload_recv);
diff --git a/third_party/nanopb b/third_party/nanopb
index f8ac463..68a86e9 160000
--- a/third_party/nanopb
+++ b/third_party/nanopb
@@ -1 +1 @@
-Subproject commit f8ac463766281625ad710900479130c7fcb4d63b
+Subproject commit 68a86e96481e6bea987df8de47027847b30c325b
diff --git a/tools/codegen/core/gen_nano_proto.sh b/tools/codegen/core/gen_nano_proto.sh
index c880fc2..df107c2 100755
--- a/tools/codegen/core/gen_nano_proto.sh
+++ b/tools/codegen/core/gen_nano_proto.sh
@@ -123,7 +123,7 @@
# this should be the same version as the submodule we compile against
# ideally we'd update this as a template to ensure that
-pip install protobuf==3.0.0b2
+pip install protobuf==3.0.0
pushd "$(dirname $INPUT_PROTO)" > /dev/null
diff --git a/tools/run_tests/sanity/check_submodules.sh b/tools/run_tests/sanity/check_submodules.sh
index d2ab5b0..4dac74b 100755
--- a/tools/run_tests/sanity/check_submodules.sh
+++ b/tools/run_tests/sanity/check_submodules.sh
@@ -44,7 +44,7 @@
c880e42ba1c8032d4cdde2aba0541d8a9d9fa2e9 third_party/boringssl (version_for_cocoapods_2.0-100-gc880e42)
05b155ff59114735ec8cd089f669c4c3d8f59029 third_party/gflags (v2.1.0-45-g05b155f)
c99458533a9b4c743ed51537e25989ea55944908 third_party/googletest (release-1.7.0)
- f8ac463766281625ad710900479130c7fcb4d63b third_party/nanopb (nanopb-0.3.4-29-gf8ac463)
+ 68a86e96481e6bea987df8de47027847b30c325b third_party/nanopb (nanopb-0.3.6-6-g68a86e9)
e8ae137c96444ea313485ed1118c5e43b2099cf1 third_party/protobuf (v3.0.0-beta-4-74-ge8ae137)
50893291621658f355bc5b4d450a8d06a563053d third_party/zlib (v1.2.8)
bcad91771b7f0bff28a1cac1981d7ef2b9bcef3c third_party/thrift