blob: f1e9ecc558c4785331f8727883f460964a79c5b2 [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070034/** Implementation of the gRPC LB policy.
35 *
David Garcia Quintas43339842016-07-18 12:56:09 -070036 * This policy takes as input a set of resolved addresses {a1..an} for which the
37 * LB set was set (it's the resolver's responsibility to ensure this). That is
38 * to say, {a1..an} represent a collection of LB servers.
39 *
40 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
41 * This channel behaves just like a regular channel. In particular, the
42 * constructed URI over the addresses a1..an will use the default pick first
43 * policy to select from this list of LB server backends.
44 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070045 * The first time the policy gets a request for a pick, a ping, or to exit the
46 * idle state, \a query_for_backends() is called. It creates an instance of \a
David Garcia Quintas43339842016-07-18 12:56:09 -070047 * lb_client_data, an internal struct meant to contain the data associated with
48 * the internal communication with the LB server. This instance is created via
49 * \a lb_client_data_create(). There, the call over lb_channel to pick-first
50 * from {a1..an} is created, the \a LoadBalancingRequest message is assembled
51 * and all necessary callbacks for the progress of the internal call configured.
52 *
53 * Back in \a query_for_backends(), the internal *streaming* call to the LB
54 * server (whichever address from {a1..an} pick-first chose) is kicked off.
55 * It'll progress over the callbacks configured in \a lb_client_data_create()
56 * (see the field docstrings of \a lb_client_data for more details).
57 *
58 * If the call fails with UNIMPLEMENTED, the original call will also fail.
59 * There's a misconfiguration somewhere: at least one of {a1..an} isn't a LB
60 * server, which contradicts the LB bit being set. If the internal call times
61 * out, the usual behavior of pick-first applies, continuing to pick from the
62 * list {a1..an}.
63 *
David Garcia Quintas65318262016-07-29 13:43:38 -070064 * Upon sucesss, a \a LoadBalancingResponse is expected in \a res_recv_cb. An
David Garcia Quintas43339842016-07-18 12:56:09 -070065 * invalid one results in the termination of the streaming call. A new streaming
66 * call should be created if possible, failing the original call otherwise.
67 * For a valid \a LoadBalancingResponse, the server list of actual backends is
68 * extracted. A Round Robin policy will be created from this list. There are two
69 * possible scenarios:
70 *
71 * 1. This is the first server list received. There was no previous instance of
72 * the Round Robin policy. \a rr_handover() will instantiate the RR policy
73 * and perform all the pending operations over it.
74 * 2. There's already a RR policy instance active. We need to introduce the new
75 * one build from the new serverlist, but taking care not to disrupt the
76 * operations in progress over the old RR instance. This is done by
77 * decreasing the reference count on the old policy. The moment no more
78 * references are held on the old RR policy, it'll be destroyed and \a
79 * rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state.
80 * At this point we can transition to a new RR instance safely, which is done
81 * once again via \a rr_handover().
82 *
83 *
84 * Once a RR policy instance is in place (and getting updated as described),
85 * calls to for a pick, a ping or a cancellation will be serviced right away by
86 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintasd4a756b2016-07-19 11:35:15 -070087 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist
David Garcia Quintas43339842016-07-18 12:56:09 -070088 * is received, etc), pick/ping requests are added to a list of pending
89 * picks/pings to be flushed and serviced as part of \a rr_handover() the moment
90 * the RR policy instance becomes available.
91 *
92 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
93 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070094
95/* TODO(dgq):
96 * - Implement LB service forwarding (point 2c. in the doc's diagram).
97 */
98
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070099#include <string.h>
100
101#include <grpc/byte_buffer_reader.h>
102#include <grpc/grpc.h>
103#include <grpc/support/alloc.h>
104#include <grpc/support/host_port.h>
105#include <grpc/support/string_util.h>
106
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700107#include "src/core/ext/client_config/client_channel_factory.h"
108#include "src/core/ext/client_config/lb_policy_registry.h"
109#include "src/core/ext/client_config/parse_address.h"
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700110#include "src/core/ext/lb_policy/grpclb/grpclb.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700111#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
112#include "src/core/lib/iomgr/sockaddr_utils.h"
113#include "src/core/lib/support/string.h"
114#include "src/core/lib/surface/call.h"
115#include "src/core/lib/surface/channel.h"
116
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700117int grpc_lb_glb_trace = 0;
118
119typedef struct wrapped_rr_closure_arg {
David Garcia Quintas43339842016-07-18 12:56:09 -0700120 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
121 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700122 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700123
124 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700125 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700126
127 /* when not NULL, represents a pending_{pick,ping} node to be freed upon
128 * closure execution */
129 void *owning_pending_node; /* to be freed if not NULL */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700130} wrapped_rr_closure_arg;
131
132/* The \a on_complete closure passed as part of the pick requires keeping a
133 * reference to its associated round robin instance. We wrap this closure in
134 * order to unref the round robin instance upon its invocation */
135static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700136 grpc_error *error) {
David Garcia Quintas43339842016-07-18 12:56:09 -0700137 wrapped_rr_closure_arg *wc_arg = arg;
David Garcia Quintas43339842016-07-18 12:56:09 -0700138 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700139 if (grpc_lb_glb_trace) {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700140 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
David Garcia Quintas43339842016-07-18 12:56:09 -0700141 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700142 }
David Garcia Quintas43339842016-07-18 12:56:09 -0700143 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700144 }
David Garcia Quintas5a876162016-07-18 13:08:42 -0700145 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
146 grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
David Garcia Quintas43339842016-07-18 12:56:09 -0700147 gpr_free(wc_arg->owning_pending_node);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700148}
149
David Garcia Quintasea11d162016-07-14 17:27:28 -0700150/* Linked list of pending pick requests. It stores all information needed to
151 * eventually call (Round Robin's) pick() on them. They mainly stay pending
152 * waiting for the RR policy to be created/updated.
153 *
154 * One particularity is the wrapping of the user-provided \a on_complete closure
155 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
156 * order to correctly unref the RR policy instance upon completion of the pick.
157 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700158typedef struct pending_pick {
159 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700160
161 /* polling entity for the pick()'s async notification */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700162 grpc_polling_entity *pollent;
David Garcia Quintas43339842016-07-18 12:56:09 -0700163
164 /* the initial metadata for the pick. See grpc_lb_policy_pick() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700165 grpc_metadata_batch *initial_metadata;
David Garcia Quintas43339842016-07-18 12:56:09 -0700166
167 /* bitmask passed to pick() and used for selective cancelling. See
168 * grpc_lb_policy_cancel_picks() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700169 uint32_t initial_metadata_flags;
David Garcia Quintas43339842016-07-18 12:56:09 -0700170
171 /* output argument where to store the pick()ed connected subchannel, or NULL
172 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700173 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700174
175 /* a closure wrapping the original on_complete one to be invoked once the
176 * pick() has completed (regardless of success) */
177 grpc_closure wrapped_on_complete;
178
179 /* args for wrapped_on_complete */
180 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700181} pending_pick;
182
David Garcia Quintas65318262016-07-29 13:43:38 -0700183static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
184 grpc_metadata_batch *initial_metadata,
185 uint32_t initial_metadata_flags,
186 grpc_connected_subchannel **target,
187 grpc_closure *on_complete) {
188 pending_pick *pp = gpr_malloc(sizeof(*pp));
189 memset(pp, 0, sizeof(pending_pick));
190 memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
191 pp->next = *root;
192 pp->pollent = pollent;
193 pp->target = target;
194 pp->initial_metadata = initial_metadata;
195 pp->initial_metadata_flags = initial_metadata_flags;
196 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
197 grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
198 &pp->wrapped_on_complete_arg);
199 *root = pp;
200}
201
David Garcia Quintasea11d162016-07-14 17:27:28 -0700202/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700203typedef struct pending_ping {
204 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700205
206 /* a closure wrapping the original on_complete one to be invoked once the
207 * ping() has completed (regardless of success) */
208 grpc_closure wrapped_notify;
209
210 /* args for wrapped_notify */
211 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700212} pending_ping;
213
David Garcia Quintas65318262016-07-29 13:43:38 -0700214static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
215 pending_ping *pping = gpr_malloc(sizeof(*pping));
216 memset(pping, 0, sizeof(pending_ping));
217 memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
218 pping->next = *root;
219 grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure,
220 &pping->wrapped_notify_arg);
221 pping->wrapped_notify_arg.wrapped_closure = notify;
222 *root = pping;
223}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700224
David Garcia Quintas65318262016-07-29 13:43:38 -0700225typedef struct rr_connectivity_data rr_connectivity_data;
226typedef struct lb_client_data lb_client_data;
227static const grpc_lb_policy_vtable glb_lb_policy_vtable;
228typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700229 /** base policy: must be first */
230 grpc_lb_policy base;
231
232 /** mutex protecting remaining members */
233 gpr_mu mu;
234
235 grpc_client_channel_factory *cc_factory;
236
237 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700238 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700239
240 /** the RR policy to use of the backend servers returned by the LB server */
241 grpc_lb_policy *rr_policy;
242
243 bool started_picking;
244
245 /** our connectivity state tracker */
246 grpc_connectivity_state_tracker state_tracker;
247
David Garcia Quintasea11d162016-07-14 17:27:28 -0700248 /** stores the deserialized response from the LB. May be NULL until one such
249 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700250 grpc_grpclb_serverlist *serverlist;
251
David Garcia Quintasea11d162016-07-14 17:27:28 -0700252 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700253 pending_pick *pending_picks;
254
David Garcia Quintasea11d162016-07-14 17:27:28 -0700255 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700256 pending_ping *pending_pings;
257
David Garcia Quintasea11d162016-07-14 17:27:28 -0700258 /** client data associated with the LB server communication */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700259 lb_client_data *lb_client;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700260
261 /** for tracking of the RR connectivity */
262 rr_connectivity_data *rr_connectivity;
David Garcia Quintas43339842016-07-18 12:56:09 -0700263
David Garcia Quintas41bef452016-07-28 19:19:58 -0700264 /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily
265 * available RR picks */
David Garcia Quintas43339842016-07-18 12:56:09 -0700266 grpc_closure wrapped_on_complete;
267
268 /* arguments for the wrapped_on_complete closure */
269 wrapped_rr_closure_arg wc_arg;
David Garcia Quintas65318262016-07-29 13:43:38 -0700270} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700271
David Garcia Quintas65318262016-07-29 13:43:38 -0700272/* Keeps track and reacts to changes in connectivity of the RR instance */
273struct rr_connectivity_data {
274 grpc_closure on_change;
275 grpc_connectivity_state state;
276 glb_lb_policy *glb_policy;
277};
278static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
279 const grpc_grpclb_serverlist *serverlist,
280 glb_lb_policy *glb_policy) {
281 /* TODO(dgq): support mixed ip version */
282 GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
283 char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
284 for (size_t i = 0; i < serverlist->num_servers; ++i) {
285 gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
286 serverlist->servers[i]->port);
287 }
288
289 size_t uri_path_len;
290 char *concat_ipports = gpr_strjoin_sep(
291 (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
292
293 grpc_lb_policy_args args;
294 args.client_channel_factory = glb_policy->cc_factory;
295 args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
296 args.addresses->naddrs = serverlist->num_servers;
297 args.addresses->addrs =
298 gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
299 size_t out_addrs_idx = 0;
300 for (size_t i = 0; i < serverlist->num_servers; ++i) {
301 grpc_uri uri;
302 struct sockaddr_storage sa;
303 size_t sa_len;
304 uri.path = host_ports[i];
305 if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
306 memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
307 args.addresses->addrs[out_addrs_idx].len = sa_len;
308 ++out_addrs_idx;
309 } else {
310 gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
311 host_ports[i]);
312 }
313 }
314
315 grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
316
317 gpr_free(concat_ipports);
318 for (size_t i = 0; i < serverlist->num_servers; i++) {
319 gpr_free(host_ports[i]);
320 }
321 gpr_free(host_ports);
322 gpr_free(args.addresses->addrs);
323 gpr_free(args.addresses);
324 return rr;
325}
David Garcia Quintas41bef452016-07-28 19:19:58 -0700326static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
David Garcia Quintas65318262016-07-29 13:43:38 -0700327 grpc_error *error) {
328 GRPC_ERROR_REF(error);
329 glb_policy->rr_policy =
330 create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
331
332 if (grpc_lb_glb_trace) {
333 gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
334 (intptr_t)glb_policy->rr_policy);
335 }
336 GPR_ASSERT(glb_policy->rr_policy != NULL);
337 glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity(
338 exec_ctx, glb_policy->rr_policy, &error);
339 grpc_lb_policy_notify_on_state_change(
340 exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
341 &glb_policy->rr_connectivity->on_change);
342 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
343 glb_policy->rr_connectivity->state, error,
344 "rr_handover");
345 grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
346
347 /* flush pending ops */
348 pending_pick *pp;
349 while ((pp = glb_policy->pending_picks)) {
350 glb_policy->pending_picks = pp->next;
351 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
352 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
353 if (grpc_lb_glb_trace) {
354 gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
355 (intptr_t)glb_policy->rr_policy);
356 }
357 grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
358 pp->initial_metadata, pp->initial_metadata_flags,
359 pp->target, &pp->wrapped_on_complete);
360 pp->wrapped_on_complete_arg.owning_pending_node = pp;
361 }
362
363 pending_ping *pping;
364 while ((pping = glb_policy->pending_pings)) {
365 glb_policy->pending_pings = pping->next;
366 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
367 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
368 if (grpc_lb_glb_trace) {
369 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
370 (intptr_t)glb_policy->rr_policy);
371 }
372 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
373 &pping->wrapped_notify);
374 pping->wrapped_notify_arg.owning_pending_node = pping;
375 }
376 GRPC_ERROR_UNREF(error);
377}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700378static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700379 grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700380 rr_connectivity_data *rr_conn_data = arg;
381 glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
382 if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
383 if (glb_policy->serverlist != NULL) {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700384 /* a RR policy is shutting down but there's a serverlist available ->
385 * perform a handover */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700386 rr_handover(exec_ctx, glb_policy, error);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700387 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700388 /* shutting down and no new serverlist available. Bail out. */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700389 gpr_free(rr_conn_data);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700390 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700391 } else {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700392 if (error == GRPC_ERROR_NONE) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700393 /* RR not shutting down. Mimic the RR's policy state */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700394 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
395 rr_conn_data->state, error,
396 "rr_connectivity_changed");
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700397 /* resubscribe */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700398 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
399 &rr_conn_data->state,
400 &rr_conn_data->on_change);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700401 } else { /* error */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700402 gpr_free(rr_conn_data);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700403 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700404 }
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700405 GRPC_ERROR_UNREF(error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700406}
407
David Garcia Quintas65318262016-07-29 13:43:38 -0700408static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
409 grpc_lb_policy_factory *factory,
410 grpc_lb_policy_args *args) {
411 glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
412 memset(glb_policy, 0, sizeof(*glb_policy));
413
414 /* All input addresses in args->addresses come from a resolver that claims
415 * they are LB services. It's the resolver's responsibility to make sure this
416 * policy is only instantiated and used in that case.
417 *
418 * Create a client channel over them to communicate with a LB service */
419 glb_policy->cc_factory = args->client_channel_factory;
420 GPR_ASSERT(glb_policy->cc_factory != NULL);
421 if (args->addresses->naddrs == 0) {
422 return NULL;
423 }
424
425 /* construct a target from the args->addresses, in the form
426 * ipvX://ip1:port1,ip2:port2,...
427 * TODO(dgq): support mixed ip version */
428 char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
429 addr_strs[0] =
430 grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
431 for (size_t i = 1; i < args->addresses->naddrs; i++) {
432 GPR_ASSERT(grpc_sockaddr_to_string(
433 &addr_strs[i],
434 (const struct sockaddr *)&args->addresses->addrs[i],
435 true) == 0);
436 }
437 size_t uri_path_len;
438 char *target_uri_str = gpr_strjoin_sep(
439 (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
440
441 /* will pick using pick_first */
442 glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
443 exec_ctx, glb_policy->cc_factory, target_uri_str,
444 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
445
446 gpr_free(target_uri_str);
447 for (size_t i = 0; i < args->addresses->naddrs; i++) {
448 gpr_free(addr_strs[i]);
449 }
450 gpr_free(addr_strs);
451
452 if (glb_policy->lb_channel == NULL) {
453 gpr_free(glb_policy);
454 return NULL;
455 }
456
457 rr_connectivity_data *rr_connectivity =
458 gpr_malloc(sizeof(rr_connectivity_data));
459 memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
460 grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
461 rr_connectivity);
462 rr_connectivity->glb_policy = glb_policy;
463 glb_policy->rr_connectivity = rr_connectivity;
464
465 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
466 gpr_mu_init(&glb_policy->mu);
467 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
468 "grpclb");
469 return &glb_policy->base;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700470}
471
David Garcia Quintas65318262016-07-29 13:43:38 -0700472static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
473 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
474 GPR_ASSERT(glb_policy->pending_picks == NULL);
475 GPR_ASSERT(glb_policy->pending_pings == NULL);
476 grpc_channel_destroy(glb_policy->lb_channel);
477 glb_policy->lb_channel = NULL;
478 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
479 if (glb_policy->serverlist != NULL) {
480 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
481 }
482 gpr_mu_destroy(&glb_policy->mu);
483 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700484}
485
David Garcia Quintas65318262016-07-29 13:43:38 -0700486static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
487 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
488 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700489
David Garcia Quintas65318262016-07-29 13:43:38 -0700490 pending_pick *pp = glb_policy->pending_picks;
491 glb_policy->pending_picks = NULL;
492 pending_ping *pping = glb_policy->pending_pings;
493 glb_policy->pending_pings = NULL;
494 gpr_mu_unlock(&glb_policy->mu);
495
496 while (pp != NULL) {
497 pending_pick *next = pp->next;
498 *pp->target = NULL;
499 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE,
500 NULL);
501 gpr_free(pp);
502 pp = next;
503 }
504
505 while (pping != NULL) {
506 pending_ping *next = pping->next;
507 grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE,
508 NULL);
509 pping = next;
510 }
511
512 if (glb_policy->rr_policy) {
513 /* unsubscribe */
514 grpc_lb_policy_notify_on_state_change(
515 exec_ctx, glb_policy->rr_policy, NULL,
516 &glb_policy->rr_connectivity->on_change);
517 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
518 }
519
520 grpc_connectivity_state_set(
521 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
522 GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
523}
524
525static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
526 grpc_connected_subchannel **target) {
527 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
528 gpr_mu_lock(&glb_policy->mu);
529 pending_pick *pp = glb_policy->pending_picks;
530 glb_policy->pending_picks = NULL;
531 while (pp != NULL) {
532 pending_pick *next = pp->next;
533 if (pp->target == target) {
534 grpc_polling_entity_del_from_pollset_set(
535 exec_ctx, pp->pollent, glb_policy->base.interested_parties);
536 *target = NULL;
537 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
538 GRPC_ERROR_CANCELLED, NULL);
539 gpr_free(pp);
540 } else {
541 pp->next = glb_policy->pending_picks;
542 glb_policy->pending_picks = pp;
543 }
544 pp = next;
545 }
546 gpr_mu_unlock(&glb_policy->mu);
547}
548
549static grpc_call *lb_client_data_get_call(lb_client_data *lb_client);
550static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
551 uint32_t initial_metadata_flags_mask,
552 uint32_t initial_metadata_flags_eq) {
553 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
554 gpr_mu_lock(&glb_policy->mu);
555 if (glb_policy->lb_client != NULL) {
556 /* cancel the call to the load balancer service, if any */
557 grpc_call_cancel(lb_client_data_get_call(glb_policy->lb_client), NULL);
558 }
559 pending_pick *pp = glb_policy->pending_picks;
560 glb_policy->pending_picks = NULL;
561 while (pp != NULL) {
562 pending_pick *next = pp->next;
563 if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
564 initial_metadata_flags_eq) {
565 grpc_polling_entity_del_from_pollset_set(
566 exec_ctx, pp->pollent, glb_policy->base.interested_parties);
567 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
568 GRPC_ERROR_CANCELLED, NULL);
569 gpr_free(pp);
570 } else {
571 pp->next = glb_policy->pending_picks;
572 glb_policy->pending_picks = pp;
573 }
574 pp = next;
575 }
576 gpr_mu_unlock(&glb_policy->mu);
577}
578static void query_for_backends(grpc_exec_ctx *exec_ctx,
579 glb_lb_policy *glb_policy);
580static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) {
581 glb_policy->started_picking = true;
582 query_for_backends(exec_ctx, glb_policy);
583}
584static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
585 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
586 gpr_mu_lock(&glb_policy->mu);
587 if (!glb_policy->started_picking) {
588 start_picking(exec_ctx, glb_policy);
589 }
590 gpr_mu_unlock(&glb_policy->mu);
591}
592static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
593 grpc_polling_entity *pollent,
594 grpc_metadata_batch *initial_metadata,
595 uint32_t initial_metadata_flags,
596 grpc_connected_subchannel **target,
597 grpc_closure *on_complete) {
598 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
599 gpr_mu_lock(&glb_policy->mu);
600 int r;
601
602 if (glb_policy->rr_policy != NULL) {
603 if (grpc_lb_glb_trace) {
604 gpr_log(GPR_INFO, "about to PICK from 0x%" PRIxPTR "",
605 (intptr_t)glb_policy->rr_policy);
606 }
607 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
608 memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
609 glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
610 glb_policy->wc_arg.wrapped_closure = on_complete;
611 grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
612 &glb_policy->wc_arg);
613 r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
614 initial_metadata, initial_metadata_flags, target,
615 &glb_policy->wrapped_on_complete);
616 if (r != 0) {
617 /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
618 * policy and notify the original callback */
619 glb_policy->wc_arg.wrapped_closure = NULL;
620 if (grpc_lb_glb_trace) {
621 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
622 (intptr_t)glb_policy->wc_arg.rr_policy);
623 }
624 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
625 grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure,
626 GRPC_ERROR_NONE, NULL);
627 }
628 } else {
629 grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
630 glb_policy->base.interested_parties);
631 add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
632 initial_metadata_flags, target, on_complete);
633
634 if (!glb_policy->started_picking) {
635 start_picking(exec_ctx, glb_policy);
636 }
637 r = 0;
638 }
639 gpr_mu_unlock(&glb_policy->mu);
640 return r;
641}
642static grpc_connectivity_state glb_check_connectivity(
643 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
644 grpc_error **connectivity_error) {
645 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
646 grpc_connectivity_state st;
647 gpr_mu_lock(&glb_policy->mu);
648 st = grpc_connectivity_state_check(&glb_policy->state_tracker,
649 connectivity_error);
650 gpr_mu_unlock(&glb_policy->mu);
651 return st;
652}
653static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
654 grpc_closure *closure) {
655 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
656 gpr_mu_lock(&glb_policy->mu);
657 if (glb_policy->rr_policy) {
658 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
659 } else {
660 add_pending_ping(&glb_policy->pending_pings, closure);
661 if (!glb_policy->started_picking) {
662 start_picking(exec_ctx, glb_policy);
663 }
664 }
665 gpr_mu_unlock(&glb_policy->mu);
666}
667static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
668 grpc_lb_policy *pol,
669 grpc_connectivity_state *current,
670 grpc_closure *notify) {
671 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
672 gpr_mu_lock(&glb_policy->mu);
673 grpc_connectivity_state_notify_on_state_change(
674 exec_ctx, &glb_policy->state_tracker, current, notify);
675
676 gpr_mu_unlock(&glb_policy->mu);
677}
678
679/* Used internally for the client call to the LB */
680typedef struct lb_client_data {
681 gpr_mu mu;
682
683 /* called once initial metadata's been sent */
684 grpc_closure md_sent;
685
686 /* called once initial metadata's been received */
687 grpc_closure md_rcvd;
688
689 /* called once the LoadBalanceRequest has been sent to the LB server. See
690 * src/proto/grpc/.../load_balancer.proto */
691 grpc_closure req_sent;
692
693 /* A response from the LB server has been received (or error). Process it */
694 grpc_closure res_rcvd;
695
696 /* After the client has sent a close to the LB server */
697 grpc_closure close_sent;
698
699 /* ... and the status from the LB server has been received */
700 grpc_closure srv_status_rcvd;
701
702 grpc_call *lb_call; /* streaming call to the LB server, */
703 gpr_timespec deadline; /* for the streaming call to the LB server */
704
705 grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */
706 grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */
707
708 /* what's being sent to the LB server. Note that its value may vary if the LB
709 * server indicates a redirect. */
710 grpc_byte_buffer *request_payload;
711
712 /* response from the LB server, if any. Processed in res_recv_cb() */
713 grpc_byte_buffer *response_payload;
714
715 /* the call's status and status detailset in srv_status_rcvd_cb() */
716 grpc_status_code status;
717 char *status_details;
718 size_t status_details_capacity;
719
720 /* pointer back to the enclosing policy */
721 glb_lb_policy *glb_policy;
722} lb_client_data;
723
724static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
725static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
726static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
727static void req_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
728static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
729static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
730 grpc_error *error);
731static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
732 grpc_error *error);
733
734static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
735 lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data));
736 memset(lb_client, 0, sizeof(lb_client_data));
737
738 gpr_mu_init(&lb_client->mu);
739 grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
740
741 grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client);
742 grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
743 grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
744 grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
745 grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client);
746
747 /* TODO(dgq): get the deadline from the client config instead of fabricating
748 * one here. */
749 lb_client->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
750 gpr_time_from_seconds(3, GPR_TIMESPAN));
751
752 lb_client->lb_call = grpc_channel_create_pollset_set_call(
753 glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
754 glb_policy->base.interested_parties, "/BalanceLoad",
755 NULL, /* FIXME(dgq): which "host" value to use? */
756 lb_client->deadline, NULL);
757
758 grpc_metadata_array_init(&lb_client->initial_metadata_recv);
759 grpc_metadata_array_init(&lb_client->trailing_metadata_recv);
760
761 grpc_grpclb_request *request = grpc_grpclb_request_create(
762 "load.balanced.service.name"); /* FIXME(dgq): get the name of the load
763 balanced service from the resolver */
764 gpr_slice request_payload_slice = grpc_grpclb_request_encode(request);
765 lb_client->request_payload =
766 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
767 gpr_slice_unref(request_payload_slice);
768 grpc_grpclb_request_destroy(request);
769
770 lb_client->status_details = NULL;
771 lb_client->status_details_capacity = 0;
772 lb_client->glb_policy = glb_policy;
773 return lb_client;
774}
775static void lb_client_data_destroy(lb_client_data *lb_client) {
776 grpc_metadata_array_destroy(&lb_client->initial_metadata_recv);
777 grpc_metadata_array_destroy(&lb_client->trailing_metadata_recv);
778
779 grpc_byte_buffer_destroy(lb_client->request_payload);
780
781 gpr_free(lb_client->status_details);
782 gpr_mu_destroy(&lb_client->mu);
783 gpr_free(lb_client);
784}
785static grpc_call *lb_client_data_get_call(lb_client_data *lb_client) {
786 return lb_client->lb_call;
787}
788
789static void query_for_backends(grpc_exec_ctx *exec_ctx,
790 glb_lb_policy *glb_policy) {
791 GPR_ASSERT(glb_policy->lb_channel != NULL);
792
793 glb_policy->lb_client = lb_client_data_create(glb_policy);
794 grpc_call_error call_error;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700795 grpc_op ops[1];
796 memset(ops, 0, sizeof(ops));
797 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -0700798 op->op = GRPC_OP_SEND_INITIAL_METADATA;
799 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700800 op->flags = 0;
801 op->reserved = NULL;
802 op++;
David Garcia Quintas65318262016-07-29 13:43:38 -0700803 call_error = grpc_call_start_batch_and_execute(
804 exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
805 &glb_policy->lb_client->md_sent);
806 GPR_ASSERT(GRPC_CALL_OK == call_error);
807
808 op = ops;
809 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
810 op->data.recv_status_on_client.trailing_metadata =
811 &glb_policy->lb_client->trailing_metadata_recv;
812 op->data.recv_status_on_client.status = &glb_policy->lb_client->status;
813 op->data.recv_status_on_client.status_details =
814 &glb_policy->lb_client->status_details;
815 op->data.recv_status_on_client.status_details_capacity =
816 &glb_policy->lb_client->status_details_capacity;
817 op->flags = 0;
818 op->reserved = NULL;
819 op++;
820 call_error = grpc_call_start_batch_and_execute(
821 exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
822 &glb_policy->lb_client->srv_status_rcvd);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700823 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700824}
825
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700826static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700827 lb_client_data *lb_client = arg;
828 GPR_ASSERT(lb_client->lb_call);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700829 grpc_op ops[1];
830 memset(ops, 0, sizeof(ops));
831 grpc_op *op = ops;
832
833 op->op = GRPC_OP_SEND_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700834 op->data.send_message = lb_client->request_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700835 op->flags = 0;
836 op->reserved = NULL;
837 op++;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700838 grpc_call_error call_error = grpc_call_start_batch_and_execute(
839 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
840 &lb_client->req_sent);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700841 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700842}
843
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700844static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700845 lb_client_data *lb_client = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700846
847 grpc_op ops[1];
848 memset(ops, 0, sizeof(ops));
849 grpc_op *op = ops;
850
851 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700852 op->data.recv_message = &lb_client->response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700853 op->flags = 0;
854 op->reserved = NULL;
855 op++;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700856 grpc_call_error call_error = grpc_call_start_batch_and_execute(
857 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
858 &lb_client->res_rcvd);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700859 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700860}
861
David Garcia Quintas65318262016-07-29 13:43:38 -0700862static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700863 lb_client_data *lb_client = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700864 grpc_op ops[2];
865 memset(ops, 0, sizeof(ops));
866 grpc_op *op = ops;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700867 if (lb_client->response_payload != NULL) {
868 /* Received data from the LB server. Look inside
869 * lb_client->response_payload, for
David Garcia Quintasea11d162016-07-14 17:27:28 -0700870 * a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700871 grpc_byte_buffer_reader bbr;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700872 grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700873 gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas41bef452016-07-28 19:19:58 -0700874 grpc_byte_buffer_destroy(lb_client->response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700875 grpc_grpclb_serverlist *serverlist =
876 grpc_grpclb_response_parse_serverlist(response_slice);
David Garcia Quintasea11d162016-07-14 17:27:28 -0700877 if (serverlist != NULL) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700878 gpr_slice_unref(response_slice);
879 if (grpc_lb_glb_trace) {
880 gpr_log(GPR_INFO, "Serverlist with %zu servers received",
881 serverlist->num_servers);
882 }
David Garcia Quintasea11d162016-07-14 17:27:28 -0700883
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700884 /* update serverlist */
885 if (serverlist->num_servers > 0) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700886 if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist,
887 serverlist)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700888 if (grpc_lb_glb_trace) {
889 gpr_log(GPR_INFO,
890 "Incoming server list identical to current, ignoring.");
891 }
892 } else { /* new serverlist */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700893 if (lb_client->glb_policy->serverlist != NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700894 /* dispose of the old serverlist */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700895 grpc_grpclb_destroy_serverlist(lb_client->glb_policy->serverlist);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700896 }
David Garcia Quintasea11d162016-07-14 17:27:28 -0700897 /* and update the copy in the glb_lb_policy instance */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700898 lb_client->glb_policy->serverlist = serverlist;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700899 }
David Garcia Quintas41bef452016-07-28 19:19:58 -0700900 if (lb_client->glb_policy->rr_policy == NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700901 /* initial "handover", in this case from a null RR policy, meaning
David Garcia Quintas43339842016-07-18 12:56:09 -0700902 * it'll just create the first RR policy instance */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700903 rr_handover(exec_ctx, lb_client->glb_policy, error);
David Garcia Quintasea11d162016-07-14 17:27:28 -0700904 } else {
905 /* unref the RR policy, eventually leading to its substitution with a
906 * new one constructed from the received serverlist (see
907 * rr_connectivity_changed) */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700908 GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
David Garcia Quintasea11d162016-07-14 17:27:28 -0700909 "serverlist_received");
910 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700911 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700912 if (grpc_lb_glb_trace) {
913 gpr_log(GPR_INFO,
914 "Received empty server list. Picks will stay pending until a "
915 "response with > 0 servers is received");
916 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700917 }
918
David Garcia Quintasea11d162016-07-14 17:27:28 -0700919 /* keep listening for serverlist updates */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700920 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700921 op->data.recv_message = &lb_client->response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700922 op->flags = 0;
923 op->reserved = NULL;
924 op++;
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700925 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas41bef452016-07-28 19:19:58 -0700926 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
927 &lb_client->res_rcvd); /* loop */
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700928 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700929 return;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700930 }
David Garcia Quintasea11d162016-07-14 17:27:28 -0700931
932 GPR_ASSERT(serverlist == NULL);
933 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'",
934 gpr_dump_slice(response_slice, GPR_DUMP_ASCII));
935 gpr_slice_unref(response_slice);
936
937 /* Disconnect from server returning invalid response. */
938 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
939 op->flags = 0;
940 op->reserved = NULL;
941 op++;
942 grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas41bef452016-07-28 19:19:58 -0700943 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
944 &lb_client->close_sent);
David Garcia Quintasea11d162016-07-14 17:27:28 -0700945 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700946 }
947 /* empty payload: call cancelled by server. Cleanups happening in
948 * srv_status_rcvd_cb */
949}
David Garcia Quintasea11d162016-07-14 17:27:28 -0700950
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700951static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
952 grpc_error *error) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700953 if (grpc_lb_glb_trace) {
954 gpr_log(GPR_INFO,
955 "Close from LB client sent. Waiting from server status now");
956 }
957}
David Garcia Quintasea11d162016-07-14 17:27:28 -0700958
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700959static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700960 grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700961 lb_client_data *lb_client = arg;
962 glb_lb_policy *glb_policy = lb_client->glb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700963 if (grpc_lb_glb_trace) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700964 gpr_log(GPR_INFO,
965 "status from lb server received. Status = %d, Details = '%s', "
966 "Capaticy "
967 "= %zu",
David Garcia Quintas41bef452016-07-28 19:19:58 -0700968 lb_client->status, lb_client->status_details,
969 lb_client->status_details_capacity);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700970 }
971
David Garcia Quintas41bef452016-07-28 19:19:58 -0700972 grpc_call_destroy(lb_client->lb_call);
973 lb_client_data_destroy(lb_client);
974 glb_policy->lb_client = NULL;
David Garcia Quintas43339842016-07-18 12:56:09 -0700975 /* TODO(dgq): deal with stream termination properly (fire up another one? fail
976 * the original call?) */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700977}
978
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700979static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
980 glb_destroy, glb_shutdown, glb_pick,
981 glb_cancel_pick, glb_cancel_picks, glb_ping_one,
982 glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
983
984static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
985
986static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
987
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700988static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
989 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
990
991static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
992
993grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
994 return &glb_lb_policy_factory;
995}
996
997/* Plugin registration */
998
999void grpc_lb_policy_grpclb_init() {
1000 grpc_register_lb_policy(grpc_glb_lb_factory_create());
1001 grpc_register_tracer("glb", &grpc_lb_glb_trace);
1002}
1003
1004void grpc_lb_policy_grpclb_shutdown() {}