blob: be05be9abed488f78d8182756312b22afa66ac3b [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070034/** Implementation of the gRPC LB policy.
35 *
David Garcia Quintas43339842016-07-18 12:56:09 -070036 * This policy takes as input a set of resolved addresses {a1..an} for which the
37 * LB set was set (it's the resolver's responsibility to ensure this). That is
38 * to say, {a1..an} represent a collection of LB servers.
39 *
40 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
41 * This channel behaves just like a regular channel. In particular, the
42 * constructed URI over the addresses a1..an will use the default pick first
43 * policy to select from this list of LB server backends.
44 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070045 * The first time the policy gets a request for a pick, a ping, or to exit the
David Garcia Quintas98da61b2016-10-29 08:46:31 +020046 * idle state, \a query_for_backends_locked() is called. This function sets up
47 * and initiates the internal communication with the LB server. In particular,
48 * it's responsible for instantiating the internal *streaming* call to the LB
49 * server (whichever address from {a1..an} pick-first chose). This call is
David Garcia Quintas7ec29132016-11-01 04:09:05 +010050 * serviced by two callbacks, \a lb_on_server_status_received and \a
51 * lb_on_response_received. The former will be called when the call to the LB
52 * server completes. This can happen if the LB server closes the connection or
53 * if this policy itself cancels the call (for example because it's shutting
David Garcia Quintas246c5642016-11-01 11:16:52 -070054 * down). If the internal call times out, the usual behavior of pick-first
David Garcia Quintas7ec29132016-11-01 04:09:05 +010055 * applies, continuing to pick from the list {a1..an}.
David Garcia Quintas43339842016-07-18 12:56:09 -070056 *
David Garcia Quintas98da61b2016-10-29 08:46:31 +020057 * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a
58 * res_recv. An invalid one results in the termination of the streaming call. A
59 * new streaming call should be created if possible, failing the original call
60 * otherwise. For a valid \a LoadBalancingResponse, the server list of actual
61 * backends is extracted. A Round Robin policy will be created from this list.
62 * There are two possible scenarios:
David Garcia Quintas43339842016-07-18 12:56:09 -070063 *
64 * 1. This is the first server list received. There was no previous instance of
David Garcia Quintas90712d52016-10-13 19:33:04 -070065 * the Round Robin policy. \a rr_handover_locked() will instantiate the RR
66 * policy and perform all the pending operations over it.
David Garcia Quintas43339842016-07-18 12:56:09 -070067 * 2. There's already a RR policy instance active. We need to introduce the new
68 * one build from the new serverlist, but taking care not to disrupt the
69 * operations in progress over the old RR instance. This is done by
70 * decreasing the reference count on the old policy. The moment no more
71 * references are held on the old RR policy, it'll be destroyed and \a
David Garcia Quintas348cfdb2016-08-19 12:19:43 -070072 * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
73 * state. At this point we can transition to a new RR instance safely, which
David Garcia Quintas90712d52016-10-13 19:33:04 -070074 * is done once again via \a rr_handover_locked().
David Garcia Quintas43339842016-07-18 12:56:09 -070075 *
76 *
77 * Once a RR policy instance is in place (and getting updated as described),
78 * calls to for a pick, a ping or a cancellation will be serviced right away by
79 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintas7ec29132016-11-01 04:09:05 +010080 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is
81 * received, etc), pick/ping requests are added to a list of pending picks/pings
82 * to be flushed and serviced as part of \a rr_handover_locked() the moment the
83 * RR policy instance becomes available.
David Garcia Quintas43339842016-07-18 12:56:09 -070084 *
85 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
86 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070087
88/* TODO(dgq):
89 * - Implement LB service forwarding (point 2c. in the doc's diagram).
90 */
91
murgatroid99085f9af2016-10-24 09:55:44 -070092/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
93 using that endpoint. Because of various transitive includes in uv.h,
94 including windows.h on Windows, uv.h must be included before other system
95 headers. Therefore, sockaddr.h must always be included first */
murgatroid997871f732016-09-23 13:49:05 -070096#include "src/core/lib/iomgr/sockaddr.h"
97
David Garcia Quintas8a81aa12016-08-22 15:06:49 -070098#include <errno.h>
99
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -0700100#include <string.h>
101
102#include <grpc/byte_buffer_reader.h>
103#include <grpc/grpc.h>
104#include <grpc/support/alloc.h>
105#include <grpc/support/host_port.h>
106#include <grpc/support/string_util.h>
David Garcia Quintas69099222016-10-03 11:28:37 -0700107#include <grpc/support/time.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -0700108
Mark D. Roth2137cd82016-09-14 09:04:00 -0700109#include "src/core/ext/client_channel/client_channel_factory.h"
Mark D. Roth15195742016-10-07 09:02:28 -0700110#include "src/core/ext/client_channel/lb_policy_factory.h"
Mark D. Roth2137cd82016-09-14 09:04:00 -0700111#include "src/core/ext/client_channel/lb_policy_registry.h"
112#include "src/core/ext/client_channel/parse_address.h"
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700113#include "src/core/ext/lb_policy/grpclb/grpclb.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700114#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
Mark D. Roth046cf762016-09-26 11:13:51 -0700115#include "src/core/lib/channel/channel_args.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200116#include "src/core/lib/iomgr/sockaddr.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700117#include "src/core/lib/iomgr/sockaddr_utils.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200118#include "src/core/lib/iomgr/timer.h"
Craig Tiller0f310802016-10-26 16:25:56 -0700119#include "src/core/lib/slice/slice_string_helpers.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200120#include "src/core/lib/support/backoff.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700121#include "src/core/lib/support/string.h"
122#include "src/core/lib/surface/call.h"
123#include "src/core/lib/surface/channel.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700124#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700125
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200126#define BACKOFF_MULTIPLIER 1.6
127#define BACKOFF_JITTER 0.2
128#define BACKOFF_MIN_SECONDS 10
129#define BACKOFF_MAX_SECONDS 60
130
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700131int grpc_lb_glb_trace = 0;
132
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700133/* add lb_token of selected subchannel (address) to the call's initial
134 * metadata */
135static void initial_metadata_add_lb_token(
136 grpc_metadata_batch *initial_metadata,
137 grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem *lb_token) {
138 GPR_ASSERT(lb_token_mdelem_storage != NULL);
139 GPR_ASSERT(lb_token != NULL);
140 grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
141 lb_token);
142}
143
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700144typedef struct wrapped_rr_closure_arg {
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700145 /* the closure instance using this struct as argument */
146 grpc_closure wrapper_closure;
147
David Garcia Quintas43339842016-07-18 12:56:09 -0700148 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
149 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700150 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700151
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700152 /* the pick's initial metadata, kept in order to append the LB token for the
153 * pick */
154 grpc_metadata_batch *initial_metadata;
155
156 /* the picked target, used to determine which LB token to add to the pick's
157 * initial metadata */
158 grpc_connected_subchannel **target;
159
160 /* the LB token associated with the pick */
161 grpc_mdelem *lb_token;
162
163 /* storage for the lb token initial metadata mdelem */
164 grpc_linked_mdelem *lb_token_mdelem_storage;
165
David Garcia Quintas43339842016-07-18 12:56:09 -0700166 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700167 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700168
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700169 /* heap memory to be freed upon closure execution. */
170 void *free_when_done;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700171} wrapped_rr_closure_arg;
172
173/* The \a on_complete closure passed as part of the pick requires keeping a
174 * reference to its associated round robin instance. We wrap this closure in
175 * order to unref the round robin instance upon its invocation */
176static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700177 grpc_error *error) {
David Garcia Quintas43339842016-07-18 12:56:09 -0700178 wrapped_rr_closure_arg *wc_arg = arg;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700179
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200180 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
181 grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error),
182 NULL);
183
184 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700185 /* if target is NULL, no pick has been made by the RR policy (eg, all
186 * addresses failed to connect). There won't be any user_data/token
187 * available */
188 if (wc_arg->target != NULL) {
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800189 if (wc_arg->lb_token != NULL) {
190 initial_metadata_add_lb_token(wc_arg->initial_metadata,
191 wc_arg->lb_token_mdelem_storage,
192 GRPC_MDELEM_REF(wc_arg->lb_token));
193 } else {
194 gpr_log(GPR_ERROR,
195 "No LB token for connected subchannel pick %p (from RR "
196 "instance %p).",
197 (void *)*wc_arg->target, (void *)wc_arg->rr_policy);
198 abort();
199 }
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700200 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200201 if (grpc_lb_glb_trace) {
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800202 gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200203 }
204 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700205 }
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700206 GPR_ASSERT(wc_arg->free_when_done != NULL);
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700207 gpr_free(wc_arg->free_when_done);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700208}
209
David Garcia Quintasea11d162016-07-14 17:27:28 -0700210/* Linked list of pending pick requests. It stores all information needed to
211 * eventually call (Round Robin's) pick() on them. They mainly stay pending
212 * waiting for the RR policy to be created/updated.
213 *
214 * One particularity is the wrapping of the user-provided \a on_complete closure
215 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
216 * order to correctly unref the RR policy instance upon completion of the pick.
217 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700218typedef struct pending_pick {
219 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700220
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700221 /* original pick()'s arguments */
222 grpc_lb_policy_pick_args pick_args;
David Garcia Quintas43339842016-07-18 12:56:09 -0700223
224 /* output argument where to store the pick()ed connected subchannel, or NULL
225 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700226 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700227
David Garcia Quintas43339842016-07-18 12:56:09 -0700228 /* args for wrapped_on_complete */
229 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700230} pending_pick;
231
David Garcia Quintas8aace512016-08-15 14:55:12 -0700232static void add_pending_pick(pending_pick **root,
233 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas65318262016-07-29 13:43:38 -0700234 grpc_connected_subchannel **target,
235 grpc_closure *on_complete) {
236 pending_pick *pp = gpr_malloc(sizeof(*pp));
237 memset(pp, 0, sizeof(pending_pick));
238 memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
239 pp->next = *root;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700240 pp->pick_args = *pick_args;
David Garcia Quintas65318262016-07-29 13:43:38 -0700241 pp->target = target;
David Garcia Quintas65318262016-07-29 13:43:38 -0700242 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700243 pp->wrapped_on_complete_arg.target = target;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700244 pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
245 pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
246 pick_args->lb_token_mdelem_storage;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700247 pp->wrapped_on_complete_arg.free_when_done = pp;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700248 grpc_closure_init(&pp->wrapped_on_complete_arg.wrapper_closure,
249 wrapped_rr_closure, &pp->wrapped_on_complete_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700250 *root = pp;
251}
252
David Garcia Quintasea11d162016-07-14 17:27:28 -0700253/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700254typedef struct pending_ping {
255 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700256
David Garcia Quintas43339842016-07-18 12:56:09 -0700257 /* args for wrapped_notify */
258 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700259} pending_ping;
260
David Garcia Quintas65318262016-07-29 13:43:38 -0700261static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
262 pending_ping *pping = gpr_malloc(sizeof(*pping));
263 memset(pping, 0, sizeof(pending_ping));
264 memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
David Garcia Quintas65318262016-07-29 13:43:38 -0700265 pping->wrapped_notify_arg.wrapped_closure = notify;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700266 pping->wrapped_notify_arg.free_when_done = pping;
David Garcia Quintas65318262016-07-29 13:43:38 -0700267 pping->next = *root;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700268 grpc_closure_init(&pping->wrapped_notify_arg.wrapper_closure,
269 wrapped_rr_closure, &pping->wrapped_notify_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700270 *root = pping;
271}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700272
David Garcia Quintas8d489112016-07-29 15:20:42 -0700273/*
274 * glb_lb_policy
275 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700276typedef struct rr_connectivity_data rr_connectivity_data;
David Garcia Quintas65318262016-07-29 13:43:38 -0700277static const grpc_lb_policy_vtable glb_lb_policy_vtable;
278typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700279 /** base policy: must be first */
280 grpc_lb_policy base;
281
282 /** mutex protecting remaining members */
283 gpr_mu mu;
284
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700285 /** who the client is trying to communicate with */
Mark D. Rothd1604af2016-09-22 11:20:27 -0700286 const char *server_name;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700287 grpc_client_channel_factory *cc_factory;
Mark D. Roth046cf762016-09-26 11:13:51 -0700288 grpc_channel_args *args;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700289
David Garcia Quintas5cf3c372016-10-03 14:30:03 -0700290 /** deadline for the LB's call */
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700291 gpr_timespec deadline;
292
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700293 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700294 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700295
296 /** the RR policy to use of the backend servers returned by the LB server */
297 grpc_lb_policy *rr_policy;
298
299 bool started_picking;
300
301 /** our connectivity state tracker */
302 grpc_connectivity_state_tracker state_tracker;
303
David Garcia Quintasea11d162016-07-14 17:27:28 -0700304 /** stores the deserialized response from the LB. May be NULL until one such
305 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700306 grpc_grpclb_serverlist *serverlist;
307
David Garcia Quintasea11d162016-07-14 17:27:28 -0700308 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700309 pending_pick *pending_picks;
310
David Garcia Quintasea11d162016-07-14 17:27:28 -0700311 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700312 pending_ping *pending_pings;
313
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200314 bool shutting_down;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700315
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200316 /************************************************************/
317 /* client data associated with the LB server communication */
318 /************************************************************/
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100319 /* Status from the LB server has been received. This signals the end of the LB
320 * call. */
321 grpc_closure lb_on_server_status_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200322
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100323 /* A response from the LB server has been received. Process it */
324 grpc_closure lb_on_response_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200325
326 grpc_call *lb_call; /* streaming call to the LB server, */
327
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100328 grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
329 grpc_metadata_array
330 lb_trailing_metadata_recv; /* trailing MD from LB server */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200331
332 /* what's being sent to the LB server. Note that its value may vary if the LB
333 * server indicates a redirect. */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100334 grpc_byte_buffer *lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200335
David Garcia Quintas246c5642016-11-01 11:16:52 -0700336 /* response the LB server, if any. Processed in lb_on_response_received() */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100337 grpc_byte_buffer *lb_response_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200338
David Garcia Quintas246c5642016-11-01 11:16:52 -0700339 /* call status code and details, set in lb_on_server_status_received() */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200340 grpc_status_code lb_call_status;
341 char *lb_call_status_details;
342 size_t lb_call_status_details_capacity;
343
344 /** LB call retry backoff state */
345 gpr_backoff lb_call_backoff_state;
346
347 /** LB call retry timer */
348 grpc_timer lb_call_retry_timer;
David Garcia Quintas65318262016-07-29 13:43:38 -0700349} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700350
David Garcia Quintas65318262016-07-29 13:43:38 -0700351/* Keeps track and reacts to changes in connectivity of the RR instance */
352struct rr_connectivity_data {
353 grpc_closure on_change;
354 grpc_connectivity_state state;
355 glb_lb_policy *glb_policy;
356};
David Garcia Quintas8d489112016-07-29 15:20:42 -0700357
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700358static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
359 bool log) {
360 const grpc_grpclb_ip_address *ip = &server->ip_address;
361 if (server->port >> 16 != 0) {
362 if (log) {
363 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200364 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
365 server->port, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700366 }
367 return false;
368 }
369
370 if (ip->size != 4 && ip->size != 16) {
371 if (log) {
372 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200373 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700374 "serverlist. Ignoring",
Jan Tattermusch2b398082016-10-07 14:40:30 +0200375 ip->size, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700376 }
377 return false;
378 }
379 return true;
380}
381
Mark D. Roth16883a32016-10-21 10:30:58 -0700382/* vtable for LB tokens in grpc_lb_addresses. */
Mark D. Roth557c9902016-10-24 11:12:05 -0700383static void *lb_token_copy(void *token) {
Mark D. Roth16883a32016-10-21 10:30:58 -0700384 return token == NULL ? NULL : GRPC_MDELEM_REF(token);
385}
386static void lb_token_destroy(void *token) {
387 if (token != NULL) GRPC_MDELEM_UNREF(token);
388}
Mark D. Roth557c9902016-10-24 11:12:05 -0700389static int lb_token_cmp(void *token1, void *token2) {
Mark D. Roth16883a32016-10-21 10:30:58 -0700390 if (token1 > token2) return 1;
391 if (token1 < token2) return -1;
392 return 0;
393}
394static const grpc_lb_user_data_vtable lb_token_vtable = {
395 lb_token_copy, lb_token_destroy, lb_token_cmp};
396
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100397static void parse_server(const grpc_grpclb_server *server,
398 grpc_resolved_address *addr) {
399 const uint16_t netorder_port = htons((uint16_t)server->port);
400 /* the addresses are given in binary format (a in(6)_addr struct) in
401 * server->ip_address.bytes. */
402 const grpc_grpclb_ip_address *ip = &server->ip_address;
403 memset(addr, 0, sizeof(*addr));
404 if (ip->size == 4) {
405 addr->len = sizeof(struct sockaddr_in);
406 struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr;
407 addr4->sin_family = AF_INET;
408 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
409 addr4->sin_port = netorder_port;
410 } else if (ip->size == 16) {
411 addr->len = sizeof(struct sockaddr_in6);
412 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr;
David Garcia Quintas107ca162016-11-02 18:17:03 -0700413 addr6->sin6_family = AF_INET6;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100414 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
415 addr6->sin6_port = netorder_port;
416 }
417}
418
Mark D. Roth7ce14d22016-09-16 13:03:46 -0700419/* Returns addresses extracted from \a serverlist. */
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800420static grpc_lb_addresses *process_serverlist_locked(
Mark D. Rothc5c38782016-09-16 08:51:01 -0700421 const grpc_grpclb_serverlist *serverlist) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700422 size_t num_valid = 0;
423 /* first pass: count how many are valid in order to allocate the necessary
424 * memory in a single block */
425 for (size_t i = 0; i < serverlist->num_servers; ++i) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700426 if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700427 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700428 if (num_valid == 0) return NULL;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700429
Mark D. Roth16883a32016-10-21 10:30:58 -0700430 grpc_lb_addresses *lb_addresses =
431 grpc_lb_addresses_create(num_valid, &lb_token_vtable);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700432
433 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700434 * to the outside world) to be read by the RR policy during its creation.
435 * Given that the validity tests are very cheap, they are performed again
436 * instead of marking the valid ones during the first pass, as this would
437 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700438 size_t addr_idx = 0;
439 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
440 GPR_ASSERT(addr_idx < num_valid);
441 const grpc_grpclb_server *server = serverlist->servers[sl_idx];
442 if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700443
444 /* address processing */
Mark D. Rothc5c38782016-09-16 08:51:01 -0700445 grpc_resolved_address addr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100446 parse_server(server, &addr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700447
448 /* lb token processing */
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700449 void *user_data;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700450 if (server->has_load_balance_token) {
David Garcia Quintas0baf1dc2016-10-28 04:44:01 +0200451 const size_t lb_token_max_length =
452 GPR_ARRAY_SIZE(server->load_balance_token);
453 const size_t lb_token_length =
454 strnlen(server->load_balance_token, lb_token_max_length);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700455 grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
David Garcia Quintas0baf1dc2016-10-28 04:44:01 +0200456 (uint8_t *)server->load_balance_token, lb_token_length);
David Garcia Quintasa3654db2016-10-11 15:52:39 -0700457 user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN,
458 lb_token_mdstr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700459 } else {
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800460 char *uri = grpc_sockaddr_to_uri(&addr);
461 gpr_log(GPR_INFO,
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700462 "Missing LB token for backend address '%s'. The empty token will "
463 "be used instead",
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800464 uri);
465 gpr_free(uri);
David Garcia Quintasa3654db2016-10-11 15:52:39 -0700466 user_data = GRPC_MDELEM_LB_TOKEN_EMPTY;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700467 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700468
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700469 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
470 false /* is_balancer */,
Mark D. Rothc5c38782016-09-16 08:51:01 -0700471 NULL /* balancer_name */, user_data);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700472 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700473 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700474 GPR_ASSERT(addr_idx == num_valid);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700475 return lb_addresses;
476}
477
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700478/* perform a pick over \a rr_policy. Given that a pick can return immediately
479 * (ignoring its completion callback) we need to perform the cleanups this
480 * callback would be otherwise resposible for */
David Garcia Quintas20359062016-10-15 15:22:51 -0700481static bool pick_from_internal_rr_locked(
482 grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy,
483 const grpc_lb_policy_pick_args *pick_args,
484 grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
485 GPR_ASSERT(rr_policy != NULL);
486 const bool pick_done =
487 grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target,
488 (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
489 if (pick_done) {
490 /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
491 if (grpc_lb_glb_trace) {
492 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
493 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700494 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200495 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700496
David Garcia Quintas20359062016-10-15 15:22:51 -0700497 /* add the load reporting initial metadata */
498 initial_metadata_add_lb_token(pick_args->initial_metadata,
499 pick_args->lb_token_mdelem_storage,
500 GRPC_MDELEM_REF(wc_arg->lb_token));
501
502 gpr_free(wc_arg);
503 }
504 /* else, the pending pick will be registered and taken care of by the
505 * pending pick list inside the RR policy (glb_policy->rr_policy).
506 * Eventually, wrapped_on_complete will be called, which will -among other
507 * things- add the LB token to the call's initial metadata */
David Garcia Quintas20359062016-10-15 15:22:51 -0700508 return pick_done;
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700509}
510
David Garcia Quintas90712d52016-10-13 19:33:04 -0700511static grpc_lb_policy *create_rr_locked(
512 grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist,
513 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700514 GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700515
516 grpc_lb_policy_args args;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700517 memset(&args, 0, sizeof(args));
David Garcia Quintas65318262016-07-29 13:43:38 -0700518 args.client_channel_factory = glb_policy->cc_factory;
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800519 grpc_lb_addresses *addresses = process_serverlist_locked(serverlist);
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700520
521 // Replace the LB addresses in the channel args that we pass down to
522 // the subchannel.
Mark D. Roth557c9902016-10-24 11:12:05 -0700523 static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200524 const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700525 args.args = grpc_channel_args_copy_and_add_and_remove(
526 glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
527 1);
David Garcia Quintas65318262016-07-29 13:43:38 -0700528
529 grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200530 GPR_ASSERT(rr != NULL);
531 grpc_lb_addresses_destroy(addresses);
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700532 grpc_channel_args_destroy(args.args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700533 return rr;
534}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700535
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200536static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
537 grpc_error *error);
538/* glb_policy->rr_policy may be NULL (initial handover) */
David Garcia Quintas90712d52016-10-13 19:33:04 -0700539static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
540 glb_lb_policy *glb_policy, grpc_error *error) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700541 GPR_ASSERT(glb_policy->serverlist != NULL &&
542 glb_policy->serverlist->num_servers > 0);
David Garcia Quintas65318262016-07-29 13:43:38 -0700543
544 if (grpc_lb_glb_trace) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200545 gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700546 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200547 if (glb_policy->rr_policy != NULL) {
548 /* if we are phasing out an existing RR instance, unref it. */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100549 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200550 }
551
552 glb_policy->rr_policy =
553 create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
554 if (grpc_lb_glb_trace) {
555 gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy);
556 }
557
David Garcia Quintas65318262016-07-29 13:43:38 -0700558 GPR_ASSERT(glb_policy->rr_policy != NULL);
Yuchen Zengb4291642016-09-01 19:17:14 -0700559 grpc_pollset_set_add_pollset_set(exec_ctx,
560 glb_policy->rr_policy->interested_parties,
561 glb_policy->base.interested_parties);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200562
563 rr_connectivity_data *rr_connectivity =
564 gpr_malloc(sizeof(rr_connectivity_data));
565 memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
566 grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
567 rr_connectivity);
568 rr_connectivity->glb_policy = glb_policy;
569 rr_connectivity->state = grpc_lb_policy_check_connectivity(
David Garcia Quintas65318262016-07-29 13:43:38 -0700570 exec_ctx, glb_policy->rr_policy, &error);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200571
David Garcia Quintas65318262016-07-29 13:43:38 -0700572 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200573 rr_connectivity->state, GRPC_ERROR_REF(error),
574 "rr_handover");
575 /* subscribe */
David Garcia Quintase224a762016-11-01 13:00:58 -0700576 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200577 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
578 &rr_connectivity->state,
579 &rr_connectivity->on_change);
David Garcia Quintas65318262016-07-29 13:43:38 -0700580 grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
581
582 /* flush pending ops */
583 pending_pick *pp;
584 while ((pp = glb_policy->pending_picks)) {
585 glb_policy->pending_picks = pp->next;
586 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
587 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
588 if (grpc_lb_glb_trace) {
589 gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "",
590 (intptr_t)glb_policy->rr_policy);
591 }
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700592 pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
593 &pp->pick_args, pp->target,
594 &pp->wrapped_on_complete_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700595 }
596
597 pending_ping *pping;
598 while ((pping = glb_policy->pending_pings)) {
599 glb_policy->pending_pings = pping->next;
600 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
601 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
602 if (grpc_lb_glb_trace) {
603 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
604 (intptr_t)glb_policy->rr_policy);
605 }
606 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy,
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700607 &pping->wrapped_notify_arg.wrapper_closure);
David Garcia Quintas65318262016-07-29 13:43:38 -0700608 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700609}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700610
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700611static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
612 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200613 /* If shutdown or error free the arg. Rely on the rest of the code to set the
614 * right grpclb status. */
David Garcia Quintas41bef452016-07-28 19:19:58 -0700615 rr_connectivity_data *rr_conn_data = arg;
616 glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
David Garcia Quintasf9532952016-11-08 14:14:25 -0800617 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700618
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100619 if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN &&
620 !glb_policy->shutting_down) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200621 /* RR not shutting down. Mimic the RR's policy state */
622 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
623 rr_conn_data->state, GRPC_ERROR_REF(error),
David Garcia Quintase224a762016-11-01 13:00:58 -0700624 "rr_connectivity_cb");
625 /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200626 grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
627 &rr_conn_data->state,
628 &rr_conn_data->on_change);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700629 } else {
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100630 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintase224a762016-11-01 13:00:58 -0700631 "rr_connectivity_cb");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200632 gpr_free(rr_conn_data);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700633 }
David Garcia Quintasf9532952016-11-08 14:14:25 -0800634 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700635}
636
David Garcia Quintas65318262016-07-29 13:43:38 -0700637static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
638 grpc_lb_policy_factory *factory,
639 grpc_lb_policy_args *args) {
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700640 /* Get server name. */
Mark D. Roth557c9902016-10-24 11:12:05 -0700641 const grpc_arg *arg =
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700642 grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
Mark D. Roth557c9902016-10-24 11:12:05 -0700643 const char *server_name =
644 arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700645
Mark D. Rothe011b1e2016-09-07 08:28:00 -0700646 /* Count the number of gRPC-LB addresses. There must be at least one.
647 * TODO(roth): For now, we ignore non-balancer addresses, but in the
648 * future, we may change the behavior such that we fall back to using
649 * the non-balancer addresses if we cannot reach any balancers. At that
650 * time, this should be changed to allow a list with no balancer addresses,
651 * since the resolver might fail to return a balancer address even when
652 * this is the right LB policy to use. */
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700653 arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
654 GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
Mark D. Roth557c9902016-10-24 11:12:05 -0700655 grpc_lb_addresses *addresses = arg->value.pointer.p;
Mark D. Rothf655c852016-09-06 10:40:38 -0700656 size_t num_grpclb_addrs = 0;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700657 for (size_t i = 0; i < addresses->num_addresses; ++i) {
658 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
Mark D. Rothf655c852016-09-06 10:40:38 -0700659 }
660 if (num_grpclb_addrs == 0) return NULL;
661
David Garcia Quintas65318262016-07-29 13:43:38 -0700662 glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
663 memset(glb_policy, 0, sizeof(*glb_policy));
664
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700665 /* All input addresses in addresses come from a resolver that claims
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700666 * they are LB services. It's the resolver's responsibility to make sure
667 * this
David Garcia Quintas65318262016-07-29 13:43:38 -0700668 * policy is only instantiated and used in that case.
669 *
670 * Create a client channel over them to communicate with a LB service */
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700671 glb_policy->server_name = gpr_strdup(server_name);
David Garcia Quintas65318262016-07-29 13:43:38 -0700672 glb_policy->cc_factory = args->client_channel_factory;
Mark D. Roth98abfd32016-10-21 08:10:51 -0700673 glb_policy->args = grpc_channel_args_copy(args->args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700674 GPR_ASSERT(glb_policy->cc_factory != NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700675
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700676 /* construct a target from the addresses in args, given in the form
David Garcia Quintas65318262016-07-29 13:43:38 -0700677 * ipvX://ip1:port1,ip2:port2,...
678 * TODO(dgq): support mixed ip version */
Mark D. Rothf655c852016-09-06 10:40:38 -0700679 char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700680 size_t addr_index = 0;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700681 for (size_t i = 0; i < addresses->num_addresses; i++) {
682 if (addresses->addresses[i].user_data != NULL) {
David Garcia Quintas5ebb7af2016-09-15 10:02:16 -0700683 gpr_log(GPR_ERROR,
684 "This LB policy doesn't support user data. It will be ignored");
685 }
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700686 if (addresses->addresses[i].is_balancer) {
Mark D. Rothc5c38782016-09-16 08:51:01 -0700687 if (addr_index == 0) {
murgatroid99dedb9232016-09-26 13:54:04 -0700688 addr_strs[addr_index++] =
Mark D. Rothfb809b72016-10-26 09:12:25 -0700689 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700690 } else {
Mark D. Roth49f89f02016-10-26 11:16:59 -0700691 GPR_ASSERT(grpc_sockaddr_to_string(&addr_strs[addr_index++],
692 &addresses->addresses[i].address,
693 true) > 0);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700694 }
Mark D. Rothf655c852016-09-06 10:40:38 -0700695 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700696 }
697 size_t uri_path_len;
Mark D. Roth989cdcd2016-09-06 13:28:28 -0700698 char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
699 num_grpclb_addrs, ",", &uri_path_len);
David Garcia Quintas65318262016-07-29 13:43:38 -0700700
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700701 /* Create a channel to talk to the LBs.
702 *
703 * We strip out the channel arg for the LB policy name, since we want
704 * to use the default (pick_first) in this case.
705 *
706 * We also strip out the channel arg for the resolved addresses, since
707 * that will be generated by the name resolver used in the LB channel.
708 * Note that the LB channel will use the sockaddr resolver, so this
709 * won't actually generate a query to DNS (or some other name service).
710 * However, the addresses returned by the sockaddr resolver will have
711 * is_balancer=false, whereas our own addresses have is_balancer=true.
712 * We need the LB channel to return addresses with is_balancer=false
713 * so that it does not wind up recursively using the grpclb LB policy,
714 * as per the special case logic in client_channel.c.
715 */
Mark D. Roth557c9902016-10-24 11:12:05 -0700716 static const char *keys_to_remove[] = {GRPC_ARG_LB_POLICY_NAME,
717 GRPC_ARG_LB_ADDRESSES};
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700718 grpc_channel_args *new_args = grpc_channel_args_copy_and_remove(
719 args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove));
David Garcia Quintas65318262016-07-29 13:43:38 -0700720 glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
721 exec_ctx, glb_policy->cc_factory, target_uri_str,
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700722 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
723 grpc_channel_args_destroy(new_args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700724
725 gpr_free(target_uri_str);
Mark D. Rothf655c852016-09-06 10:40:38 -0700726 for (size_t i = 0; i < num_grpclb_addrs; i++) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700727 gpr_free(addr_strs[i]);
728 }
729 gpr_free(addr_strs);
730
731 if (glb_policy->lb_channel == NULL) {
732 gpr_free(glb_policy);
733 return NULL;
734 }
735
David Garcia Quintas65318262016-07-29 13:43:38 -0700736 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
737 gpr_mu_init(&glb_policy->mu);
738 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
739 "grpclb");
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200740
David Garcia Quintas65318262016-07-29 13:43:38 -0700741 return &glb_policy->base;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700742}
743
David Garcia Quintas65318262016-07-29 13:43:38 -0700744static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
745 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
746 GPR_ASSERT(glb_policy->pending_picks == NULL);
747 GPR_ASSERT(glb_policy->pending_pings == NULL);
Mark D. Rothd1604af2016-09-22 11:20:27 -0700748 gpr_free((void *)glb_policy->server_name);
Mark D. Roth046cf762016-09-26 11:13:51 -0700749 grpc_channel_args_destroy(glb_policy->args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700750 grpc_channel_destroy(glb_policy->lb_channel);
751 glb_policy->lb_channel = NULL;
752 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
753 if (glb_policy->serverlist != NULL) {
754 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
755 }
756 gpr_mu_destroy(&glb_policy->mu);
757 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700758}
759
David Garcia Quintas65318262016-07-29 13:43:38 -0700760static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
761 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
762 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200763 glb_policy->shutting_down = true;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700764
David Garcia Quintas65318262016-07-29 13:43:38 -0700765 pending_pick *pp = glb_policy->pending_picks;
766 glb_policy->pending_picks = NULL;
767 pending_ping *pping = glb_policy->pending_pings;
768 glb_policy->pending_pings = NULL;
David Garcia Quintasaa24e9a2016-11-07 11:05:50 -0800769 if (glb_policy->rr_policy) {
770 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
771 }
David Garcia Quintasaa24e9a2016-11-07 11:05:50 -0800772 grpc_connectivity_state_set(
773 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
774 GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown");
David Garcia Quintasa74b2462016-11-11 14:07:27 -0800775 /* We need a copy of the lb_call pointer because we can't cancell the call
776 * while holding glb_policy->mu: lb_on_server_status_received, invoked due to
777 * the cancel, needs to acquire that same lock */
778 grpc_call *lb_call = glb_policy->lb_call;
David Garcia Quintas3ddebf42016-11-14 13:18:53 -0800779 glb_policy->lb_call = NULL;
David Garcia Quintas65318262016-07-29 13:43:38 -0700780 gpr_mu_unlock(&glb_policy->mu);
781
David Garcia Quintasa74b2462016-11-11 14:07:27 -0800782 /* glb_policy->lb_call and this local lb_call must be consistent at this point
783 * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
784 * of query_for_backends_locked, which can only be invoked while
785 * glb_policy->shutting_down is false. */
786 if (lb_call != NULL) {
787 grpc_call_cancel(lb_call, NULL);
788 /* lb_on_server_status_received will pick up the cancel and clean up */
789 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700790 while (pp != NULL) {
791 pending_pick *next = pp->next;
792 *pp->target = NULL;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700793 grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
794 GRPC_ERROR_NONE, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700795 pp = next;
796 }
797
798 while (pping != NULL) {
799 pending_ping *next = pping->next;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700800 grpc_exec_ctx_sched(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
801 GRPC_ERROR_NONE, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700802 pping = next;
803 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700804}
805
806static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
Mark D. Roth5f844002016-09-08 08:20:53 -0700807 grpc_connected_subchannel **target,
808 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700809 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
810 gpr_mu_lock(&glb_policy->mu);
811 pending_pick *pp = glb_policy->pending_picks;
812 glb_policy->pending_picks = NULL;
813 while (pp != NULL) {
814 pending_pick *next = pp->next;
815 if (pp->target == target) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700816 *target = NULL;
Mark D. Roth932b10c2016-09-09 08:44:30 -0700817 grpc_exec_ctx_sched(
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700818 exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
Mark D. Roth932b10c2016-09-09 08:44:30 -0700819 GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700820 } else {
821 pp->next = glb_policy->pending_picks;
822 glb_policy->pending_picks = pp;
823 }
824 pp = next;
825 }
826 gpr_mu_unlock(&glb_policy->mu);
Mark D. Roth5f844002016-09-08 08:20:53 -0700827 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700828}
829
David Garcia Quintas65318262016-07-29 13:43:38 -0700830static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
831 uint32_t initial_metadata_flags_mask,
Mark D. Rothe65ff112016-09-09 13:48:38 -0700832 uint32_t initial_metadata_flags_eq,
833 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700834 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
835 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas65318262016-07-29 13:43:38 -0700836 pending_pick *pp = glb_policy->pending_picks;
837 glb_policy->pending_picks = NULL;
838 while (pp != NULL) {
839 pending_pick *next = pp->next;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700840 if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
David Garcia Quintas65318262016-07-29 13:43:38 -0700841 initial_metadata_flags_eq) {
Mark D. Roth58f52b72016-09-09 13:55:18 -0700842 grpc_exec_ctx_sched(
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700843 exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
Mark D. Roth58f52b72016-09-09 13:55:18 -0700844 GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700845 } else {
846 pp->next = glb_policy->pending_picks;
847 glb_policy->pending_picks = pp;
848 }
849 pp = next;
850 }
851 gpr_mu_unlock(&glb_policy->mu);
Mark D. Rothe65ff112016-09-09 13:48:38 -0700852 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700853}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700854
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200855static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
856 glb_lb_policy *glb_policy);
857static void start_picking_locked(grpc_exec_ctx *exec_ctx,
858 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700859 glb_policy->started_picking = true;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200860 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
861 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700862}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700863
David Garcia Quintas65318262016-07-29 13:43:38 -0700864static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
865 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
866 gpr_mu_lock(&glb_policy->mu);
867 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200868 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700869 }
870 gpr_mu_unlock(&glb_policy->mu);
871}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700872
David Garcia Quintas65318262016-07-29 13:43:38 -0700873static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
David Garcia Quintas8aace512016-08-15 14:55:12 -0700874 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700875 grpc_connected_subchannel **target, void **user_data,
David Garcia Quintas65318262016-07-29 13:43:38 -0700876 grpc_closure *on_complete) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700877 if (pick_args->lb_token_mdelem_storage == NULL) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700878 *target = NULL;
David Garcia Quintas6cc44fc2016-09-12 23:04:35 -0700879 grpc_exec_ctx_sched(
880 exec_ctx, on_complete,
881 GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting "
882 "won't work without it. Failing"),
883 NULL);
Mark D. Roth1e5f6af2016-10-07 08:32:58 -0700884 return 0;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700885 }
886
David Garcia Quintas65318262016-07-29 13:43:38 -0700887 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
888 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas5cf3c372016-10-03 14:30:03 -0700889 glb_policy->deadline = pick_args->deadline;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700890 bool pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -0700891
892 if (glb_policy->rr_policy != NULL) {
893 if (grpc_lb_glb_trace) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200894 gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p",
895 (void *)glb_policy, (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700896 }
897 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
David Garcia Quintas8aace512016-08-15 14:55:12 -0700898
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700899 wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
David Garcia Quintas90712d52016-10-13 19:33:04 -0700900 memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg));
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700901
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700902 grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg);
David Garcia Quintas90712d52016-10-13 19:33:04 -0700903 wc_arg->rr_policy = glb_policy->rr_policy;
904 wc_arg->target = target;
905 wc_arg->wrapped_closure = on_complete;
906 wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
907 wc_arg->initial_metadata = pick_args->initial_metadata;
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700908 wc_arg->free_when_done = wc_arg;
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700909 pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy,
David Garcia Quintas20359062016-10-15 15:22:51 -0700910 pick_args, target, wc_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700911 } else {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200912 if (grpc_lb_glb_trace) {
913 gpr_log(GPR_DEBUG,
914 "No RR policy in grpclb instance %p. Adding to grpclb's pending "
915 "picks",
916 (void *)(glb_policy));
917 }
David Garcia Quintas8aace512016-08-15 14:55:12 -0700918 add_pending_pick(&glb_policy->pending_picks, pick_args, target,
919 on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -0700920
921 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200922 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700923 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700924 pick_done = false;
David Garcia Quintas65318262016-07-29 13:43:38 -0700925 }
926 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700927 return pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -0700928}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700929
David Garcia Quintas65318262016-07-29 13:43:38 -0700930static grpc_connectivity_state glb_check_connectivity(
931 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
932 grpc_error **connectivity_error) {
933 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
934 grpc_connectivity_state st;
935 gpr_mu_lock(&glb_policy->mu);
936 st = grpc_connectivity_state_check(&glb_policy->state_tracker,
937 connectivity_error);
938 gpr_mu_unlock(&glb_policy->mu);
939 return st;
940}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700941
David Garcia Quintas65318262016-07-29 13:43:38 -0700942static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
943 grpc_closure *closure) {
944 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
945 gpr_mu_lock(&glb_policy->mu);
946 if (glb_policy->rr_policy) {
947 grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure);
948 } else {
949 add_pending_ping(&glb_policy->pending_pings, closure);
950 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200951 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700952 }
953 }
954 gpr_mu_unlock(&glb_policy->mu);
955}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700956
David Garcia Quintas65318262016-07-29 13:43:38 -0700957static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
958 grpc_lb_policy *pol,
959 grpc_connectivity_state *current,
960 grpc_closure *notify) {
961 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
962 gpr_mu_lock(&glb_policy->mu);
963 grpc_connectivity_state_notify_on_state_change(
964 exec_ctx, &glb_policy->state_tracker, current, notify);
965
966 gpr_mu_unlock(&glb_policy->mu);
967}
968
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100969static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
970 grpc_error *error);
971static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
972 grpc_error *error);
David Garcia Quintasa74b2462016-11-11 14:07:27 -0800973static void lb_call_init_locked(glb_lb_policy *glb_policy) {
David Garcia Quintas55ba14a2016-09-27 18:45:30 -0700974 GPR_ASSERT(glb_policy->server_name != NULL);
975 GPR_ASSERT(glb_policy->server_name[0] != '\0');
David Garcia Quintasa74b2462016-11-11 14:07:27 -0800976 GPR_ASSERT(!glb_policy->shutting_down);
David Garcia Quintas55ba14a2016-09-27 18:45:30 -0700977
David Garcia Quintas15eba132016-08-09 15:20:48 -0700978 /* Note the following LB call progresses every time there's activity in \a
979 * glb_policy->base.interested_parties, which is comprised of the polling
Yuchen Zengf7c45ae2016-09-15 13:40:32 -0700980 * entities from \a client_channel. */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200981 glb_policy->lb_call = grpc_channel_create_pollset_set_call(
David Garcia Quintas65318262016-07-29 13:43:38 -0700982 glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
David Garcia Quintas4543e5c2016-09-22 15:09:34 -0700983 glb_policy->base.interested_parties,
David Garcia Quintas55ba14a2016-09-27 18:45:30 -0700984 "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name,
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200985 glb_policy->deadline, NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700986
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100987 grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
988 grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -0700989
David Garcia Quintas55ba14a2016-09-27 18:45:30 -0700990 grpc_grpclb_request *request =
991 grpc_grpclb_request_create(glb_policy->server_name);
Craig Tillerd41a4a72016-10-26 16:16:06 -0700992 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100993 glb_policy->lb_request_payload =
David Garcia Quintas65318262016-07-29 13:43:38 -0700994 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
Craig Tillerd41a4a72016-10-26 16:16:06 -0700995 grpc_slice_unref(request_payload_slice);
David Garcia Quintas65318262016-07-29 13:43:38 -0700996 grpc_grpclb_request_destroy(request);
997
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200998 glb_policy->lb_call_status_details = NULL;
999 glb_policy->lb_call_status_details_capacity = 0;
1000
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001001 grpc_closure_init(&glb_policy->lb_on_server_status_received,
1002 lb_on_server_status_received, glb_policy);
1003 grpc_closure_init(&glb_policy->lb_on_response_received,
1004 lb_on_response_received, glb_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001005
1006 gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER,
1007 BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000,
1008 BACKOFF_MAX_SECONDS * 1000);
David Garcia Quintas65318262016-07-29 13:43:38 -07001009}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001010
David Garcia Quintasaa24e9a2016-11-07 11:05:50 -08001011static void lb_call_destroy_locked(glb_lb_policy *glb_policy) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001012 GPR_ASSERT(glb_policy->lb_call != NULL);
1013 grpc_call_destroy(glb_policy->lb_call);
1014 glb_policy->lb_call = NULL;
David Garcia Quintas65318262016-07-29 13:43:38 -07001015
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001016 grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv);
1017 grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -07001018
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001019 grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001020 gpr_free(glb_policy->lb_call_status_details);
David Garcia Quintas65318262016-07-29 13:43:38 -07001021}
1022
David Garcia Quintas8d489112016-07-29 15:20:42 -07001023/*
1024 * Auxiliary functions and LB client callbacks.
1025 */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001026static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
1027 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001028 GPR_ASSERT(glb_policy->lb_channel != NULL);
David Garcia Quintasa74b2462016-11-11 14:07:27 -08001029 if (glb_policy->shutting_down) return;
1030
1031 lb_call_init_locked(glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001032
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001033 if (grpc_lb_glb_trace) {
1034 gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)",
1035 (void *)glb_policy, (void *)glb_policy->lb_call);
1036 }
1037 GPR_ASSERT(glb_policy->lb_call != NULL);
1038
David Garcia Quintas65318262016-07-29 13:43:38 -07001039 grpc_call_error call_error;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001040 grpc_op ops[4];
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001041 memset(ops, 0, sizeof(ops));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001042
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001043 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -07001044 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1045 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001046 op->flags = 0;
1047 op->reserved = NULL;
1048 op++;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001049
1050 op->op = GRPC_OP_RECV_INITIAL_METADATA;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001051 op->data.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001052 op->flags = 0;
1053 op->reserved = NULL;
1054 op++;
1055
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001056 GPR_ASSERT(glb_policy->lb_request_payload != NULL);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001057 op->op = GRPC_OP_SEND_MESSAGE;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001058 op->data.send_message = glb_policy->lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001059 op->flags = 0;
1060 op->reserved = NULL;
1061 op++;
1062
1063 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1064 op->data.recv_status_on_client.trailing_metadata =
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001065 &glb_policy->lb_trailing_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001066 op->data.recv_status_on_client.status = &glb_policy->lb_call_status;
1067 op->data.recv_status_on_client.status_details =
1068 &glb_policy->lb_call_status_details;
1069 op->data.recv_status_on_client.status_details_capacity =
1070 &glb_policy->lb_call_status_details_capacity;
1071 op->flags = 0;
1072 op->reserved = NULL;
1073 op++;
David Garcia Quintase224a762016-11-01 13:00:58 -07001074 /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
1075 * count goes to zero) to be unref'd in lb_on_server_status_received */
1076 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received");
David Garcia Quintas65318262016-07-29 13:43:38 -07001077 call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001078 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1079 &glb_policy->lb_on_server_status_received);
David Garcia Quintas65318262016-07-29 13:43:38 -07001080 GPR_ASSERT(GRPC_CALL_OK == call_error);
1081
1082 op = ops;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001083 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001084 op->data.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001085 op->flags = 0;
1086 op->reserved = NULL;
1087 op++;
David Garcia Quintase224a762016-11-01 13:00:58 -07001088 /* take another weak ref to be unref'd in lb_on_response_received */
1089 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received");
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001090 call_error = grpc_call_start_batch_and_execute(
1091 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1092 &glb_policy->lb_on_response_received);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001093 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001094}
1095
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001096static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
1097 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001098 glb_lb_policy *glb_policy = arg;
1099
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001100 grpc_op ops[2];
1101 memset(ops, 0, sizeof(ops));
1102 grpc_op *op = ops;
David Garcia Quintasf9532952016-11-08 14:14:25 -08001103 gpr_mu_lock(&glb_policy->mu);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001104 if (glb_policy->lb_response_payload != NULL) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001105 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
David Garcia Quintas41bef452016-07-28 19:19:58 -07001106 /* Received data from the LB server. Look inside
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001107 * glb_policy->lb_response_payload, for a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001108 grpc_byte_buffer_reader bbr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001109 grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
Craig Tillerd41a4a72016-10-26 16:16:06 -07001110 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001111 grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001112 grpc_grpclb_serverlist *serverlist =
1113 grpc_grpclb_response_parse_serverlist(response_slice);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001114 if (serverlist != NULL) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001115 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tillerd41a4a72016-10-26 16:16:06 -07001116 grpc_slice_unref(response_slice);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001117 if (grpc_lb_glb_trace) {
Jan Tattermusch2b398082016-10-07 14:40:30 +02001118 gpr_log(GPR_INFO, "Serverlist with %lu servers received",
1119 (unsigned long)serverlist->num_servers);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001120 for (size_t i = 0; i < serverlist->num_servers; ++i) {
1121 grpc_resolved_address addr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001122 parse_server(serverlist->servers[i], &addr);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001123 char *ipport;
1124 grpc_sockaddr_to_string(&ipport, &addr, false);
1125 gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
1126 gpr_free(ipport);
1127 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001128 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001129
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001130 /* update serverlist */
1131 if (serverlist->num_servers > 0) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001132 if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001133 if (grpc_lb_glb_trace) {
1134 gpr_log(GPR_INFO,
1135 "Incoming server list identical to current, ignoring.");
1136 }
1137 } else { /* new serverlist */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001138 if (glb_policy->serverlist != NULL) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001139 /* dispose of the old serverlist */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001140 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001141 }
David Garcia Quintasea11d162016-07-14 17:27:28 -07001142 /* and update the copy in the glb_lb_policy instance */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001143 glb_policy->serverlist = serverlist;
1144
1145 rr_handover_locked(exec_ctx, glb_policy, error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001146 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001147 } else {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001148 if (grpc_lb_glb_trace) {
1149 gpr_log(GPR_INFO,
1150 "Received empty server list. Picks will stay pending until a "
1151 "response with > 0 servers is received");
1152 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001153 }
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001154 } else { /* serverlist == NULL */
1155 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
Craig Tiller32df4672016-11-04 08:21:56 -07001156 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
1157 grpc_slice_unref(response_slice);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001158 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001159
David Garcia Quintas246c5642016-11-01 11:16:52 -07001160 if (!glb_policy->shutting_down) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001161 /* keep listening for serverlist updates */
1162 op->op = GRPC_OP_RECV_MESSAGE;
David Garcia Quintas246c5642016-11-01 11:16:52 -07001163 op->data.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001164 op->flags = 0;
1165 op->reserved = NULL;
1166 op++;
David Garcia Quintase224a762016-11-01 13:00:58 -07001167 /* reuse the "lb_on_response_received" weak ref taken in
1168 * query_for_backends_locked() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001169 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas246c5642016-11-01 11:16:52 -07001170 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1171 &glb_policy->lb_on_response_received); /* loop */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001172 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001173 }
David Garcia Quintasf9532952016-11-08 14:14:25 -08001174 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintase224a762016-11-01 13:00:58 -07001175 } else { /* empty payload: call cancelled. */
murgatroid992e012342016-11-10 18:24:08 -08001176 /* dispose of the "lb_on_response_received" weak ref taken in
1177 * query_for_backends_locked() and reused in every reception loop */
David Garcia Quintasd4d2ece2016-11-08 14:38:12 -08001178 gpr_mu_unlock(&glb_policy->mu);
1179 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1180 "lb_on_response_received_empty_payload");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001181 }
1182}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001183
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001184static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
1185 grpc_error *error) {
1186 glb_lb_policy *glb_policy = arg;
1187 gpr_mu_lock(&glb_policy->mu);
1188
1189 if (!glb_policy->shutting_down) {
1190 if (grpc_lb_glb_trace) {
1191 gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
1192 (void *)glb_policy);
1193 }
1194 GPR_ASSERT(glb_policy->lb_call == NULL);
1195 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001196 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001197 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001198 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1199 "grpclb_on_retry_timer");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001200}
1201
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001202static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg,
1203 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001204 glb_lb_policy *glb_policy = arg;
1205 gpr_mu_lock(&glb_policy->mu);
1206
1207 GPR_ASSERT(glb_policy->lb_call != NULL);
1208
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001209 if (grpc_lb_glb_trace) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001210 gpr_log(GPR_DEBUG,
1211 "Status from LB server received. Status = %d, Details = '%s', "
1212 "(call: %p)",
1213 glb_policy->lb_call_status, glb_policy->lb_call_status_details,
1214 (void *)glb_policy->lb_call);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001215 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001216
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001217 /* We need to performe cleanups no matter what. */
David Garcia Quintasaa24e9a2016-11-07 11:05:50 -08001218 lb_call_destroy_locked(glb_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001219
1220 if (!glb_policy->shutting_down) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001221 /* if we aren't shutting down, restart the LB client call after some time */
1222 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1223 gpr_timespec next_try =
1224 gpr_backoff_step(&glb_policy->lb_call_backoff_state, now);
1225 if (grpc_lb_glb_trace) {
1226 gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
1227 (void *)glb_policy);
1228 gpr_timespec timeout = gpr_time_sub(next_try, now);
1229 if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
1230 gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.",
1231 timeout.tv_sec, timeout.tv_nsec);
1232 } else {
1233 gpr_log(GPR_DEBUG, "... retrying immediately.");
1234 }
1235 }
1236 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
1237 grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
1238 lb_call_on_retry_timer, glb_policy, now);
1239 }
1240 gpr_mu_unlock(&glb_policy->mu);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001241 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1242 "lb_on_server_status_received");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001243}
1244
David Garcia Quintas8d489112016-07-29 15:20:42 -07001245/* Code wiring the policy with the rest of the core */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001246static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
1247 glb_destroy, glb_shutdown, glb_pick,
1248 glb_cancel_pick, glb_cancel_picks, glb_ping_one,
1249 glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
1250
1251static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
1252
1253static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
1254
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001255static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
1256 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
1257
1258static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
1259
1260grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
1261 return &glb_lb_policy_factory;
1262}
1263
1264/* Plugin registration */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001265void grpc_lb_policy_grpclb_init() {
1266 grpc_register_lb_policy(grpc_glb_lb_factory_create());
1267 grpc_register_tracer("glb", &grpc_lb_glb_trace);
1268}
1269
1270void grpc_lb_policy_grpclb_shutdown() {}