blob: a9b6f8d9e4e6b8e8944462d8e6946a459a2b197b [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070034/** Implementation of the gRPC LB policy.
35 *
David Garcia Quintas43339842016-07-18 12:56:09 -070036 * This policy takes as input a set of resolved addresses {a1..an} for which the
37 * LB set was set (it's the resolver's responsibility to ensure this). That is
38 * to say, {a1..an} represent a collection of LB servers.
39 *
40 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
41 * This channel behaves just like a regular channel. In particular, the
42 * constructed URI over the addresses a1..an will use the default pick first
43 * policy to select from this list of LB server backends.
44 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070045 * The first time the policy gets a request for a pick, a ping, or to exit the
46 * idle state, \a query_for_backends() is called. It creates an instance of \a
David Garcia Quintas43339842016-07-18 12:56:09 -070047 * lb_client_data, an internal struct meant to contain the data associated with
48 * the internal communication with the LB server. This instance is created via
49 * \a lb_client_data_create(). There, the call over lb_channel to pick-first
50 * from {a1..an} is created, the \a LoadBalancingRequest message is assembled
51 * and all necessary callbacks for the progress of the internal call configured.
52 *
53 * Back in \a query_for_backends(), the internal *streaming* call to the LB
54 * server (whichever address from {a1..an} pick-first chose) is kicked off.
55 * It'll progress over the callbacks configured in \a lb_client_data_create()
56 * (see the field docstrings of \a lb_client_data for more details).
57 *
58 * If the call fails with UNIMPLEMENTED, the original call will also fail.
59 * There's a misconfiguration somewhere: at least one of {a1..an} isn't a LB
60 * server, which contradicts the LB bit being set. If the internal call times
61 * out, the usual behavior of pick-first applies, continuing to pick from the
62 * list {a1..an}.
63 *
David Garcia Quintas65318262016-07-29 13:43:38 -070064 * Upon sucesss, a \a LoadBalancingResponse is expected in \a res_recv_cb. An
David Garcia Quintas43339842016-07-18 12:56:09 -070065 * invalid one results in the termination of the streaming call. A new streaming
66 * call should be created if possible, failing the original call otherwise.
67 * For a valid \a LoadBalancingResponse, the server list of actual backends is
68 * extracted. A Round Robin policy will be created from this list. There are two
69 * possible scenarios:
70 *
71 * 1. This is the first server list received. There was no previous instance of
72 * the Round Robin policy. \a rr_handover() will instantiate the RR policy
73 * and perform all the pending operations over it.
74 * 2. There's already a RR policy instance active. We need to introduce the new
75 * one build from the new serverlist, but taking care not to disrupt the
76 * operations in progress over the old RR instance. This is done by
77 * decreasing the reference count on the old policy. The moment no more
78 * references are held on the old RR policy, it'll be destroyed and \a
David Garcia Quintas348cfdb2016-08-19 12:19:43 -070079 * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
80 * state. At this point we can transition to a new RR instance safely, which
81 * is done once again via \a rr_handover().
David Garcia Quintas43339842016-07-18 12:56:09 -070082 *
83 *
84 * Once a RR policy instance is in place (and getting updated as described),
85 * calls to for a pick, a ping or a cancellation will be serviced right away by
86 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintasd4a756b2016-07-19 11:35:15 -070087 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist
David Garcia Quintas43339842016-07-18 12:56:09 -070088 * is received, etc), pick/ping requests are added to a list of pending
89 * picks/pings to be flushed and serviced as part of \a rr_handover() the moment
90 * the RR policy instance becomes available.
91 *
92 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
93 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070094
95/* TODO(dgq):
96 * - Implement LB service forwarding (point 2c. in the doc's diagram).
97 */
98
David Garcia Quintas8a81aa12016-08-22 15:06:49 -070099#include <errno.h>
100
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -0700101#include <string.h>
102
103#include <grpc/byte_buffer_reader.h>
104#include <grpc/grpc.h>
105#include <grpc/support/alloc.h>
106#include <grpc/support/host_port.h>
107#include <grpc/support/string_util.h>
108
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700109#include "src/core/ext/client_config/client_channel_factory.h"
Mark D. Rothc5c38782016-09-16 08:51:01 -0700110#include "src/core/ext/client_config/lb_policy_factory.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700111#include "src/core/ext/client_config/lb_policy_registry.h"
112#include "src/core/ext/client_config/parse_address.h"
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700113#include "src/core/ext/lb_policy/grpclb/grpclb.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700114#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
David Garcia Quintasbc334912016-08-22 16:57:20 -0700115#include "src/core/lib/iomgr/sockaddr.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700116#include "src/core/lib/iomgr/sockaddr_utils.h"
117#include "src/core/lib/support/string.h"
118#include "src/core/lib/surface/call.h"
119#include "src/core/lib/surface/channel.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700120#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700121
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700122int grpc_lb_glb_trace = 0;
123
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700124/* add lb_token of selected subchannel (address) to the call's initial
125 * metadata */
126static void initial_metadata_add_lb_token(
127 grpc_metadata_batch *initial_metadata,
128 grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
129 GPR_ASSERT(lb_token_mdelem_storage != NULL);
130 GPR_ASSERT(lb_token != NULL);
131 grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
132 lb_token);
133}
134
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700135typedef struct wrapped_rr_closure_arg {
David Garcia Quintas43339842016-07-18 12:56:09 -0700136 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
137 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700138 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700139
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700140 /* the pick's initial metadata, kept in order to append the LB token for the
141 * pick */
142 grpc_metadata_batch *initial_metadata;
143
144 /* the picked target, used to determine which LB token to add to the pick's
145 * initial metadata */
146 grpc_connected_subchannel **target;
147
148 /* the LB token associated with the pick */
149 grpc_mdelem *lb_token;
150
151 /* storage for the lb token initial metadata mdelem */
152 grpc_linked_mdelem *lb_token_mdelem_storage;
153
David Garcia Quintas43339842016-07-18 12:56:09 -0700154 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700155 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700156
157 /* when not NULL, represents a pending_{pick,ping} node to be freed upon
158 * closure execution */
159 void *owning_pending_node; /* to be freed if not NULL */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700160} wrapped_rr_closure_arg;
161
162/* The \a on_complete closure passed as part of the pick requires keeping a
163 * reference to its associated round robin instance. We wrap this closure in
164 * order to unref the round robin instance upon its invocation */
165static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700166 grpc_error *error) {
David Garcia Quintas43339842016-07-18 12:56:09 -0700167 wrapped_rr_closure_arg *wc_arg = arg;
David Garcia Quintas43339842016-07-18 12:56:09 -0700168 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700169 if (grpc_lb_glb_trace) {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700170 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
David Garcia Quintas43339842016-07-18 12:56:09 -0700171 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700172 }
David Garcia Quintas43339842016-07-18 12:56:09 -0700173 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700174 }
David Garcia Quintas5a876162016-07-18 13:08:42 -0700175 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700176
177 initial_metadata_add_lb_token(wc_arg->initial_metadata,
178 wc_arg->lb_token_mdelem_storage,
David Garcia Quintas8424fdc2016-09-15 08:35:59 -0700179 GRPC_MDELEM_REF(wc_arg->lb_token));
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700180
David Garcia Quintas5a876162016-07-18 13:08:42 -0700181 grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, error, NULL);
David Garcia Quintas43339842016-07-18 12:56:09 -0700182 gpr_free(wc_arg->owning_pending_node);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700183}
184
David Garcia Quintasea11d162016-07-14 17:27:28 -0700185/* Linked list of pending pick requests. It stores all information needed to
186 * eventually call (Round Robin's) pick() on them. They mainly stay pending
187 * waiting for the RR policy to be created/updated.
188 *
189 * One particularity is the wrapping of the user-provided \a on_complete closure
190 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
191 * order to correctly unref the RR policy instance upon completion of the pick.
192 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700193typedef struct pending_pick {
194 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700195
196 /* polling entity for the pick()'s async notification */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700197 grpc_polling_entity *pollent;
David Garcia Quintas43339842016-07-18 12:56:09 -0700198
199 /* the initial metadata for the pick. See grpc_lb_policy_pick() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700200 grpc_metadata_batch *initial_metadata;
David Garcia Quintas43339842016-07-18 12:56:09 -0700201
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700202 /* storage for the lb token initial metadata mdelem */
203 grpc_linked_mdelem *lb_token_mdelem_storage;
204
David Garcia Quintas43339842016-07-18 12:56:09 -0700205 /* bitmask passed to pick() and used for selective cancelling. See
206 * grpc_lb_policy_cancel_picks() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700207 uint32_t initial_metadata_flags;
David Garcia Quintas43339842016-07-18 12:56:09 -0700208
209 /* output argument where to store the pick()ed connected subchannel, or NULL
210 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700211 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700212
213 /* a closure wrapping the original on_complete one to be invoked once the
214 * pick() has completed (regardless of success) */
215 grpc_closure wrapped_on_complete;
216
217 /* args for wrapped_on_complete */
218 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700219} pending_pick;
220
David Garcia Quintas8aace512016-08-15 14:55:12 -0700221static void add_pending_pick(pending_pick **root,
222 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas65318262016-07-29 13:43:38 -0700223 grpc_connected_subchannel **target,
224 grpc_closure *on_complete) {
225 pending_pick *pp = gpr_malloc(sizeof(*pp));
226 memset(pp, 0, sizeof(pending_pick));
227 memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
228 pp->next = *root;
David Garcia Quintas8aace512016-08-15 14:55:12 -0700229 pp->pollent = pick_args->pollent;
David Garcia Quintas65318262016-07-29 13:43:38 -0700230 pp->target = target;
David Garcia Quintas8aace512016-08-15 14:55:12 -0700231 pp->initial_metadata = pick_args->initial_metadata;
232 pp->initial_metadata_flags = pick_args->initial_metadata_flags;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700233 pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
David Garcia Quintas65318262016-07-29 13:43:38 -0700234 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700235 pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
236 pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
237 pick_args->lb_token_mdelem_storage;
David Garcia Quintas65318262016-07-29 13:43:38 -0700238 grpc_closure_init(&pp->wrapped_on_complete, wrapped_rr_closure,
239 &pp->wrapped_on_complete_arg);
240 *root = pp;
241}
242
David Garcia Quintasea11d162016-07-14 17:27:28 -0700243/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700244typedef struct pending_ping {
245 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700246
247 /* a closure wrapping the original on_complete one to be invoked once the
248 * ping() has completed (regardless of success) */
249 grpc_closure wrapped_notify;
250
251 /* args for wrapped_notify */
252 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700253} pending_ping;
254
David Garcia Quintas65318262016-07-29 13:43:38 -0700255static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
256 pending_ping *pping = gpr_malloc(sizeof(*pping));
257 memset(pping, 0, sizeof(pending_ping));
258 memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
259 pping->next = *root;
260 grpc_closure_init(&pping->wrapped_notify, wrapped_rr_closure,
261 &pping->wrapped_notify_arg);
262 pping->wrapped_notify_arg.wrapped_closure = notify;
263 *root = pping;
264}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700265
David Garcia Quintas8d489112016-07-29 15:20:42 -0700266/*
267 * glb_lb_policy
268 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700269typedef struct rr_connectivity_data rr_connectivity_data;
David Garcia Quintasa0e278e2016-08-01 11:36:14 -0700270struct lb_client_data;
David Garcia Quintas65318262016-07-29 13:43:38 -0700271static const grpc_lb_policy_vtable glb_lb_policy_vtable;
272typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700273 /** base policy: must be first */
274 grpc_lb_policy base;
275
276 /** mutex protecting remaining members */
277 gpr_mu mu;
278
279 grpc_client_channel_factory *cc_factory;
280
281 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700282 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700283
284 /** the RR policy to use of the backend servers returned by the LB server */
285 grpc_lb_policy *rr_policy;
286
287 bool started_picking;
288
289 /** our connectivity state tracker */
290 grpc_connectivity_state_tracker state_tracker;
291
David Garcia Quintasea11d162016-07-14 17:27:28 -0700292 /** stores the deserialized response from the LB. May be NULL until one such
293 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700294 grpc_grpclb_serverlist *serverlist;
295
David Garcia Quintasea11d162016-07-14 17:27:28 -0700296 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700297 pending_pick *pending_picks;
298
David Garcia Quintasea11d162016-07-14 17:27:28 -0700299 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700300 pending_ping *pending_pings;
301
David Garcia Quintasea11d162016-07-14 17:27:28 -0700302 /** client data associated with the LB server communication */
David Garcia Quintasa0e278e2016-08-01 11:36:14 -0700303 struct lb_client_data *lb_client;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700304
305 /** for tracking of the RR connectivity */
306 rr_connectivity_data *rr_connectivity;
David Garcia Quintas43339842016-07-18 12:56:09 -0700307
David Garcia Quintas41bef452016-07-28 19:19:58 -0700308 /* a wrapped (see \a wrapped_rr_closure) on-complete closure for readily
309 * available RR picks */
David Garcia Quintas43339842016-07-18 12:56:09 -0700310 grpc_closure wrapped_on_complete;
311
312 /* arguments for the wrapped_on_complete closure */
313 wrapped_rr_closure_arg wc_arg;
David Garcia Quintas65318262016-07-29 13:43:38 -0700314} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700315
David Garcia Quintas65318262016-07-29 13:43:38 -0700316/* Keeps track and reacts to changes in connectivity of the RR instance */
317struct rr_connectivity_data {
318 grpc_closure on_change;
319 grpc_connectivity_state state;
320 glb_lb_policy *glb_policy;
321};
David Garcia Quintas8d489112016-07-29 15:20:42 -0700322
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700323static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
324 bool log) {
325 const grpc_grpclb_ip_address *ip = &server->ip_address;
326 if (server->port >> 16 != 0) {
327 if (log) {
328 gpr_log(GPR_ERROR,
329 "Invalid port '%d' at index %zu of serverlist. Ignoring.",
330 server->port, idx);
331 }
332 return false;
333 }
334
335 if (ip->size != 4 && ip->size != 16) {
336 if (log) {
337 gpr_log(GPR_ERROR,
338 "Expected IP to be 4 or 16 bytes, got %d at index %zu of "
339 "serverlist. Ignoring",
340 ip->size, idx);
341 }
342 return false;
343 }
344 return true;
345}
346
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700347/* populate \a addresses according to \a serverlist. Returns the number of
348 * addresses successfully parsed and added to \a addresses */
Mark D. Rothc5c38782016-09-16 08:51:01 -0700349static grpc_lb_addresses* process_serverlist(
350 const grpc_grpclb_serverlist *serverlist) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700351 size_t num_valid = 0;
352 /* first pass: count how many are valid in order to allocate the necessary
353 * memory in a single block */
354 for (size_t i = 0; i < serverlist->num_servers; ++i) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700355 if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700356 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700357 if (num_valid == 0) return NULL;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700358
Mark D. Rothc5c38782016-09-16 08:51:01 -0700359 grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700360
361 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700362 * to the outside world) to be read by the RR policy during its creation.
363 * Given that the validity tests are very cheap, they are performed again
364 * instead of marking the valid ones during the first pass, as this would
365 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700366 size_t addr_idx = 0;
367 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
368 GPR_ASSERT(addr_idx < num_valid);
369 const grpc_grpclb_server *server = serverlist->servers[sl_idx];
370 if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700371
372 /* address processing */
373 const uint16_t netorder_port = htons((uint16_t)server->port);
374 /* the addresses are given in binary format (a in(6)_addr struct) in
375 * server->ip_address.bytes. */
376 const grpc_grpclb_ip_address *ip = &server->ip_address;
Mark D. Rothc5c38782016-09-16 08:51:01 -0700377 grpc_resolved_address addr;
378 memset(&addr, 0, sizeof(addr));
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700379 if (ip->size == 4) {
Mark D. Rothc5c38782016-09-16 08:51:01 -0700380 addr.len = sizeof(struct sockaddr_in);
381 struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700382 addr4->sin_family = AF_INET;
383 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
384 addr4->sin_port = netorder_port;
385 } else if (ip->size == 16) {
Mark D. Rothc5c38782016-09-16 08:51:01 -0700386 addr.len = sizeof(struct sockaddr_in6);
387 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700388 addr6->sin6_family = AF_INET;
389 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
390 addr6->sin6_port = netorder_port;
391 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700392
393 /* lb token processing */
Mark D. Rothc5c38782016-09-16 08:51:01 -0700394 void* user_data;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700395 if (server->has_load_balance_token) {
396 const size_t lb_token_size =
397 GPR_ARRAY_SIZE(server->load_balance_token) - 1;
398 grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
399 (uint8_t *)server->load_balance_token, lb_token_size);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700400 user_data = grpc_mdelem_from_metadata_strings(
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700401 GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
402 } else {
403 gpr_log(GPR_ERROR,
404 "Missing LB token for backend address '%s'. The empty token will "
405 "be used instead",
Mark D. Rothc5c38782016-09-16 08:51:01 -0700406 grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr));
407 user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700408 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700409
410 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr,
411 addr.len, false /* is_balancer */,
412 NULL /* balancer_name */, user_data);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700413 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700414 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700415 GPR_ASSERT(addr_idx == num_valid);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700416
417 return lb_addresses;
418}
419
420static void lb_token_destroy(void* token) {
421 if (token != NULL) GRPC_MDELEM_UNREF(token);
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700422}
423
David Garcia Quintas65318262016-07-29 13:43:38 -0700424static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
425 const grpc_grpclb_serverlist *serverlist,
426 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700427 GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700428
429 grpc_lb_policy_args args;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700430 memset(&args, 0, sizeof(args));
David Garcia Quintas65318262016-07-29 13:43:38 -0700431 args.client_channel_factory = glb_policy->cc_factory;
Mark D. Rothc5c38782016-09-16 08:51:01 -0700432 args.addresses = process_serverlist(serverlist);
David Garcia Quintas65318262016-07-29 13:43:38 -0700433
434 grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
435
Mark D. Rothc5c38782016-09-16 08:51:01 -0700436 grpc_lb_addresses_destroy(args.addresses, lb_token_destroy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700437 return rr;
438}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700439
David Garcia Quintas41bef452016-07-28 19:19:58 -0700440static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
David Garcia Quintas65318262016-07-29 13:43:38 -0700441 grpc_error *error) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700442 GPR_ASSERT(glb_policy->serverlist != NULL &&
443 glb_policy->serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700444 glb_policy->rr_policy =
445 create_rr(exec_ctx, glb_policy->serverlist, glb_policy);
446
447 if (grpc_lb_glb_trace) {
448 gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
449 (intptr_t)glb_policy->rr_policy);
450 }
451 GPR_ASSERT(glb_policy->rr_policy != NULL);
452 glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity(
453 exec_ctx, glb_policy->rr_policy, &error);
454 grpc_lb_policy_notify_on_state_change(
455 exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state,
456 &glb_policy->rr_connectivity->on_change);
457 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700458 glb_policy->rr_connectivity->state,
459 GRPC_ERROR_REF(error), "rr_handover");
David Garcia Quintas65318262016-07-29 13:43:38 -0700460 grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
461
462 /* flush pending ops */
463 pending_pick *pp;
464 while ((pp = glb_policy->pending_picks)) {
465 glb_policy->pending_picks = pp->next;
466 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
467 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
468 if (grpc_lb_glb_trace) {
469 gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
470 (intptr_t)glb_policy->rr_policy);
471 }
David Garcia Quintas8aace512016-08-15 14:55:12 -0700472 const grpc_lb_policy_pick_args pick_args = {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700473 pp->pollent, pp->initial_metadata, pp->initial_metadata_flags,
474 pp->lb_token_mdelem_storage};
David Garcia Quintas8aace512016-08-15 14:55:12 -0700475 grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700476 (void **)&pp->wrapped_on_complete_arg.lb_token,
David Garcia Quintas8aace512016-08-15 14:55:12 -0700477 &pp->wrapped_on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -0700478 pp->wrapped_on_complete_arg.owning_pending_node = pp;
479 }
480
481 pending_ping *pping;
482 while ((pping = glb_policy->pending_pings)) {
483 glb_policy->pending_pings = pping->next;
484 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
485 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
486 if (grpc_lb_glb_trace) {
487 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
488 (intptr_t)glb_policy->rr_policy);
489 }
490 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
491 &pping->wrapped_notify);
492 pping->wrapped_notify_arg.owning_pending_node = pping;
493 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700494}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700495
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700496static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
497 grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -0700498 rr_connectivity_data *rr_conn_data = arg;
499 glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700500
David Garcia Quintas41bef452016-07-28 19:19:58 -0700501 if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) {
502 if (glb_policy->serverlist != NULL) {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700503 /* a RR policy is shutting down but there's a serverlist available ->
504 * perform a handover */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700505 rr_handover(exec_ctx, glb_policy, error);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700506 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700507 /* shutting down and no new serverlist available. Bail out. */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700508 gpr_free(rr_conn_data);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700509 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700510 } else {
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700511 if (error == GRPC_ERROR_NONE) {
David Garcia Quintasea11d162016-07-14 17:27:28 -0700512 /* RR not shutting down. Mimic the RR's policy state */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700513 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700514 rr_conn_data->state, GRPC_ERROR_REF(error),
515 "glb_rr_connectivity_changed");
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700516 /* resubscribe */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700517 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
518 &rr_conn_data->state,
519 &rr_conn_data->on_change);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700520 } else { /* error */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700521 gpr_free(rr_conn_data);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700522 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700523 }
524}
525
David Garcia Quintas65318262016-07-29 13:43:38 -0700526static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
527 grpc_lb_policy_factory *factory,
528 grpc_lb_policy_args *args) {
Mark D. Rothe011b1e2016-09-07 08:28:00 -0700529 /* Count the number of gRPC-LB addresses. There must be at least one.
530 * TODO(roth): For now, we ignore non-balancer addresses, but in the
531 * future, we may change the behavior such that we fall back to using
532 * the non-balancer addresses if we cannot reach any balancers. At that
533 * time, this should be changed to allow a list with no balancer addresses,
534 * since the resolver might fail to return a balancer address even when
535 * this is the right LB policy to use. */
Mark D. Rothf655c852016-09-06 10:40:38 -0700536 size_t num_grpclb_addrs = 0;
Mark D. Rothe011b1e2016-09-07 08:28:00 -0700537 for (size_t i = 0; i < args->addresses->num_addresses; ++i) {
538 if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
Mark D. Rothf655c852016-09-06 10:40:38 -0700539 }
540 if (num_grpclb_addrs == 0) return NULL;
541
David Garcia Quintas65318262016-07-29 13:43:38 -0700542 glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
543 memset(glb_policy, 0, sizeof(*glb_policy));
544
545 /* All input addresses in args->addresses come from a resolver that claims
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700546 * they are LB services. It's the resolver's responsibility to make sure
547 * this
David Garcia Quintas65318262016-07-29 13:43:38 -0700548 * policy is only instantiated and used in that case.
549 *
550 * Create a client channel over them to communicate with a LB service */
551 glb_policy->cc_factory = args->client_channel_factory;
552 GPR_ASSERT(glb_policy->cc_factory != NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700553
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700554 /* construct a target from the addresses in args, given in the form
David Garcia Quintas65318262016-07-29 13:43:38 -0700555 * ipvX://ip1:port1,ip2:port2,...
556 * TODO(dgq): support mixed ip version */
Mark D. Rothf655c852016-09-06 10:40:38 -0700557 char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700558 size_t addr_index = 0;
559 for (size_t i = 0; i < args->addresses->num_addresses; i++) {
560 if (args->addresses->addresses[i].user_data != NULL) {
David Garcia Quintas5ebb7af2016-09-15 10:02:16 -0700561 gpr_log(GPR_ERROR,
562 "This LB policy doesn't support user data. It will be ignored");
563 }
Mark D. Rothe011b1e2016-09-07 08:28:00 -0700564 if (args->addresses->addresses[i].is_balancer) {
Mark D. Rothc5c38782016-09-16 08:51:01 -0700565 if (addr_index == 0) {
566 addr_strs[addr_index++] = grpc_sockaddr_to_uri(
567 (const struct sockaddr *)&args->addresses->addresses[i]
568 .address.addr);
569 } else {
570 GPR_ASSERT(grpc_sockaddr_to_string(
571 &addr_strs[addr_index++],
572 (const struct sockaddr *)&args->addresses->addresses[i]
573 .address.addr,
574 true) == 0);
575 }
Mark D. Rothf655c852016-09-06 10:40:38 -0700576 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700577 }
578 size_t uri_path_len;
Mark D. Roth989cdcd2016-09-06 13:28:28 -0700579 char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
580 num_grpclb_addrs, ",", &uri_path_len);
David Garcia Quintas65318262016-07-29 13:43:38 -0700581
582 /* will pick using pick_first */
583 glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
584 exec_ctx, glb_policy->cc_factory, target_uri_str,
585 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
586
587 gpr_free(target_uri_str);
Mark D. Rothf655c852016-09-06 10:40:38 -0700588 for (size_t i = 0; i < num_grpclb_addrs; i++) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700589 gpr_free(addr_strs[i]);
590 }
591 gpr_free(addr_strs);
592
593 if (glb_policy->lb_channel == NULL) {
594 gpr_free(glb_policy);
595 return NULL;
596 }
597
598 rr_connectivity_data *rr_connectivity =
599 gpr_malloc(sizeof(rr_connectivity_data));
600 memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700601 grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
David Garcia Quintas65318262016-07-29 13:43:38 -0700602 rr_connectivity);
603 rr_connectivity->glb_policy = glb_policy;
604 glb_policy->rr_connectivity = rr_connectivity;
605
606 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
607 gpr_mu_init(&glb_policy->mu);
608 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
609 "grpclb");
610 return &glb_policy->base;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700611}
612
David Garcia Quintas65318262016-07-29 13:43:38 -0700613static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
614 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
615 GPR_ASSERT(glb_policy->pending_picks == NULL);
616 GPR_ASSERT(glb_policy->pending_pings == NULL);
617 grpc_channel_destroy(glb_policy->lb_channel);
618 glb_policy->lb_channel = NULL;
619 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
620 if (glb_policy->serverlist != NULL) {
621 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
622 }
623 gpr_mu_destroy(&glb_policy->mu);
624 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700625}
626
David Garcia Quintas740759e2016-08-01 14:49:49 -0700627static void lb_client_data_destroy(struct lb_client_data *lb_client);
David Garcia Quintas65318262016-07-29 13:43:38 -0700628static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
629 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
630 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700631
David Garcia Quintas65318262016-07-29 13:43:38 -0700632 pending_pick *pp = glb_policy->pending_picks;
633 glb_policy->pending_picks = NULL;
634 pending_ping *pping = glb_policy->pending_pings;
635 glb_policy->pending_pings = NULL;
636 gpr_mu_unlock(&glb_policy->mu);
637
638 while (pp != NULL) {
639 pending_pick *next = pp->next;
640 *pp->target = NULL;
641 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE,
642 NULL);
643 gpr_free(pp);
644 pp = next;
645 }
646
647 while (pping != NULL) {
648 pending_ping *next = pping->next;
649 grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify, GRPC_ERROR_NONE,
650 NULL);
651 pping = next;
652 }
653
654 if (glb_policy->rr_policy) {
655 /* unsubscribe */
656 grpc_lb_policy_notify_on_state_change(
657 exec_ctx, glb_policy->rr_policy, NULL,
658 &glb_policy->rr_connectivity->on_change);
659 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
660 }
661
David Garcia Quintas740759e2016-08-01 14:49:49 -0700662 lb_client_data_destroy(glb_policy->lb_client);
663 glb_policy->lb_client = NULL;
664
David Garcia Quintas65318262016-07-29 13:43:38 -0700665 grpc_connectivity_state_set(
666 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
667 GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
668}
669
670static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
671 grpc_connected_subchannel **target) {
672 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
673 gpr_mu_lock(&glb_policy->mu);
674 pending_pick *pp = glb_policy->pending_picks;
675 glb_policy->pending_picks = NULL;
676 while (pp != NULL) {
677 pending_pick *next = pp->next;
678 if (pp->target == target) {
679 grpc_polling_entity_del_from_pollset_set(
680 exec_ctx, pp->pollent, glb_policy->base.interested_parties);
681 *target = NULL;
682 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
683 GRPC_ERROR_CANCELLED, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700684 } else {
685 pp->next = glb_policy->pending_picks;
686 glb_policy->pending_picks = pp;
687 }
688 pp = next;
689 }
690 gpr_mu_unlock(&glb_policy->mu);
691}
692
David Garcia Quintasa0e278e2016-08-01 11:36:14 -0700693static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client);
David Garcia Quintas65318262016-07-29 13:43:38 -0700694static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
695 uint32_t initial_metadata_flags_mask,
696 uint32_t initial_metadata_flags_eq) {
697 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
698 gpr_mu_lock(&glb_policy->mu);
699 if (glb_policy->lb_client != NULL) {
700 /* cancel the call to the load balancer service, if any */
701 grpc_call_cancel(lb_client_data_get_call(glb_policy->lb_client), NULL);
702 }
703 pending_pick *pp = glb_policy->pending_picks;
704 glb_policy->pending_picks = NULL;
705 while (pp != NULL) {
706 pending_pick *next = pp->next;
707 if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
708 initial_metadata_flags_eq) {
709 grpc_polling_entity_del_from_pollset_set(
710 exec_ctx, pp->pollent, glb_policy->base.interested_parties);
711 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
712 GRPC_ERROR_CANCELLED, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700713 } else {
714 pp->next = glb_policy->pending_picks;
715 glb_policy->pending_picks = pp;
716 }
717 pp = next;
718 }
719 gpr_mu_unlock(&glb_policy->mu);
720}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700721
David Garcia Quintas65318262016-07-29 13:43:38 -0700722static void query_for_backends(grpc_exec_ctx *exec_ctx,
723 glb_lb_policy *glb_policy);
724static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) {
725 glb_policy->started_picking = true;
726 query_for_backends(exec_ctx, glb_policy);
727}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700728
David Garcia Quintas65318262016-07-29 13:43:38 -0700729static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
730 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
731 gpr_mu_lock(&glb_policy->mu);
732 if (!glb_policy->started_picking) {
733 start_picking(exec_ctx, glb_policy);
734 }
735 gpr_mu_unlock(&glb_policy->mu);
736}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700737
David Garcia Quintas65318262016-07-29 13:43:38 -0700738static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
David Garcia Quintas8aace512016-08-15 14:55:12 -0700739 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700740 grpc_connected_subchannel **target, void **user_data,
David Garcia Quintas65318262016-07-29 13:43:38 -0700741 grpc_closure *on_complete) {
742 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700743
744 if (pick_args->lb_token_mdelem_storage == NULL) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700745 *target = NULL;
David Garcia Quintas6cc44fc2016-09-12 23:04:35 -0700746 grpc_exec_ctx_sched(
747 exec_ctx, on_complete,
748 GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
749 "won't work without it. Failing"),
750 NULL);
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700751 return 1;
752 }
753
David Garcia Quintas65318262016-07-29 13:43:38 -0700754 gpr_mu_lock(&glb_policy->mu);
755 int r;
756
757 if (glb_policy->rr_policy != NULL) {
758 if (grpc_lb_glb_trace) {
759 gpr_log(GPR_INFO, "about to PICK from 0x%" PRIxPTR "",
760 (intptr_t)glb_policy->rr_policy);
761 }
762 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
763 memset(&glb_policy->wc_arg, 0, sizeof(wrapped_rr_closure_arg));
764 glb_policy->wc_arg.rr_policy = glb_policy->rr_policy;
765 glb_policy->wc_arg.wrapped_closure = on_complete;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700766 glb_policy->wc_arg.lb_token_mdelem_storage =
767 pick_args->lb_token_mdelem_storage;
768 glb_policy->wc_arg.initial_metadata = pick_args->initial_metadata;
769 glb_policy->wc_arg.owning_pending_node = NULL;
David Garcia Quintas65318262016-07-29 13:43:38 -0700770 grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure,
771 &glb_policy->wc_arg);
David Garcia Quintas8aace512016-08-15 14:55:12 -0700772
773 r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700774 (void **)&glb_policy->wc_arg.lb_token,
David Garcia Quintas65318262016-07-29 13:43:38 -0700775 &glb_policy->wrapped_on_complete);
776 if (r != 0) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700777 /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
David Garcia Quintas65318262016-07-29 13:43:38 -0700778 if (grpc_lb_glb_trace) {
779 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
780 (intptr_t)glb_policy->wc_arg.rr_policy);
781 }
782 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->wc_arg.rr_policy, "glb_pick");
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700783
784 /* add the load reporting initial metadata */
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700785 initial_metadata_add_lb_token(
786 pick_args->initial_metadata, pick_args->lb_token_mdelem_storage,
David Garcia Quintas8424fdc2016-09-15 08:35:59 -0700787 GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token));
David Garcia Quintas65318262016-07-29 13:43:38 -0700788 }
789 } else {
David Garcia Quintas8aace512016-08-15 14:55:12 -0700790 grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent,
David Garcia Quintas65318262016-07-29 13:43:38 -0700791 glb_policy->base.interested_parties);
David Garcia Quintas8aace512016-08-15 14:55:12 -0700792 add_pending_pick(&glb_policy->pending_picks, pick_args, target,
793 on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -0700794
795 if (!glb_policy->started_picking) {
796 start_picking(exec_ctx, glb_policy);
797 }
798 r = 0;
799 }
800 gpr_mu_unlock(&glb_policy->mu);
801 return r;
802}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700803
David Garcia Quintas65318262016-07-29 13:43:38 -0700804static grpc_connectivity_state glb_check_connectivity(
805 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
806 grpc_error **connectivity_error) {
807 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
808 grpc_connectivity_state st;
809 gpr_mu_lock(&glb_policy->mu);
810 st = grpc_connectivity_state_check(&glb_policy->state_tracker,
811 connectivity_error);
812 gpr_mu_unlock(&glb_policy->mu);
813 return st;
814}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700815
David Garcia Quintas65318262016-07-29 13:43:38 -0700816static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
817 grpc_closure *closure) {
818 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
819 gpr_mu_lock(&glb_policy->mu);
820 if (glb_policy->rr_policy) {
821 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
822 } else {
823 add_pending_ping(&glb_policy->pending_pings, closure);
824 if (!glb_policy->started_picking) {
825 start_picking(exec_ctx, glb_policy);
826 }
827 }
828 gpr_mu_unlock(&glb_policy->mu);
829}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700830
David Garcia Quintas65318262016-07-29 13:43:38 -0700831static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
832 grpc_lb_policy *pol,
833 grpc_connectivity_state *current,
834 grpc_closure *notify) {
835 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
836 gpr_mu_lock(&glb_policy->mu);
837 grpc_connectivity_state_notify_on_state_change(
838 exec_ctx, &glb_policy->state_tracker, current, notify);
839
840 gpr_mu_unlock(&glb_policy->mu);
841}
842
David Garcia Quintas8d489112016-07-29 15:20:42 -0700843/*
844 * lb_client_data
845 *
846 * Used internally for the client call to the LB */
David Garcia Quintas65318262016-07-29 13:43:38 -0700847typedef struct lb_client_data {
848 gpr_mu mu;
849
850 /* called once initial metadata's been sent */
851 grpc_closure md_sent;
852
David Garcia Quintas65318262016-07-29 13:43:38 -0700853 /* called once the LoadBalanceRequest has been sent to the LB server. See
854 * src/proto/grpc/.../load_balancer.proto */
855 grpc_closure req_sent;
856
857 /* A response from the LB server has been received (or error). Process it */
858 grpc_closure res_rcvd;
859
860 /* After the client has sent a close to the LB server */
861 grpc_closure close_sent;
862
863 /* ... and the status from the LB server has been received */
864 grpc_closure srv_status_rcvd;
865
866 grpc_call *lb_call; /* streaming call to the LB server, */
867 gpr_timespec deadline; /* for the streaming call to the LB server */
868
869 grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */
870 grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */
871
872 /* what's being sent to the LB server. Note that its value may vary if the LB
873 * server indicates a redirect. */
874 grpc_byte_buffer *request_payload;
875
876 /* response from the LB server, if any. Processed in res_recv_cb() */
877 grpc_byte_buffer *response_payload;
878
879 /* the call's status and status detailset in srv_status_rcvd_cb() */
880 grpc_status_code status;
881 char *status_details;
882 size_t status_details_capacity;
883
884 /* pointer back to the enclosing policy */
885 glb_lb_policy *glb_policy;
886} lb_client_data;
887
888static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700889static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700890static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
891static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
892 grpc_error *error);
893static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
894 grpc_error *error);
895
896static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) {
897 lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data));
898 memset(lb_client, 0, sizeof(lb_client_data));
899
900 gpr_mu_init(&lb_client->mu);
901 grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client);
902
David Garcia Quintas65318262016-07-29 13:43:38 -0700903 grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client);
904 grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client);
905 grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client);
906 grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client);
907
908 /* TODO(dgq): get the deadline from the client config instead of fabricating
909 * one here. */
910 lb_client->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
911 gpr_time_from_seconds(3, GPR_TIMESPAN));
912
David Garcia Quintas15eba132016-08-09 15:20:48 -0700913 /* Note the following LB call progresses every time there's activity in \a
914 * glb_policy->base.interested_parties, which is comprised of the polling
915 * entities passed to glb_pick(). */
David Garcia Quintas65318262016-07-29 13:43:38 -0700916 lb_client->lb_call = grpc_channel_create_pollset_set_call(
917 glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
918 glb_policy->base.interested_parties, "/BalanceLoad",
919 NULL, /* FIXME(dgq): which "host" value to use? */
920 lb_client->deadline, NULL);
921
922 grpc_metadata_array_init(&lb_client->initial_metadata_recv);
923 grpc_metadata_array_init(&lb_client->trailing_metadata_recv);
924
925 grpc_grpclb_request *request = grpc_grpclb_request_create(
926 "load.balanced.service.name"); /* FIXME(dgq): get the name of the load
927 balanced service from the resolver */
928 gpr_slice request_payload_slice = grpc_grpclb_request_encode(request);
929 lb_client->request_payload =
930 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
931 gpr_slice_unref(request_payload_slice);
932 grpc_grpclb_request_destroy(request);
933
934 lb_client->status_details = NULL;
935 lb_client->status_details_capacity = 0;
936 lb_client->glb_policy = glb_policy;
937 return lb_client;
938}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700939
David Garcia Quintas65318262016-07-29 13:43:38 -0700940static void lb_client_data_destroy(lb_client_data *lb_client) {
David Garcia Quintas740759e2016-08-01 14:49:49 -0700941 grpc_call_destroy(lb_client->lb_call);
David Garcia Quintas65318262016-07-29 13:43:38 -0700942 grpc_metadata_array_destroy(&lb_client->initial_metadata_recv);
943 grpc_metadata_array_destroy(&lb_client->trailing_metadata_recv);
944
945 grpc_byte_buffer_destroy(lb_client->request_payload);
946
947 gpr_free(lb_client->status_details);
948 gpr_mu_destroy(&lb_client->mu);
949 gpr_free(lb_client);
950}
951static grpc_call *lb_client_data_get_call(lb_client_data *lb_client) {
952 return lb_client->lb_call;
953}
954
David Garcia Quintas8d489112016-07-29 15:20:42 -0700955/*
956 * Auxiliary functions and LB client callbacks.
957 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700958static void query_for_backends(grpc_exec_ctx *exec_ctx,
959 glb_lb_policy *glb_policy) {
960 GPR_ASSERT(glb_policy->lb_channel != NULL);
961
962 glb_policy->lb_client = lb_client_data_create(glb_policy);
963 grpc_call_error call_error;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700964 grpc_op ops[1];
965 memset(ops, 0, sizeof(ops));
966 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -0700967 op->op = GRPC_OP_SEND_INITIAL_METADATA;
968 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700969 op->flags = 0;
970 op->reserved = NULL;
971 op++;
David Garcia Quintas65318262016-07-29 13:43:38 -0700972 call_error = grpc_call_start_batch_and_execute(
973 exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
974 &glb_policy->lb_client->md_sent);
975 GPR_ASSERT(GRPC_CALL_OK == call_error);
976
977 op = ops;
978 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
979 op->data.recv_status_on_client.trailing_metadata =
980 &glb_policy->lb_client->trailing_metadata_recv;
981 op->data.recv_status_on_client.status = &glb_policy->lb_client->status;
982 op->data.recv_status_on_client.status_details =
983 &glb_policy->lb_client->status_details;
984 op->data.recv_status_on_client.status_details_capacity =
985 &glb_policy->lb_client->status_details_capacity;
986 op->flags = 0;
987 op->reserved = NULL;
988 op++;
989 call_error = grpc_call_start_batch_and_execute(
990 exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops),
991 &glb_policy->lb_client->srv_status_rcvd);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700992 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700993}
994
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700995static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
996 lb_client_data *lb_client = arg;
997 GPR_ASSERT(lb_client->lb_call);
998 grpc_op ops[1];
999 memset(ops, 0, sizeof(ops));
1000 grpc_op *op = ops;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001001
1002 op->op = GRPC_OP_SEND_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001003 op->data.send_message = lb_client->request_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001004 op->flags = 0;
1005 op->reserved = NULL;
1006 op++;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001007 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1008 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1009 &lb_client->req_sent);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001010 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001011}
1012
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001013static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001014 lb_client_data *lb_client = arg;
David Garcia Quintas601bb122016-08-18 15:03:59 -07001015 GPR_ASSERT(lb_client->lb_call);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001016
David Garcia Quintas601bb122016-08-18 15:03:59 -07001017 grpc_op ops[2];
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001018 memset(ops, 0, sizeof(ops));
1019 grpc_op *op = ops;
1020
David Garcia Quintas601bb122016-08-18 15:03:59 -07001021 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1022 op->data.recv_initial_metadata = &lb_client->initial_metadata_recv;
1023 op->flags = 0;
1024 op->reserved = NULL;
1025 op++;
1026
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001027 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001028 op->data.recv_message = &lb_client->response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001029 op->flags = 0;
1030 op->reserved = NULL;
1031 op++;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001032 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1033 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1034 &lb_client->res_rcvd);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001035 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001036}
1037
David Garcia Quintas65318262016-07-29 13:43:38 -07001038static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001039 lb_client_data *lb_client = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001040 grpc_op ops[2];
1041 memset(ops, 0, sizeof(ops));
1042 grpc_op *op = ops;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001043 if (lb_client->response_payload != NULL) {
1044 /* Received data from the LB server. Look inside
David Garcia Quintas601bb122016-08-18 15:03:59 -07001045 * lb_client->response_payload, for a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001046 grpc_byte_buffer_reader bbr;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001047 grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001048 gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas41bef452016-07-28 19:19:58 -07001049 grpc_byte_buffer_destroy(lb_client->response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001050 grpc_grpclb_serverlist *serverlist =
1051 grpc_grpclb_response_parse_serverlist(response_slice);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001052 if (serverlist != NULL) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001053 gpr_slice_unref(response_slice);
1054 if (grpc_lb_glb_trace) {
1055 gpr_log(GPR_INFO, "Serverlist with %zu servers received",
1056 serverlist->num_servers);
1057 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001058
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001059 /* update serverlist */
1060 if (serverlist->num_servers > 0) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001061 if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist,
1062 serverlist)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001063 if (grpc_lb_glb_trace) {
1064 gpr_log(GPR_INFO,
1065 "Incoming server list identical to current, ignoring.");
1066 }
1067 } else { /* new serverlist */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001068 if (lb_client->glb_policy->serverlist != NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001069 /* dispose of the old serverlist */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001070 grpc_grpclb_destroy_serverlist(lb_client->glb_policy->serverlist);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001071 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001072 /* and update the copy in the glb_lb_policy instance */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001073 lb_client->glb_policy->serverlist = serverlist;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001074 }
David Garcia Quintas41bef452016-07-28 19:19:58 -07001075 if (lb_client->glb_policy->rr_policy == NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001076 /* initial "handover", in this case from a null RR policy, meaning
David Garcia Quintas43339842016-07-18 12:56:09 -07001077 * it'll just create the first RR policy instance */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001078 rr_handover(exec_ctx, lb_client->glb_policy, error);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001079 } else {
1080 /* unref the RR policy, eventually leading to its substitution with a
1081 * new one constructed from the received serverlist (see
David Garcia Quintas348cfdb2016-08-19 12:19:43 -07001082 * glb_rr_connectivity_changed) */
David Garcia Quintas41bef452016-07-28 19:19:58 -07001083 GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy,
David Garcia Quintasea11d162016-07-14 17:27:28 -07001084 "serverlist_received");
1085 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001086 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001087 if (grpc_lb_glb_trace) {
1088 gpr_log(GPR_INFO,
1089 "Received empty server list. Picks will stay pending until a "
1090 "response with > 0 servers is received");
1091 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001092 }
1093
David Garcia Quintasea11d162016-07-14 17:27:28 -07001094 /* keep listening for serverlist updates */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001095 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas41bef452016-07-28 19:19:58 -07001096 op->data.recv_message = &lb_client->response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001097 op->flags = 0;
1098 op->reserved = NULL;
1099 op++;
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001100 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas41bef452016-07-28 19:19:58 -07001101 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1102 &lb_client->res_rcvd); /* loop */
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001103 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001104 return;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001105 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001106
1107 GPR_ASSERT(serverlist == NULL);
1108 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'",
1109 gpr_dump_slice(response_slice, GPR_DUMP_ASCII));
1110 gpr_slice_unref(response_slice);
1111
1112 /* Disconnect from server returning invalid response. */
1113 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
1114 op->flags = 0;
1115 op->reserved = NULL;
1116 op++;
1117 grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas41bef452016-07-28 19:19:58 -07001118 exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops),
1119 &lb_client->close_sent);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001120 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001121 }
1122 /* empty payload: call cancelled by server. Cleanups happening in
1123 * srv_status_rcvd_cb */
1124}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001125
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001126static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
1127 grpc_error *error) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001128 if (grpc_lb_glb_trace) {
1129 gpr_log(GPR_INFO,
1130 "Close from LB client sent. Waiting from server status now");
1131 }
1132}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001133
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001134static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001135 grpc_error *error) {
David Garcia Quintas41bef452016-07-28 19:19:58 -07001136 lb_client_data *lb_client = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001137 if (grpc_lb_glb_trace) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001138 gpr_log(GPR_INFO,
1139 "status from lb server received. Status = %d, Details = '%s', "
1140 "Capaticy "
1141 "= %zu",
David Garcia Quintas41bef452016-07-28 19:19:58 -07001142 lb_client->status, lb_client->status_details,
1143 lb_client->status_details_capacity);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001144 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -07001145 /* TODO(dgq): deal with stream termination properly (fire up another one?
David Garcia Quintas8424fdc2016-09-15 08:35:59 -07001146 * fail the original call?) */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001147}
1148
David Garcia Quintas8d489112016-07-29 15:20:42 -07001149/* Code wiring the policy with the rest of the core */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001150static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
1151 glb_destroy, glb_shutdown, glb_pick,
1152 glb_cancel_pick, glb_cancel_picks, glb_ping_one,
1153 glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
1154
1155static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
1156
1157static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
1158
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001159static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
1160 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
1161
1162static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
1163
1164grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
1165 return &glb_lb_policy_factory;
1166}
1167
1168/* Plugin registration */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001169void grpc_lb_policy_grpclb_init() {
1170 grpc_register_lb_policy(grpc_glb_lb_factory_create());
1171 grpc_register_tracer("glb", &grpc_lb_glb_trace);
1172}
1173
1174void grpc_lb_policy_grpclb_shutdown() {}