blob: a85c16b881de6df4bf7e587bae021e93a6f8e6cd [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070034/** Implementation of the gRPC LB policy.
35 *
David Garcia Quintas43339842016-07-18 12:56:09 -070036 * This policy takes as input a set of resolved addresses {a1..an} for which the
37 * LB set was set (it's the resolver's responsibility to ensure this). That is
38 * to say, {a1..an} represent a collection of LB servers.
39 *
40 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
41 * This channel behaves just like a regular channel. In particular, the
42 * constructed URI over the addresses a1..an will use the default pick first
43 * policy to select from this list of LB server backends.
44 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070045 * The first time the policy gets a request for a pick, a ping, or to exit the
David Garcia Quintas98da61b2016-10-29 08:46:31 +020046 * idle state, \a query_for_backends_locked() is called. This function sets up
47 * and initiates the internal communication with the LB server. In particular,
48 * it's responsible for instantiating the internal *streaming* call to the LB
49 * server (whichever address from {a1..an} pick-first chose). This call is
David Garcia Quintas7ec29132016-11-01 04:09:05 +010050 * serviced by two callbacks, \a lb_on_server_status_received and \a
51 * lb_on_response_received. The former will be called when the call to the LB
52 * server completes. This can happen if the LB server closes the connection or
53 * if this policy itself cancels the call (for example because it's shutting
David Garcia Quintas246c5642016-11-01 11:16:52 -070054 * down). If the internal call times out, the usual behavior of pick-first
David Garcia Quintas7ec29132016-11-01 04:09:05 +010055 * applies, continuing to pick from the list {a1..an}.
David Garcia Quintas43339842016-07-18 12:56:09 -070056 *
David Garcia Quintas98da61b2016-10-29 08:46:31 +020057 * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a
58 * res_recv. An invalid one results in the termination of the streaming call. A
59 * new streaming call should be created if possible, failing the original call
60 * otherwise. For a valid \a LoadBalancingResponse, the server list of actual
61 * backends is extracted. A Round Robin policy will be created from this list.
62 * There are two possible scenarios:
David Garcia Quintas43339842016-07-18 12:56:09 -070063 *
64 * 1. This is the first server list received. There was no previous instance of
David Garcia Quintas90712d52016-10-13 19:33:04 -070065 * the Round Robin policy. \a rr_handover_locked() will instantiate the RR
66 * policy and perform all the pending operations over it.
David Garcia Quintas43339842016-07-18 12:56:09 -070067 * 2. There's already a RR policy instance active. We need to introduce the new
68 * one build from the new serverlist, but taking care not to disrupt the
69 * operations in progress over the old RR instance. This is done by
70 * decreasing the reference count on the old policy. The moment no more
71 * references are held on the old RR policy, it'll be destroyed and \a
David Garcia Quintas348cfdb2016-08-19 12:19:43 -070072 * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
73 * state. At this point we can transition to a new RR instance safely, which
David Garcia Quintas90712d52016-10-13 19:33:04 -070074 * is done once again via \a rr_handover_locked().
David Garcia Quintas43339842016-07-18 12:56:09 -070075 *
76 *
77 * Once a RR policy instance is in place (and getting updated as described),
78 * calls to for a pick, a ping or a cancellation will be serviced right away by
79 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintas7ec29132016-11-01 04:09:05 +010080 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is
81 * received, etc), pick/ping requests are added to a list of pending picks/pings
82 * to be flushed and serviced as part of \a rr_handover_locked() the moment the
83 * RR policy instance becomes available.
David Garcia Quintas43339842016-07-18 12:56:09 -070084 *
85 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
86 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070087
88/* TODO(dgq):
89 * - Implement LB service forwarding (point 2c. in the doc's diagram).
90 */
91
murgatroid99085f9af2016-10-24 09:55:44 -070092/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
93 using that endpoint. Because of various transitive includes in uv.h,
94 including windows.h on Windows, uv.h must be included before other system
95 headers. Therefore, sockaddr.h must always be included first */
murgatroid997871f732016-09-23 13:49:05 -070096#include "src/core/lib/iomgr/sockaddr.h"
97
David Garcia Quintas8a81aa12016-08-22 15:06:49 -070098#include <errno.h>
99
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -0700100#include <string.h>
101
102#include <grpc/byte_buffer_reader.h>
103#include <grpc/grpc.h>
104#include <grpc/support/alloc.h>
105#include <grpc/support/host_port.h>
106#include <grpc/support/string_util.h>
David Garcia Quintas69099222016-10-03 11:28:37 -0700107#include <grpc/support/time.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -0700108
Mark D. Roth2137cd82016-09-14 09:04:00 -0700109#include "src/core/ext/client_channel/client_channel_factory.h"
Mark D. Roth15195742016-10-07 09:02:28 -0700110#include "src/core/ext/client_channel/lb_policy_factory.h"
Mark D. Roth2137cd82016-09-14 09:04:00 -0700111#include "src/core/ext/client_channel/lb_policy_registry.h"
112#include "src/core/ext/client_channel/parse_address.h"
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700113#include "src/core/ext/lb_policy/grpclb/grpclb.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700114#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
Mark D. Roth046cf762016-09-26 11:13:51 -0700115#include "src/core/lib/channel/channel_args.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200116#include "src/core/lib/iomgr/sockaddr.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700117#include "src/core/lib/iomgr/sockaddr_utils.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200118#include "src/core/lib/iomgr/timer.h"
Craig Tiller18b4ba32016-11-09 15:23:42 -0800119#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -0700120#include "src/core/lib/slice/slice_string_helpers.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200121#include "src/core/lib/support/backoff.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700122#include "src/core/lib/support/string.h"
123#include "src/core/lib/surface/call.h"
124#include "src/core/lib/surface/channel.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700125#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700126
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200127#define BACKOFF_MULTIPLIER 1.6
128#define BACKOFF_JITTER 0.2
129#define BACKOFF_MIN_SECONDS 10
130#define BACKOFF_MAX_SECONDS 60
131
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700132int grpc_lb_glb_trace = 0;
133
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700134/* add lb_token of selected subchannel (address) to the call's initial
135 * metadata */
136static void initial_metadata_add_lb_token(
137 grpc_metadata_batch *initial_metadata,
138 grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
139 GPR_ASSERT(lb_token_mdelem_storage != NULL);
140 GPR_ASSERT(lb_token != NULL);
141 grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
142 lb_token);
143}
144
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700145typedef struct wrapped_rr_closure_arg {
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700146 /* the closure instance using this struct as argument */
147 grpc_closure wrapper_closure;
148
David Garcia Quintas43339842016-07-18 12:56:09 -0700149 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
150 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700151 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700152
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700153 /* the pick's initial metadata, kept in order to append the LB token for the
154 * pick */
155 grpc_metadata_batch *initial_metadata;
156
157 /* the picked target, used to determine which LB token to add to the pick's
158 * initial metadata */
159 grpc_connected_subchannel **target;
160
161 /* the LB token associated with the pick */
162 grpc_mdelem *lb_token;
163
164 /* storage for the lb token initial metadata mdelem */
165 grpc_linked_mdelem *lb_token_mdelem_storage;
166
David Garcia Quintas43339842016-07-18 12:56:09 -0700167 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700168 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700169
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700170 /* heap memory to be freed upon closure execution. */
171 void *free_when_done;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700172} wrapped_rr_closure_arg;
173
174/* The \a on_complete closure passed as part of the pick requires keeping a
175 * reference to its associated round robin instance. We wrap this closure in
176 * order to unref the round robin instance upon its invocation */
177static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700178 grpc_error *error) {
David Garcia Quintas43339842016-07-18 12:56:09 -0700179 wrapped_rr_closure_arg *wc_arg = arg;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700180
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200181 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
182 grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
183 NULL);
184
185 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700186 /* if target is NULL, no pick has been made by the RR policy (eg, all
187 * addresses failed to connect). There won't be any user_data/token
188 * available */
189 if (wc_arg->target != NULL) {
190 initial_metadata_add_lb_token(wc_arg->initial_metadata,
191 wc_arg->lb_token_mdelem_storage,
192 GRPC_MDELEM_REF(wc_arg->lb_token));
193 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200194 if (grpc_lb_glb_trace) {
195 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
196 (intptr_t)wc_arg->rr_policy);
197 }
198 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700199 }
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700200 GPR_ASSERT(wc_arg->free_when_done != NULL);
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700201 gpr_free(wc_arg->free_when_done);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700202}
203
David Garcia Quintasea11d162016-07-14 17:27:28 -0700204/* Linked list of pending pick requests. It stores all information needed to
205 * eventually call (Round Robin's) pick() on them. They mainly stay pending
206 * waiting for the RR policy to be created/updated.
207 *
208 * One particularity is the wrapping of the user-provided \a on_complete closure
209 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
210 * order to correctly unref the RR policy instance upon completion of the pick.
211 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700212typedef struct pending_pick {
213 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700214
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700215 /* original pick()'s arguments */
216 grpc_lb_policy_pick_args pick_args;
David Garcia Quintas43339842016-07-18 12:56:09 -0700217
218 /* output argument where to store the pick()ed connected subchannel, or NULL
219 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700220 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700221
David Garcia Quintas43339842016-07-18 12:56:09 -0700222 /* args for wrapped_on_complete */
223 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700224} pending_pick;
225
David Garcia Quintas8aace512016-08-15 14:55:12 -0700226static void add_pending_pick(pending_pick **root,
227 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas65318262016-07-29 13:43:38 -0700228 grpc_connected_subchannel **target,
229 grpc_closure *on_complete) {
230 pending_pick *pp = gpr_malloc(sizeof(*pp));
231 memset(pp, 0, sizeof(pending_pick));
232 memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
233 pp->next = *root;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700234 pp->pick_args = *pick_args;
David Garcia Quintas65318262016-07-29 13:43:38 -0700235 pp->target = target;
David Garcia Quintas65318262016-07-29 13:43:38 -0700236 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700237 pp->wrapped_on_complete_arg.target = target;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700238 pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
239 pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
240 pick_args->lb_token_mdelem_storage;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700241 pp->wrapped_on_complete_arg.free_when_done = pp;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700242 grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
243 wrapped_rr_closure, &pp->wrapped_on_complete_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700244 *root = pp;
245}
246
David Garcia Quintasea11d162016-07-14 17:27:28 -0700247/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700248typedef struct pending_ping {
249 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700250
David Garcia Quintas43339842016-07-18 12:56:09 -0700251 /* args for wrapped_notify */
252 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700253} pending_ping;
254
David Garcia Quintas65318262016-07-29 13:43:38 -0700255static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
256 pending_ping *pping = gpr_malloc(sizeof(*pping));
257 memset(pping, 0, sizeof(pending_ping));
258 memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
David Garcia Quintas65318262016-07-29 13:43:38 -0700259 pping->wrapped_notify_arg.wrapped_closure = notify;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700260 pping->wrapped_notify_arg.free_when_done = pping;
David Garcia Quintas65318262016-07-29 13:43:38 -0700261 pping->next = *root;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700262 grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
263 wrapped_rr_closure, &pping->wrapped_notify_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700264 *root = pping;
265}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700266
David Garcia Quintas8d489112016-07-29 15:20:42 -0700267/*
268 * glb_lb_policy
269 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700270typedef struct rr_connectivity_data rr_connectivity_data;
David Garcia Quintas65318262016-07-29 13:43:38 -0700271static const grpc_lb_policy_vtable glb_lb_policy_vtable;
272typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700273 /** base policy: must be first */
274 grpc_lb_policy base;
275
276 /** mutex protecting remaining members */
277 gpr_mu mu;
278
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700279 /** who the client is trying to communicate with */
Mark D. Rothd1604af2016-09-22 11:20:27 -0700280 const char *server_name;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700281 grpc_client_channel_factory *cc_factory;
Mark D. Roth046cf762016-09-26 11:13:51 -0700282 grpc_channel_args *args;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700283
David Garcia Quintas5cf3c372016-10-03 14:30:03 -0700284 /** deadline for the LB's call */
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700285 gpr_timespec deadline;
286
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700287 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700288 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700289
290 /** the RR policy to use of the backend servers returned by the LB server */
291 grpc_lb_policy *rr_policy;
292
293 bool started_picking;
294
295 /** our connectivity state tracker */
296 grpc_connectivity_state_tracker state_tracker;
297
David Garcia Quintasea11d162016-07-14 17:27:28 -0700298 /** stores the deserialized response from the LB. May be NULL until one such
299 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700300 grpc_grpclb_serverlist *serverlist;
301
David Garcia Quintasea11d162016-07-14 17:27:28 -0700302 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700303 pending_pick *pending_picks;
304
David Garcia Quintasea11d162016-07-14 17:27:28 -0700305 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700306 pending_ping *pending_pings;
307
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200308 bool shutting_down;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700309
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200310 /************************************************************/
311 /* client data associated with the LB server communication */
312 /************************************************************/
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100313 /* Status from the LB server has been received. This signals the end of the LB
314 * call. */
315 grpc_closure lb_on_server_status_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200316
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100317 /* A response from the LB server has been received. Process it */
318 grpc_closure lb_on_response_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200319
320 grpc_call *lb_call; /* streaming call to the LB server, */
321
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100322 grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
323 grpc_metadata_array
324 lb_trailing_metadata_recv; /* trailing MD from LB server */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200325
326 /* what's being sent to the LB server. Note that its value may vary if the LB
327 * server indicates a redirect. */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100328 grpc_byte_buffer *lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200329
David Garcia Quintas246c5642016-11-01 11:16:52 -0700330 /* response the LB server, if any. Processed in lb_on_response_received() */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100331 grpc_byte_buffer *lb_response_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200332
David Garcia Quintas246c5642016-11-01 11:16:52 -0700333 /* call status code and details, set in lb_on_server_status_received() */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200334 grpc_status_code lb_call_status;
335 char *lb_call_status_details;
336 size_t lb_call_status_details_capacity;
337
338 /** LB call retry backoff state */
339 gpr_backoff lb_call_backoff_state;
340
341 /** LB call retry timer */
342 grpc_timer lb_call_retry_timer;
David Garcia Quintas65318262016-07-29 13:43:38 -0700343} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700344
David Garcia Quintas65318262016-07-29 13:43:38 -0700345/* Keeps track and reacts to changes in connectivity of the RR instance */
346struct rr_connectivity_data {
347 grpc_closure on_change;
348 grpc_connectivity_state state;
349 glb_lb_policy *glb_policy;
350};
David Garcia Quintas8d489112016-07-29 15:20:42 -0700351
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700352static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
353 bool log) {
354 const grpc_grpclb_ip_address *ip = &server->ip_address;
355 if (server->port >> 16 != 0) {
356 if (log) {
357 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200358 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
359 server->port, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700360 }
361 return false;
362 }
363
364 if (ip->size != 4 && ip->size != 16) {
365 if (log) {
366 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200367 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700368 "serverlist. Ignoring",
Jan Tattermusch2b398082016-10-07 14:40:30 +0200369 ip->size, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700370 }
371 return false;
372 }
373 return true;
374}
375
Mark D. Roth16883a32016-10-21 10:30:58 -0700376/* vtable for LB tokens in grpc_lb_addresses. */
Mark D. Roth557c9902016-10-24 11:12:05 -0700377static void *lb_token_copy(void *token) {
Mark D. Roth16883a32016-10-21 10:30:58 -0700378 return token == NULL ? NULL : GRPC_MDELEM_REF(token);
379}
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800380static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) {
381 if (token != NULL) GRPC_MDELEM_UNREF(exec_ctx, token);
Mark D. Roth16883a32016-10-21 10:30:58 -0700382}
Mark D. Roth557c9902016-10-24 11:12:05 -0700383static int lb_token_cmp(void *token1, void *token2) {
Mark D. Roth16883a32016-10-21 10:30:58 -0700384 if (token1 > token2) return 1;
385 if (token1 < token2) return -1;
386 return 0;
387}
388static const grpc_lb_user_data_vtable lb_token_vtable = {
389 lb_token_copy, lb_token_destroy, lb_token_cmp};
390
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100391static void parse_server(const grpc_grpclb_server *server,
392 grpc_resolved_address *addr) {
393 const uint16_t netorder_port = htons((uint16_t)server->port);
394 /* the addresses are given in binary format (a in(6)_addr struct) in
395 * server->ip_address.bytes. */
396 const grpc_grpclb_ip_address *ip = &server->ip_address;
397 memset(addr, 0, sizeof(*addr));
398 if (ip->size == 4) {
399 addr->len = sizeof(struct sockaddr_in);
400 struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr;
401 addr4->sin_family = AF_INET;
402 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
403 addr4->sin_port = netorder_port;
404 } else if (ip->size == 16) {
405 addr->len = sizeof(struct sockaddr_in6);
406 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr;
David Garcia Quintas107ca162016-11-02 18:17:03 -0700407 addr6->sin6_family = AF_INET6;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100408 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
409 addr6->sin6_port = netorder_port;
410 }
411}
412
Mark D. Roth7ce14d22016-09-16 13:03:46 -0700413/* Returns addresses extracted from \a serverlist. */
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700414static grpc_lb_addresses *process_serverlist(
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800415 grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700416 size_t num_valid = 0;
417 /* first pass: count how many are valid in order to allocate the necessary
418 * memory in a single block */
419 for (size_t i = 0; i < serverlist->num_servers; ++i) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700420 if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700421 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700422 if (num_valid == 0) return NULL;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700423
Mark D. Roth16883a32016-10-21 10:30:58 -0700424 grpc_lb_addresses *lb_addresses =
425 grpc_lb_addresses_create(num_valid, &lb_token_vtable);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700426
427 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700428 * to the outside world) to be read by the RR policy during its creation.
429 * Given that the validity tests are very cheap, they are performed again
430 * instead of marking the valid ones during the first pass, as this would
431 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700432 size_t addr_idx = 0;
433 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
434 GPR_ASSERT(addr_idx < num_valid);
435 const grpc_grpclb_server *server = serverlist->servers[sl_idx];
436 if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700437
438 /* address processing */
Mark D. Rothc5c38782016-09-16 08:51:01 -0700439 grpc_resolved_address addr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100440 parse_server(server, &addr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700441
442 /* lb token processing */
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700443 void *user_data;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700444 if (server->has_load_balance_token) {
David Garcia Quintas0baf1dc2016-10-28 04:44:01 +0200445 const size_t lb_token_max_length =
446 GPR_ARRAY_SIZE(server->load_balance_token);
447 const size_t lb_token_length =
448 strnlen(server->load_balance_token, lb_token_max_length);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700449 grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
David Garcia Quintas0baf1dc2016-10-28 04:44:01 +0200450 (uint8_t *)server->load_balance_token, lb_token_length);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800451 user_data = grpc_mdelem_from_metadata_strings(
452 exec_ctx, GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700453 } else {
454 gpr_log(GPR_ERROR,
455 "Missing LB token for backend address '%s'. The empty token will "
456 "be used instead",
murgatroid9908b0fab2016-09-23 14:35:49 -0700457 grpc_sockaddr_to_uri(&addr));
David Garcia Quintasa3654db2016-10-11 15:52:39 -0700458 user_data = GRPC_MDELEM_LB_TOKEN_EMPTY;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700459 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700460
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700461 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
462 false /* is_balancer */,
Mark D. Rothc5c38782016-09-16 08:51:01 -0700463 NULL /* balancer_name */, user_data);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700464 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700465 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700466 GPR_ASSERT(addr_idx == num_valid);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700467 return lb_addresses;
468}
469
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700470/* perform a pick over \a rr_policy. Given that a pick can return immediately
471 * (ignoring its completion callback) we need to perform the cleanups this
472 * callback would be otherwise resposible for */
David Garcia Quintas20359062016-10-15 15:22:51 -0700473static bool pick_from_internal_rr_locked(
474 grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy,
475 const grpc_lb_policy_pick_args *pick_args,
476 grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
477 GPR_ASSERT(rr_policy != NULL);
478 const bool pick_done =
479 grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target,
480 (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
481 if (pick_done) {
482 /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
483 if (grpc_lb_glb_trace) {
484 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
485 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700486 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200487 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700488
David Garcia Quintas20359062016-10-15 15:22:51 -0700489 /* add the load reporting initial metadata */
490 initial_metadata_add_lb_token(pick_args->initial_metadata,
491 pick_args->lb_token_mdelem_storage,
492 GRPC_MDELEM_REF(wc_arg->lb_token));
493
494 gpr_free(wc_arg);
495 }
496 /* else, the pending pick will be registered and taken care of by the
497 * pending pick list inside the RR policy (glb_policy->rr_policy).
498 * Eventually, wrapped_on_complete will be called, which will -among other
499 * things- add the LB token to the call's initial metadata */
David Garcia Quintas20359062016-10-15 15:22:51 -0700500 return pick_done;
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700501}
502
David Garcia Quintas90712d52016-10-13 19:33:04 -0700503static grpc_lb_policy *create_rr_locked(
504 grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist,
505 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700506 GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700507
508 grpc_lb_policy_args args;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700509 memset(&args, 0, sizeof(args));
David Garcia Quintas65318262016-07-29 13:43:38 -0700510 args.client_channel_factory = glb_policy->cc_factory;
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800511 grpc_lb_addresses *addresses = process_serverlist(exec_ctx, serverlist);
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700512
513 // Replace the LB addresses in the channel args that we pass down to
514 // the subchannel.
Mark D. Roth557c9902016-10-24 11:12:05 -0700515 static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200516 const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700517 args.args = grpc_channel_args_copy_and_add_and_remove(
518 glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
519 1);
David Garcia Quintas65318262016-07-29 13:43:38 -0700520
521 grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200522 GPR_ASSERT(rr != NULL);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800523 grpc_lb_addresses_destroy(exec_ctx, addresses);
524 grpc_channel_args_destroy(exec_ctx, args.args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700525 return rr;
526}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700527
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200528static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
529 grpc_error *error);
530/* glb_policy->rr_policy may be NULL (initial handover) */
David Garcia Quintas90712d52016-10-13 19:33:04 -0700531static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
532 glb_lb_policy *glb_policy, grpc_error *error) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700533 GPR_ASSERT(glb_policy->serverlist != NULL &&
534 glb_policy->serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700535
536 if (grpc_lb_glb_trace) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200537 gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700538 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200539 if (glb_policy->rr_policy != NULL) {
540 /* if we are phasing out an existing RR instance, unref it. */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100541 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200542 }
543
544 glb_policy->rr_policy =
545 create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
546 if (grpc_lb_glb_trace) {
547 gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy);
548 }
549
David Garcia Quintas65318262016-07-29 13:43:38 -0700550 GPR_ASSERT(glb_policy->rr_policy != NULL);
Yuchen Zengb4291642016-09-01 19:17:14 -0700551 grpc_pollset_set_add_pollset_set(exec_ctx,
552 glb_policy->rr_policy->interested_parties,
553 glb_policy->base.interested_parties);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200554
555 rr_connectivity_data *rr_connectivity =
556 gpr_malloc(sizeof(rr_connectivity_data));
557 memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
558 grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
559 rr_connectivity);
560 rr_connectivity->glb_policy = glb_policy;
561 rr_connectivity->state = grpc_lb_policy_check_connectivity(
David Garcia Quintas65318262016-07-29 13:43:38 -0700562 exec_ctx, glb_policy->rr_policy, &error);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200563
David Garcia Quintas65318262016-07-29 13:43:38 -0700564 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200565 rr_connectivity->state, GRPC_ERROR_REF(error),
566 "rr_handover");
567 /* subscribe */
David Garcia Quintase224a762016-11-01 13:00:58 -0700568 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200569 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
570 &rr_connectivity->state,
571 &rr_connectivity->on_change);
David Garcia Quintas65318262016-07-29 13:43:38 -0700572 grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
573
574 /* flush pending ops */
575 pending_pick *pp;
576 while ((pp = glb_policy->pending_picks)) {
577 glb_policy->pending_picks = pp->next;
578 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
579 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
580 if (grpc_lb_glb_trace) {
581 gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
582 (intptr_t)glb_policy->rr_policy);
583 }
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700584 pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
585 &pp->pick_args, pp->target,
586 &pp->wrapped_on_complete_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700587 }
588
589 pending_ping *pping;
590 while ((pping = glb_policy->pending_pings)) {
591 glb_policy->pending_pings = pping->next;
592 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
593 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
594 if (grpc_lb_glb_trace) {
595 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
596 (intptr_t)glb_policy->rr_policy);
597 }
598 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700599 &pping->wrapped_notify_arg.wrapper_closure);
David Garcia Quintas65318262016-07-29 13:43:38 -0700600 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700601}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700602
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700603static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
604 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200605 /* If shutdown or error free the arg. Rely on the rest of the code to set the
606 * right grpclb status. */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700607 rr_connectivity_data *rr_conn_data = arg;
608 glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700609
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100610 if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN &&
611 !glb_policy->shutting_down) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200612 gpr_mu_lock(&glb_policy->mu);
613 /* RR not shutting down. Mimic the RR's policy state */
614 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
615 rr_conn_data->state, GRPC_ERROR_REF(error),
David Garcia Quintase224a762016-11-01 13:00:58 -0700616 "rr_connectivity_cb");
617 /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200618 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
619 &rr_conn_data->state,
620 &rr_conn_data->on_change);
621 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700622 } else {
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100623 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintase224a762016-11-01 13:00:58 -0700624 "rr_connectivity_cb");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200625 gpr_free(rr_conn_data);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700626 }
627}
628
David Garcia Quintas65318262016-07-29 13:43:38 -0700629static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
630 grpc_lb_policy_factory *factory,
631 grpc_lb_policy_args *args) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700632 /* Get server name. */
Mark D. Roth557c9902016-10-24 11:12:05 -0700633 const grpc_arg *arg =
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700634 grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
Mark D. Roth557c9902016-10-24 11:12:05 -0700635 const char *server_name =
636 arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700637
Mark D. Rothe011b1e2016-09-07 08:28:00 -0700638 /* Count the number of gRPC-LB addresses. There must be at least one.
639 * TODO(roth): For now, we ignore non-balancer addresses, but in the
640 * future, we may change the behavior such that we fall back to using
641 * the non-balancer addresses if we cannot reach any balancers. At that
642 * time, this should be changed to allow a list with no balancer addresses,
643 * since the resolver might fail to return a balancer address even when
644 * this is the right LB policy to use. */
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700645 arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
646 GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
Mark D. Roth557c9902016-10-24 11:12:05 -0700647 grpc_lb_addresses *addresses = arg->value.pointer.p;
Mark D. Rothf655c852016-09-06 10:40:38 -0700648 size_t num_grpclb_addrs = 0;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700649 for (size_t i = 0; i < addresses->num_addresses; ++i) {
650 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
Mark D. Rothf655c852016-09-06 10:40:38 -0700651 }
652 if (num_grpclb_addrs == 0) return NULL;
653
David Garcia Quintas65318262016-07-29 13:43:38 -0700654 glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
655 memset(glb_policy, 0, sizeof(*glb_policy));
656
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700657 /* All input addresses in addresses come from a resolver that claims
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700658 * they are LB services. It's the resolver's responsibility to make sure
659 * this
David Garcia Quintas65318262016-07-29 13:43:38 -0700660 * policy is only instantiated and used in that case.
661 *
662 * Create a client channel over them to communicate with a LB service */
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700663 glb_policy->server_name = gpr_strdup(server_name);
David Garcia Quintas65318262016-07-29 13:43:38 -0700664 glb_policy->cc_factory = args->client_channel_factory;
Mark D. Roth98abfd32016-10-21 08:10:51 -0700665 glb_policy->args = grpc_channel_args_copy(args->args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700666 GPR_ASSERT(glb_policy->cc_factory != NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700667
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700668 /* construct a target from the addresses in args, given in the form
David Garcia Quintas65318262016-07-29 13:43:38 -0700669 * ipvX://ip1:port1,ip2:port2,...
670 * TODO(dgq): support mixed ip version */
Mark D. Rothf655c852016-09-06 10:40:38 -0700671 char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700672 size_t addr_index = 0;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700673 for (size_t i = 0; i < addresses->num_addresses; i++) {
674 if (addresses->addresses[i].user_data != NULL) {
David Garcia Quintas5ebb7af2016-09-15 10:02:16 -0700675 gpr_log(GPR_ERROR,
676 "This LB policy doesn't support user data. It will be ignored");
677 }
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700678 if (addresses->addresses[i].is_balancer) {
Mark D. Rothc5c38782016-09-16 08:51:01 -0700679 if (addr_index == 0) {
murgatroid99dedb9232016-09-26 13:54:04 -0700680 addr_strs[addr_index++] =
Mark D. Rothfb809b72016-10-26 09:12:25 -0700681 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700682 } else {
Mark D. Roth49f89f02016-10-26 11:16:59 -0700683 GPR_ASSERT(grpc_sockaddr_to_string(&addr_strs[addr_index++],
684 &addresses->addresses[i].address,
685 true) > 0);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700686 }
Mark D. Rothf655c852016-09-06 10:40:38 -0700687 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700688 }
689 size_t uri_path_len;
Mark D. Roth989cdcd2016-09-06 13:28:28 -0700690 char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
691 num_grpclb_addrs, ",", &uri_path_len);
David Garcia Quintas65318262016-07-29 13:43:38 -0700692
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700693 /* Create a channel to talk to the LBs.
694 *
695 * We strip out the channel arg for the LB policy name, since we want
696 * to use the default (pick_first) in this case.
697 *
698 * We also strip out the channel arg for the resolved addresses, since
699 * that will be generated by the name resolver used in the LB channel.
700 * Note that the LB channel will use the sockaddr resolver, so this
701 * won't actually generate a query to DNS (or some other name service).
702 * However, the addresses returned by the sockaddr resolver will have
703 * is_balancer=false, whereas our own addresses have is_balancer=true.
704 * We need the LB channel to return addresses with is_balancer=false
705 * so that it does not wind up recursively using the grpclb LB policy,
706 * as per the special case logic in client_channel.c.
707 */
Mark D. Roth557c9902016-10-24 11:12:05 -0700708 static const char *keys_to_remove[] = {GRPC_ARG_LB_POLICY_NAME,
709 GRPC_ARG_LB_ADDRESSES};
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700710 grpc_channel_args *new_args = grpc_channel_args_copy_and_remove(
711 args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove));
David Garcia Quintas65318262016-07-29 13:43:38 -0700712 glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
713 exec_ctx, glb_policy->cc_factory, target_uri_str,
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700714 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800715 grpc_channel_args_destroy(exec_ctx, new_args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700716
717 gpr_free(target_uri_str);
Mark D. Rothf655c852016-09-06 10:40:38 -0700718 for (size_t i = 0; i < num_grpclb_addrs; i++) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700719 gpr_free(addr_strs[i]);
720 }
721 gpr_free(addr_strs);
722
723 if (glb_policy->lb_channel == NULL) {
724 gpr_free(glb_policy);
725 return NULL;
726 }
727
David Garcia Quintas65318262016-07-29 13:43:38 -0700728 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
729 gpr_mu_init(&glb_policy->mu);
730 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
731 "grpclb");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200732
David Garcia Quintas65318262016-07-29 13:43:38 -0700733 return &glb_policy->base;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700734}
735
David Garcia Quintas65318262016-07-29 13:43:38 -0700736static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
737 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
738 GPR_ASSERT(glb_policy->pending_picks == NULL);
739 GPR_ASSERT(glb_policy->pending_pings == NULL);
Mark D. Rothd1604af2016-09-22 11:20:27 -0700740 gpr_free((void *)glb_policy->server_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800741 grpc_channel_args_destroy(exec_ctx, glb_policy->args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700742 grpc_channel_destroy(glb_policy->lb_channel);
743 glb_policy->lb_channel = NULL;
744 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
745 if (glb_policy->serverlist != NULL) {
746 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
747 }
748 gpr_mu_destroy(&glb_policy->mu);
749 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700750}
751
David Garcia Quintas65318262016-07-29 13:43:38 -0700752static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
753 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
754 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200755 glb_policy->shutting_down = true;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700756
David Garcia Quintas65318262016-07-29 13:43:38 -0700757 pending_pick *pp = glb_policy->pending_picks;
758 glb_policy->pending_picks = NULL;
759 pending_ping *pping = glb_policy->pending_pings;
760 glb_policy->pending_pings = NULL;
David Garcia Quintasaa24e9a2016-11-07 11:05:50 -0800761 if (glb_policy->rr_policy) {
762 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
763 }
764 if (glb_policy->started_picking) {
765 if (glb_policy->lb_call != NULL) {
766 grpc_call_cancel(glb_policy->lb_call, NULL);
767 /* lb_on_server_status_received will pick up the cancel and clean up */
768 }
769 }
770 grpc_connectivity_state_set(
771 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
772 GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
David Garcia Quintas65318262016-07-29 13:43:38 -0700773 gpr_mu_unlock(&glb_policy->mu);
774
775 while (pp != NULL) {
776 pending_pick *next = pp->next;
777 *pp->target = NULL;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700778 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
779 GRPC_ERROR_NONE, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700780 pp = next;
781 }
782
783 while (pping != NULL) {
784 pending_ping *next = pping->next;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700785 grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
786 GRPC_ERROR_NONE, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700787 pping = next;
788 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700789}
790
791static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
Mark D. Roth5f844002016-09-08 08:20:53 -0700792 grpc_connected_subchannel **target,
793 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700794 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
795 gpr_mu_lock(&glb_policy->mu);
796 pending_pick *pp = glb_policy->pending_picks;
797 glb_policy->pending_picks = NULL;
798 while (pp != NULL) {
799 pending_pick *next = pp->next;
800 if (pp->target == target) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700801 *target = NULL;
Mark D. Roth932b10c2016-09-09 08:44:30 -0700802 grpc_exec_ctx_sched(
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700803 exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
Mark D. Roth932b10c2016-09-09 08:44:30 -0700804 GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700805 } else {
806 pp->next = glb_policy->pending_picks;
807 glb_policy->pending_picks = pp;
808 }
809 pp = next;
810 }
811 gpr_mu_unlock(&glb_policy->mu);
Mark D. Roth5f844002016-09-08 08:20:53 -0700812 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700813}
814
David Garcia Quintas65318262016-07-29 13:43:38 -0700815static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
816 uint32_t initial_metadata_flags_mask,
Mark D. Rothe65ff112016-09-09 13:48:38 -0700817 uint32_t initial_metadata_flags_eq,
818 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700819 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
820 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas65318262016-07-29 13:43:38 -0700821 pending_pick *pp = glb_policy->pending_picks;
822 glb_policy->pending_picks = NULL;
823 while (pp != NULL) {
824 pending_pick *next = pp->next;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700825 if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
David Garcia Quintas65318262016-07-29 13:43:38 -0700826 initial_metadata_flags_eq) {
Mark D. Roth58f52b72016-09-09 13:55:18 -0700827 grpc_exec_ctx_sched(
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700828 exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
Mark D. Roth58f52b72016-09-09 13:55:18 -0700829 GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700830 } else {
831 pp->next = glb_policy->pending_picks;
832 glb_policy->pending_picks = pp;
833 }
834 pp = next;
835 }
836 gpr_mu_unlock(&glb_policy->mu);
Mark D. Rothe65ff112016-09-09 13:48:38 -0700837 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700838}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700839
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200840static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
841 glb_lb_policy *glb_policy);
842static void start_picking_locked(grpc_exec_ctx *exec_ctx,
843 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700844 glb_policy->started_picking = true;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200845 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
846 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700847}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700848
David Garcia Quintas65318262016-07-29 13:43:38 -0700849static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
850 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
851 gpr_mu_lock(&glb_policy->mu);
852 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200853 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700854 }
855 gpr_mu_unlock(&glb_policy->mu);
856}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700857
David Garcia Quintas65318262016-07-29 13:43:38 -0700858static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
David Garcia Quintas8aace512016-08-15 14:55:12 -0700859 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700860 grpc_connected_subchannel **target, void **user_data,
David Garcia Quintas65318262016-07-29 13:43:38 -0700861 grpc_closure *on_complete) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700862 if (pick_args->lb_token_mdelem_storage == NULL) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700863 *target = NULL;
David Garcia Quintas6cc44fc2016-09-12 23:04:35 -0700864 grpc_exec_ctx_sched(
865 exec_ctx, on_complete,
866 GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
867 "won't work without it. Failing"),
868 NULL);
Mark D. Roth1e5f6af2016-10-07 08:32:58 -0700869 return 0;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700870 }
871
David Garcia Quintas65318262016-07-29 13:43:38 -0700872 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
873 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas5cf3c372016-10-03 14:30:03 -0700874 glb_policy->deadline = pick_args->deadline;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700875 bool pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -0700876
877 if (glb_policy->rr_policy != NULL) {
878 if (grpc_lb_glb_trace) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200879 gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p",
880 (void *)glb_policy, (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700881 }
882 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
David Garcia Quintas8aace512016-08-15 14:55:12 -0700883
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700884 wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
David Garcia Quintas90712d52016-10-13 19:33:04 -0700885 memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg));
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700886
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700887 grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg);
David Garcia Quintas90712d52016-10-13 19:33:04 -0700888 wc_arg->rr_policy = glb_policy->rr_policy;
889 wc_arg->target = target;
890 wc_arg->wrapped_closure = on_complete;
891 wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
892 wc_arg->initial_metadata = pick_args->initial_metadata;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700893 wc_arg->free_when_done = wc_arg;
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700894 pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
David Garcia Quintas20359062016-10-15 15:22:51 -0700895 pick_args, target, wc_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700896 } else {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200897 if (grpc_lb_glb_trace) {
898 gpr_log(GPR_DEBUG,
899 "No RR policy in grpclb instance %p. Adding to grpclb's pending "
900 "picks",
901 (void *)(glb_policy));
902 }
David Garcia Quintas8aace512016-08-15 14:55:12 -0700903 add_pending_pick(&glb_policy->pending_picks, pick_args, target,
904 on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -0700905
906 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200907 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700908 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700909 pick_done = false;
David Garcia Quintas65318262016-07-29 13:43:38 -0700910 }
911 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700912 return pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -0700913}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700914
David Garcia Quintas65318262016-07-29 13:43:38 -0700915static grpc_connectivity_state glb_check_connectivity(
916 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
917 grpc_error **connectivity_error) {
918 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
919 grpc_connectivity_state st;
920 gpr_mu_lock(&glb_policy->mu);
921 st = grpc_connectivity_state_check(&glb_policy->state_tracker,
922 connectivity_error);
923 gpr_mu_unlock(&glb_policy->mu);
924 return st;
925}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700926
David Garcia Quintas65318262016-07-29 13:43:38 -0700927static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
928 grpc_closure *closure) {
929 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
930 gpr_mu_lock(&glb_policy->mu);
931 if (glb_policy->rr_policy) {
932 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
933 } else {
934 add_pending_ping(&glb_policy->pending_pings, closure);
935 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200936 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700937 }
938 }
939 gpr_mu_unlock(&glb_policy->mu);
940}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700941
David Garcia Quintas65318262016-07-29 13:43:38 -0700942static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
943 grpc_lb_policy *pol,
944 grpc_connectivity_state *current,
945 grpc_closure *notify) {
946 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
947 gpr_mu_lock(&glb_policy->mu);
948 grpc_connectivity_state_notify_on_state_change(
949 exec_ctx, &glb_policy->state_tracker, current, notify);
950
951 gpr_mu_unlock(&glb_policy->mu);
952}
953
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100954static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
955 grpc_error *error);
956static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
957 grpc_error *error);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800958static void lb_call_init(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) {
David Garcia Quintas55ba14a2016-09-27 18:45:30 -0700959 GPR_ASSERT(glb_policy->server_name != NULL);
960 GPR_ASSERT(glb_policy->server_name[0] != '\0');
961
David Garcia Quintas15eba132016-08-09 15:20:48 -0700962 /* Note the following LB call progresses every time there's activity in \a
963 * glb_policy->base.interested_parties, which is comprised of the polling
Yuchen Zengf7c45ae2016-09-15 13:40:32 -0700964 * entities from \a client_channel. */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200965 glb_policy->lb_call = grpc_channel_create_pollset_set_call(
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800966 exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
David Garcia Quintas4543e5c2016-09-22 15:09:34 -0700967 glb_policy->base.interested_parties,
David Garcia Quintas55ba14a2016-09-27 18:45:30 -0700968 "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name,
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200969 glb_policy->deadline, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700970
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100971 grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
972 grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -0700973
David Garcia Quintas55ba14a2016-09-27 18:45:30 -0700974 grpc_grpclb_request *request =
975 grpc_grpclb_request_create(glb_policy->server_name);
Craig Tillerd41a4a72016-10-26 16:16:06 -0700976 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100977 glb_policy->lb_request_payload =
David Garcia Quintas65318262016-07-29 13:43:38 -0700978 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
Craig Tiller18b4ba32016-11-09 15:23:42 -0800979 grpc_slice_unref_internal(exec_ctx, request_payload_slice);
David Garcia Quintas65318262016-07-29 13:43:38 -0700980 grpc_grpclb_request_destroy(request);
981
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200982 glb_policy->lb_call_status_details = NULL;
983 glb_policy->lb_call_status_details_capacity = 0;
984
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100985 grpc_closure_init(&glb_policy->lb_on_server_status_received,
986 lb_on_server_status_received, glb_policy);
987 grpc_closure_init(&glb_policy->lb_on_response_received,
988 lb_on_response_received, glb_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200989
990 gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER,
991 BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000,
992 BACKOFF_MAX_SECONDS * 1000);
David Garcia Quintas65318262016-07-29 13:43:38 -0700993}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700994
David Garcia Quintasaa24e9a2016-11-07 11:05:50 -0800995static void lb_call_destroy_locked(glb_lb_policy *glb_policy) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200996 GPR_ASSERT(glb_policy->lb_call != NULL);
997 grpc_call_destroy(glb_policy->lb_call);
998 glb_policy->lb_call = NULL;
David Garcia Quintas65318262016-07-29 13:43:38 -0700999
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001000 grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv);
1001 grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -07001002
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001003 grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001004 gpr_free(glb_policy->lb_call_status_details);
David Garcia Quintas65318262016-07-29 13:43:38 -07001005}
1006
David Garcia Quintas8d489112016-07-29 15:20:42 -07001007/*
1008 * Auxiliary functions and LB client callbacks.
1009 */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001010static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
1011 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001012 GPR_ASSERT(glb_policy->lb_channel != NULL);
Craig Tiller87a7e1f2016-11-09 09:42:19 -08001013 lb_call_init(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001014
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001015 if (grpc_lb_glb_trace) {
1016 gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)",
1017 (void *)glb_policy, (void *)glb_policy->lb_call);
1018 }
1019 GPR_ASSERT(glb_policy->lb_call != NULL);
1020
David Garcia Quintas65318262016-07-29 13:43:38 -07001021 grpc_call_error call_error;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001022 grpc_op ops[4];
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001023 memset(ops, 0, sizeof(ops));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001024
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001025 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -07001026 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1027 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001028 op->flags = 0;
1029 op->reserved = NULL;
1030 op++;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001031
1032 op->op = GRPC_OP_RECV_INITIAL_METADATA;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001033 op->data.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001034 op->flags = 0;
1035 op->reserved = NULL;
1036 op++;
1037
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001038 GPR_ASSERT(glb_policy->lb_request_payload != NULL);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001039 op->op = GRPC_OP_SEND_MESSAGE;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001040 op->data.send_message = glb_policy->lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001041 op->flags = 0;
1042 op->reserved = NULL;
1043 op++;
1044
1045 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1046 op->data.recv_status_on_client.trailing_metadata =
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001047 &glb_policy->lb_trailing_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001048 op->data.recv_status_on_client.status = &glb_policy->lb_call_status;
1049 op->data.recv_status_on_client.status_details =
1050 &glb_policy->lb_call_status_details;
1051 op->data.recv_status_on_client.status_details_capacity =
1052 &glb_policy->lb_call_status_details_capacity;
1053 op->flags = 0;
1054 op->reserved = NULL;
1055 op++;
David Garcia Quintase224a762016-11-01 13:00:58 -07001056 /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
1057 * count goes to zero) to be unref'd in lb_on_server_status_received */
1058 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received");
David Garcia Quintas65318262016-07-29 13:43:38 -07001059 call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001060 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1061 &glb_policy->lb_on_server_status_received);
David Garcia Quintas65318262016-07-29 13:43:38 -07001062 GPR_ASSERT(GRPC_CALL_OK == call_error);
1063
1064 op = ops;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001065 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001066 op->data.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001067 op->flags = 0;
1068 op->reserved = NULL;
1069 op++;
David Garcia Quintase224a762016-11-01 13:00:58 -07001070 /* take another weak ref to be unref'd in lb_on_response_received */
1071 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received");
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001072 call_error = grpc_call_start_batch_and_execute(
1073 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1074 &glb_policy->lb_on_response_received);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001075 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001076}
1077
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001078static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
1079 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001080 glb_lb_policy *glb_policy = arg;
1081
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001082 grpc_op ops[2];
1083 memset(ops, 0, sizeof(ops));
1084 grpc_op *op = ops;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001085 if (glb_policy->lb_response_payload != NULL) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001086 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
David Garcia Quintas41bef452016-07-28 19:19:58 -07001087 /* Received data from the LB server. Look inside
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001088 * glb_policy->lb_response_payload, for a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001089 grpc_byte_buffer_reader bbr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001090 grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
Craig Tillerd41a4a72016-10-26 16:16:06 -07001091 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001092 grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001093 grpc_grpclb_serverlist *serverlist =
1094 grpc_grpclb_response_parse_serverlist(response_slice);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001095 if (serverlist != NULL) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001096 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tiller18b4ba32016-11-09 15:23:42 -08001097 grpc_slice_unref_internal(exec_ctx, response_slice);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001098 if (grpc_lb_glb_trace) {
Jan Tattermusch2b398082016-10-07 14:40:30 +02001099 gpr_log(GPR_INFO, "Serverlist with %lu servers received",
1100 (unsigned long)serverlist->num_servers);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001101 for (size_t i = 0; i < serverlist->num_servers; ++i) {
1102 grpc_resolved_address addr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001103 parse_server(serverlist->servers[i], &addr);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001104 char *ipport;
1105 grpc_sockaddr_to_string(&ipport, &addr, false);
1106 gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
1107 gpr_free(ipport);
1108 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001109 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001110
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001111 /* update serverlist */
1112 if (serverlist->num_servers > 0) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001113 gpr_mu_lock(&glb_policy->mu);
1114 if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001115 if (grpc_lb_glb_trace) {
1116 gpr_log(GPR_INFO,
1117 "Incoming server list identical to current, ignoring.");
1118 }
1119 } else { /* new serverlist */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001120 if (glb_policy->serverlist != NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001121 /* dispose of the old serverlist */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001122 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001123 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001124 /* and update the copy in the glb_lb_policy instance */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001125 glb_policy->serverlist = serverlist;
1126
1127 rr_handover_locked(exec_ctx, glb_policy, error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001128 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001129 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001130 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001131 if (grpc_lb_glb_trace) {
1132 gpr_log(GPR_INFO,
1133 "Received empty server list. Picks will stay pending until a "
1134 "response with > 0 servers is received");
1135 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001136 }
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001137 } else { /* serverlist == NULL */
1138 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
Craig Tiller32df4672016-11-04 08:21:56 -07001139 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
Craig Tiller18b4ba32016-11-09 15:23:42 -08001140 grpc_slice_unref_internal(exec_ctx, response_slice);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001141 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001142
David Garcia Quintas246c5642016-11-01 11:16:52 -07001143 if (!glb_policy->shutting_down) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001144 /* keep listening for serverlist updates */
1145 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas246c5642016-11-01 11:16:52 -07001146 op->data.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001147 op->flags = 0;
1148 op->reserved = NULL;
1149 op++;
David Garcia Quintase224a762016-11-01 13:00:58 -07001150 /* reuse the "lb_on_response_received" weak ref taken in
1151 * query_for_backends_locked() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001152 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas246c5642016-11-01 11:16:52 -07001153 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1154 &glb_policy->lb_on_response_received); /* loop */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001155 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001156 }
David Garcia Quintase224a762016-11-01 13:00:58 -07001157 } else { /* empty payload: call cancelled. */
1158 /* dispose of the "lb_on_response_received" weak ref taken in
1159 * query_for_backends_locked() and reused in every reception loop */
1160 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1161 "lb_on_response_received_empty_payload");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001162 }
1163}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001164
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001165static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
1166 grpc_error *error) {
1167 glb_lb_policy *glb_policy = arg;
1168 gpr_mu_lock(&glb_policy->mu);
1169
1170 if (!glb_policy->shutting_down) {
1171 if (grpc_lb_glb_trace) {
1172 gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
1173 (void *)glb_policy);
1174 }
1175 GPR_ASSERT(glb_policy->lb_call == NULL);
1176 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001177 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001178 gpr_mu_unlock(&glb_policy->mu);
1179
1180 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1181 "grpclb_on_retry_timer");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001182}
1183
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001184static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
1185 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001186 glb_lb_policy *glb_policy = arg;
1187 gpr_mu_lock(&glb_policy->mu);
1188
1189 GPR_ASSERT(glb_policy->lb_call != NULL);
1190
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001191 if (grpc_lb_glb_trace) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001192 gpr_log(GPR_DEBUG,
1193 "Status from LB server received. Status = %d, Details = '%s', "
1194 "(call: %p)",
1195 glb_policy->lb_call_status, glb_policy->lb_call_status_details,
1196 (void *)glb_policy->lb_call);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001197 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001198
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001199 /* We need to performe cleanups no matter what. */
David Garcia Quintasaa24e9a2016-11-07 11:05:50 -08001200 lb_call_destroy_locked(glb_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001201
1202 if (!glb_policy->shutting_down) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001203 /* if we aren't shutting down, restart the LB client call after some time */
1204 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1205 gpr_timespec next_try =
1206 gpr_backoff_step(&glb_policy->lb_call_backoff_state, now);
1207 if (grpc_lb_glb_trace) {
1208 gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
1209 (void *)glb_policy);
1210 gpr_timespec timeout = gpr_time_sub(next_try, now);
1211 if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
1212 gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.",
1213 timeout.tv_sec, timeout.tv_nsec);
1214 } else {
1215 gpr_log(GPR_DEBUG, "... retrying immediately.");
1216 }
1217 }
1218 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
1219 grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
1220 lb_call_on_retry_timer, glb_policy, now);
1221 }
1222 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001223 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1224 "lb_on_server_status_received");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001225}
1226
David Garcia Quintas8d489112016-07-29 15:20:42 -07001227/* Code wiring the policy with the rest of the core */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001228static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
1229 glb_destroy, glb_shutdown, glb_pick,
1230 glb_cancel_pick, glb_cancel_picks, glb_ping_one,
1231 glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
1232
1233static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
1234
1235static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
1236
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001237static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
1238 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
1239
1240static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
1241
1242grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
1243 return &glb_lb_policy_factory;
1244}
1245
1246/* Plugin registration */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001247void grpc_lb_policy_grpclb_init() {
1248 grpc_register_lb_policy(grpc_glb_lb_factory_create());
1249 grpc_register_tracer("glb", &grpc_lb_glb_trace);
1250}
1251
1252void grpc_lb_policy_grpclb_shutdown() {}