blob: 5628040b7009da622817a38586edc5f7bd96754e [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070034/** Implementation of the gRPC LB policy.
35 *
David Garcia Quintas43339842016-07-18 12:56:09 -070036 * This policy takes as input a set of resolved addresses {a1..an} for which the
37 * LB set was set (it's the resolver's responsibility to ensure this). That is
38 * to say, {a1..an} represent a collection of LB servers.
39 *
40 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
41 * This channel behaves just like a regular channel. In particular, the
42 * constructed URI over the addresses a1..an will use the default pick first
43 * policy to select from this list of LB server backends.
44 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070045 * The first time the policy gets a request for a pick, a ping, or to exit the
46 * idle state, \a query_for_backends() is called. It creates an instance of \a
David Garcia Quintas43339842016-07-18 12:56:09 -070047 * lb_client_data, an internal struct meant to contain the data associated with
48 * the internal communication with the LB server. This instance is created via
49 * \a lb_client_data_create(). There, the call over lb_channel to pick-first
50 * from {a1..an} is created, the \a LoadBalancingRequest message is assembled
51 * and all necessary callbacks for the progress of the internal call configured.
52 *
53 * Back in \a query_for_backends(), the internal *streaming* call to the LB
54 * server (whichever address from {a1..an} pick-first chose) is kicked off.
55 * It'll progress over the callbacks configured in \a lb_client_data_create()
56 * (see the field docstrings of \a lb_client_data for more details).
57 *
58 * If the call fails with UNIMPLEMENTED, the original call will also fail.
59 * There's a misconfiguration somewhere: at least one of {a1..an} isn't a LB
60 * server, which contradicts the LB bit being set. If the internal call times
61 * out, the usual behavior of pick-first applies, continuing to pick from the
62 * list {a1..an}.
63 *
David Garcia Quintas65318262016-07-29 13:43:38 -070064 * Upon sucesss, a \a LoadBalancingResponse is expected in \a res_recv_cb. An
David Garcia Quintas43339842016-07-18 12:56:09 -070065 * invalid one results in the termination of the streaming call. A new streaming
66 * call should be created if possible, failing the original call otherwise.
67 * For a valid \a LoadBalancingResponse, the server list of actual backends is
68 * extracted. A Round Robin policy will be created from this list. There are two
69 * possible scenarios:
70 *
71 * 1. This is the first server list received. There was no previous instance of
72 * the Round Robin policy. \a rr_handover() will instantiate the RR policy
73 * and perform all the pending operations over it.
74 * 2. There's already a RR policy instance active. We need to introduce the new
75 * one build from the new serverlist, but taking care not to disrupt the
76 * operations in progress over the old RR instance. This is done by
77 * decreasing the reference count on the old policy. The moment no more
78 * references are held on the old RR policy, it'll be destroyed and \a
79 * rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN state.
80 * At this point we can transition to a new RR instance safely, which is done
81 * once again via \a rr_handover().
82 *
83 *
84 * Once a RR policy instance is in place (and getting updated as described),
85 * calls to for a pick, a ping or a cancellation will be serviced right away by
86 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintasd4a756b2016-07-19 11:35:15 -070087 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist
David Garcia Quintas43339842016-07-18 12:56:09 -070088 * is received, etc), pick/ping requests are added to a list of pending
89 * picks/pings to be flushed and serviced as part of \a rr_handover() the moment
90 * the RR policy instance becomes available.
91 *
92 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
93 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070094
95/* TODO(dgq):
96 * - Implement LB service forwarding (point 2c. in the doc's diagram).
97 */
98
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070099#include <string.h>
100
101#include <grpc/byte_buffer_reader.h>
102#include <grpc/grpc.h>
103#include <grpc/support/alloc.h>
104#include <grpc/support/host_port.h>
105#include <grpc/support/string_util.h>
106
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700107#include "src/core/ext/client_config/client_channel_factory.h"
108#include "src/core/ext/client_config/lb_policy_registry.h"
109#include "src/core/ext/client_config/parse_address.h"
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700110#include "src/core/ext/lb_policy/grpclb/grpclb.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700111#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
112#include "src/core/lib/iomgr/sockaddr_utils.h"
113#include "src/core/lib/support/string.h"
114#include "src/core/lib/surface/call.h"
115#include "src/core/lib/surface/channel.h"
116
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700117int grpc_lb_glb_trace = 0;
118
119typedef struct wrapped_rr_closure_arg {
David Garcia Quintas43339842016-07-18 12:56:09 -0700120 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
121 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700122 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700123
124 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700125 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700126
127 /* when not NULL, represents a pending_{pick,ping} node to be freed upon
128 * closure execution */
129 void *owning_pending_node; /* to be freed if not NULL */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700130} wrapped_rr_closure_arg;
131
132/* The \a on_complete closure passed as part of the pick requires keeping a
133 * reference to its associated round robin instance. We wrap this closure in
134 * order to unref the round robin instance upon its invocation */
135static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700136 grpc_error *error) {
David Garcia Quintas43339842016-07-18 12:56:09 -0700137 wrapped_rr_closure_arg *wc_arg = arg;
David Garcia Quintas43339842016-07-18 12:56:09 -0700138 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700139 if (grpc_lb_glb_trace) {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700140 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
David Garcia Quintas43339842016-07-18 12:56:09 -0700141 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700142 }
David Garcia Quintas43339842016-07-18 12:56:09 -0700143 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700144 }
David Garcia Quintas5a876162016-07-18 13:08:42 -0700145 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
146 grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
David Garcia Quintas43339842016-07-18 12:56:09 -0700147 gpr_free(wc_arg->owning_pending_node);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700148}
149
David Garcia Quintasea11d162016-07-14 17:27:28 -0700150/* Linked list of pending pick requests. It stores all information needed to
151 * eventually call (Round Robin's) pick() on them. They mainly stay pending
152 * waiting for the RR policy to be created/updated.
153 *
154 * One particularity is the wrapping of the user-provided \a on_complete closure
155 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
156 * order to correctly unref the RR policy instance upon completion of the pick.
157 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700158typedef struct pending_pick {
159 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700160
161 /* polling entity for the pick()'s async notification */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700162 grpc_polling_entity *pollent;
David Garcia Quintas43339842016-07-18 12:56:09 -0700163
164 /* the initial metadata for the pick. See grpc_lb_policy_pick() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700165 grpc_metadata_batch *initial_metadata;
David Garcia Quintas43339842016-07-18 12:56:09 -0700166
167 /* bitmask passed to pick() and used for selective cancelling. See
168 * grpc_lb_policy_cancel_picks() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700169 uint32_t initial_metadata_flags;
David Garcia Quintas43339842016-07-18 12:56:09 -0700170
171 /* output argument where to store the pick()ed connected subchannel, or NULL
172 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700173 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700174
175 /* a closure wrapping the original on_complete one to be invoked once the
176 * pick() has completed (regardless of success) */
177 grpc_closure wrapped_on_complete;
178
179 /* args for wrapped_on_complete */
180 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700181} pending_pick;
182
David Garcia Quintas65318262016-07-29 13:43:38 -0700183static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
184 grpc_metadata_batch *initial_metadata,
185 uint32_t initial_metadata_flags,
186 grpc_connected_subchannel **target,
187 grpc_closure *on_complete) {
188 pending_pick *pp = gpr_malloc(sizeof(*pp));
189 memset(pp, 0, sizeof(pending_pick));
190 memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
191 pp->next = *root;
192 pp->pollent = pollent;
193 pp->target = target;
194 pp->initial_metadata = initial_metadata;
195 pp->initial_metadata_flags = initial_metadata_flags;
196 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
197 grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
198 &pp->wrapped_on_complete_arg);
199 *root = pp;
200}
201
David Garcia Quintasea11d162016-07-14 17:27:28 -0700202/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700203typedef struct pending_ping {
204 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700205
206 /* a closure wrapping the original on_complete one to be invoked once the
207 * ping() has completed (regardless of success) */
208 grpc_closure wrapped_notify;
209
210 /* args for wrapped_notify */
211 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700212} pending_ping;
213
David Garcia Quintas65318262016-07-29 13:43:38 -0700214static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
215 pending_ping *pping = gpr_malloc(sizeof(*pping));
216 memset(pping, 0, sizeof(pending_ping));
217 memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
218 pping->next = *root;
219 grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure,
220 &pping->wrapped_notify_arg);
221 pping->wrapped_notify_arg.wrapped_closure = notify;
222 *root = pping;
223}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700224
David Garcia Quintas8d489112016-07-29 15:20:42 -0700225
226/*
227 * glb_lb_policy
228 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700229typedef struct rr_connectivity_data rr_connectivity_data;
230typedef struct lb_client_data lb_client_data;
231static const grpc_lb_policy_vtable glb_lb_policy_vtable;
232typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700233 /** base policy: must be first */
234 grpc_lb_policy base;
235
236 /** mutex protecting remaining members */
237 gpr_mu mu;
238
239 grpc_client_channel_factory *cc_factory;
240
241 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700242 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700243
244 /** the RR policy to use of the backend servers returned by the LB server */
245 grpc_lb_policy *rr_policy;
246
247 bool started_picking;
248
249 /** our connectivity state tracker */
250 grpc_connectivity_state_tracker state_tracker;
251
David Garcia Quintasea11d162016-07-14 17:27:28 -0700252 /** stores the deserialized response from the LB. May be NULL until one such
253 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700254 grpc_grpclb_serverlist *serverlist;
255
David Garcia Quintasea11d162016-07-14 17:27:28 -0700256 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700257 pending_pick *pending_picks;
258
David Garcia Quintasea11d162016-07-14 17:27:28 -0700259 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700260 pending_ping *pending_pings;
261
David Garcia Quintasea11d162016-07-14 17:27:28 -0700262 /** client data associated with the LB server communication */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700263 lb_client_data *lb_client;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700264
265 /** for tracking of the RR connectivity */
266 rr_connectivity_data *rr_connectivity;
David Garcia Quintas43339842016-07-18 12:56:09 -0700267
David Garcia Quintas41bef452016-07-28 19:19:58 -0700268 /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily
269 * available RR picks */
David Garcia Quintas43339842016-07-18 12:56:09 -0700270 grpc_closure wrapped_on_complete;
271
272 /* arguments for the wrapped_on_complete closure */
273 wrapped_rr_closure_arg wc_arg;
David Garcia Quintas65318262016-07-29 13:43:38 -0700274} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700275
David Garcia Quintas65318262016-07-29 13:43:38 -0700276/* Keeps track and reacts to changes in connectivity of the RR instance */
277struct rr_connectivity_data {
278 grpc_closure on_change;
279 grpc_connectivity_state state;
280 glb_lb_policy *glb_policy;
281};
David Garcia Quintas8d489112016-07-29 15:20:42 -0700282
David Garcia Quintas65318262016-07-29 13:43:38 -0700283static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
284 const grpc_grpclb_serverlist *serverlist,
285 glb_lb_policy *glb_policy) {
286 /* TODO(dgq): support mixed ip version */
287 GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
288 char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
289 for (size_t i = 0; i < serverlist->num_servers; ++i) {
290 gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
291 serverlist->servers[i]->port);
292 }
293
294 size_t uri_path_len;
295 char *concat_ipports = gpr_strjoin_sep(
296 (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
297
298 grpc_lb_policy_args args;
299 args.client_channel_factory = glb_policy->cc_factory;
300 args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
301 args.addresses->naddrs = serverlist->num_servers;
302 args.addresses->addrs =
303 gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
304 size_t out_addrs_idx = 0;
305 for (size_t i = 0; i < serverlist->num_servers; ++i) {
306 grpc_uri uri;
307 struct sockaddr_storage sa;
308 size_t sa_len;
309 uri.path = host_ports[i];
310 if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
311 memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
312 args.addresses->addrs[out_addrs_idx].len = sa_len;
313 ++out_addrs_idx;
314 } else {
315 gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
316 host_ports[i]);
317 }
318 }
319
320 grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
321
322 gpr_free(concat_ipports);
323 for (size_t i = 0; i < serverlist->num_servers; i++) {
324 gpr_free(host_ports[i]);
325 }
326 gpr_free(host_ports);
327 gpr_free(args.addresses->addrs);
328 gpr_free(args.addresses);
329 return rr;
330}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700331
David Garcia Quintas41bef452016-07-28 19:19:58 -0700332static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
David Garcia Quintas65318262016-07-29 13:43:38 -0700333 grpc_error *error) {
334 GRPC_ERROR_REF(error);
335 glb_policy->rr_policy =
336 create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
337
338 if (grpc_lb_glb_trace) {
339 gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
340 (intptr_t)glb_policy->rr_policy);
341 }
342 GPR_ASSERT(glb_policy->rr_policy != NULL);
343 glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity(
344 exec_ctx, glb_policy->rr_policy, &error);
345 grpc_lb_policy_notify_on_state_change(
346 exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
347 &glb_policy->rr_connectivity->on_change);
348 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
349 glb_policy->rr_connectivity->state, error,
350 "rr_handover");
351 grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
352
353 /* flush pending ops */
354 pending_pick *pp;
355 while ((pp = glb_policy->pending_picks)) {
356 glb_policy->pending_picks = pp->next;
357 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
358 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
359 if (grpc_lb_glb_trace) {
360 gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
361 (intptr_t)glb_policy->rr_policy);
362 }
363 grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pp->pollent,
364 pp->initial_metadata, pp->initial_metadata_flags,
365 pp->target, &pp->wrapped_on_complete);
366 pp->wrapped_on_complete_arg.owning_pending_node = pp;
367 }
368
369 pending_ping *pping;
370 while ((pping = glb_policy->pending_pings)) {
371 glb_policy->pending_pings = pping->next;
372 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
373 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
374 if (grpc_lb_glb_trace) {
375 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
376 (intptr_t)glb_policy->rr_policy);
377 }
378 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
379 &pping->wrapped_notify);
380 pping->wrapped_notify_arg.owning_pending_node = pping;
381 }
382 GRPC_ERROR_UNREF(error);
383}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700384
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700385static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700386 grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700387 rr_connectivity_data *rr_conn_data = arg;
388 glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
389 if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
390 if (glb_policy->serverlist != NULL) {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700391 /* a RR policy is shutting down but there's a serverlist available ->
392 * perform a handover */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700393 rr_handover(exec_ctx, glb_policy, error);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700394 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700395 /* shutting down and no new serverlist available. Bail out. */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700396 gpr_free(rr_conn_data);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700397 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700398 } else {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700399 if (error == GRPC_ERROR_NONE) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700400 /* RR not shutting down. Mimic the RR's policy state */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700401 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
402 rr_conn_data->state, error,
403 "rr_connectivity_changed");
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700404 /* resubscribe */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700405 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
406 &rr_conn_data->state,
407 &rr_conn_data->on_change);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700408 } else { /* error */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700409 gpr_free(rr_conn_data);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700410 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700411 }
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700412 GRPC_ERROR_UNREF(error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700413}
414
David Garcia Quintas65318262016-07-29 13:43:38 -0700415static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
416 grpc_lb_policy_factory *factory,
417 grpc_lb_policy_args *args) {
418 glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
419 memset(glb_policy, 0, sizeof(*glb_policy));
420
421 /* All input addresses in args->addresses come from a resolver that claims
422 * they are LB services. It's the resolver's responsibility to make sure this
423 * policy is only instantiated and used in that case.
424 *
425 * Create a client channel over them to communicate with a LB service */
426 glb_policy->cc_factory = args->client_channel_factory;
427 GPR_ASSERT(glb_policy->cc_factory != NULL);
428 if (args->addresses->naddrs == 0) {
429 return NULL;
430 }
431
432 /* construct a target from the args->addresses, in the form
433 * ipvX://ip1:port1,ip2:port2,...
434 * TODO(dgq): support mixed ip version */
435 char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
436 addr_strs[0] =
437 grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
438 for (size_t i = 1; i < args->addresses->naddrs; i++) {
439 GPR_ASSERT(grpc_sockaddr_to_string(
440 &addr_strs[i],
441 (const struct sockaddr *)&args->addresses->addrs[i],
442 true) == 0);
443 }
444 size_t uri_path_len;
445 char *target_uri_str = gpr_strjoin_sep(
446 (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
447
448 /* will pick using pick_first */
449 glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
450 exec_ctx, glb_policy->cc_factory, target_uri_str,
451 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
452
453 gpr_free(target_uri_str);
454 for (size_t i = 0; i < args->addresses->naddrs; i++) {
455 gpr_free(addr_strs[i]);
456 }
457 gpr_free(addr_strs);
458
459 if (glb_policy->lb_channel == NULL) {
460 gpr_free(glb_policy);
461 return NULL;
462 }
463
464 rr_connectivity_data *rr_connectivity =
465 gpr_malloc(sizeof(rr_connectivity_data));
466 memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
467 grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
468 rr_connectivity);
469 rr_connectivity->glb_policy = glb_policy;
470 glb_policy->rr_connectivity = rr_connectivity;
471
472 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
473 gpr_mu_init(&glb_policy->mu);
474 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
475 "grpclb");
476 return &glb_policy->base;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700477}
478
David Garcia Quintas65318262016-07-29 13:43:38 -0700479static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
480 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
481 GPR_ASSERT(glb_policy->pending_picks == NULL);
482 GPR_ASSERT(glb_policy->pending_pings == NULL);
483 grpc_channel_destroy(glb_policy->lb_channel);
484 glb_policy->lb_channel = NULL;
485 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
486 if (glb_policy->serverlist != NULL) {
487 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
488 }
489 gpr_mu_destroy(&glb_policy->mu);
490 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700491}
492
David Garcia Quintas65318262016-07-29 13:43:38 -0700493static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
494 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
495 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700496
David Garcia Quintas65318262016-07-29 13:43:38 -0700497 pending_pick *pp = glb_policy->pending_picks;
498 glb_policy->pending_picks = NULL;
499 pending_ping *pping = glb_policy->pending_pings;
500 glb_policy->pending_pings = NULL;
501 gpr_mu_unlock(&glb_policy->mu);
502
503 while (pp != NULL) {
504 pending_pick *next = pp->next;
505 *pp->target = NULL;
506 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE,
507 NULL);
508 gpr_free(pp);
509 pp = next;
510 }
511
512 while (pping != NULL) {
513 pending_ping *next = pping->next;
514 grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE,
515 NULL);
516 pping = next;
517 }
518
519 if (glb_policy->rr_policy) {
520 /* unsubscribe */
521 grpc_lb_policy_notify_on_state_change(
522 exec_ctx, glb_policy->rr_policy, NULL,
523 &glb_policy->rr_connectivity->on_change);
524 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
525 }
526
527 grpc_connectivity_state_set(
528 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
529 GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
530}
531
532static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
533 grpc_connected_subchannel **target) {
534 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
535 gpr_mu_lock(&glb_policy->mu);
536 pending_pick *pp = glb_policy->pending_picks;
537 glb_policy->pending_picks = NULL;
538 while (pp != NULL) {
539 pending_pick *next = pp->next;
540 if (pp->target == target) {
541 grpc_polling_entity_del_from_pollset_set(
542 exec_ctx, pp->pollent, glb_policy->base.interested_parties);
543 *target = NULL;
544 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
545 GRPC_ERROR_CANCELLED, NULL);
546 gpr_free(pp);
547 } else {
548 pp->next = glb_policy->pending_picks;
549 glb_policy->pending_picks = pp;
550 }
551 pp = next;
552 }
553 gpr_mu_unlock(&glb_policy->mu);
554}
555
556static grpc_call *lb_client_data_get_call(lb_client_data *lb_client);
557static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
558 uint32_t initial_metadata_flags_mask,
559 uint32_t initial_metadata_flags_eq) {
560 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
561 gpr_mu_lock(&glb_policy->mu);
562 if (glb_policy->lb_client != NULL) {
563 /* cancel the call to the load balancer service, if any */
564 grpc_call_cancel(lb_client_data_get_call(glb_policy->lb_client), NULL);
565 }
566 pending_pick *pp = glb_policy->pending_picks;
567 glb_policy->pending_picks = NULL;
568 while (pp != NULL) {
569 pending_pick *next = pp->next;
570 if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
571 initial_metadata_flags_eq) {
572 grpc_polling_entity_del_from_pollset_set(
573 exec_ctx, pp->pollent, glb_policy->base.interested_parties);
574 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
575 GRPC_ERROR_CANCELLED, NULL);
576 gpr_free(pp);
577 } else {
578 pp->next = glb_policy->pending_picks;
579 glb_policy->pending_picks = pp;
580 }
581 pp = next;
582 }
583 gpr_mu_unlock(&glb_policy->mu);
584}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700585
David Garcia Quintas65318262016-07-29 13:43:38 -0700586static void query_for_backends(grpc_exec_ctx *exec_ctx,
587 glb_lb_policy *glb_policy);
588static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) {
589 glb_policy->started_picking = true;
590 query_for_backends(exec_ctx, glb_policy);
591}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700592
David Garcia Quintas65318262016-07-29 13:43:38 -0700593static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
594 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
595 gpr_mu_lock(&glb_policy->mu);
596 if (!glb_policy->started_picking) {
597 start_picking(exec_ctx, glb_policy);
598 }
599 gpr_mu_unlock(&glb_policy->mu);
600}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700601
David Garcia Quintas65318262016-07-29 13:43:38 -0700602static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
603 grpc_polling_entity *pollent,
604 grpc_metadata_batch *initial_metadata,
605 uint32_t initial_metadata_flags,
606 grpc_connected_subchannel **target,
607 grpc_closure *on_complete) {
608 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
609 gpr_mu_lock(&glb_policy->mu);
610 int r;
611
612 if (glb_policy->rr_policy != NULL) {
613 if (grpc_lb_glb_trace) {
614 gpr_log(GPR_INFO, "about to PICK from 0x%" PRIxPTR "",
615 (intptr_t)glb_policy->rr_policy);
616 }
617 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
618 memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
619 glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
620 glb_policy->wc_arg.wrapped_closure = on_complete;
621 grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
622 &glb_policy->wc_arg);
623 r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pollent,
624 initial_metadata, initial_metadata_flags, target,
625 &glb_policy->wrapped_on_complete);
626 if (r != 0) {
627 /* the call to grpc_lb_policy_pick has been sychronous. Unreffing the RR
628 * policy and notify the original callback */
629 glb_policy->wc_arg.wrapped_closure = NULL;
630 if (grpc_lb_glb_trace) {
631 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
632 (intptr_t)glb_policy->wc_arg.rr_policy);
633 }
634 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
635 grpc_exec_ctx_sched(exec_ctx, glb_policy->wc_arg.wrapped_closure,
636 GRPC_ERROR_NONE, NULL);
637 }
638 } else {
639 grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
640 glb_policy->base.interested_parties);
641 add_pending_pick(&glb_policy->pending_picks, pollent, initial_metadata,
642 initial_metadata_flags, target, on_complete);
643
644 if (!glb_policy->started_picking) {
645 start_picking(exec_ctx, glb_policy);
646 }
647 r = 0;
648 }
649 gpr_mu_unlock(&glb_policy->mu);
650 return r;
651}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700652
David Garcia Quintas65318262016-07-29 13:43:38 -0700653static grpc_connectivity_state glb_check_connectivity(
654 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
655 grpc_error **connectivity_error) {
656 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
657 grpc_connectivity_state st;
658 gpr_mu_lock(&glb_policy->mu);
659 st = grpc_connectivity_state_check(&glb_policy->state_tracker,
660 connectivity_error);
661 gpr_mu_unlock(&glb_policy->mu);
662 return st;
663}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700664
David Garcia Quintas65318262016-07-29 13:43:38 -0700665static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
666 grpc_closure *closure) {
667 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
668 gpr_mu_lock(&glb_policy->mu);
669 if (glb_policy->rr_policy) {
670 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
671 } else {
672 add_pending_ping(&glb_policy->pending_pings, closure);
673 if (!glb_policy->started_picking) {
674 start_picking(exec_ctx, glb_policy);
675 }
676 }
677 gpr_mu_unlock(&glb_policy->mu);
678}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700679
David Garcia Quintas65318262016-07-29 13:43:38 -0700680static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
681 grpc_lb_policy *pol,
682 grpc_connectivity_state *current,
683 grpc_closure *notify) {
684 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
685 gpr_mu_lock(&glb_policy->mu);
686 grpc_connectivity_state_notify_on_state_change(
687 exec_ctx, &glb_policy->state_tracker, current, notify);
688
689 gpr_mu_unlock(&glb_policy->mu);
690}
691
David Garcia Quintas8d489112016-07-29 15:20:42 -0700692
693/*
694 * lb_client_data
695 *
696 * Used internally for the client call to the LB */
David Garcia Quintas65318262016-07-29 13:43:38 -0700697typedef struct lb_client_data {
698 gpr_mu mu;
699
700 /* called once initial metadata's been sent */
701 grpc_closure md_sent;
702
703 /* called once initial metadata's been received */
704 grpc_closure md_rcvd;
705
706 /* called once the LoadBalanceRequest has been sent to the LB server. See
707 * src/proto/grpc/.../load_balancer.proto */
708 grpc_closure req_sent;
709
710 /* A response from the LB server has been received (or error). Process it */
711 grpc_closure res_rcvd;
712
713 /* After the client has sent a close to the LB server */
714 grpc_closure close_sent;
715
716 /* ... and the status from the LB server has been received */
717 grpc_closure srv_status_rcvd;
718
719 grpc_call *lb_call; /* streaming call to the LB server, */
720 gpr_timespec deadline; /* for the streaming call to the LB server */
721
722 grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */
723 grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */
724
725 /* what's being sent to the LB server. Note that its value may vary if the LB
726 * server indicates a redirect. */
727 grpc_byte_buffer *request_payload;
728
729 /* response from the LB server, if any. Processed in res_recv_cb() */
730 grpc_byte_buffer *response_payload;
731
732 /* the call's status and status detailset in srv_status_rcvd_cb() */
733 grpc_status_code status;
734 char *status_details;
735 size_t status_details_capacity;
736
737 /* pointer back to the enclosing policy */
738 glb_lb_policy *glb_policy;
739} lb_client_data;
740
741static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
742static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
743static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700744static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
745static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
746 grpc_error *error);
747static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
748 grpc_error *error);
749
750static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
751 lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data));
752 memset(lb_client, 0, sizeof(lb_client_data));
753
754 gpr_mu_init(&lb_client->mu);
755 grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
756
757 grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client);
758 grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
759 grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
760 grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
761 grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client);
762
763 /* TODO(dgq): get the deadline from the client config instead of fabricating
764 * one here. */
765 lb_client->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
766 gpr_time_from_seconds(3, GPR_TIMESPAN));
767
768 lb_client->lb_call = grpc_channel_create_pollset_set_call(
769 glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
770 glb_policy->base.interested_parties, "/BalanceLoad",
771 NULL, /* FIXME(dgq): which "host" value to use? */
772 lb_client->deadline, NULL);
773
774 grpc_metadata_array_init(&lb_client->initial_metadata_recv);
775 grpc_metadata_array_init(&lb_client->trailing_metadata_recv);
776
777 grpc_grpclb_request *request = grpc_grpclb_request_create(
778 "load.balanced.service.name"); /* FIXME(dgq): get the name of the load
779 balanced service from the resolver */
780 gpr_slice request_payload_slice = grpc_grpclb_request_encode(request);
781 lb_client->request_payload =
782 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
783 gpr_slice_unref(request_payload_slice);
784 grpc_grpclb_request_destroy(request);
785
786 lb_client->status_details = NULL;
787 lb_client->status_details_capacity = 0;
788 lb_client->glb_policy = glb_policy;
789 return lb_client;
790}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700791
David Garcia Quintas65318262016-07-29 13:43:38 -0700792static void lb_client_data_destroy(lb_client_data *lb_client) {
793 grpc_metadata_array_destroy(&lb_client->initial_metadata_recv);
794 grpc_metadata_array_destroy(&lb_client->trailing_metadata_recv);
795
796 grpc_byte_buffer_destroy(lb_client->request_payload);
797
798 gpr_free(lb_client->status_details);
799 gpr_mu_destroy(&lb_client->mu);
800 gpr_free(lb_client);
801}
802static grpc_call *lb_client_data_get_call(lb_client_data *lb_client) {
803 return lb_client->lb_call;
804}
805
David Garcia Quintas8d489112016-07-29 15:20:42 -0700806/*
807 * Auxiliary functions and LB client callbacks.
808 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700809static void query_for_backends(grpc_exec_ctx *exec_ctx,
810 glb_lb_policy *glb_policy) {
811 GPR_ASSERT(glb_policy->lb_channel != NULL);
812
813 glb_policy->lb_client = lb_client_data_create(glb_policy);
814 grpc_call_error call_error;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700815 grpc_op ops[1];
816 memset(ops, 0, sizeof(ops));
817 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -0700818 op->op = GRPC_OP_SEND_INITIAL_METADATA;
819 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700820 op->flags = 0;
821 op->reserved = NULL;
822 op++;
David Garcia Quintas65318262016-07-29 13:43:38 -0700823 call_error = grpc_call_start_batch_and_execute(
824 exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
825 &glb_policy->lb_client->md_sent);
826 GPR_ASSERT(GRPC_CALL_OK == call_error);
827
828 op = ops;
829 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
830 op->data.recv_status_on_client.trailing_metadata =
831 &glb_policy->lb_client->trailing_metadata_recv;
832 op->data.recv_status_on_client.status = &glb_policy->lb_client->status;
833 op->data.recv_status_on_client.status_details =
834 &glb_policy->lb_client->status_details;
835 op->data.recv_status_on_client.status_details_capacity =
836 &glb_policy->lb_client->status_details_capacity;
837 op->flags = 0;
838 op->reserved = NULL;
839 op++;
840 call_error = grpc_call_start_batch_and_execute(
841 exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
842 &glb_policy->lb_client->srv_status_rcvd);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700843 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700844}
845
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700846static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
847 lb_client_data *lb_client = arg;
848 GPR_ASSERT(lb_client->lb_call);
849 grpc_op ops[1];
850 memset(ops, 0, sizeof(ops));
851 grpc_op *op = ops;
852 op->op = GRPC_OP_RECV_INITIAL_METADATA;
853 op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
854 op->flags = 0;
855 op->reserved = NULL;
856 op++;
857 grpc_call_error call_error = grpc_call_start_batch_and_execute(
858 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
859 &lb_client->md_rcvd);
860 GPR_ASSERT(GRPC_CALL_OK == call_error);
861}
862
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700863static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700864 lb_client_data *lb_client = arg;
865 GPR_ASSERT(lb_client->lb_call);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700866 grpc_op ops[1];
867 memset(ops, 0, sizeof(ops));
868 grpc_op *op = ops;
869
870 op->op = GRPC_OP_SEND_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700871 op->data.send_message = lb_client->request_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700872 op->flags = 0;
873 op->reserved = NULL;
874 op++;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700875 grpc_call_error call_error = grpc_call_start_batch_and_execute(
876 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
877 &lb_client->req_sent);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700878 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700879}
880
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700881static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700882 lb_client_data *lb_client = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700883
884 grpc_op ops[1];
885 memset(ops, 0, sizeof(ops));
886 grpc_op *op = ops;
887
888 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700889 op->data.recv_message = &lb_client->response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700890 op->flags = 0;
891 op->reserved = NULL;
892 op++;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700893 grpc_call_error call_error = grpc_call_start_batch_and_execute(
894 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
895 &lb_client->res_rcvd);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700896 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700897}
898
David Garcia Quintas65318262016-07-29 13:43:38 -0700899static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700900 lb_client_data *lb_client = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700901 grpc_op ops[2];
902 memset(ops, 0, sizeof(ops));
903 grpc_op *op = ops;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700904 if (lb_client->response_payload != NULL) {
905 /* Received data from the LB server. Look inside
906 * lb_client->response_payload, for
David Garcia Quintasea11d162016-07-14 17:27:28 -0700907 * a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700908 grpc_byte_buffer_reader bbr;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700909 grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700910 gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas41bef452016-07-28 19:19:58 -0700911 grpc_byte_buffer_destroy(lb_client->response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700912 grpc_grpclb_serverlist *serverlist =
913 grpc_grpclb_response_parse_serverlist(response_slice);
David Garcia Quintasea11d162016-07-14 17:27:28 -0700914 if (serverlist != NULL) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700915 gpr_slice_unref(response_slice);
916 if (grpc_lb_glb_trace) {
917 gpr_log(GPR_INFO, "Serverlist with %zu servers received",
918 serverlist->num_servers);
919 }
David Garcia Quintasea11d162016-07-14 17:27:28 -0700920
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700921 /* update serverlist */
922 if (serverlist->num_servers > 0) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700923 if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist,
924 serverlist)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700925 if (grpc_lb_glb_trace) {
926 gpr_log(GPR_INFO,
927 "Incoming server list identical to current, ignoring.");
928 }
929 } else { /* new serverlist */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700930 if (lb_client->glb_policy->serverlist != NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700931 /* dispose of the old serverlist */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700932 grpc_grpclb_destroy_serverlist(lb_client->glb_policy->serverlist);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700933 }
David Garcia Quintasea11d162016-07-14 17:27:28 -0700934 /* and update the copy in the glb_lb_policy instance */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700935 lb_client->glb_policy->serverlist = serverlist;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700936 }
David Garcia Quintas41bef452016-07-28 19:19:58 -0700937 if (lb_client->glb_policy->rr_policy == NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700938 /* initial "handover", in this case from a null RR policy, meaning
David Garcia Quintas43339842016-07-18 12:56:09 -0700939 * it'll just create the first RR policy instance */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700940 rr_handover(exec_ctx, lb_client->glb_policy, error);
David Garcia Quintasea11d162016-07-14 17:27:28 -0700941 } else {
942 /* unref the RR policy, eventually leading to its substitution with a
943 * new one constructed from the received serverlist (see
944 * rr_connectivity_changed) */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700945 GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
David Garcia Quintasea11d162016-07-14 17:27:28 -0700946 "serverlist_received");
947 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700948 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700949 if (grpc_lb_glb_trace) {
950 gpr_log(GPR_INFO,
951 "Received empty server list. Picks will stay pending until a "
952 "response with > 0 servers is received");
953 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700954 }
955
David Garcia Quintasea11d162016-07-14 17:27:28 -0700956 /* keep listening for serverlist updates */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700957 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -0700958 op->data.recv_message = &lb_client->response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700959 op->flags = 0;
960 op->reserved = NULL;
961 op++;
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700962 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas41bef452016-07-28 19:19:58 -0700963 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
964 &lb_client->res_rcvd); /* loop */
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700965 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700966 return;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700967 }
David Garcia Quintasea11d162016-07-14 17:27:28 -0700968
969 GPR_ASSERT(serverlist == NULL);
970 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'",
971 gpr_dump_slice(response_slice, GPR_DUMP_ASCII));
972 gpr_slice_unref(response_slice);
973
974 /* Disconnect from server returning invalid response. */
975 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
976 op->flags = 0;
977 op->reserved = NULL;
978 op++;
979 grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas41bef452016-07-28 19:19:58 -0700980 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
981 &lb_client->close_sent);
David Garcia Quintasea11d162016-07-14 17:27:28 -0700982 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700983 }
984 /* empty payload: call cancelled by server. Cleanups happening in
985 * srv_status_rcvd_cb */
986}
David Garcia Quintasea11d162016-07-14 17:27:28 -0700987
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700988static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
989 grpc_error *error) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700990 if (grpc_lb_glb_trace) {
991 gpr_log(GPR_INFO,
992 "Close from LB client sent. Waiting from server status now");
993 }
994}
David Garcia Quintasea11d162016-07-14 17:27:28 -0700995
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700996static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700997 grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700998 lb_client_data *lb_client = arg;
999 glb_lb_policy *glb_policy = lb_client->glb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001000 if (grpc_lb_glb_trace) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001001 gpr_log(GPR_INFO,
1002 "status from lb server received. Status = %d, Details = '%s', "
1003 "Capaticy "
1004 "= %zu",
David Garcia Quintas41bef452016-07-28 19:19:58 -07001005 lb_client->status, lb_client->status_details,
1006 lb_client->status_details_capacity);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001007 }
1008
David Garcia Quintas41bef452016-07-28 19:19:58 -07001009 grpc_call_destroy(lb_client->lb_call);
1010 lb_client_data_destroy(lb_client);
1011 glb_policy->lb_client = NULL;
David Garcia Quintas43339842016-07-18 12:56:09 -07001012 /* TODO(dgq): deal with stream termination properly (fire up another one? fail
1013 * the original call?) */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001014}
1015
David Garcia Quintas8d489112016-07-29 15:20:42 -07001016/* Code wiring the policy with the rest of the core */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001017static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
1018 glb_destroy, glb_shutdown, glb_pick,
1019 glb_cancel_pick, glb_cancel_picks, glb_ping_one,
1020 glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
1021
1022static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
1023
1024static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
1025
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001026static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
1027 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
1028
1029static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
1030
1031grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
1032 return &glb_lb_policy_factory;
1033}
1034
1035/* Plugin registration */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001036void grpc_lb_policy_grpclb_init() {
1037 grpc_register_lb_policy(grpc_glb_lb_factory_create());
1038 grpc_register_tracer("glb", &grpc_lb_glb_trace);
1039}
1040
1041void grpc_lb_policy_grpclb_shutdown() {}