blob: 9a608fd47760ada8706cefaad4b51b868655eeda [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070034/** Implementation of the gRPC LB policy.
35 *
David Garcia Quintas43339842016-07-18 12:56:09 -070036 * This policy takes as input a set of resolved addresses {a1..an} for which the
37 * LB set was set (it's the resolver's responsibility to ensure this). That is
38 * to say, {a1..an} represent a collection of LB servers.
39 *
40 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
41 * This channel behaves just like a regular channel. In particular, the
42 * constructed URI over the addresses a1..an will use the default pick first
43 * policy to select from this list of LB server backends.
44 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070045 * The first time the policy gets a request for a pick, a ping, or to exit the
David Garcia Quintas98da61b2016-10-29 08:46:31 +020046 * idle state, \a query_for_backends_locked() is called. This function sets up
47 * and initiates the internal communication with the LB server. In particular,
48 * it's responsible for instantiating the internal *streaming* call to the LB
49 * server (whichever address from {a1..an} pick-first chose). This call is
David Garcia Quintas7ec29132016-11-01 04:09:05 +010050 * serviced by two callbacks, \a lb_on_server_status_received and \a
51 * lb_on_response_received. The former will be called when the call to the LB
52 * server completes. This can happen if the LB server closes the connection or
53 * if this policy itself cancels the call (for example because it's shutting
54 * down).If the internal call times out, the usual behavior of pick-first
55 * applies, continuing to pick from the list {a1..an}.
David Garcia Quintas43339842016-07-18 12:56:09 -070056 *
David Garcia Quintas98da61b2016-10-29 08:46:31 +020057 * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a
58 * res_recv. An invalid one results in the termination of the streaming call. A
59 * new streaming call should be created if possible, failing the original call
60 * otherwise. For a valid \a LoadBalancingResponse, the server list of actual
61 * backends is extracted. A Round Robin policy will be created from this list.
62 * There are two possible scenarios:
David Garcia Quintas43339842016-07-18 12:56:09 -070063 *
64 * 1. This is the first server list received. There was no previous instance of
David Garcia Quintas90712d52016-10-13 19:33:04 -070065 * the Round Robin policy. \a rr_handover_locked() will instantiate the RR
66 * policy and perform all the pending operations over it.
David Garcia Quintas43339842016-07-18 12:56:09 -070067 * 2. There's already a RR policy instance active. We need to introduce the new
68 * one build from the new serverlist, but taking care not to disrupt the
69 * operations in progress over the old RR instance. This is done by
70 * decreasing the reference count on the old policy. The moment no more
71 * references are held on the old RR policy, it'll be destroyed and \a
David Garcia Quintas348cfdb2016-08-19 12:19:43 -070072 * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
73 * state. At this point we can transition to a new RR instance safely, which
David Garcia Quintas90712d52016-10-13 19:33:04 -070074 * is done once again via \a rr_handover_locked().
David Garcia Quintas43339842016-07-18 12:56:09 -070075 *
76 *
77 * Once a RR policy instance is in place (and getting updated as described),
78 * calls to for a pick, a ping or a cancellation will be serviced right away by
79 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintas7ec29132016-11-01 04:09:05 +010080 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is
81 * received, etc), pick/ping requests are added to a list of pending picks/pings
82 * to be flushed and serviced as part of \a rr_handover_locked() the moment the
83 * RR policy instance becomes available.
David Garcia Quintas43339842016-07-18 12:56:09 -070084 *
85 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
86 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070087
88/* TODO(dgq):
89 * - Implement LB service forwarding (point 2c. in the doc's diagram).
90 */
91
murgatroid99085f9af2016-10-24 09:55:44 -070092/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
93 using that endpoint. Because of various transitive includes in uv.h,
94 including windows.h on Windows, uv.h must be included before other system
95 headers. Therefore, sockaddr.h must always be included first */
murgatroid997871f732016-09-23 13:49:05 -070096#include "src/core/lib/iomgr/sockaddr.h"
97
David Garcia Quintas8a81aa12016-08-22 15:06:49 -070098#include <errno.h>
99
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -0700100#include <string.h>
101
102#include <grpc/byte_buffer_reader.h>
103#include <grpc/grpc.h>
104#include <grpc/support/alloc.h>
105#include <grpc/support/host_port.h>
106#include <grpc/support/string_util.h>
David Garcia Quintas69099222016-10-03 11:28:37 -0700107#include <grpc/support/time.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -0700108
Mark D. Roth2137cd82016-09-14 09:04:00 -0700109#include "src/core/ext/client_channel/client_channel_factory.h"
Mark D. Roth15195742016-10-07 09:02:28 -0700110#include "src/core/ext/client_channel/lb_policy_factory.h"
Mark D. Roth2137cd82016-09-14 09:04:00 -0700111#include "src/core/ext/client_channel/lb_policy_registry.h"
112#include "src/core/ext/client_channel/parse_address.h"
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700113#include "src/core/ext/lb_policy/grpclb/grpclb.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700114#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
Mark D. Roth046cf762016-09-26 11:13:51 -0700115#include "src/core/lib/channel/channel_args.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200116#include "src/core/lib/iomgr/sockaddr.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700117#include "src/core/lib/iomgr/sockaddr_utils.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200118#include "src/core/lib/iomgr/timer.h"
119#include "src/core/lib/support/backoff.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700120#include "src/core/lib/support/string.h"
121#include "src/core/lib/surface/call.h"
122#include "src/core/lib/surface/channel.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700123#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700124
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200125#define BACKOFF_MULTIPLIER 1.6
126#define BACKOFF_JITTER 0.2
127#define BACKOFF_MIN_SECONDS 10
128#define BACKOFF_MAX_SECONDS 60
129
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700130int grpc_lb_glb_trace = 0;
131
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700132/* add lb_token of selected subchannel (address) to the call's initial
133 * metadata */
134static void initial_metadata_add_lb_token(
135 grpc_metadata_batch *initial_metadata,
136 grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
137 GPR_ASSERT(lb_token_mdelem_storage != NULL);
138 GPR_ASSERT(lb_token != NULL);
139 grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
140 lb_token);
141}
142
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700143typedef struct wrapped_rr_closure_arg {
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700144 /* the closure instance using this struct as argument */
145 grpc_closure wrapper_closure;
146
David Garcia Quintas43339842016-07-18 12:56:09 -0700147 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
148 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700149 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700150
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700151 /* the pick's initial metadata, kept in order to append the LB token for the
152 * pick */
153 grpc_metadata_batch *initial_metadata;
154
155 /* the picked target, used to determine which LB token to add to the pick's
156 * initial metadata */
157 grpc_connected_subchannel **target;
158
159 /* the LB token associated with the pick */
160 grpc_mdelem *lb_token;
161
162 /* storage for the lb token initial metadata mdelem */
163 grpc_linked_mdelem *lb_token_mdelem_storage;
164
David Garcia Quintas43339842016-07-18 12:56:09 -0700165 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700166 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700167
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700168 /* heap memory to be freed upon closure execution. */
169 void *free_when_done;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700170} wrapped_rr_closure_arg;
171
172/* The \a on_complete closure passed as part of the pick requires keeping a
173 * reference to its associated round robin instance. We wrap this closure in
174 * order to unref the round robin instance upon its invocation */
175static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700176 grpc_error *error) {
David Garcia Quintas43339842016-07-18 12:56:09 -0700177 wrapped_rr_closure_arg *wc_arg = arg;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700178
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200179 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
180 grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
181 NULL);
182
183 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700184 /* if target is NULL, no pick has been made by the RR policy (eg, all
185 * addresses failed to connect). There won't be any user_data/token
186 * available */
187 if (wc_arg->target != NULL) {
188 initial_metadata_add_lb_token(wc_arg->initial_metadata,
189 wc_arg->lb_token_mdelem_storage,
190 GRPC_MDELEM_REF(wc_arg->lb_token));
191 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200192 if (grpc_lb_glb_trace) {
193 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
194 (intptr_t)wc_arg->rr_policy);
195 }
196 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700197 }
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700198 GPR_ASSERT(wc_arg->free_when_done != NULL);
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700199 gpr_free(wc_arg->free_when_done);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700200}
201
David Garcia Quintasea11d162016-07-14 17:27:28 -0700202/* Linked list of pending pick requests. It stores all information needed to
203 * eventually call (Round Robin's) pick() on them. They mainly stay pending
204 * waiting for the RR policy to be created/updated.
205 *
206 * One particularity is the wrapping of the user-provided \a on_complete closure
207 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
208 * order to correctly unref the RR policy instance upon completion of the pick.
209 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700210typedef struct pending_pick {
211 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700212
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700213 /* original pick()'s arguments */
214 grpc_lb_policy_pick_args pick_args;
David Garcia Quintas43339842016-07-18 12:56:09 -0700215
216 /* output argument where to store the pick()ed connected subchannel, or NULL
217 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700218 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700219
David Garcia Quintas43339842016-07-18 12:56:09 -0700220 /* args for wrapped_on_complete */
221 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700222} pending_pick;
223
David Garcia Quintas8aace512016-08-15 14:55:12 -0700224static void add_pending_pick(pending_pick **root,
225 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas65318262016-07-29 13:43:38 -0700226 grpc_connected_subchannel **target,
227 grpc_closure *on_complete) {
228 pending_pick *pp = gpr_malloc(sizeof(*pp));
229 memset(pp, 0, sizeof(pending_pick));
230 memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
231 pp->next = *root;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700232 pp->pick_args = *pick_args;
David Garcia Quintas65318262016-07-29 13:43:38 -0700233 pp->target = target;
David Garcia Quintas65318262016-07-29 13:43:38 -0700234 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700235 pp->wrapped_on_complete_arg.target = target;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700236 pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
237 pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
238 pick_args->lb_token_mdelem_storage;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700239 pp->wrapped_on_complete_arg.free_when_done = pp;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700240 grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
241 wrapped_rr_closure, &pp->wrapped_on_complete_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700242 *root = pp;
243}
244
David Garcia Quintasea11d162016-07-14 17:27:28 -0700245/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700246typedef struct pending_ping {
247 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700248
David Garcia Quintas43339842016-07-18 12:56:09 -0700249 /* args for wrapped_notify */
250 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700251} pending_ping;
252
David Garcia Quintas65318262016-07-29 13:43:38 -0700253static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
254 pending_ping *pping = gpr_malloc(sizeof(*pping));
255 memset(pping, 0, sizeof(pending_ping));
256 memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
David Garcia Quintas65318262016-07-29 13:43:38 -0700257 pping->wrapped_notify_arg.wrapped_closure = notify;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700258 pping->wrapped_notify_arg.free_when_done = pping;
David Garcia Quintas65318262016-07-29 13:43:38 -0700259 pping->next = *root;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700260 grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
261 wrapped_rr_closure, &pping->wrapped_notify_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700262 *root = pping;
263}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700264
David Garcia Quintas8d489112016-07-29 15:20:42 -0700265/*
266 * glb_lb_policy
267 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700268typedef struct rr_connectivity_data rr_connectivity_data;
David Garcia Quintas65318262016-07-29 13:43:38 -0700269static const grpc_lb_policy_vtable glb_lb_policy_vtable;
270typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700271 /** base policy: must be first */
272 grpc_lb_policy base;
273
274 /** mutex protecting remaining members */
275 gpr_mu mu;
276
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700277 /** who the client is trying to communicate with */
Mark D. Rothd1604af2016-09-22 11:20:27 -0700278 const char *server_name;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700279 grpc_client_channel_factory *cc_factory;
Mark D. Roth046cf762016-09-26 11:13:51 -0700280 grpc_channel_args *args;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700281
David Garcia Quintas5cf3c372016-10-03 14:30:03 -0700282 /** deadline for the LB's call */
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700283 gpr_timespec deadline;
284
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700285 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700286 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700287
288 /** the RR policy to use of the backend servers returned by the LB server */
289 grpc_lb_policy *rr_policy;
290
291 bool started_picking;
292
293 /** our connectivity state tracker */
294 grpc_connectivity_state_tracker state_tracker;
295
David Garcia Quintasea11d162016-07-14 17:27:28 -0700296 /** stores the deserialized response from the LB. May be NULL until one such
297 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700298 grpc_grpclb_serverlist *serverlist;
299
David Garcia Quintasea11d162016-07-14 17:27:28 -0700300 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700301 pending_pick *pending_picks;
302
David Garcia Quintasea11d162016-07-14 17:27:28 -0700303 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700304 pending_ping *pending_pings;
305
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200306 bool shutting_down;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700307
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200308 /************************************************************/
309 /* client data associated with the LB server communication */
310 /************************************************************/
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100311 /* Status from the LB server has been received. This signals the end of the LB
312 * call. */
313 grpc_closure lb_on_server_status_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200314
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100315 /* A response from the LB server has been received. Process it */
316 grpc_closure lb_on_response_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200317
318 grpc_call *lb_call; /* streaming call to the LB server, */
319
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100320 grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
321 grpc_metadata_array
322 lb_trailing_metadata_recv; /* trailing MD from LB server */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200323
324 /* what's being sent to the LB server. Note that its value may vary if the LB
325 * server indicates a redirect. */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100326 grpc_byte_buffer *lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200327
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100328 /* response from the LB server, if any. Processed in lb_on_response_received()
329 */
330 grpc_byte_buffer *lb_response_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200331
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100332 /* the call's status and status detailset in lb_on_server_status_received() */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200333 grpc_status_code lb_call_status;
334 char *lb_call_status_details;
335 size_t lb_call_status_details_capacity;
336
337 /** LB call retry backoff state */
338 gpr_backoff lb_call_backoff_state;
339
340 /** LB call retry timer */
341 grpc_timer lb_call_retry_timer;
David Garcia Quintas65318262016-07-29 13:43:38 -0700342} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700343
David Garcia Quintas65318262016-07-29 13:43:38 -0700344/* Keeps track and reacts to changes in connectivity of the RR instance */
345struct rr_connectivity_data {
346 grpc_closure on_change;
347 grpc_connectivity_state state;
348 glb_lb_policy *glb_policy;
349};
David Garcia Quintas8d489112016-07-29 15:20:42 -0700350
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700351static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
352 bool log) {
353 const grpc_grpclb_ip_address *ip = &server->ip_address;
354 if (server->port >> 16 != 0) {
355 if (log) {
356 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200357 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
358 server->port, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700359 }
360 return false;
361 }
362
363 if (ip->size != 4 && ip->size != 16) {
364 if (log) {
365 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200366 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700367 "serverlist. Ignoring",
Jan Tattermusch2b398082016-10-07 14:40:30 +0200368 ip->size, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700369 }
370 return false;
371 }
372 return true;
373}
374
Mark D. Roth16883a32016-10-21 10:30:58 -0700375/* vtable for LB tokens in grpc_lb_addresses. */
Mark D. Roth557c9902016-10-24 11:12:05 -0700376static void *lb_token_copy(void *token) {
Mark D. Roth16883a32016-10-21 10:30:58 -0700377 return token == NULL ? NULL : GRPC_MDELEM_REF(token);
378}
379static void lb_token_destroy(void *token) {
380 if (token != NULL) GRPC_MDELEM_UNREF(token);
381}
Mark D. Roth557c9902016-10-24 11:12:05 -0700382static int lb_token_cmp(void *token1, void *token2) {
Mark D. Roth16883a32016-10-21 10:30:58 -0700383 if (token1 > token2) return 1;
384 if (token1 < token2) return -1;
385 return 0;
386}
387static const grpc_lb_user_data_vtable lb_token_vtable = {
388 lb_token_copy, lb_token_destroy, lb_token_cmp};
389
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100390static void parse_server(const grpc_grpclb_server *server,
391 grpc_resolved_address *addr) {
392 const uint16_t netorder_port = htons((uint16_t)server->port);
393 /* the addresses are given in binary format (a in(6)_addr struct) in
394 * server->ip_address.bytes. */
395 const grpc_grpclb_ip_address *ip = &server->ip_address;
396 memset(addr, 0, sizeof(*addr));
397 if (ip->size == 4) {
398 addr->len = sizeof(struct sockaddr_in);
399 struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr;
400 addr4->sin_family = AF_INET;
401 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
402 addr4->sin_port = netorder_port;
403 } else if (ip->size == 16) {
404 addr->len = sizeof(struct sockaddr_in6);
405 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr;
406 addr6->sin6_family = AF_INET;
407 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
408 addr6->sin6_port = netorder_port;
409 }
410}
411
Mark D. Roth7ce14d22016-09-16 13:03:46 -0700412/* Returns addresses extracted from \a serverlist. */
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700413static grpc_lb_addresses *process_serverlist(
Mark D. Rothc5c38782016-09-16 08:51:01 -0700414 const grpc_grpclb_serverlist *serverlist) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700415 size_t num_valid = 0;
416 /* first pass: count how many are valid in order to allocate the necessary
417 * memory in a single block */
418 for (size_t i = 0; i < serverlist->num_servers; ++i) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700419 if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700420 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700421 if (num_valid == 0) return NULL;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700422
Mark D. Roth16883a32016-10-21 10:30:58 -0700423 grpc_lb_addresses *lb_addresses =
424 grpc_lb_addresses_create(num_valid, &lb_token_vtable);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700425
426 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700427 * to the outside world) to be read by the RR policy during its creation.
428 * Given that the validity tests are very cheap, they are performed again
429 * instead of marking the valid ones during the first pass, as this would
430 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700431 size_t addr_idx = 0;
432 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
433 GPR_ASSERT(addr_idx < num_valid);
434 const grpc_grpclb_server *server = serverlist->servers[sl_idx];
435 if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700436
437 /* address processing */
Mark D. Rothc5c38782016-09-16 08:51:01 -0700438 grpc_resolved_address addr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100439 parse_server(server, &addr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700440
441 /* lb token processing */
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700442 void *user_data;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700443 if (server->has_load_balance_token) {
444 const size_t lb_token_size =
445 GPR_ARRAY_SIZE(server->load_balance_token) - 1;
446 grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
447 (uint8_t *)server->load_balance_token, lb_token_size);
David Garcia Quintasa3654db2016-10-11 15:52:39 -0700448 user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN,
449 lb_token_mdstr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700450 } else {
451 gpr_log(GPR_ERROR,
452 "Missing LB token for backend address '%s'. The empty token will "
453 "be used instead",
murgatroid9908b0fab2016-09-23 14:35:49 -0700454 grpc_sockaddr_to_uri(&addr));
David Garcia Quintasa3654db2016-10-11 15:52:39 -0700455 user_data = GRPC_MDELEM_LB_TOKEN_EMPTY;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700456 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700457
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700458 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
459 false /* is_balancer */,
Mark D. Rothc5c38782016-09-16 08:51:01 -0700460 NULL /* balancer_name */, user_data);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700461 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700462 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700463 GPR_ASSERT(addr_idx == num_valid);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700464 return lb_addresses;
465}
466
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700467/* perform a pick over \a rr_policy. Given that a pick can return immediately
468 * (ignoring its completion callback) we need to perform the cleanups this
469 * callback would be otherwise resposible for */
David Garcia Quintas20359062016-10-15 15:22:51 -0700470static bool pick_from_internal_rr_locked(
471 grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy,
472 const grpc_lb_policy_pick_args *pick_args,
473 grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
474 GPR_ASSERT(rr_policy != NULL);
475 const bool pick_done =
476 grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target,
477 (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
478 if (pick_done) {
479 /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
480 if (grpc_lb_glb_trace) {
481 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
482 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700483 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200484 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700485
David Garcia Quintas20359062016-10-15 15:22:51 -0700486 /* add the load reporting initial metadata */
487 initial_metadata_add_lb_token(pick_args->initial_metadata,
488 pick_args->lb_token_mdelem_storage,
489 GRPC_MDELEM_REF(wc_arg->lb_token));
490
491 gpr_free(wc_arg);
492 }
493 /* else, the pending pick will be registered and taken care of by the
494 * pending pick list inside the RR policy (glb_policy->rr_policy).
495 * Eventually, wrapped_on_complete will be called, which will -among other
496 * things- add the LB token to the call's initial metadata */
David Garcia Quintas20359062016-10-15 15:22:51 -0700497 return pick_done;
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700498}
499
David Garcia Quintas90712d52016-10-13 19:33:04 -0700500static grpc_lb_policy *create_rr_locked(
501 grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist,
502 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700503 GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700504
505 grpc_lb_policy_args args;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700506 memset(&args, 0, sizeof(args));
David Garcia Quintas65318262016-07-29 13:43:38 -0700507 args.client_channel_factory = glb_policy->cc_factory;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200508 grpc_lb_addresses *addresses = process_serverlist(serverlist);
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700509
510 // Replace the LB addresses in the channel args that we pass down to
511 // the subchannel.
Mark D. Roth557c9902016-10-24 11:12:05 -0700512 static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200513 const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700514 args.args = grpc_channel_args_copy_and_add_and_remove(
515 glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
516 1);
David Garcia Quintas65318262016-07-29 13:43:38 -0700517
518 grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200519 GPR_ASSERT(rr != NULL);
520 grpc_lb_addresses_destroy(addresses);
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700521 grpc_channel_args_destroy(args.args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700522 return rr;
523}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700524
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200525static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
526 grpc_error *error);
527/* glb_policy->rr_policy may be NULL (initial handover) */
David Garcia Quintas90712d52016-10-13 19:33:04 -0700528static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
529 glb_lb_policy *glb_policy, grpc_error *error) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700530 GPR_ASSERT(glb_policy->serverlist != NULL &&
531 glb_policy->serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700532
533 if (grpc_lb_glb_trace) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200534 gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700535 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200536 if (glb_policy->rr_policy != NULL) {
537 /* if we are phasing out an existing RR instance, unref it. */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100538 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200539 }
540
541 glb_policy->rr_policy =
542 create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
543 if (grpc_lb_glb_trace) {
544 gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy);
545 }
546
David Garcia Quintas65318262016-07-29 13:43:38 -0700547 GPR_ASSERT(glb_policy->rr_policy != NULL);
Yuchen Zengb4291642016-09-01 19:17:14 -0700548 grpc_pollset_set_add_pollset_set(exec_ctx,
549 glb_policy->rr_policy->interested_parties,
550 glb_policy->base.interested_parties);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200551
552 rr_connectivity_data *rr_connectivity =
553 gpr_malloc(sizeof(rr_connectivity_data));
554 memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
555 grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
556 rr_connectivity);
557 rr_connectivity->glb_policy = glb_policy;
558 rr_connectivity->state = grpc_lb_policy_check_connectivity(
David Garcia Quintas65318262016-07-29 13:43:38 -0700559 exec_ctx, glb_policy->rr_policy, &error);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200560
David Garcia Quintas65318262016-07-29 13:43:38 -0700561 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200562 rr_connectivity->state, GRPC_ERROR_REF(error),
563 "rr_handover");
564 /* subscribe */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100565 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectiviby_cb");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200566 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
567 &rr_connectivity->state,
568 &rr_connectivity->on_change);
David Garcia Quintas65318262016-07-29 13:43:38 -0700569 grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
570
571 /* flush pending ops */
572 pending_pick *pp;
573 while ((pp = glb_policy->pending_picks)) {
574 glb_policy->pending_picks = pp->next;
575 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
576 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
577 if (grpc_lb_glb_trace) {
578 gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
579 (intptr_t)glb_policy->rr_policy);
580 }
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700581 pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
582 &pp->pick_args, pp->target,
583 &pp->wrapped_on_complete_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700584 }
585
586 pending_ping *pping;
587 while ((pping = glb_policy->pending_pings)) {
588 glb_policy->pending_pings = pping->next;
589 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
590 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
591 if (grpc_lb_glb_trace) {
592 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
593 (intptr_t)glb_policy->rr_policy);
594 }
595 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700596 &pping->wrapped_notify_arg.wrapper_closure);
David Garcia Quintas65318262016-07-29 13:43:38 -0700597 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700598}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700599
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700600static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
601 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200602 /* If shutdown or error free the arg. Rely on the rest of the code to set the
603 * right grpclb status. */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700604 rr_connectivity_data *rr_conn_data = arg;
605 glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700606
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100607 if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN &&
608 !glb_policy->shutting_down) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200609 gpr_mu_lock(&glb_policy->mu);
610 /* RR not shutting down. Mimic the RR's policy state */
611 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
612 rr_conn_data->state, GRPC_ERROR_REF(error),
613 "glb_rr_connectivity_changed");
614 /* resubscribe */
615 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
616 &rr_conn_data->state,
617 &rr_conn_data->on_change);
618 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700619 } else {
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100620 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
621 "rr_connectiviby_cb");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200622 gpr_free(rr_conn_data);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700623 }
624}
625
David Garcia Quintas65318262016-07-29 13:43:38 -0700626static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
627 grpc_lb_policy_factory *factory,
628 grpc_lb_policy_args *args) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700629 /* Get server name. */
Mark D. Roth557c9902016-10-24 11:12:05 -0700630 const grpc_arg *arg =
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700631 grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
Mark D. Roth557c9902016-10-24 11:12:05 -0700632 const char *server_name =
633 arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700634
Mark D. Rothe011b1e2016-09-07 08:28:00 -0700635 /* Count the number of gRPC-LB addresses. There must be at least one.
636 * TODO(roth): For now, we ignore non-balancer addresses, but in the
637 * future, we may change the behavior such that we fall back to using
638 * the non-balancer addresses if we cannot reach any balancers. At that
639 * time, this should be changed to allow a list with no balancer addresses,
640 * since the resolver might fail to return a balancer address even when
641 * this is the right LB policy to use. */
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700642 arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
643 GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
Mark D. Roth557c9902016-10-24 11:12:05 -0700644 grpc_lb_addresses *addresses = arg->value.pointer.p;
Mark D. Rothf655c852016-09-06 10:40:38 -0700645 size_t num_grpclb_addrs = 0;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700646 for (size_t i = 0; i < addresses->num_addresses; ++i) {
647 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
Mark D. Rothf655c852016-09-06 10:40:38 -0700648 }
649 if (num_grpclb_addrs == 0) return NULL;
650
David Garcia Quintas65318262016-07-29 13:43:38 -0700651 glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
652 memset(glb_policy, 0, sizeof(*glb_policy));
653
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700654 /* All input addresses in addresses come from a resolver that claims
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700655 * they are LB services. It's the resolver's responsibility to make sure
656 * this
David Garcia Quintas65318262016-07-29 13:43:38 -0700657 * policy is only instantiated and used in that case.
658 *
659 * Create a client channel over them to communicate with a LB service */
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700660 glb_policy->server_name = gpr_strdup(server_name);
David Garcia Quintas65318262016-07-29 13:43:38 -0700661 glb_policy->cc_factory = args->client_channel_factory;
Mark D. Roth98abfd32016-10-21 08:10:51 -0700662 glb_policy->args = grpc_channel_args_copy(args->args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700663 GPR_ASSERT(glb_policy->cc_factory != NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700664
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700665 /* construct a target from the addresses in args, given in the form
David Garcia Quintas65318262016-07-29 13:43:38 -0700666 * ipvX://ip1:port1,ip2:port2,...
667 * TODO(dgq): support mixed ip version */
Mark D. Rothf655c852016-09-06 10:40:38 -0700668 char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700669 size_t addr_index = 0;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700670 for (size_t i = 0; i < addresses->num_addresses; i++) {
671 if (addresses->addresses[i].user_data != NULL) {
David Garcia Quintas5ebb7af2016-09-15 10:02:16 -0700672 gpr_log(GPR_ERROR,
673 "This LB policy doesn't support user data. It will be ignored");
674 }
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700675 if (addresses->addresses[i].is_balancer) {
Mark D. Rothc5c38782016-09-16 08:51:01 -0700676 if (addr_index == 0) {
murgatroid99dedb9232016-09-26 13:54:04 -0700677 addr_strs[addr_index++] =
Mark D. Rothfb809b72016-10-26 09:12:25 -0700678 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700679 } else {
Mark D. Roth49f89f02016-10-26 11:16:59 -0700680 GPR_ASSERT(grpc_sockaddr_to_string(&addr_strs[addr_index++],
681 &addresses->addresses[i].address,
682 true) > 0);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700683 }
Mark D. Rothf655c852016-09-06 10:40:38 -0700684 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700685 }
686 size_t uri_path_len;
Mark D. Roth989cdcd2016-09-06 13:28:28 -0700687 char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
688 num_grpclb_addrs, ",", &uri_path_len);
David Garcia Quintas65318262016-07-29 13:43:38 -0700689
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700690 /* Create a channel to talk to the LBs.
691 *
692 * We strip out the channel arg for the LB policy name, since we want
693 * to use the default (pick_first) in this case.
694 *
695 * We also strip out the channel arg for the resolved addresses, since
696 * that will be generated by the name resolver used in the LB channel.
697 * Note that the LB channel will use the sockaddr resolver, so this
698 * won't actually generate a query to DNS (or some other name service).
699 * However, the addresses returned by the sockaddr resolver will have
700 * is_balancer=false, whereas our own addresses have is_balancer=true.
701 * We need the LB channel to return addresses with is_balancer=false
702 * so that it does not wind up recursively using the grpclb LB policy,
703 * as per the special case logic in client_channel.c.
704 */
Mark D. Roth557c9902016-10-24 11:12:05 -0700705 static const char *keys_to_remove[] = {GRPC_ARG_LB_POLICY_NAME,
706 GRPC_ARG_LB_ADDRESSES};
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700707 grpc_channel_args *new_args = grpc_channel_args_copy_and_remove(
708 args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove));
David Garcia Quintas65318262016-07-29 13:43:38 -0700709 glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
710 exec_ctx, glb_policy->cc_factory, target_uri_str,
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700711 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
712 grpc_channel_args_destroy(new_args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700713
714 gpr_free(target_uri_str);
Mark D. Rothf655c852016-09-06 10:40:38 -0700715 for (size_t i = 0; i < num_grpclb_addrs; i++) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700716 gpr_free(addr_strs[i]);
717 }
718 gpr_free(addr_strs);
719
720 if (glb_policy->lb_channel == NULL) {
721 gpr_free(glb_policy);
722 return NULL;
723 }
724
David Garcia Quintas65318262016-07-29 13:43:38 -0700725 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
726 gpr_mu_init(&glb_policy->mu);
727 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
728 "grpclb");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200729
David Garcia Quintas65318262016-07-29 13:43:38 -0700730 return &glb_policy->base;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700731}
732
David Garcia Quintas65318262016-07-29 13:43:38 -0700733static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
734 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
735 GPR_ASSERT(glb_policy->pending_picks == NULL);
736 GPR_ASSERT(glb_policy->pending_pings == NULL);
Mark D. Rothd1604af2016-09-22 11:20:27 -0700737 gpr_free((void *)glb_policy->server_name);
Mark D. Roth046cf762016-09-26 11:13:51 -0700738 grpc_channel_args_destroy(glb_policy->args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700739 grpc_channel_destroy(glb_policy->lb_channel);
740 glb_policy->lb_channel = NULL;
741 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
742 if (glb_policy->serverlist != NULL) {
743 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
744 }
745 gpr_mu_destroy(&glb_policy->mu);
746 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700747}
748
David Garcia Quintas65318262016-07-29 13:43:38 -0700749static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
750 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
751 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200752 glb_policy->shutting_down = true;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700753
David Garcia Quintas65318262016-07-29 13:43:38 -0700754 pending_pick *pp = glb_policy->pending_picks;
755 glb_policy->pending_picks = NULL;
756 pending_ping *pping = glb_policy->pending_pings;
757 glb_policy->pending_pings = NULL;
758 gpr_mu_unlock(&glb_policy->mu);
759
760 while (pp != NULL) {
761 pending_pick *next = pp->next;
762 *pp->target = NULL;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700763 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
764 GRPC_ERROR_NONE, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700765 pp = next;
766 }
767
768 while (pping != NULL) {
769 pending_ping *next = pping->next;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700770 grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
771 GRPC_ERROR_NONE, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700772 pping = next;
773 }
774
775 if (glb_policy->rr_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700776 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
777 }
778
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200779 if (glb_policy->started_picking) {
780 if (glb_policy->lb_call != NULL) {
781 grpc_call_cancel(glb_policy->lb_call, NULL);
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100782 /* lb_on_server_status_received will pick up the cancellation and clean up
783 */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200784 }
785 }
David Garcia Quintas740759e2016-08-01 14:49:49 -0700786
David Garcia Quintas65318262016-07-29 13:43:38 -0700787 grpc_connectivity_state_set(
788 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
789 GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
790}
791
792static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
Mark D. Roth5f844002016-09-08 08:20:53 -0700793 grpc_connected_subchannel **target,
794 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700795 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
796 gpr_mu_lock(&glb_policy->mu);
797 pending_pick *pp = glb_policy->pending_picks;
798 glb_policy->pending_picks = NULL;
799 while (pp != NULL) {
800 pending_pick *next = pp->next;
801 if (pp->target == target) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700802 *target = NULL;
Mark D. Roth932b10c2016-09-09 08:44:30 -0700803 grpc_exec_ctx_sched(
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700804 exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
Mark D. Roth932b10c2016-09-09 08:44:30 -0700805 GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700806 } else {
807 pp->next = glb_policy->pending_picks;
808 glb_policy->pending_picks = pp;
809 }
810 pp = next;
811 }
812 gpr_mu_unlock(&glb_policy->mu);
Mark D. Roth5f844002016-09-08 08:20:53 -0700813 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700814}
815
David Garcia Quintas65318262016-07-29 13:43:38 -0700816static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
817 uint32_t initial_metadata_flags_mask,
Mark D. Rothe65ff112016-09-09 13:48:38 -0700818 uint32_t initial_metadata_flags_eq,
819 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700820 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
821 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas65318262016-07-29 13:43:38 -0700822 pending_pick *pp = glb_policy->pending_picks;
823 glb_policy->pending_picks = NULL;
824 while (pp != NULL) {
825 pending_pick *next = pp->next;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700826 if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
David Garcia Quintas65318262016-07-29 13:43:38 -0700827 initial_metadata_flags_eq) {
Mark D. Roth58f52b72016-09-09 13:55:18 -0700828 grpc_exec_ctx_sched(
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700829 exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
Mark D. Roth58f52b72016-09-09 13:55:18 -0700830 GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700831 } else {
832 pp->next = glb_policy->pending_picks;
833 glb_policy->pending_picks = pp;
834 }
835 pp = next;
836 }
837 gpr_mu_unlock(&glb_policy->mu);
Mark D. Rothe65ff112016-09-09 13:48:38 -0700838 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700839}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700840
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200841static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
842 glb_lb_policy *glb_policy);
843static void start_picking_locked(grpc_exec_ctx *exec_ctx,
844 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700845 glb_policy->started_picking = true;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200846 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
847 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700848}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700849
David Garcia Quintas65318262016-07-29 13:43:38 -0700850static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
851 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
852 gpr_mu_lock(&glb_policy->mu);
853 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200854 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700855 }
856 gpr_mu_unlock(&glb_policy->mu);
857}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700858
David Garcia Quintas65318262016-07-29 13:43:38 -0700859static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
David Garcia Quintas8aace512016-08-15 14:55:12 -0700860 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700861 grpc_connected_subchannel **target, void **user_data,
David Garcia Quintas65318262016-07-29 13:43:38 -0700862 grpc_closure *on_complete) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700863 if (pick_args->lb_token_mdelem_storage == NULL) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700864 *target = NULL;
David Garcia Quintas6cc44fc2016-09-12 23:04:35 -0700865 grpc_exec_ctx_sched(
866 exec_ctx, on_complete,
867 GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
868 "won't work without it. Failing"),
869 NULL);
Mark D. Roth1e5f6af2016-10-07 08:32:58 -0700870 return 0;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700871 }
872
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700873 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -0700874 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas5cf3c372016-10-03 14:30:03 -0700875 glb_policy->deadline = pick_args->deadline;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700876 bool pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -0700877
878 if (glb_policy->rr_policy != NULL) {
879 if (grpc_lb_glb_trace) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200880 gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p",
881 (void *)glb_policy, (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700882 }
883 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
David Garcia Quintas8aace512016-08-15 14:55:12 -0700884
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700885 wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
David Garcia Quintas90712d52016-10-13 19:33:04 -0700886 memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg));
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700887
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700888 grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg);
David Garcia Quintas90712d52016-10-13 19:33:04 -0700889 wc_arg->rr_policy = glb_policy->rr_policy;
890 wc_arg->target = target;
891 wc_arg->wrapped_closure = on_complete;
892 wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
893 wc_arg->initial_metadata = pick_args->initial_metadata;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700894 wc_arg->free_when_done = wc_arg;
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700895 pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
David Garcia Quintas20359062016-10-15 15:22:51 -0700896 pick_args, target, wc_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700897 } else {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200898 if (grpc_lb_glb_trace) {
899 gpr_log(GPR_DEBUG,
900 "No RR policy in grpclb instance %p. Adding to grpclb's pending "
901 "picks",
902 (void *)(glb_policy));
903 }
David Garcia Quintas8aace512016-08-15 14:55:12 -0700904 add_pending_pick(&glb_policy->pending_picks, pick_args, target,
905 on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -0700906
907 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200908 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700909 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700910 pick_done = false;
David Garcia Quintas65318262016-07-29 13:43:38 -0700911 }
912 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700913 return pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -0700914}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700915
David Garcia Quintas65318262016-07-29 13:43:38 -0700916static grpc_connectivity_state glb_check_connectivity(
917 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
918 grpc_error **connectivity_error) {
919 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
920 grpc_connectivity_state st;
921 gpr_mu_lock(&glb_policy->mu);
922 st = grpc_connectivity_state_check(&glb_policy->state_tracker,
923 connectivity_error);
924 gpr_mu_unlock(&glb_policy->mu);
925 return st;
926}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700927
David Garcia Quintas65318262016-07-29 13:43:38 -0700928static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
929 grpc_closure *closure) {
930 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
931 gpr_mu_lock(&glb_policy->mu);
932 if (glb_policy->rr_policy) {
933 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
934 } else {
935 add_pending_ping(&glb_policy->pending_pings, closure);
936 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200937 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700938 }
939 }
940 gpr_mu_unlock(&glb_policy->mu);
941}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700942
David Garcia Quintas65318262016-07-29 13:43:38 -0700943static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
944 grpc_lb_policy *pol,
945 grpc_connectivity_state *current,
946 grpc_closure *notify) {
947 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
948 gpr_mu_lock(&glb_policy->mu);
949 grpc_connectivity_state_notify_on_state_change(
950 exec_ctx, &glb_policy->state_tracker, current, notify);
951
952 gpr_mu_unlock(&glb_policy->mu);
953}
954
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100955static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
956 grpc_error *error);
957static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
958 grpc_error *error);
959static void lb_call_init(glb_lb_policy *glb_policy) {
David Garcia Quintas55ba14a2016-09-27 18:45:30 -0700960 GPR_ASSERT(glb_policy->server_name != NULL);
961 GPR_ASSERT(glb_policy->server_name[0] != '\0');
962
David Garcia Quintas15eba132016-08-09 15:20:48 -0700963 /* Note the following LB call progresses every time there's activity in \a
964 * glb_policy->base.interested_parties, which is comprised of the polling
Yuchen Zengf7c45ae2016-09-15 13:40:32 -0700965 * entities from \a client_channel. */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200966 glb_policy->lb_call = grpc_channel_create_pollset_set_call(
David Garcia Quintas65318262016-07-29 13:43:38 -0700967 glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
David Garcia Quintas4543e5c2016-09-22 15:09:34 -0700968 glb_policy->base.interested_parties,
David Garcia Quintas55ba14a2016-09-27 18:45:30 -0700969 "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name,
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200970 glb_policy->deadline, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700971
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100972 grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
973 grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -0700974
David Garcia Quintas55ba14a2016-09-27 18:45:30 -0700975 grpc_grpclb_request *request =
976 grpc_grpclb_request_create(glb_policy->server_name);
David Garcia Quintas65318262016-07-29 13:43:38 -0700977 gpr_slice request_payload_slice = grpc_grpclb_request_encode(request);
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100978 glb_policy->lb_request_payload =
David Garcia Quintas65318262016-07-29 13:43:38 -0700979 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
980 gpr_slice_unref(request_payload_slice);
981 grpc_grpclb_request_destroy(request);
982
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200983 glb_policy->lb_call_status_details = NULL;
984 glb_policy->lb_call_status_details_capacity = 0;
985
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100986 grpc_closure_init(&glb_policy->lb_on_server_status_received,
987 lb_on_server_status_received, glb_policy);
988 grpc_closure_init(&glb_policy->lb_on_response_received,
989 lb_on_response_received, glb_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200990
991 gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER,
992 BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000,
993 BACKOFF_MAX_SECONDS * 1000);
David Garcia Quintas65318262016-07-29 13:43:38 -0700994}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700995
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100996static void lb_call_destroy(glb_lb_policy *glb_policy) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200997 GPR_ASSERT(glb_policy->lb_call != NULL);
998 grpc_call_destroy(glb_policy->lb_call);
999 glb_policy->lb_call = NULL;
David Garcia Quintas65318262016-07-29 13:43:38 -07001000
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001001 grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv);
1002 grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -07001003
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001004 grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001005 gpr_free(glb_policy->lb_call_status_details);
David Garcia Quintas65318262016-07-29 13:43:38 -07001006}
1007
David Garcia Quintas8d489112016-07-29 15:20:42 -07001008/*
1009 * Auxiliary functions and LB client callbacks.
1010 */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001011static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
1012 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001013 GPR_ASSERT(glb_policy->lb_channel != NULL);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001014 /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
1015 * count goes to zero) to be unref'd in lb_on_server_status_received */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001016 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "query_for_backends_locked");
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001017 lb_call_init(glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001018
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001019 if (grpc_lb_glb_trace) {
1020 gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)",
1021 (void *)glb_policy, (void *)glb_policy->lb_call);
1022 }
1023 GPR_ASSERT(glb_policy->lb_call != NULL);
1024
David Garcia Quintas65318262016-07-29 13:43:38 -07001025 grpc_call_error call_error;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001026 grpc_op ops[4];
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001027 memset(ops, 0, sizeof(ops));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001028
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001029 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -07001030 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1031 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001032 op->flags = 0;
1033 op->reserved = NULL;
1034 op++;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001035
1036 op->op = GRPC_OP_RECV_INITIAL_METADATA;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001037 op->data.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001038 op->flags = 0;
1039 op->reserved = NULL;
1040 op++;
1041
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001042 GPR_ASSERT(glb_policy->lb_request_payload != NULL);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001043 op->op = GRPC_OP_SEND_MESSAGE;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001044 op->data.send_message = glb_policy->lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001045 op->flags = 0;
1046 op->reserved = NULL;
1047 op++;
1048
1049 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1050 op->data.recv_status_on_client.trailing_metadata =
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001051 &glb_policy->lb_trailing_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001052 op->data.recv_status_on_client.status = &glb_policy->lb_call_status;
1053 op->data.recv_status_on_client.status_details =
1054 &glb_policy->lb_call_status_details;
1055 op->data.recv_status_on_client.status_details_capacity =
1056 &glb_policy->lb_call_status_details_capacity;
1057 op->flags = 0;
1058 op->reserved = NULL;
1059 op++;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001060 call_error = grpc_call_start_batch_and_execute(
1061 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1062 &glb_policy->lb_on_server_status_received);
David Garcia Quintas65318262016-07-29 13:43:38 -07001063 GPR_ASSERT(GRPC_CALL_OK == call_error);
1064
1065 op = ops;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001066 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001067 op->data.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001068 op->flags = 0;
1069 op->reserved = NULL;
1070 op++;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001071 call_error = grpc_call_start_batch_and_execute(
1072 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1073 &glb_policy->lb_on_response_received);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001074 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001075}
1076
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001077static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
1078 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001079 glb_lb_policy *glb_policy = arg;
1080
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001081 grpc_op ops[2];
1082 memset(ops, 0, sizeof(ops));
1083 grpc_op *op = ops;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001084 if (glb_policy->lb_response_payload != NULL) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001085 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
David Garcia Quintas41bef452016-07-28 19:19:58 -07001086 /* Received data from the LB server. Look inside
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001087 * glb_policy->lb_response_payload, for a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001088 grpc_byte_buffer_reader bbr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001089 grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001090 gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001091 grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001092 grpc_grpclb_serverlist *serverlist =
1093 grpc_grpclb_response_parse_serverlist(response_slice);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001094 if (serverlist != NULL) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001095 GPR_ASSERT(glb_policy->lb_call != NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001096 gpr_slice_unref(response_slice);
1097 if (grpc_lb_glb_trace) {
Jan Tattermusch2b398082016-10-07 14:40:30 +02001098 gpr_log(GPR_INFO, "Serverlist with %lu servers received",
1099 (unsigned long)serverlist->num_servers);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001100 for (size_t i = 0; i < serverlist->num_servers; ++i) {
1101 grpc_resolved_address addr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001102 parse_server(serverlist->servers[i], &addr);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001103 char *ipport;
1104 grpc_sockaddr_to_string(&ipport, &addr, false);
1105 gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
1106 gpr_free(ipport);
1107 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001108 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001109
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001110 /* update serverlist */
1111 if (serverlist->num_servers > 0) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001112 gpr_mu_lock(&glb_policy->mu);
1113 if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001114 if (grpc_lb_glb_trace) {
1115 gpr_log(GPR_INFO,
1116 "Incoming server list identical to current, ignoring.");
1117 }
1118 } else { /* new serverlist */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001119 if (glb_policy->serverlist != NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001120 /* dispose of the old serverlist */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001121 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001122 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001123 /* and update the copy in the glb_lb_policy instance */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001124 glb_policy->serverlist = serverlist;
1125
1126 rr_handover_locked(exec_ctx, glb_policy, error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001127 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001128 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001129 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001130 if (grpc_lb_glb_trace) {
1131 gpr_log(GPR_INFO,
1132 "Received empty server list. Picks will stay pending until a "
1133 "response with > 0 servers is received");
1134 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001135 }
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001136 } else { /* serverlist == NULL */
1137 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
1138 gpr_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
1139 gpr_slice_unref(response_slice);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001140 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001141
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001142 /* keep listening for serverlist updates */
1143 op->op = GRPC_OP_RECV_MESSAGE;
1144 op->data.recv_message = &glb_policy->lb_response_payload;
1145 op->flags = 0;
1146 op->reserved = NULL;
1147 op++;
1148 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1149 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1150 &glb_policy->lb_on_response_received); /* loop */
1151 GPR_ASSERT(GRPC_CALL_OK == call_error);
1152 return;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001153 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001154 /* else, empty payload: call cancelled by server. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001155}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001156
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001157static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
1158 grpc_error *error) {
1159 glb_lb_policy *glb_policy = arg;
1160 gpr_mu_lock(&glb_policy->mu);
1161
1162 if (!glb_policy->shutting_down) {
1163 if (grpc_lb_glb_trace) {
1164 gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
1165 (void *)glb_policy);
1166 }
1167 GPR_ASSERT(glb_policy->lb_call == NULL);
1168 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001169 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001170 gpr_mu_unlock(&glb_policy->mu);
1171
1172 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1173 "grpclb_on_retry_timer");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001174}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001175
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001176static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
1177 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001178 glb_lb_policy *glb_policy = arg;
1179 gpr_mu_lock(&glb_policy->mu);
1180
1181 GPR_ASSERT(glb_policy->lb_call != NULL);
1182
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001183 if (grpc_lb_glb_trace) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001184 gpr_log(GPR_DEBUG,
1185 "Status from LB server received. Status = %d, Details = '%s', "
1186 "(call: %p)",
1187 glb_policy->lb_call_status, glb_policy->lb_call_status_details,
1188 (void *)glb_policy->lb_call);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001189 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001190
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001191 const bool was_cancelled =
1192 (glb_policy->lb_call_status == GRPC_STATUS_CANCELLED);
1193
1194 /* We need to performe cleanups no matter what. */
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001195 lb_call_destroy(glb_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001196
1197 if (!glb_policy->shutting_down) {
1198 GPR_ASSERT(!was_cancelled);
1199 /* if we aren't shutting down, restart the LB client call after some time */
1200 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1201 gpr_timespec next_try =
1202 gpr_backoff_step(&glb_policy->lb_call_backoff_state, now);
1203 if (grpc_lb_glb_trace) {
1204 gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
1205 (void *)glb_policy);
1206 gpr_timespec timeout = gpr_time_sub(next_try, now);
1207 if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
1208 gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.",
1209 timeout.tv_sec, timeout.tv_nsec);
1210 } else {
1211 gpr_log(GPR_DEBUG, "... retrying immediately.");
1212 }
1213 }
1214 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
1215 grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
1216 lb_call_on_retry_timer, glb_policy, now);
1217 }
1218 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001219 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1220 "lb_on_server_status_received");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001221}
1222
David Garcia Quintas8d489112016-07-29 15:20:42 -07001223/* Code wiring the policy with the rest of the core */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001224static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
1225 glb_destroy, glb_shutdown, glb_pick,
1226 glb_cancel_pick, glb_cancel_picks, glb_ping_one,
1227 glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
1228
1229static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
1230
1231static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
1232
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001233static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
1234 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
1235
1236static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
1237
1238grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
1239 return &glb_lb_policy_factory;
1240}
1241
1242/* Plugin registration */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001243void grpc_lb_policy_grpclb_init() {
1244 grpc_register_lb_policy(grpc_glb_lb_factory_create());
1245 grpc_register_tracer("glb", &grpc_lb_glb_trace);
1246}
1247
1248void grpc_lb_policy_grpclb_shutdown() {}