blob: 11504511163486f598002a6c363ffceccfb1a303 [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070034/** Implementation of the gRPC LB policy.
35 *
David Garcia Quintas43339842016-07-18 12:56:09 -070036 * This policy takes as input a set of resolved addresses {a1..an} for which the
37 * LB set was set (it's the resolver's responsibility to ensure this). That is
38 * to say, {a1..an} represent a collection of LB servers.
39 *
40 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
41 * This channel behaves just like a regular channel. In particular, the
42 * constructed URI over the addresses a1..an will use the default pick first
43 * policy to select from this list of LB server backends.
44 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070045 * The first time the policy gets a request for a pick, a ping, or to exit the
46 * idle state, \a query_for_backends() is called. It creates an instance of \a
David Garcia Quintas43339842016-07-18 12:56:09 -070047 * lb_client_data, an internal struct meant to contain the data associated with
48 * the internal communication with the LB server. This instance is created via
49 * \a lb_client_data_create(). There, the call over lb_channel to pick-first
50 * from {a1..an} is created, the \a LoadBalancingRequest message is assembled
51 * and all necessary callbacks for the progress of the internal call configured.
52 *
53 * Back in \a query_for_backends(), the internal *streaming* call to the LB
54 * server (whichever address from {a1..an} pick-first chose) is kicked off.
55 * It'll progress over the callbacks configured in \a lb_client_data_create()
56 * (see the field docstrings of \a lb_client_data for more details).
57 *
58 * If the call fails with UNIMPLEMENTED, the original call will also fail.
59 * There's a misconfiguration somewhere: at least one of {a1..an} isn't a LB
60 * server, which contradicts the LB bit being set. If the internal call times
61 * out, the usual behavior of pick-first applies, continuing to pick from the
62 * list {a1..an}.
63 *
David Garcia Quintas65318262016-07-29 13:43:38 -070064 * Upon sucesss, a \a LoadBalancingResponse is expected in \a res_recv_cb. An
David Garcia Quintas43339842016-07-18 12:56:09 -070065 * invalid one results in the termination of the streaming call. A new streaming
66 * call should be created if possible, failing the original call otherwise.
67 * For a valid \a LoadBalancingResponse, the server list of actual backends is
68 * extracted. A Round Robin policy will be created from this list. There are two
69 * possible scenarios:
70 *
71 * 1. This is the first server list received. There was no previous instance of
72 * the Round Robin policy. \a rr_handover() will instantiate the RR policy
73 * and perform all the pending operations over it.
74 * 2. There's already a RR policy instance active. We need to introduce the new
75 * one build from the new serverlist, but taking care not to disrupt the
76 * operations in progress over the old RR instance. This is done by
77 * decreasing the reference count on the old policy. The moment no more
78 * references are held on the old RR policy, it'll be destroyed and \a
David Garcia Quintas348cfdb2016-08-19 12:19:43 -070079 * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
80 * state. At this point we can transition to a new RR instance safely, which
81 * is done once again via \a rr_handover().
David Garcia Quintas43339842016-07-18 12:56:09 -070082 *
83 *
84 * Once a RR policy instance is in place (and getting updated as described),
85 * calls to for a pick, a ping or a cancellation will be serviced right away by
86 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintasd4a756b2016-07-19 11:35:15 -070087 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist
David Garcia Quintas43339842016-07-18 12:56:09 -070088 * is received, etc), pick/ping requests are added to a list of pending
89 * picks/pings to be flushed and serviced as part of \a rr_handover() the moment
90 * the RR policy instance becomes available.
91 *
92 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
93 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070094
95/* TODO(dgq):
96 * - Implement LB service forwarding (point 2c. in the doc's diagram).
97 */
98
David Garcia Quintas8a81aa12016-08-22 15:06:49 -070099#include <errno.h>
100
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -0700101#include <string.h>
102
103#include <grpc/byte_buffer_reader.h>
104#include <grpc/grpc.h>
105#include <grpc/support/alloc.h>
106#include <grpc/support/host_port.h>
107#include <grpc/support/string_util.h>
108
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700109#include "src/core/ext/client_config/client_channel_factory.h"
110#include "src/core/ext/client_config/lb_policy_registry.h"
111#include "src/core/ext/client_config/parse_address.h"
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700112#include "src/core/ext/lb_policy/grpclb/grpclb.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700113#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
David Garcia Quintasbc334912016-08-22 16:57:20 -0700114#include "src/core/lib/iomgr/sockaddr.h"
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700115#include "src/core/lib/iomgr/sockaddr_utils.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700116#include "src/core/lib/support/string.h"
117#include "src/core/lib/surface/call.h"
118#include "src/core/lib/surface/channel.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700119#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700120
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700121int grpc_lb_glb_trace = 0;
122
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700123static void *user_data_copy(void *user_data) {
124 if (user_data == NULL) return NULL;
125 return GRPC_MDELEM_REF(user_data);
126}
127
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700128static void lb_addrs_destroy(grpc_lb_address *lb_addresses,
129 size_t num_addresses) {
130 /* free "resolved" addresses memblock */
131 gpr_free(lb_addresses->resolved_address);
132 for (size_t i = 0; i < num_addresses; ++i) {
133 if (lb_addresses[i].user_data != NULL) {
134 GRPC_MDELEM_UNREF(lb_addresses[i].user_data);
135 }
136 }
137 gpr_free(lb_addresses);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700138}
139
140/* add lb_token of selected subchannel (address) to the call's initial
141 * metadata */
142static void initial_metadata_add_lb_token(
143 grpc_metadata_batch *initial_metadata,
144 grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
145 GPR_ASSERT(lb_token_mdelem_storage != NULL);
146 GPR_ASSERT(lb_token != NULL);
147 grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
148 lb_token);
149}
150
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700151typedef struct wrapped_rr_closure_arg {
David Garcia Quintas43339842016-07-18 12:56:09 -0700152 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
153 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700154 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700155
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700156 /* the pick's initial metadata, kept in order to append the LB token for the
157 * pick */
158 grpc_metadata_batch *initial_metadata;
159
160 /* the picked target, used to determine which LB token to add to the pick's
161 * initial metadata */
162 grpc_connected_subchannel **target;
163
164 /* the LB token associated with the pick */
165 grpc_mdelem *lb_token;
166
167 /* storage for the lb token initial metadata mdelem */
168 grpc_linked_mdelem *lb_token_mdelem_storage;
169
David Garcia Quintas43339842016-07-18 12:56:09 -0700170 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700171 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700172
173 /* when not NULL, represents a pending_{pick,ping} node to be freed upon
174 * closure execution */
175 void *owning_pending_node; /* to be freed if not NULL */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700176} wrapped_rr_closure_arg;
177
178/* The \a on_complete closure passed as part of the pick requires keeping a
179 * reference to its associated round robin instance. We wrap this closure in
180 * order to unref the round robin instance upon its invocation */
181static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700182 grpc_error *error) {
David Garcia Quintas43339842016-07-18 12:56:09 -0700183 wrapped_rr_closure_arg *wc_arg = arg;
David Garcia Quintas43339842016-07-18 12:56:09 -0700184 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700185 if (grpc_lb_glb_trace) {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700186 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
David Garcia Quintas43339842016-07-18 12:56:09 -0700187 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700188 }
David Garcia Quintas43339842016-07-18 12:56:09 -0700189 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700190 }
David Garcia Quintas5a876162016-07-18 13:08:42 -0700191 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700192
193 initial_metadata_add_lb_token(wc_arg->initial_metadata,
194 wc_arg->lb_token_mdelem_storage,
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700195 user_data_copy(wc_arg->lb_token));
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700196
David Garcia Quintas5a876162016-07-18 13:08:42 -0700197 grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
David Garcia Quintas43339842016-07-18 12:56:09 -0700198 gpr_free(wc_arg->owning_pending_node);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700199}
200
David Garcia Quintasea11d162016-07-14 17:27:28 -0700201/* Linked list of pending pick requests. It stores all information needed to
202 * eventually call (Round Robin's) pick() on them. They mainly stay pending
203 * waiting for the RR policy to be created/updated.
204 *
205 * One particularity is the wrapping of the user-provided \a on_complete closure
206 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
207 * order to correctly unref the RR policy instance upon completion of the pick.
208 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700209typedef struct pending_pick {
210 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700211
212 /* polling entity for the pick()'s async notification */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700213 grpc_polling_entity *pollent;
David Garcia Quintas43339842016-07-18 12:56:09 -0700214
215 /* the initial metadata for the pick. See grpc_lb_policy_pick() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700216 grpc_metadata_batch *initial_metadata;
David Garcia Quintas43339842016-07-18 12:56:09 -0700217
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700218 /* storage for the lb token initial metadata mdelem */
219 grpc_linked_mdelem *lb_token_mdelem_storage;
220
David Garcia Quintas43339842016-07-18 12:56:09 -0700221 /* bitmask passed to pick() and used for selective cancelling. See
222 * grpc_lb_policy_cancel_picks() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700223 uint32_t initial_metadata_flags;
David Garcia Quintas43339842016-07-18 12:56:09 -0700224
225 /* output argument where to store the pick()ed connected subchannel, or NULL
226 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700227 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700228
229 /* a closure wrapping the original on_complete one to be invoked once the
230 * pick() has completed (regardless of success) */
231 grpc_closure wrapped_on_complete;
232
233 /* args for wrapped_on_complete */
234 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700235} pending_pick;
236
David Garcia Quintas8aace512016-08-15 14:55:12 -0700237static void add_pending_pick(pending_pick **root,
238 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas65318262016-07-29 13:43:38 -0700239 grpc_connected_subchannel **target,
240 grpc_closure *on_complete) {
241 pending_pick *pp = gpr_malloc(sizeof(*pp));
242 memset(pp, 0, sizeof(pending_pick));
243 memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
244 pp->next = *root;
David Garcia Quintas8aace512016-08-15 14:55:12 -0700245 pp->pollent = pick_args->pollent;
David Garcia Quintas65318262016-07-29 13:43:38 -0700246 pp->target = target;
David Garcia Quintas8aace512016-08-15 14:55:12 -0700247 pp->initial_metadata = pick_args->initial_metadata;
248 pp->initial_metadata_flags = pick_args->initial_metadata_flags;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700249 pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
David Garcia Quintas65318262016-07-29 13:43:38 -0700250 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700251 pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
252 pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
253 pick_args->lb_token_mdelem_storage;
David Garcia Quintas65318262016-07-29 13:43:38 -0700254 grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
255 &pp->wrapped_on_complete_arg);
256 *root = pp;
257}
258
David Garcia Quintasea11d162016-07-14 17:27:28 -0700259/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700260typedef struct pending_ping {
261 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700262
263 /* a closure wrapping the original on_complete one to be invoked once the
264 * ping() has completed (regardless of success) */
265 grpc_closure wrapped_notify;
266
267 /* args for wrapped_notify */
268 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700269} pending_ping;
270
David Garcia Quintas65318262016-07-29 13:43:38 -0700271static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
272 pending_ping *pping = gpr_malloc(sizeof(*pping));
273 memset(pping, 0, sizeof(pending_ping));
274 memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
275 pping->next = *root;
276 grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure,
277 &pping->wrapped_notify_arg);
278 pping->wrapped_notify_arg.wrapped_closure = notify;
279 *root = pping;
280}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700281
David Garcia Quintas8d489112016-07-29 15:20:42 -0700282/*
283 * glb_lb_policy
284 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700285typedef struct rr_connectivity_data rr_connectivity_data;
David Garcia Quintasa0e278e2016-08-01 11:36:14 -0700286struct lb_client_data;
David Garcia Quintas65318262016-07-29 13:43:38 -0700287static const grpc_lb_policy_vtable glb_lb_policy_vtable;
288typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700289 /** base policy: must be first */
290 grpc_lb_policy base;
291
292 /** mutex protecting remaining members */
293 gpr_mu mu;
294
295 grpc_client_channel_factory *cc_factory;
296
297 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700298 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700299
300 /** the RR policy to use of the backend servers returned by the LB server */
301 grpc_lb_policy *rr_policy;
302
303 bool started_picking;
304
305 /** our connectivity state tracker */
306 grpc_connectivity_state_tracker state_tracker;
307
David Garcia Quintasea11d162016-07-14 17:27:28 -0700308 /** stores the deserialized response from the LB. May be NULL until one such
309 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700310 grpc_grpclb_serverlist *serverlist;
311
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700312 /** total number of valid addresses received in \a serverlist */
313 size_t num_ok_serverlist_addresses;
314
315 /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */
316 grpc_lb_address *lb_addresses;
317
David Garcia Quintasea11d162016-07-14 17:27:28 -0700318 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700319 pending_pick *pending_picks;
320
David Garcia Quintasea11d162016-07-14 17:27:28 -0700321 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700322 pending_ping *pending_pings;
323
David Garcia Quintasea11d162016-07-14 17:27:28 -0700324 /** client data associated with the LB server communication */
David Garcia Quintasa0e278e2016-08-01 11:36:14 -0700325 struct lb_client_data *lb_client;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700326
327 /** for tracking of the RR connectivity */
328 rr_connectivity_data *rr_connectivity;
David Garcia Quintas43339842016-07-18 12:56:09 -0700329
David Garcia Quintas41bef452016-07-28 19:19:58 -0700330 /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily
331 * available RR picks */
David Garcia Quintas43339842016-07-18 12:56:09 -0700332 grpc_closure wrapped_on_complete;
333
334 /* arguments for the wrapped_on_complete closure */
335 wrapped_rr_closure_arg wc_arg;
David Garcia Quintas65318262016-07-29 13:43:38 -0700336} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700337
David Garcia Quintas65318262016-07-29 13:43:38 -0700338/* Keeps track and reacts to changes in connectivity of the RR instance */
339struct rr_connectivity_data {
340 grpc_closure on_change;
341 grpc_connectivity_state state;
342 glb_lb_policy *glb_policy;
343};
David Garcia Quintas8d489112016-07-29 15:20:42 -0700344
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700345static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
346 bool log) {
347 const grpc_grpclb_ip_address *ip = &server->ip_address;
348 if (server->port >> 16 != 0) {
349 if (log) {
350 gpr_log(GPR_ERROR,
351 "Invalid port '%d' at index %zu of serverlist. Ignoring.",
352 server->port, idx);
353 }
354 return false;
355 }
356
357 if (ip->size != 4 && ip->size != 16) {
358 if (log) {
359 gpr_log(GPR_ERROR,
360 "Expected IP to be 4 or 16 bytes, got %d at index %zu of "
361 "serverlist. Ignoring",
362 ip->size, idx);
363 }
364 return false;
365 }
366 return true;
367}
368
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700369/* populate \a addresses according to \a serverlist. Returns the number of
370 * addresses successfully parsed and added to \a addresses */
371static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
372 grpc_lb_address **lb_addresses) {
373 size_t num_valid = 0;
374 /* first pass: count how many are valid in order to allocate the necessary
375 * memory in a single block */
376 for (size_t i = 0; i < serverlist->num_servers; ++i) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700377 if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700378 }
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700379 if (num_valid == 0) {
380 return 0;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700381 }
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700382
383 /* allocate the memory block for the "resolved" addresses. */
384 grpc_resolved_address *r_addrs_memblock =
385 gpr_malloc(sizeof(grpc_resolved_address) * num_valid);
386 memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid);
387 grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid);
388 memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
389
390 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700391 * to the outside world) to be read by the RR policy during its creation.
392 * Given that the validity tests are very cheap, they are performed again
393 * instead of marking the valid ones during the first pass, as this would
394 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700395 size_t addr_idx = 0;
396 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
397 GPR_ASSERT(addr_idx < num_valid);
398 const grpc_grpclb_server *server = serverlist->servers[sl_idx];
399 if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
400 grpc_lb_address *const lb_addr = &lb_addrs[addr_idx];
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700401
402 /* address processing */
403 const uint16_t netorder_port = htons((uint16_t)server->port);
404 /* the addresses are given in binary format (a in(6)_addr struct) in
405 * server->ip_address.bytes. */
406 const grpc_grpclb_ip_address *ip = &server->ip_address;
407
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700408 lb_addr->resolved_address = &r_addrs_memblock[addr_idx];
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700409 struct sockaddr_storage *sa =
410 (struct sockaddr_storage *)lb_addr->resolved_address->addr;
411 size_t *sa_len = &lb_addr->resolved_address->len;
412 *sa_len = 0;
413 if (ip->size == 4) {
414 struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
415 *sa_len = sizeof(struct sockaddr_in);
416 memset(addr4, 0, *sa_len);
417 addr4->sin_family = AF_INET;
418 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
419 addr4->sin_port = netorder_port;
420 } else if (ip->size == 16) {
421 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
422 *sa_len = sizeof(struct sockaddr_in6);
423 memset(addr6, 0, *sa_len);
424 addr6->sin6_family = AF_INET;
425 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
426 addr6->sin6_port = netorder_port;
427 }
428 GPR_ASSERT(*sa_len > 0);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700429
430 /* lb token processing */
431 if (server->has_load_balance_token) {
432 const size_t lb_token_size =
433 GPR_ARRAY_SIZE(server->load_balance_token) - 1;
434 grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
435 (uint8_t *)server->load_balance_token, lb_token_size);
436 lb_addr->user_data = grpc_mdelem_from_metadata_strings(
437 GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
438 } else {
439 gpr_log(GPR_ERROR,
440 "Missing LB token for backend address '%s'. The empty token will "
441 "be used instead",
442 grpc_sockaddr_to_uri((struct sockaddr *)sa));
443 lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
444 }
445 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700446 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700447 GPR_ASSERT(addr_idx == num_valid);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700448 *lb_addresses = lb_addrs;
449 return num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700450}
451
David Garcia Quintas65318262016-07-29 13:43:38 -0700452static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
453 const grpc_grpclb_serverlist *serverlist,
454 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700455 GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700456
457 grpc_lb_policy_args args;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700458 memset(&args, 0, sizeof(args));
David Garcia Quintas65318262016-07-29 13:43:38 -0700459 args.client_channel_factory = glb_policy->cc_factory;
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700460 const size_t num_ok_addresses =
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700461 process_serverlist(serverlist, &args.addresses);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700462 args.num_addresses = num_ok_addresses;
David Garcia Quintas65318262016-07-29 13:43:38 -0700463
464 grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700465
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700466 if (glb_policy->lb_addresses != NULL) {
467 /* dispose of the previous version */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700468 lb_addrs_destroy(glb_policy->lb_addresses,
469 glb_policy->num_ok_serverlist_addresses);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700470 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700471 glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
472 glb_policy->lb_addresses = args.addresses;
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700473
David Garcia Quintas65318262016-07-29 13:43:38 -0700474 return rr;
475}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700476
David Garcia Quintas41bef452016-07-28 19:19:58 -0700477static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
David Garcia Quintas65318262016-07-29 13:43:38 -0700478 grpc_error *error) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700479 GPR_ASSERT(glb_policy->serverlist != NULL &&
480 glb_policy->serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700481 glb_policy->rr_policy =
482 create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
483
484 if (grpc_lb_glb_trace) {
485 gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
486 (intptr_t)glb_policy->rr_policy);
487 }
488 GPR_ASSERT(glb_policy->rr_policy != NULL);
489 glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity(
490 exec_ctx, glb_policy->rr_policy, &error);
491 grpc_lb_policy_notify_on_state_change(
492 exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
493 &glb_policy->rr_connectivity->on_change);
494 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700495 glb_policy->rr_connectivity->state,
496 GRPC_ERROR_REF(error), "rr_handover");
David Garcia Quintas65318262016-07-29 13:43:38 -0700497 grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
498
499 /* flush pending ops */
500 pending_pick *pp;
501 while ((pp = glb_policy->pending_picks)) {
502 glb_policy->pending_picks = pp->next;
503 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
504 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
505 if (grpc_lb_glb_trace) {
506 gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
507 (intptr_t)glb_policy->rr_policy);
508 }
David Garcia Quintas8aace512016-08-15 14:55:12 -0700509 const grpc_lb_policy_pick_args pick_args = {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700510 pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
511 pp->lb_token_mdelem_storage};
David Garcia Quintas8aace512016-08-15 14:55:12 -0700512 grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700513 (void **)&pp->wrapped_on_complete_arg.lb_token,
David Garcia Quintas8aace512016-08-15 14:55:12 -0700514 &pp->wrapped_on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -0700515 pp->wrapped_on_complete_arg.owning_pending_node = pp;
516 }
517
518 pending_ping *pping;
519 while ((pping = glb_policy->pending_pings)) {
520 glb_policy->pending_pings = pping->next;
521 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
522 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
523 if (grpc_lb_glb_trace) {
524 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
525 (intptr_t)glb_policy->rr_policy);
526 }
527 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
528 &pping->wrapped_notify);
529 pping->wrapped_notify_arg.owning_pending_node = pping;
530 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700531}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700532
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700533static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
534 grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700535 rr_connectivity_data *rr_conn_data = arg;
536 glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700537
David Garcia Quintas41bef452016-07-28 19:19:58 -0700538 if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
539 if (glb_policy->serverlist != NULL) {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700540 /* a RR policy is shutting down but there's a serverlist available ->
541 * perform a handover */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700542 rr_handover(exec_ctx, glb_policy, error);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700543 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700544 /* shutting down and no new serverlist available. Bail out. */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700545 gpr_free(rr_conn_data);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700546 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700547 } else {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700548 if (error == GRPC_ERROR_NONE) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700549 /* RR not shutting down. Mimic the RR's policy state */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700550 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700551 rr_conn_data->state, GRPC_ERROR_REF(error),
552 "glb_rr_connectivity_changed");
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700553 /* resubscribe */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700554 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
555 &rr_conn_data->state,
556 &rr_conn_data->on_change);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700557 } else { /* error */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700558 gpr_free(rr_conn_data);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700559 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700560 }
561}
562
David Garcia Quintas65318262016-07-29 13:43:38 -0700563static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
564 grpc_lb_policy_factory *factory,
565 grpc_lb_policy_args *args) {
566 glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
567 memset(glb_policy, 0, sizeof(*glb_policy));
568
569 /* All input addresses in args->addresses come from a resolver that claims
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700570 * they are LB services. It's the resolver's responsibility to make sure
571 * this
David Garcia Quintas65318262016-07-29 13:43:38 -0700572 * policy is only instantiated and used in that case.
573 *
574 * Create a client channel over them to communicate with a LB service */
575 glb_policy->cc_factory = args->client_channel_factory;
576 GPR_ASSERT(glb_policy->cc_factory != NULL);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700577 if (args->num_addresses == 0) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700578 return NULL;
579 }
580
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700581 /* this LB policy doesn't support \a user_data */
582 GPR_ASSERT(args->addresses[0].user_data == NULL);
583
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700584 /* construct a target from the addresses in args, given in the form
David Garcia Quintas65318262016-07-29 13:43:38 -0700585 * ipvX://ip1:port1,ip2:port2,...
586 * TODO(dgq): support mixed ip version */
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700587 char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
588 addr_strs[0] = grpc_sockaddr_to_uri(
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700589 (const struct sockaddr *)&args->addresses[0].resolved_address->addr);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700590 for (size_t i = 1; i < args->num_addresses; i++) {
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700591 /* this LB policy doesn't support \a user_data */
592 GPR_ASSERT(args->addresses[i].user_data == NULL);
593
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700594 GPR_ASSERT(
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700595 grpc_sockaddr_to_string(
596 &addr_strs[i],
597 (const struct sockaddr *)&args->addresses[i].resolved_address->addr,
598 true) == 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700599 }
600 size_t uri_path_len;
601 char *target_uri_str = gpr_strjoin_sep(
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700602 (const char **)addr_strs, args->num_addresses, ",", &uri_path_len);
David Garcia Quintas65318262016-07-29 13:43:38 -0700603
604 /* will pick using pick_first */
605 glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
606 exec_ctx, glb_policy->cc_factory, target_uri_str,
607 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
608
609 gpr_free(target_uri_str);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700610 for (size_t i = 0; i < args->num_addresses; i++) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700611 gpr_free(addr_strs[i]);
612 }
613 gpr_free(addr_strs);
614
615 if (glb_policy->lb_channel == NULL) {
616 gpr_free(glb_policy);
617 return NULL;
618 }
619
620 rr_connectivity_data *rr_connectivity =
621 gpr_malloc(sizeof(rr_connectivity_data));
622 memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700623 grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
David Garcia Quintas65318262016-07-29 13:43:38 -0700624 rr_connectivity);
625 rr_connectivity->glb_policy = glb_policy;
626 glb_policy->rr_connectivity = rr_connectivity;
627
628 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
629 gpr_mu_init(&glb_policy->mu);
630 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
631 "grpclb");
632 return &glb_policy->base;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700633}
634
David Garcia Quintas65318262016-07-29 13:43:38 -0700635static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
636 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
637 GPR_ASSERT(glb_policy->pending_picks == NULL);
638 GPR_ASSERT(glb_policy->pending_pings == NULL);
639 grpc_channel_destroy(glb_policy->lb_channel);
640 glb_policy->lb_channel = NULL;
641 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
642 if (glb_policy->serverlist != NULL) {
643 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
644 }
645 gpr_mu_destroy(&glb_policy->mu);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700646
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700647 lb_addrs_destroy(glb_policy->lb_addresses,
648 glb_policy->num_ok_serverlist_addresses);
David Garcia Quintas65318262016-07-29 13:43:38 -0700649 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700650}
651
David Garcia Quintas740759e2016-08-01 14:49:49 -0700652static void lb_client_data_destroy(struct lb_client_data *lb_client);
David Garcia Quintas65318262016-07-29 13:43:38 -0700653static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
654 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
655 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700656
David Garcia Quintas65318262016-07-29 13:43:38 -0700657 pending_pick *pp = glb_policy->pending_picks;
658 glb_policy->pending_picks = NULL;
659 pending_ping *pping = glb_policy->pending_pings;
660 glb_policy->pending_pings = NULL;
661 gpr_mu_unlock(&glb_policy->mu);
662
663 while (pp != NULL) {
664 pending_pick *next = pp->next;
665 *pp->target = NULL;
666 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE,
667 NULL);
668 gpr_free(pp);
669 pp = next;
670 }
671
672 while (pping != NULL) {
673 pending_ping *next = pping->next;
674 grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE,
675 NULL);
676 pping = next;
677 }
678
679 if (glb_policy->rr_policy) {
680 /* unsubscribe */
681 grpc_lb_policy_notify_on_state_change(
682 exec_ctx, glb_policy->rr_policy, NULL,
683 &glb_policy->rr_connectivity->on_change);
684 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
685 }
686
David Garcia Quintas740759e2016-08-01 14:49:49 -0700687 lb_client_data_destroy(glb_policy->lb_client);
688 glb_policy->lb_client = NULL;
689
David Garcia Quintas65318262016-07-29 13:43:38 -0700690 grpc_connectivity_state_set(
691 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
692 GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
693}
694
695static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
696 grpc_connected_subchannel **target) {
697 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
698 gpr_mu_lock(&glb_policy->mu);
699 pending_pick *pp = glb_policy->pending_picks;
700 glb_policy->pending_picks = NULL;
701 while (pp != NULL) {
702 pending_pick *next = pp->next;
703 if (pp->target == target) {
704 grpc_polling_entity_del_from_pollset_set(
705 exec_ctx, pp->pollent, glb_policy->base.interested_parties);
706 *target = NULL;
707 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
708 GRPC_ERROR_CANCELLED, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700709 } else {
710 pp->next = glb_policy->pending_picks;
711 glb_policy->pending_picks = pp;
712 }
713 pp = next;
714 }
715 gpr_mu_unlock(&glb_policy->mu);
716}
717
David Garcia Quintasa0e278e2016-08-01 11:36:14 -0700718static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client);
David Garcia Quintas65318262016-07-29 13:43:38 -0700719static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
720 uint32_t initial_metadata_flags_mask,
721 uint32_t initial_metadata_flags_eq) {
722 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
723 gpr_mu_lock(&glb_policy->mu);
724 if (glb_policy->lb_client != NULL) {
725 /* cancel the call to the load balancer service, if any */
726 grpc_call_cancel(lb_client_data_get_call(glb_policy->lb_client), NULL);
727 }
728 pending_pick *pp = glb_policy->pending_picks;
729 glb_policy->pending_picks = NULL;
730 while (pp != NULL) {
731 pending_pick *next = pp->next;
732 if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
733 initial_metadata_flags_eq) {
734 grpc_polling_entity_del_from_pollset_set(
735 exec_ctx, pp->pollent, glb_policy->base.interested_parties);
736 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
737 GRPC_ERROR_CANCELLED, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700738 } else {
739 pp->next = glb_policy->pending_picks;
740 glb_policy->pending_picks = pp;
741 }
742 pp = next;
743 }
744 gpr_mu_unlock(&glb_policy->mu);
745}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700746
David Garcia Quintas65318262016-07-29 13:43:38 -0700747static void query_for_backends(grpc_exec_ctx *exec_ctx,
748 glb_lb_policy *glb_policy);
749static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) {
750 glb_policy->started_picking = true;
751 query_for_backends(exec_ctx, glb_policy);
752}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700753
David Garcia Quintas65318262016-07-29 13:43:38 -0700754static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
755 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
756 gpr_mu_lock(&glb_policy->mu);
757 if (!glb_policy->started_picking) {
758 start_picking(exec_ctx, glb_policy);
759 }
760 gpr_mu_unlock(&glb_policy->mu);
761}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700762
David Garcia Quintas65318262016-07-29 13:43:38 -0700763static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
David Garcia Quintas8aace512016-08-15 14:55:12 -0700764 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700765 grpc_connected_subchannel **target, void **user_data,
David Garcia Quintas65318262016-07-29 13:43:38 -0700766 grpc_closure *on_complete) {
767 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700768
769 if (pick_args->lb_token_mdelem_storage == NULL) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700770 *target = NULL;
David Garcia Quintas6cc44fc2016-09-12 23:04:35 -0700771 grpc_exec_ctx_sched(
772 exec_ctx, on_complete,
773 GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
774 "won't work without it. Failing"),
775 NULL);
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700776 return 1;
777 }
778
David Garcia Quintas65318262016-07-29 13:43:38 -0700779 gpr_mu_lock(&glb_policy->mu);
780 int r;
781
782 if (glb_policy->rr_policy != NULL) {
783 if (grpc_lb_glb_trace) {
784 gpr_log(GPR_INFO, "about to PICK from 0x%" PRIxPTR "",
785 (intptr_t)glb_policy->rr_policy);
786 }
787 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
788 memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
789 glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
790 glb_policy->wc_arg.wrapped_closure = on_complete;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700791 glb_policy->wc_arg.lb_token_mdelem_storage =
792 pick_args->lb_token_mdelem_storage;
793 glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
794 glb_policy->wc_arg.owning_pending_node = NULL;
David Garcia Quintas65318262016-07-29 13:43:38 -0700795 grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
796 &glb_policy->wc_arg);
David Garcia Quintas8aace512016-08-15 14:55:12 -0700797
798 r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700799 (void **)&glb_policy->wc_arg.lb_token,
David Garcia Quintas65318262016-07-29 13:43:38 -0700800 &glb_policy->wrapped_on_complete);
801 if (r != 0) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700802 /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
David Garcia Quintas65318262016-07-29 13:43:38 -0700803 if (grpc_lb_glb_trace) {
804 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
805 (intptr_t)glb_policy->wc_arg.rr_policy);
806 }
807 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700808
809 /* add the load reporting initial metadata */
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700810 initial_metadata_add_lb_token(
811 pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
812 user_data_copy(glb_policy->wc_arg.lb_token));
David Garcia Quintas65318262016-07-29 13:43:38 -0700813 }
814 } else {
David Garcia Quintas8aace512016-08-15 14:55:12 -0700815 grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
David Garcia Quintas65318262016-07-29 13:43:38 -0700816 glb_policy->base.interested_parties);
David Garcia Quintas8aace512016-08-15 14:55:12 -0700817 add_pending_pick(&glb_policy->pending_picks, pick_args, target,
818 on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -0700819
820 if (!glb_policy->started_picking) {
821 start_picking(exec_ctx, glb_policy);
822 }
823 r = 0;
824 }
825 gpr_mu_unlock(&glb_policy->mu);
826 return r;
827}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700828
David Garcia Quintas65318262016-07-29 13:43:38 -0700829static grpc_connectivity_state glb_check_connectivity(
830 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
831 grpc_error **connectivity_error) {
832 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
833 grpc_connectivity_state st;
834 gpr_mu_lock(&glb_policy->mu);
835 st = grpc_connectivity_state_check(&glb_policy->state_tracker,
836 connectivity_error);
837 gpr_mu_unlock(&glb_policy->mu);
838 return st;
839}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700840
David Garcia Quintas65318262016-07-29 13:43:38 -0700841static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
842 grpc_closure *closure) {
843 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
844 gpr_mu_lock(&glb_policy->mu);
845 if (glb_policy->rr_policy) {
846 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
847 } else {
848 add_pending_ping(&glb_policy->pending_pings, closure);
849 if (!glb_policy->started_picking) {
850 start_picking(exec_ctx, glb_policy);
851 }
852 }
853 gpr_mu_unlock(&glb_policy->mu);
854}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700855
David Garcia Quintas65318262016-07-29 13:43:38 -0700856static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
857 grpc_lb_policy *pol,
858 grpc_connectivity_state *current,
859 grpc_closure *notify) {
860 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
861 gpr_mu_lock(&glb_policy->mu);
862 grpc_connectivity_state_notify_on_state_change(
863 exec_ctx, &glb_policy->state_tracker, current, notify);
864
865 gpr_mu_unlock(&glb_policy->mu);
866}
867
David Garcia Quintas8d489112016-07-29 15:20:42 -0700868/*
869 * lb_client_data
870 *
871 * Used internally for the client call to the LB */
David Garcia Quintas65318262016-07-29 13:43:38 -0700872typedef struct lb_client_data {
873 gpr_mu mu;
874
875 /* called once initial metadata's been sent */
876 grpc_closure md_sent;
877
David Garcia Quintas65318262016-07-29 13:43:38 -0700878 /* called once the LoadBalanceRequest has been sent to the LB server. See
879 * src/proto/grpc/.../load_balancer.proto */
880 grpc_closure req_sent;
881
882 /* A response from the LB server has been received (or error). Process it */
883 grpc_closure res_rcvd;
884
885 /* After the client has sent a close to the LB server */
886 grpc_closure close_sent;
887
888 /* ... and the status from the LB server has been received */
889 grpc_closure srv_status_rcvd;
890
891 grpc_call *lb_call; /* streaming call to the LB server, */
892 gpr_timespec deadline; /* for the streaming call to the LB server */
893
894 grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */
895 grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */
896
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700897 /* what's being sent to the LB server. Note that its value may vary if the
898 * LB
David Garcia Quintas65318262016-07-29 13:43:38 -0700899 * server indicates a redirect. */
900 grpc_byte_buffer *request_payload;
901
902 /* response from the LB server, if any. Processed in res_recv_cb() */
903 grpc_byte_buffer *response_payload;
904
905 /* the call's status and status detailset in srv_status_rcvd_cb() */
906 grpc_status_code status;
907 char *status_details;
908 size_t status_details_capacity;
909
910 /* pointer back to the enclosing policy */
911 glb_lb_policy *glb_policy;
912} lb_client_data;
913
914static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700915static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700916static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
917static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
918 grpc_error *error);
919static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
920 grpc_error *error);
921
922static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
923 lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data));
924 memset(lb_client, 0, sizeof(lb_client_data));
925
926 gpr_mu_init(&lb_client->mu);
927 grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
928
David Garcia Quintas65318262016-07-29 13:43:38 -0700929 grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
930 grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
931 grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
932 grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client);
933
934 /* TODO(dgq): get the deadline from the client config instead of fabricating
935 * one here. */
936 lb_client->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
937 gpr_time_from_seconds(3, GPR_TIMESPAN));
938
David Garcia Quintas15eba132016-08-09 15:20:48 -0700939 /* Note the following LB call progresses every time there's activity in \a
940 * glb_policy->base.interested_parties, which is comprised of the polling
941 * entities passed to glb_pick(). */
David Garcia Quintas65318262016-07-29 13:43:38 -0700942 lb_client->lb_call = grpc_channel_create_pollset_set_call(
943 glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
944 glb_policy->base.interested_parties, "/BalanceLoad",
945 NULL, /* FIXME(dgq): which "host" value to use? */
946 lb_client->deadline, NULL);
947
948 grpc_metadata_array_init(&lb_client->initial_metadata_recv);
949 grpc_metadata_array_init(&lb_client->trailing_metadata_recv);
950
951 grpc_grpclb_request *request = grpc_grpclb_request_create(
952 "load.balanced.service.name"); /* FIXME(dgq): get the name of the load
953 balanced service from the resolver */
954 gpr_slice request_payload_slice = grpc_grpclb_request_encode(request);
955 lb_client->request_payload =
956 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
957 gpr_slice_unref(request_payload_slice);
958 grpc_grpclb_request_destroy(request);
959
960 lb_client->status_details = NULL;
961 lb_client->status_details_capacity = 0;
962 lb_client->glb_policy = glb_policy;
963 return lb_client;
964}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700965
David Garcia Quintas65318262016-07-29 13:43:38 -0700966static void lb_client_data_destroy(lb_client_data *lb_client) {
David Garcia Quintas740759e2016-08-01 14:49:49 -0700967 grpc_call_destroy(lb_client->lb_call);
David Garcia Quintas65318262016-07-29 13:43:38 -0700968 grpc_metadata_array_destroy(&lb_client->initial_metadata_recv);
969 grpc_metadata_array_destroy(&lb_client->trailing_metadata_recv);
970
971 grpc_byte_buffer_destroy(lb_client->request_payload);
972
973 gpr_free(lb_client->status_details);
974 gpr_mu_destroy(&lb_client->mu);
975 gpr_free(lb_client);
976}
977static grpc_call *lb_client_data_get_call(lb_client_data *lb_client) {
978 return lb_client->lb_call;
979}
980
David Garcia Quintas8d489112016-07-29 15:20:42 -0700981/*
982 * Auxiliary functions and LB client callbacks.
983 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700984static void query_for_backends(grpc_exec_ctx *exec_ctx,
985 glb_lb_policy *glb_policy) {
986 GPR_ASSERT(glb_policy->lb_channel != NULL);
987
988 glb_policy->lb_client = lb_client_data_create(glb_policy);
989 grpc_call_error call_error;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700990 grpc_op ops[1];
991 memset(ops, 0, sizeof(ops));
992 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -0700993 op->op = GRPC_OP_SEND_INITIAL_METADATA;
994 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700995 op->flags = 0;
996 op->reserved = NULL;
997 op++;
David Garcia Quintas65318262016-07-29 13:43:38 -0700998 call_error = grpc_call_start_batch_and_execute(
999 exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
1000 &glb_policy->lb_client->md_sent);
1001 GPR_ASSERT(GRPC_CALL_OK == call_error);
1002
1003 op = ops;
1004 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1005 op->data.recv_status_on_client.trailing_metadata =
1006 &glb_policy->lb_client->trailing_metadata_recv;
1007 op->data.recv_status_on_client.status = &glb_policy->lb_client->status;
1008 op->data.recv_status_on_client.status_details =
1009 &glb_policy->lb_client->status_details;
1010 op->data.recv_status_on_client.status_details_capacity =
1011 &glb_policy->lb_client->status_details_capacity;
1012 op->flags = 0;
1013 op->reserved = NULL;
1014 op++;
1015 call_error = grpc_call_start_batch_and_execute(
1016 exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
1017 &glb_policy->lb_client->srv_status_rcvd);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001018 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001019}
1020
David Garcia Quintas4166cb02016-07-29 14:33:15 -07001021static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
1022 lb_client_data *lb_client = arg;
1023 GPR_ASSERT(lb_client->lb_call);
1024 grpc_op ops[1];
1025 memset(ops, 0, sizeof(ops));
1026 grpc_op *op = ops;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001027
1028 op->op = GRPC_OP_SEND_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001029 op->data.send_message = lb_client->request_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001030 op->flags = 0;
1031 op->reserved = NULL;
1032 op++;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001033 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1034 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1035 &lb_client->req_sent);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001036 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001037}
1038
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001039static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001040 lb_client_data *lb_client = arg;
David Garcia Quintas601bb122016-08-18 15:03:59 -07001041 GPR_ASSERT(lb_client->lb_call);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001042
David Garcia Quintas601bb122016-08-18 15:03:59 -07001043 grpc_op ops[2];
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001044 memset(ops, 0, sizeof(ops));
1045 grpc_op *op = ops;
1046
David Garcia Quintas601bb122016-08-18 15:03:59 -07001047 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1048 op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
1049 op->flags = 0;
1050 op->reserved = NULL;
1051 op++;
1052
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001053 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001054 op->data.recv_message = &lb_client->response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001055 op->flags = 0;
1056 op->reserved = NULL;
1057 op++;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001058 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1059 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1060 &lb_client->res_rcvd);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001061 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001062}
1063
David Garcia Quintas65318262016-07-29 13:43:38 -07001064static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001065 lb_client_data *lb_client = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001066 grpc_op ops[2];
1067 memset(ops, 0, sizeof(ops));
1068 grpc_op *op = ops;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001069 if (lb_client->response_payload != NULL) {
1070 /* Received data from the LB server. Look inside
David Garcia Quintas601bb122016-08-18 15:03:59 -07001071 * lb_client->response_payload, for a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001072 grpc_byte_buffer_reader bbr;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001073 grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001074 gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas41bef452016-07-28 19:19:58 -07001075 grpc_byte_buffer_destroy(lb_client->response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001076 grpc_grpclb_serverlist *serverlist =
1077 grpc_grpclb_response_parse_serverlist(response_slice);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001078 if (serverlist != NULL) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001079 gpr_slice_unref(response_slice);
1080 if (grpc_lb_glb_trace) {
1081 gpr_log(GPR_INFO, "Serverlist with %zu servers received",
1082 serverlist->num_servers);
1083 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001084
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001085 /* update serverlist */
1086 if (serverlist->num_servers > 0) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001087 if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist,
1088 serverlist)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001089 if (grpc_lb_glb_trace) {
1090 gpr_log(GPR_INFO,
1091 "Incoming server list identical to current, ignoring.");
1092 }
1093 } else { /* new serverlist */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001094 if (lb_client->glb_policy->serverlist != NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001095 /* dispose of the old serverlist */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001096 grpc_grpclb_destroy_serverlist(lb_client->glb_policy->serverlist);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001097 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001098 /* and update the copy in the glb_lb_policy instance */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001099 lb_client->glb_policy->serverlist = serverlist;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001100 }
David Garcia Quintas41bef452016-07-28 19:19:58 -07001101 if (lb_client->glb_policy->rr_policy == NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001102 /* initial "handover", in this case from a null RR policy, meaning
David Garcia Quintas43339842016-07-18 12:56:09 -07001103 * it'll just create the first RR policy instance */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001104 rr_handover(exec_ctx, lb_client->glb_policy, error);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001105 } else {
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -07001106 /* unref the RR policy, eventually leading to its substitution with
1107 * a
David Garcia Quintasea11d162016-07-14 17:27:28 -07001108 * new one constructed from the received serverlist (see
David Garcia Quintas348cfdb2016-08-19 12:19:43 -07001109 * glb_rr_connectivity_changed) */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001110 GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
David Garcia Quintasea11d162016-07-14 17:27:28 -07001111 "serverlist_received");
1112 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001113 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001114 if (grpc_lb_glb_trace) {
1115 gpr_log(GPR_INFO,
1116 "Received empty server list. Picks will stay pending until a "
1117 "response with > 0 servers is received");
1118 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001119 }
1120
David Garcia Quintasea11d162016-07-14 17:27:28 -07001121 /* keep listening for serverlist updates */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001122 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001123 op->data.recv_message = &lb_client->response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001124 op->flags = 0;
1125 op->reserved = NULL;
1126 op++;
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001127 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas41bef452016-07-28 19:19:58 -07001128 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1129 &lb_client->res_rcvd); /* loop */
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001130 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001131 return;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001132 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001133
1134 GPR_ASSERT(serverlist == NULL);
1135 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'",
1136 gpr_dump_slice(response_slice, GPR_DUMP_ASCII));
1137 gpr_slice_unref(response_slice);
1138
1139 /* Disconnect from server returning invalid response. */
1140 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
1141 op->flags = 0;
1142 op->reserved = NULL;
1143 op++;
1144 grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas41bef452016-07-28 19:19:58 -07001145 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1146 &lb_client->close_sent);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001147 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001148 }
1149 /* empty payload: call cancelled by server. Cleanups happening in
1150 * srv_status_rcvd_cb */
1151}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001152
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001153static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
1154 grpc_error *error) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001155 if (grpc_lb_glb_trace) {
1156 gpr_log(GPR_INFO,
1157 "Close from LB client sent. Waiting from server status now");
1158 }
1159}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001160
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001161static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001162 grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001163 lb_client_data *lb_client = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001164 if (grpc_lb_glb_trace) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001165 gpr_log(GPR_INFO,
1166 "status from lb server received. Status = %d, Details = '%s', "
1167 "Capaticy "
1168 "= %zu",
David Garcia Quintas41bef452016-07-28 19:19:58 -07001169 lb_client->status, lb_client->status_details,
1170 lb_client->status_details_capacity);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001171 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -07001172 /* TODO(dgq): deal with stream termination properly (fire up another one?
1173 * fail
David Garcia Quintas43339842016-07-18 12:56:09 -07001174 * the original call?) */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001175}
1176
David Garcia Quintas8d489112016-07-29 15:20:42 -07001177/* Code wiring the policy with the rest of the core */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001178static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
1179 glb_destroy, glb_shutdown, glb_pick,
1180 glb_cancel_pick, glb_cancel_picks, glb_ping_one,
1181 glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
1182
1183static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
1184
1185static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
1186
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001187static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
1188 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
1189
1190static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
1191
1192grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
1193 return &glb_lb_policy_factory;
1194}
1195
1196/* Plugin registration */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001197void grpc_lb_policy_grpclb_init() {
1198 grpc_register_lb_policy(grpc_glb_lb_factory_create());
1199 grpc_register_tracer("glb", &grpc_lb_glb_trace);
1200}
1201
1202void grpc_lb_policy_grpclb_shutdown() {}