blob: 7f3c4613c66a79be898956894a1ad41cf7c2b4a3 [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070034/** Implementation of the gRPC LB policy.
35 *
David Garcia Quintas43339842016-07-18 12:56:09 -070036 * This policy takes as input a set of resolved addresses {a1..an} for which the
37 * LB set was set (it's the resolver's responsibility to ensure this). That is
38 * to say, {a1..an} represent a collection of LB servers.
39 *
40 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
41 * This channel behaves just like a regular channel. In particular, the
42 * constructed URI over the addresses a1..an will use the default pick first
43 * policy to select from this list of LB server backends.
44 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070045 * The first time the policy gets a request for a pick, a ping, or to exit the
46 * idle state, \a query_for_backends() is called. It creates an instance of \a
David Garcia Quintas43339842016-07-18 12:56:09 -070047 * lb_client_data, an internal struct meant to contain the data associated with
48 * the internal communication with the LB server. This instance is created via
49 * \a lb_client_data_create(). There, the call over lb_channel to pick-first
50 * from {a1..an} is created, the \a LoadBalancingRequest message is assembled
51 * and all necessary callbacks for the progress of the internal call configured.
52 *
53 * Back in \a query_for_backends(), the internal *streaming* call to the LB
54 * server (whichever address from {a1..an} pick-first chose) is kicked off.
55 * It'll progress over the callbacks configured in \a lb_client_data_create()
56 * (see the field docstrings of \a lb_client_data for more details).
57 *
58 * If the call fails with UNIMPLEMENTED, the original call will also fail.
59 * There's a misconfiguration somewhere: at least one of {a1..an} isn't a LB
60 * server, which contradicts the LB bit being set. If the internal call times
61 * out, the usual behavior of pick-first applies, continuing to pick from the
62 * list {a1..an}.
63 *
David Garcia Quintas65318262016-07-29 13:43:38 -070064 * Upon sucesss, a \a LoadBalancingResponse is expected in \a res_recv_cb. An
David Garcia Quintas43339842016-07-18 12:56:09 -070065 * invalid one results in the termination of the streaming call. A new streaming
66 * call should be created if possible, failing the original call otherwise.
67 * For a valid \a LoadBalancingResponse, the server list of actual backends is
68 * extracted. A Round Robin policy will be created from this list. There are two
69 * possible scenarios:
70 *
71 * 1. This is the first server list received. There was no previous instance of
72 * the Round Robin policy. \a rr_handover() will instantiate the RR policy
73 * and perform all the pending operations over it.
74 * 2. There's already a RR policy instance active. We need to introduce the new
75 * one build from the new serverlist, but taking care not to disrupt the
76 * operations in progress over the old RR instance. This is done by
77 * decreasing the reference count on the old policy. The moment no more
78 * references are held on the old RR policy, it'll be destroyed and \a
David Garcia Quintas348cfdb2016-08-19 12:19:43 -070079 * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
80 * state. At this point we can transition to a new RR instance safely, which
81 * is done once again via \a rr_handover().
David Garcia Quintas43339842016-07-18 12:56:09 -070082 *
83 *
84 * Once a RR policy instance is in place (and getting updated as described),
85 * calls to for a pick, a ping or a cancellation will be serviced right away by
86 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintasd4a756b2016-07-19 11:35:15 -070087 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist
David Garcia Quintas43339842016-07-18 12:56:09 -070088 * is received, etc), pick/ping requests are added to a list of pending
89 * picks/pings to be flushed and serviced as part of \a rr_handover() the moment
90 * the RR policy instance becomes available.
91 *
92 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
93 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070094
95/* TODO(dgq):
96 * - Implement LB service forwarding (point 2c. in the doc's diagram).
97 */
98
murgatroid997871f732016-09-23 13:49:05 -070099#include "src/core/lib/iomgr/sockaddr.h"
100
David Garcia Quintas8a81aa12016-08-22 15:06:49 -0700101#include <errno.h>
murgatroid999030c812016-09-16 13:25:08 -0700102
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -0700103#include <string.h>
104
105#include <grpc/byte_buffer_reader.h>
106#include <grpc/grpc.h>
107#include <grpc/support/alloc.h>
108#include <grpc/support/host_port.h>
109#include <grpc/support/string_util.h>
110
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700111#include "src/core/ext/client_config/client_channel_factory.h"
112#include "src/core/ext/client_config/lb_policy_registry.h"
113#include "src/core/ext/client_config/parse_address.h"
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700114#include "src/core/ext/lb_policy/grpclb/grpclb.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700115#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
116#include "src/core/lib/iomgr/sockaddr_utils.h"
117#include "src/core/lib/support/string.h"
118#include "src/core/lib/surface/call.h"
119#include "src/core/lib/surface/channel.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700120#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700121
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700122int grpc_lb_glb_trace = 0;
123
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700124static void lb_addrs_destroy(grpc_lb_address *lb_addresses,
125 size_t num_addresses) {
126 /* free "resolved" addresses memblock */
127 gpr_free(lb_addresses->resolved_address);
128 for (size_t i = 0; i < num_addresses; ++i) {
129 if (lb_addresses[i].user_data != NULL) {
130 GRPC_MDELEM_UNREF(lb_addresses[i].user_data);
131 }
132 }
133 gpr_free(lb_addresses);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700134}
135
136/* add lb_token of selected subchannel (address) to the call's initial
137 * metadata */
138static void initial_metadata_add_lb_token(
139 grpc_metadata_batch *initial_metadata,
140 grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
141 GPR_ASSERT(lb_token_mdelem_storage != NULL);
142 GPR_ASSERT(lb_token != NULL);
143 grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
144 lb_token);
145}
146
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700147typedef struct wrapped_rr_closure_arg {
David Garcia Quintas43339842016-07-18 12:56:09 -0700148 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
149 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700150 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700151
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700152 /* the pick's initial metadata, kept in order to append the LB token for the
153 * pick */
154 grpc_metadata_batch *initial_metadata;
155
156 /* the picked target, used to determine which LB token to add to the pick's
157 * initial metadata */
158 grpc_connected_subchannel **target;
159
160 /* the LB token associated with the pick */
161 grpc_mdelem *lb_token;
162
163 /* storage for the lb token initial metadata mdelem */
164 grpc_linked_mdelem *lb_token_mdelem_storage;
165
David Garcia Quintas43339842016-07-18 12:56:09 -0700166 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700167 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700168
169 /* when not NULL, represents a pending_{pick,ping} node to be freed upon
170 * closure execution */
171 void *owning_pending_node; /* to be freed if not NULL */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700172} wrapped_rr_closure_arg;
173
174/* The \a on_complete closure passed as part of the pick requires keeping a
175 * reference to its associated round robin instance. We wrap this closure in
176 * order to unref the round robin instance upon its invocation */
177static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700178 grpc_error *error) {
David Garcia Quintas43339842016-07-18 12:56:09 -0700179 wrapped_rr_closure_arg *wc_arg = arg;
David Garcia Quintas43339842016-07-18 12:56:09 -0700180 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700181 if (grpc_lb_glb_trace) {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700182 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
David Garcia Quintas43339842016-07-18 12:56:09 -0700183 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700184 }
David Garcia Quintas43339842016-07-18 12:56:09 -0700185 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700186
187 /* if target is NULL, no pick has been made by the RR policy (eg, all
188 * addresses failed to connect). There won't be any user_data/token
189 * available */
190 if (wc_arg->target != NULL) {
191 initial_metadata_add_lb_token(wc_arg->initial_metadata,
192 wc_arg->lb_token_mdelem_storage,
193 GRPC_MDELEM_REF(wc_arg->lb_token));
194 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700195 }
David Garcia Quintas5a876162016-07-18 13:08:42 -0700196 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700197
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700198 grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
199 NULL);
David Garcia Quintas43339842016-07-18 12:56:09 -0700200 gpr_free(wc_arg->owning_pending_node);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700201}
202
David Garcia Quintasea11d162016-07-14 17:27:28 -0700203/* Linked list of pending pick requests. It stores all information needed to
204 * eventually call (Round Robin's) pick() on them. They mainly stay pending
205 * waiting for the RR policy to be created/updated.
206 *
207 * One particularity is the wrapping of the user-provided \a on_complete closure
208 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
209 * order to correctly unref the RR policy instance upon completion of the pick.
210 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700211typedef struct pending_pick {
212 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700213
214 /* polling entity for the pick()'s async notification */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700215 grpc_polling_entity *pollent;
David Garcia Quintas43339842016-07-18 12:56:09 -0700216
217 /* the initial metadata for the pick. See grpc_lb_policy_pick() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700218 grpc_metadata_batch *initial_metadata;
David Garcia Quintas43339842016-07-18 12:56:09 -0700219
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700220 /* storage for the lb token initial metadata mdelem */
221 grpc_linked_mdelem *lb_token_mdelem_storage;
222
David Garcia Quintas43339842016-07-18 12:56:09 -0700223 /* bitmask passed to pick() and used for selective cancelling. See
224 * grpc_lb_policy_cancel_picks() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700225 uint32_t initial_metadata_flags;
David Garcia Quintas43339842016-07-18 12:56:09 -0700226
227 /* output argument where to store the pick()ed connected subchannel, or NULL
228 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700229 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700230
231 /* a closure wrapping the original on_complete one to be invoked once the
232 * pick() has completed (regardless of success) */
233 grpc_closure wrapped_on_complete;
234
235 /* args for wrapped_on_complete */
236 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700237} pending_pick;
238
David Garcia Quintas8aace512016-08-15 14:55:12 -0700239static void add_pending_pick(pending_pick **root,
240 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas65318262016-07-29 13:43:38 -0700241 grpc_connected_subchannel **target,
242 grpc_closure *on_complete) {
243 pending_pick *pp = gpr_malloc(sizeof(*pp));
244 memset(pp, 0, sizeof(pending_pick));
245 memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
246 pp->next = *root;
David Garcia Quintas8aace512016-08-15 14:55:12 -0700247 pp->pollent = pick_args->pollent;
David Garcia Quintas65318262016-07-29 13:43:38 -0700248 pp->target = target;
David Garcia Quintas8aace512016-08-15 14:55:12 -0700249 pp->initial_metadata = pick_args->initial_metadata;
250 pp->initial_metadata_flags = pick_args->initial_metadata_flags;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700251 pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
David Garcia Quintas65318262016-07-29 13:43:38 -0700252 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700253 pp->wrapped_on_complete_arg.target = target;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700254 pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
255 pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
256 pick_args->lb_token_mdelem_storage;
David Garcia Quintas65318262016-07-29 13:43:38 -0700257 grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
258 &pp->wrapped_on_complete_arg);
259 *root = pp;
260}
261
David Garcia Quintasea11d162016-07-14 17:27:28 -0700262/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700263typedef struct pending_ping {
264 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700265
266 /* a closure wrapping the original on_complete one to be invoked once the
267 * ping() has completed (regardless of success) */
268 grpc_closure wrapped_notify;
269
270 /* args for wrapped_notify */
271 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700272} pending_ping;
273
David Garcia Quintas65318262016-07-29 13:43:38 -0700274static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
275 pending_ping *pping = gpr_malloc(sizeof(*pping));
276 memset(pping, 0, sizeof(pending_ping));
277 memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
278 pping->next = *root;
279 grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure,
280 &pping->wrapped_notify_arg);
281 pping->wrapped_notify_arg.wrapped_closure = notify;
282 *root = pping;
283}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700284
David Garcia Quintas8d489112016-07-29 15:20:42 -0700285/*
286 * glb_lb_policy
287 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700288typedef struct rr_connectivity_data rr_connectivity_data;
David Garcia Quintasa0e278e2016-08-01 11:36:14 -0700289struct lb_client_data;
David Garcia Quintas65318262016-07-29 13:43:38 -0700290static const grpc_lb_policy_vtable glb_lb_policy_vtable;
291typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700292 /** base policy: must be first */
293 grpc_lb_policy base;
294
295 /** mutex protecting remaining members */
296 gpr_mu mu;
297
298 grpc_client_channel_factory *cc_factory;
299
300 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700301 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700302
303 /** the RR policy to use of the backend servers returned by the LB server */
304 grpc_lb_policy *rr_policy;
305
306 bool started_picking;
307
308 /** our connectivity state tracker */
309 grpc_connectivity_state_tracker state_tracker;
310
David Garcia Quintasea11d162016-07-14 17:27:28 -0700311 /** stores the deserialized response from the LB. May be NULL until one such
312 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700313 grpc_grpclb_serverlist *serverlist;
314
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700315 /** total number of valid addresses received in \a serverlist */
316 size_t num_ok_serverlist_addresses;
317
318 /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */
319 grpc_lb_address *lb_addresses;
320
David Garcia Quintasea11d162016-07-14 17:27:28 -0700321 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700322 pending_pick *pending_picks;
323
David Garcia Quintasea11d162016-07-14 17:27:28 -0700324 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700325 pending_ping *pending_pings;
326
David Garcia Quintasea11d162016-07-14 17:27:28 -0700327 /** client data associated with the LB server communication */
David Garcia Quintasa0e278e2016-08-01 11:36:14 -0700328 struct lb_client_data *lb_client;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700329
330 /** for tracking of the RR connectivity */
331 rr_connectivity_data *rr_connectivity;
David Garcia Quintas43339842016-07-18 12:56:09 -0700332
David Garcia Quintas41bef452016-07-28 19:19:58 -0700333 /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily
334 * available RR picks */
David Garcia Quintas43339842016-07-18 12:56:09 -0700335 grpc_closure wrapped_on_complete;
336
337 /* arguments for the wrapped_on_complete closure */
338 wrapped_rr_closure_arg wc_arg;
David Garcia Quintas65318262016-07-29 13:43:38 -0700339} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700340
David Garcia Quintas65318262016-07-29 13:43:38 -0700341/* Keeps track and reacts to changes in connectivity of the RR instance */
342struct rr_connectivity_data {
343 grpc_closure on_change;
344 grpc_connectivity_state state;
345 glb_lb_policy *glb_policy;
346};
David Garcia Quintas8d489112016-07-29 15:20:42 -0700347
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700348static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
349 bool log) {
350 const grpc_grpclb_ip_address *ip = &server->ip_address;
351 if (server->port >> 16 != 0) {
352 if (log) {
353 gpr_log(GPR_ERROR,
354 "Invalid port '%d' at index %zu of serverlist. Ignoring.",
355 server->port, idx);
356 }
357 return false;
358 }
359
360 if (ip->size != 4 && ip->size != 16) {
361 if (log) {
362 gpr_log(GPR_ERROR,
363 "Expected IP to be 4 or 16 bytes, got %d at index %zu of "
364 "serverlist. Ignoring",
365 ip->size, idx);
366 }
367 return false;
368 }
369 return true;
370}
371
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700372/* populate \a addresses according to \a serverlist. Returns the number of
373 * addresses successfully parsed and added to \a addresses */
374static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
375 grpc_lb_address **lb_addresses) {
376 size_t num_valid = 0;
377 /* first pass: count how many are valid in order to allocate the necessary
378 * memory in a single block */
379 for (size_t i = 0; i < serverlist->num_servers; ++i) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700380 if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700381 }
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700382 if (num_valid == 0) {
383 return 0;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700384 }
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700385
386 /* allocate the memory block for the "resolved" addresses. */
387 grpc_resolved_address *r_addrs_memblock =
388 gpr_malloc(sizeof(grpc_resolved_address) * num_valid);
389 memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid);
390 grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid);
391 memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
392
393 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700394 * to the outside world) to be read by the RR policy during its creation.
395 * Given that the validity tests are very cheap, they are performed again
396 * instead of marking the valid ones during the first pass, as this would
397 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700398 size_t addr_idx = 0;
399 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
400 GPR_ASSERT(addr_idx < num_valid);
401 const grpc_grpclb_server *server = serverlist->servers[sl_idx];
402 if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
403 grpc_lb_address *const lb_addr = &lb_addrs[addr_idx];
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700404
405 /* address processing */
406 const uint16_t netorder_port = htons((uint16_t)server->port);
407 /* the addresses are given in binary format (a in(6)_addr struct) in
408 * server->ip_address.bytes. */
409 const grpc_grpclb_ip_address *ip = &server->ip_address;
410
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700411 lb_addr->resolved_address = &r_addrs_memblock[addr_idx];
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700412 struct sockaddr_storage *sa =
413 (struct sockaddr_storage *)lb_addr->resolved_address->addr;
414 size_t *sa_len = &lb_addr->resolved_address->len;
415 *sa_len = 0;
416 if (ip->size == 4) {
417 struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
418 *sa_len = sizeof(struct sockaddr_in);
419 memset(addr4, 0, *sa_len);
420 addr4->sin_family = AF_INET;
421 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
422 addr4->sin_port = netorder_port;
423 } else if (ip->size == 16) {
424 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
425 *sa_len = sizeof(struct sockaddr_in6);
426 memset(addr6, 0, *sa_len);
427 addr6->sin6_family = AF_INET;
428 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
429 addr6->sin6_port = netorder_port;
430 }
431 GPR_ASSERT(*sa_len > 0);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700432
433 /* lb token processing */
434 if (server->has_load_balance_token) {
435 const size_t lb_token_size =
436 GPR_ARRAY_SIZE(server->load_balance_token) - 1;
437 grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
438 (uint8_t *)server->load_balance_token, lb_token_size);
439 lb_addr->user_data = grpc_mdelem_from_metadata_strings(
440 GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
441 } else {
442 gpr_log(GPR_ERROR,
443 "Missing LB token for backend address '%s'. The empty token will "
444 "be used instead",
murgatroid997871f732016-09-23 13:49:05 -0700445 grpc_sockaddr_to_uri(lb_addr->resolved_address));
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700446 lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
447 }
448 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700449 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700450 GPR_ASSERT(addr_idx == num_valid);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700451 *lb_addresses = lb_addrs;
452 return num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700453}
454
David Garcia Quintas65318262016-07-29 13:43:38 -0700455static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
456 const grpc_grpclb_serverlist *serverlist,
457 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700458 GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700459
460 grpc_lb_policy_args args;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700461 memset(&args, 0, sizeof(args));
David Garcia Quintas65318262016-07-29 13:43:38 -0700462 args.client_channel_factory = glb_policy->cc_factory;
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700463 const size_t num_ok_addresses =
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700464 process_serverlist(serverlist, &args.addresses);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700465 args.num_addresses = num_ok_addresses;
David Garcia Quintas65318262016-07-29 13:43:38 -0700466
467 grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
468
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700469 if (glb_policy->lb_addresses != NULL) {
470 /* dispose of the previous version */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700471 lb_addrs_destroy(glb_policy->lb_addresses,
472 glb_policy->num_ok_serverlist_addresses);
David Garcia Quintas65318262016-07-29 13:43:38 -0700473 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700474 glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
475 glb_policy->lb_addresses = args.addresses;
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700476
David Garcia Quintas65318262016-07-29 13:43:38 -0700477 return rr;
478}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700479
David Garcia Quintas41bef452016-07-28 19:19:58 -0700480static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
David Garcia Quintas65318262016-07-29 13:43:38 -0700481 grpc_error *error) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700482 GPR_ASSERT(glb_policy->serverlist != NULL &&
483 glb_policy->serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700484 glb_policy->rr_policy =
485 create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
486
487 if (grpc_lb_glb_trace) {
488 gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
489 (intptr_t)glb_policy->rr_policy);
490 }
491 GPR_ASSERT(glb_policy->rr_policy != NULL);
492 glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity(
493 exec_ctx, glb_policy->rr_policy, &error);
494 grpc_lb_policy_notify_on_state_change(
495 exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
496 &glb_policy->rr_connectivity->on_change);
497 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700498 glb_policy->rr_connectivity->state,
499 GRPC_ERROR_REF(error), "rr_handover");
David Garcia Quintas65318262016-07-29 13:43:38 -0700500 grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
501
502 /* flush pending ops */
503 pending_pick *pp;
504 while ((pp = glb_policy->pending_picks)) {
505 glb_policy->pending_picks = pp->next;
506 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
507 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
508 if (grpc_lb_glb_trace) {
509 gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
510 (intptr_t)glb_policy->rr_policy);
511 }
David Garcia Quintas8aace512016-08-15 14:55:12 -0700512 const grpc_lb_policy_pick_args pick_args = {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700513 pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
514 pp->lb_token_mdelem_storage};
David Garcia Quintas8aace512016-08-15 14:55:12 -0700515 grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700516 (void **)&pp->wrapped_on_complete_arg.lb_token,
David Garcia Quintas8aace512016-08-15 14:55:12 -0700517 &pp->wrapped_on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -0700518 pp->wrapped_on_complete_arg.owning_pending_node = pp;
519 }
520
521 pending_ping *pping;
522 while ((pping = glb_policy->pending_pings)) {
523 glb_policy->pending_pings = pping->next;
524 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
525 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
526 if (grpc_lb_glb_trace) {
527 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
528 (intptr_t)glb_policy->rr_policy);
529 }
530 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
531 &pping->wrapped_notify);
532 pping->wrapped_notify_arg.owning_pending_node = pping;
533 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700534}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700535
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700536static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
537 grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700538 rr_connectivity_data *rr_conn_data = arg;
539 glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700540
David Garcia Quintas41bef452016-07-28 19:19:58 -0700541 if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
542 if (glb_policy->serverlist != NULL) {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700543 /* a RR policy is shutting down but there's a serverlist available ->
544 * perform a handover */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700545 rr_handover(exec_ctx, glb_policy, error);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700546 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700547 /* shutting down and no new serverlist available. Bail out. */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700548 gpr_free(rr_conn_data);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700549 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700550 } else {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700551 if (error == GRPC_ERROR_NONE) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700552 /* RR not shutting down. Mimic the RR's policy state */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700553 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700554 rr_conn_data->state, GRPC_ERROR_REF(error),
555 "glb_rr_connectivity_changed");
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700556 /* resubscribe */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700557 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
558 &rr_conn_data->state,
559 &rr_conn_data->on_change);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700560 } else { /* error */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700561 gpr_free(rr_conn_data);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700562 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700563 }
564}
565
David Garcia Quintas65318262016-07-29 13:43:38 -0700566static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
567 grpc_lb_policy_factory *factory,
568 grpc_lb_policy_args *args) {
569 glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
570 memset(glb_policy, 0, sizeof(*glb_policy));
571
572 /* All input addresses in args->addresses come from a resolver that claims
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700573 * they are LB services. It's the resolver's responsibility to make sure
574 * this
David Garcia Quintas65318262016-07-29 13:43:38 -0700575 * policy is only instantiated and used in that case.
576 *
577 * Create a client channel over them to communicate with a LB service */
578 glb_policy->cc_factory = args->client_channel_factory;
579 GPR_ASSERT(glb_policy->cc_factory != NULL);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700580 if (args->num_addresses == 0) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700581 return NULL;
582 }
583
David Garcia Quintas5ebb7af2016-09-15 10:02:16 -0700584 if (args->addresses[0].user_data != NULL) {
585 gpr_log(GPR_ERROR,
586 "This LB policy doesn't support user data. It will be ignored");
587 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700588
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700589 /* construct a target from the addresses in args, given in the form
David Garcia Quintas65318262016-07-29 13:43:38 -0700590 * ipvX://ip1:port1,ip2:port2,...
591 * TODO(dgq): support mixed ip version */
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700592 char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
murgatroid997871f732016-09-23 13:49:05 -0700593 addr_strs[0] = grpc_sockaddr_to_uri(args->addresses[0].resolved_address);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700594 for (size_t i = 1; i < args->num_addresses; i++) {
David Garcia Quintas5ebb7af2016-09-15 10:02:16 -0700595 if (args->addresses[i].user_data != NULL) {
596 gpr_log(GPR_ERROR,
597 "This LB policy doesn't support user data. It will be ignored");
598 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700599
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700600 GPR_ASSERT(
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700601 grpc_sockaddr_to_string(
602 &addr_strs[i],
murgatroid997871f732016-09-23 13:49:05 -0700603 args->addresses[i].resolved_address,
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700604 true) == 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700605 }
606 size_t uri_path_len;
607 char *target_uri_str = gpr_strjoin_sep(
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700608 (const char **)addr_strs, args->num_addresses, ",", &uri_path_len);
David Garcia Quintas65318262016-07-29 13:43:38 -0700609
610 /* will pick using pick_first */
611 glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
612 exec_ctx, glb_policy->cc_factory, target_uri_str,
613 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
614
615 gpr_free(target_uri_str);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700616 for (size_t i = 0; i < args->num_addresses; i++) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700617 gpr_free(addr_strs[i]);
618 }
619 gpr_free(addr_strs);
620
621 if (glb_policy->lb_channel == NULL) {
622 gpr_free(glb_policy);
623 return NULL;
624 }
625
626 rr_connectivity_data *rr_connectivity =
627 gpr_malloc(sizeof(rr_connectivity_data));
628 memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700629 grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
David Garcia Quintas65318262016-07-29 13:43:38 -0700630 rr_connectivity);
631 rr_connectivity->glb_policy = glb_policy;
632 glb_policy->rr_connectivity = rr_connectivity;
633
634 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
635 gpr_mu_init(&glb_policy->mu);
636 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
637 "grpclb");
638 return &glb_policy->base;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700639}
640
David Garcia Quintas65318262016-07-29 13:43:38 -0700641static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
642 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
643 GPR_ASSERT(glb_policy->pending_picks == NULL);
644 GPR_ASSERT(glb_policy->pending_pings == NULL);
645 grpc_channel_destroy(glb_policy->lb_channel);
646 glb_policy->lb_channel = NULL;
647 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
648 if (glb_policy->serverlist != NULL) {
649 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
650 }
651 gpr_mu_destroy(&glb_policy->mu);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700652
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700653 lb_addrs_destroy(glb_policy->lb_addresses,
654 glb_policy->num_ok_serverlist_addresses);
David Garcia Quintas65318262016-07-29 13:43:38 -0700655 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700656}
657
David Garcia Quintas740759e2016-08-01 14:49:49 -0700658static void lb_client_data_destroy(struct lb_client_data *lb_client);
David Garcia Quintas65318262016-07-29 13:43:38 -0700659static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
660 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
661 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700662
David Garcia Quintas65318262016-07-29 13:43:38 -0700663 pending_pick *pp = glb_policy->pending_picks;
664 glb_policy->pending_picks = NULL;
665 pending_ping *pping = glb_policy->pending_pings;
666 glb_policy->pending_pings = NULL;
667 gpr_mu_unlock(&glb_policy->mu);
668
669 while (pp != NULL) {
670 pending_pick *next = pp->next;
671 *pp->target = NULL;
672 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE,
673 NULL);
674 gpr_free(pp);
675 pp = next;
676 }
677
678 while (pping != NULL) {
679 pending_ping *next = pping->next;
680 grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE,
681 NULL);
682 pping = next;
683 }
684
685 if (glb_policy->rr_policy) {
686 /* unsubscribe */
687 grpc_lb_policy_notify_on_state_change(
688 exec_ctx, glb_policy->rr_policy, NULL,
689 &glb_policy->rr_connectivity->on_change);
690 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
691 }
692
David Garcia Quintas740759e2016-08-01 14:49:49 -0700693 lb_client_data_destroy(glb_policy->lb_client);
694 glb_policy->lb_client = NULL;
695
David Garcia Quintas65318262016-07-29 13:43:38 -0700696 grpc_connectivity_state_set(
697 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
698 GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
699}
700
701static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
702 grpc_connected_subchannel **target) {
703 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
704 gpr_mu_lock(&glb_policy->mu);
705 pending_pick *pp = glb_policy->pending_picks;
706 glb_policy->pending_picks = NULL;
707 while (pp != NULL) {
708 pending_pick *next = pp->next;
709 if (pp->target == target) {
710 grpc_polling_entity_del_from_pollset_set(
711 exec_ctx, pp->pollent, glb_policy->base.interested_parties);
712 *target = NULL;
713 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
714 GRPC_ERROR_CANCELLED, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700715 } else {
716 pp->next = glb_policy->pending_picks;
717 glb_policy->pending_picks = pp;
718 }
719 pp = next;
720 }
721 gpr_mu_unlock(&glb_policy->mu);
722}
723
David Garcia Quintasa0e278e2016-08-01 11:36:14 -0700724static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client);
David Garcia Quintas65318262016-07-29 13:43:38 -0700725static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
726 uint32_t initial_metadata_flags_mask,
727 uint32_t initial_metadata_flags_eq) {
728 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
729 gpr_mu_lock(&glb_policy->mu);
730 if (glb_policy->lb_client != NULL) {
731 /* cancel the call to the load balancer service, if any */
732 grpc_call_cancel(lb_client_data_get_call(glb_policy->lb_client), NULL);
733 }
734 pending_pick *pp = glb_policy->pending_picks;
735 glb_policy->pending_picks = NULL;
736 while (pp != NULL) {
737 pending_pick *next = pp->next;
738 if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
739 initial_metadata_flags_eq) {
740 grpc_polling_entity_del_from_pollset_set(
741 exec_ctx, pp->pollent, glb_policy->base.interested_parties);
742 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
743 GRPC_ERROR_CANCELLED, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700744 } else {
745 pp->next = glb_policy->pending_picks;
746 glb_policy->pending_picks = pp;
747 }
748 pp = next;
749 }
750 gpr_mu_unlock(&glb_policy->mu);
751}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700752
David Garcia Quintas65318262016-07-29 13:43:38 -0700753static void query_for_backends(grpc_exec_ctx *exec_ctx,
754 glb_lb_policy *glb_policy);
755static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) {
756 glb_policy->started_picking = true;
757 query_for_backends(exec_ctx, glb_policy);
758}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700759
David Garcia Quintas65318262016-07-29 13:43:38 -0700760static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
761 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
762 gpr_mu_lock(&glb_policy->mu);
763 if (!glb_policy->started_picking) {
764 start_picking(exec_ctx, glb_policy);
765 }
766 gpr_mu_unlock(&glb_policy->mu);
767}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700768
David Garcia Quintas65318262016-07-29 13:43:38 -0700769static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
David Garcia Quintas8aace512016-08-15 14:55:12 -0700770 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700771 grpc_connected_subchannel **target, void **user_data,
David Garcia Quintas65318262016-07-29 13:43:38 -0700772 grpc_closure *on_complete) {
773 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700774
775 if (pick_args->lb_token_mdelem_storage == NULL) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700776 *target = NULL;
David Garcia Quintas6cc44fc2016-09-12 23:04:35 -0700777 grpc_exec_ctx_sched(
778 exec_ctx, on_complete,
779 GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
780 "won't work without it. Failing"),
781 NULL);
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700782 return 1;
783 }
784
David Garcia Quintas65318262016-07-29 13:43:38 -0700785 gpr_mu_lock(&glb_policy->mu);
786 int r;
787
788 if (glb_policy->rr_policy != NULL) {
789 if (grpc_lb_glb_trace) {
790 gpr_log(GPR_INFO, "about to PICK from 0x%" PRIxPTR "",
791 (intptr_t)glb_policy->rr_policy);
792 }
793 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
794 memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
795 glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700796 glb_policy->wc_arg.target = target;
David Garcia Quintas65318262016-07-29 13:43:38 -0700797 glb_policy->wc_arg.wrapped_closure = on_complete;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700798 glb_policy->wc_arg.lb_token_mdelem_storage =
799 pick_args->lb_token_mdelem_storage;
800 glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
801 glb_policy->wc_arg.owning_pending_node = NULL;
David Garcia Quintas65318262016-07-29 13:43:38 -0700802 grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
803 &glb_policy->wc_arg);
David Garcia Quintas8aace512016-08-15 14:55:12 -0700804
805 r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700806 (void **)&glb_policy->wc_arg.lb_token,
David Garcia Quintas65318262016-07-29 13:43:38 -0700807 &glb_policy->wrapped_on_complete);
808 if (r != 0) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700809 /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
David Garcia Quintas65318262016-07-29 13:43:38 -0700810 if (grpc_lb_glb_trace) {
811 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
812 (intptr_t)glb_policy->wc_arg.rr_policy);
813 }
814 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700815
816 /* add the load reporting initial metadata */
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700817 initial_metadata_add_lb_token(
818 pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
David Garcia Quintas8424fdc2016-09-15 08:35:59 -0700819 GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
David Garcia Quintas65318262016-07-29 13:43:38 -0700820 }
821 } else {
David Garcia Quintas8aace512016-08-15 14:55:12 -0700822 grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
David Garcia Quintas65318262016-07-29 13:43:38 -0700823 glb_policy->base.interested_parties);
David Garcia Quintas8aace512016-08-15 14:55:12 -0700824 add_pending_pick(&glb_policy->pending_picks, pick_args, target,
825 on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -0700826
827 if (!glb_policy->started_picking) {
828 start_picking(exec_ctx, glb_policy);
829 }
830 r = 0;
831 }
832 gpr_mu_unlock(&glb_policy->mu);
833 return r;
834}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700835
David Garcia Quintas65318262016-07-29 13:43:38 -0700836static grpc_connectivity_state glb_check_connectivity(
837 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
838 grpc_error **connectivity_error) {
839 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
840 grpc_connectivity_state st;
841 gpr_mu_lock(&glb_policy->mu);
842 st = grpc_connectivity_state_check(&glb_policy->state_tracker,
843 connectivity_error);
844 gpr_mu_unlock(&glb_policy->mu);
845 return st;
846}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700847
David Garcia Quintas65318262016-07-29 13:43:38 -0700848static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
849 grpc_closure *closure) {
850 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
851 gpr_mu_lock(&glb_policy->mu);
852 if (glb_policy->rr_policy) {
853 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
854 } else {
855 add_pending_ping(&glb_policy->pending_pings, closure);
856 if (!glb_policy->started_picking) {
857 start_picking(exec_ctx, glb_policy);
858 }
859 }
860 gpr_mu_unlock(&glb_policy->mu);
861}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700862
David Garcia Quintas65318262016-07-29 13:43:38 -0700863static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
864 grpc_lb_policy *pol,
865 grpc_connectivity_state *current,
866 grpc_closure *notify) {
867 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
868 gpr_mu_lock(&glb_policy->mu);
869 grpc_connectivity_state_notify_on_state_change(
870 exec_ctx, &glb_policy->state_tracker, current, notify);
871
872 gpr_mu_unlock(&glb_policy->mu);
873}
874
David Garcia Quintas8d489112016-07-29 15:20:42 -0700875/*
876 * lb_client_data
877 *
878 * Used internally for the client call to the LB */
David Garcia Quintas65318262016-07-29 13:43:38 -0700879typedef struct lb_client_data {
880 gpr_mu mu;
881
882 /* called once initial metadata's been sent */
883 grpc_closure md_sent;
884
David Garcia Quintas65318262016-07-29 13:43:38 -0700885 /* called once the LoadBalanceRequest has been sent to the LB server. See
886 * src/proto/grpc/.../load_balancer.proto */
887 grpc_closure req_sent;
888
889 /* A response from the LB server has been received (or error). Process it */
890 grpc_closure res_rcvd;
891
892 /* After the client has sent a close to the LB server */
893 grpc_closure close_sent;
894
895 /* ... and the status from the LB server has been received */
896 grpc_closure srv_status_rcvd;
897
898 grpc_call *lb_call; /* streaming call to the LB server, */
899 gpr_timespec deadline; /* for the streaming call to the LB server */
900
901 grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */
902 grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */
903
904 /* what's being sent to the LB server. Note that its value may vary if the LB
905 * server indicates a redirect. */
906 grpc_byte_buffer *request_payload;
907
908 /* response from the LB server, if any. Processed in res_recv_cb() */
909 grpc_byte_buffer *response_payload;
910
911 /* the call's status and status detailset in srv_status_rcvd_cb() */
912 grpc_status_code status;
913 char *status_details;
914 size_t status_details_capacity;
915
916 /* pointer back to the enclosing policy */
917 glb_lb_policy *glb_policy;
918} lb_client_data;
919
920static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700921static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700922static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
923static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
924 grpc_error *error);
925static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
926 grpc_error *error);
927
928static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
929 lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data));
930 memset(lb_client, 0, sizeof(lb_client_data));
931
932 gpr_mu_init(&lb_client->mu);
933 grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
934
David Garcia Quintas65318262016-07-29 13:43:38 -0700935 grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
936 grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
937 grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
938 grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client);
939
940 /* TODO(dgq): get the deadline from the client config instead of fabricating
941 * one here. */
942 lb_client->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
943 gpr_time_from_seconds(3, GPR_TIMESPAN));
944
David Garcia Quintas15eba132016-08-09 15:20:48 -0700945 /* Note the following LB call progresses every time there's activity in \a
946 * glb_policy->base.interested_parties, which is comprised of the polling
947 * entities passed to glb_pick(). */
David Garcia Quintas65318262016-07-29 13:43:38 -0700948 lb_client->lb_call = grpc_channel_create_pollset_set_call(
949 glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
950 glb_policy->base.interested_parties, "/BalanceLoad",
951 NULL, /* FIXME(dgq): which "host" value to use? */
952 lb_client->deadline, NULL);
953
954 grpc_metadata_array_init(&lb_client->initial_metadata_recv);
955 grpc_metadata_array_init(&lb_client->trailing_metadata_recv);
956
957 grpc_grpclb_request *request = grpc_grpclb_request_create(
958 "load.balanced.service.name"); /* FIXME(dgq): get the name of the load
959 balanced service from the resolver */
960 gpr_slice request_payload_slice = grpc_grpclb_request_encode(request);
961 lb_client->request_payload =
962 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
963 gpr_slice_unref(request_payload_slice);
964 grpc_grpclb_request_destroy(request);
965
966 lb_client->status_details = NULL;
967 lb_client->status_details_capacity = 0;
968 lb_client->glb_policy = glb_policy;
969 return lb_client;
970}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700971
David Garcia Quintas65318262016-07-29 13:43:38 -0700972static void lb_client_data_destroy(lb_client_data *lb_client) {
David Garcia Quintas740759e2016-08-01 14:49:49 -0700973 grpc_call_destroy(lb_client->lb_call);
David Garcia Quintas65318262016-07-29 13:43:38 -0700974 grpc_metadata_array_destroy(&lb_client->initial_metadata_recv);
975 grpc_metadata_array_destroy(&lb_client->trailing_metadata_recv);
976
977 grpc_byte_buffer_destroy(lb_client->request_payload);
978
979 gpr_free(lb_client->status_details);
980 gpr_mu_destroy(&lb_client->mu);
981 gpr_free(lb_client);
982}
983static grpc_call *lb_client_data_get_call(lb_client_data *lb_client) {
984 return lb_client->lb_call;
985}
986
David Garcia Quintas8d489112016-07-29 15:20:42 -0700987/*
988 * Auxiliary functions and LB client callbacks.
989 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700990static void query_for_backends(grpc_exec_ctx *exec_ctx,
991 glb_lb_policy *glb_policy) {
992 GPR_ASSERT(glb_policy->lb_channel != NULL);
993
994 glb_policy->lb_client = lb_client_data_create(glb_policy);
995 grpc_call_error call_error;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700996 grpc_op ops[1];
997 memset(ops, 0, sizeof(ops));
998 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -0700999 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1000 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001001 op->flags = 0;
1002 op->reserved = NULL;
1003 op++;
David Garcia Quintas65318262016-07-29 13:43:38 -07001004 call_error = grpc_call_start_batch_and_execute(
1005 exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
1006 &glb_policy->lb_client->md_sent);
1007 GPR_ASSERT(GRPC_CALL_OK == call_error);
1008
1009 op = ops;
1010 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1011 op->data.recv_status_on_client.trailing_metadata =
1012 &glb_policy->lb_client->trailing_metadata_recv;
1013 op->data.recv_status_on_client.status = &glb_policy->lb_client->status;
1014 op->data.recv_status_on_client.status_details =
1015 &glb_policy->lb_client->status_details;
1016 op->data.recv_status_on_client.status_details_capacity =
1017 &glb_policy->lb_client->status_details_capacity;
1018 op->flags = 0;
1019 op->reserved = NULL;
1020 op++;
1021 call_error = grpc_call_start_batch_and_execute(
1022 exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
1023 &glb_policy->lb_client->srv_status_rcvd);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001024 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001025}
1026
David Garcia Quintas4166cb02016-07-29 14:33:15 -07001027static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
1028 lb_client_data *lb_client = arg;
1029 GPR_ASSERT(lb_client->lb_call);
1030 grpc_op ops[1];
1031 memset(ops, 0, sizeof(ops));
1032 grpc_op *op = ops;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001033
1034 op->op = GRPC_OP_SEND_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001035 op->data.send_message = lb_client->request_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001036 op->flags = 0;
1037 op->reserved = NULL;
1038 op++;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001039 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1040 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1041 &lb_client->req_sent);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001042 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001043}
1044
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001045static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001046 lb_client_data *lb_client = arg;
David Garcia Quintas601bb122016-08-18 15:03:59 -07001047 GPR_ASSERT(lb_client->lb_call);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001048
David Garcia Quintas601bb122016-08-18 15:03:59 -07001049 grpc_op ops[2];
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001050 memset(ops, 0, sizeof(ops));
1051 grpc_op *op = ops;
1052
David Garcia Quintas601bb122016-08-18 15:03:59 -07001053 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1054 op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
1055 op->flags = 0;
1056 op->reserved = NULL;
1057 op++;
1058
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001059 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001060 op->data.recv_message = &lb_client->response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001061 op->flags = 0;
1062 op->reserved = NULL;
1063 op++;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001064 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1065 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1066 &lb_client->res_rcvd);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001067 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001068}
1069
David Garcia Quintas65318262016-07-29 13:43:38 -07001070static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001071 lb_client_data *lb_client = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001072 grpc_op ops[2];
1073 memset(ops, 0, sizeof(ops));
1074 grpc_op *op = ops;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001075 if (lb_client->response_payload != NULL) {
1076 /* Received data from the LB server. Look inside
David Garcia Quintas601bb122016-08-18 15:03:59 -07001077 * lb_client->response_payload, for a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001078 grpc_byte_buffer_reader bbr;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001079 grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001080 gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas41bef452016-07-28 19:19:58 -07001081 grpc_byte_buffer_destroy(lb_client->response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001082 grpc_grpclb_serverlist *serverlist =
1083 grpc_grpclb_response_parse_serverlist(response_slice);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001084 if (serverlist != NULL) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001085 gpr_slice_unref(response_slice);
1086 if (grpc_lb_glb_trace) {
1087 gpr_log(GPR_INFO, "Serverlist with %zu servers received",
1088 serverlist->num_servers);
1089 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001090
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001091 /* update serverlist */
1092 if (serverlist->num_servers > 0) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001093 if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist,
1094 serverlist)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001095 if (grpc_lb_glb_trace) {
1096 gpr_log(GPR_INFO,
1097 "Incoming server list identical to current, ignoring.");
1098 }
1099 } else { /* new serverlist */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001100 if (lb_client->glb_policy->serverlist != NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001101 /* dispose of the old serverlist */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001102 grpc_grpclb_destroy_serverlist(lb_client->glb_policy->serverlist);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001103 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001104 /* and update the copy in the glb_lb_policy instance */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001105 lb_client->glb_policy->serverlist = serverlist;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001106 }
David Garcia Quintas41bef452016-07-28 19:19:58 -07001107 if (lb_client->glb_policy->rr_policy == NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001108 /* initial "handover", in this case from a null RR policy, meaning
David Garcia Quintas43339842016-07-18 12:56:09 -07001109 * it'll just create the first RR policy instance */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001110 rr_handover(exec_ctx, lb_client->glb_policy, error);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001111 } else {
1112 /* unref the RR policy, eventually leading to its substitution with a
1113 * new one constructed from the received serverlist (see
David Garcia Quintas348cfdb2016-08-19 12:19:43 -07001114 * glb_rr_connectivity_changed) */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001115 GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
David Garcia Quintasea11d162016-07-14 17:27:28 -07001116 "serverlist_received");
1117 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001118 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001119 if (grpc_lb_glb_trace) {
1120 gpr_log(GPR_INFO,
1121 "Received empty server list. Picks will stay pending until a "
1122 "response with > 0 servers is received");
1123 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001124 }
1125
David Garcia Quintasea11d162016-07-14 17:27:28 -07001126 /* keep listening for serverlist updates */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001127 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001128 op->data.recv_message = &lb_client->response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001129 op->flags = 0;
1130 op->reserved = NULL;
1131 op++;
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001132 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas41bef452016-07-28 19:19:58 -07001133 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1134 &lb_client->res_rcvd); /* loop */
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001135 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001136 return;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001137 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001138
1139 GPR_ASSERT(serverlist == NULL);
1140 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'",
1141 gpr_dump_slice(response_slice, GPR_DUMP_ASCII));
1142 gpr_slice_unref(response_slice);
1143
1144 /* Disconnect from server returning invalid response. */
1145 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
1146 op->flags = 0;
1147 op->reserved = NULL;
1148 op++;
1149 grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas41bef452016-07-28 19:19:58 -07001150 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1151 &lb_client->close_sent);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001152 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001153 }
1154 /* empty payload: call cancelled by server. Cleanups happening in
1155 * srv_status_rcvd_cb */
1156}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001157
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001158static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
1159 grpc_error *error) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001160 if (grpc_lb_glb_trace) {
1161 gpr_log(GPR_INFO,
1162 "Close from LB client sent. Waiting from server status now");
1163 }
1164}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001165
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001166static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001167 grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001168 lb_client_data *lb_client = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001169 if (grpc_lb_glb_trace) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001170 gpr_log(GPR_INFO,
1171 "status from lb server received. Status = %d, Details = '%s', "
1172 "Capaticy "
1173 "= %zu",
David Garcia Quintas41bef452016-07-28 19:19:58 -07001174 lb_client->status, lb_client->status_details,
1175 lb_client->status_details_capacity);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001176 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -07001177 /* TODO(dgq): deal with stream termination properly (fire up another one?
David Garcia Quintas8424fdc2016-09-15 08:35:59 -07001178 * fail the original call?) */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001179}
1180
David Garcia Quintas8d489112016-07-29 15:20:42 -07001181/* Code wiring the policy with the rest of the core */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001182static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
1183 glb_destroy, glb_shutdown, glb_pick,
1184 glb_cancel_pick, glb_cancel_picks, glb_ping_one,
1185 glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
1186
1187static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
1188
1189static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
1190
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001191static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
1192 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
1193
1194static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
1195
1196grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
1197 return &glb_lb_policy_factory;
1198}
1199
1200/* Plugin registration */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001201void grpc_lb_policy_grpclb_init() {
1202 grpc_register_lb_policy(grpc_glb_lb_factory_create());
1203 grpc_register_tracer("glb", &grpc_lb_glb_trace);
1204}
1205
1206void grpc_lb_policy_grpclb_shutdown() {}