blob: 05a106f214d58d7376ab07ddc5efb629bec9814b [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2016 gRPC authors.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070016 *
17 */
18
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070019/** Implementation of the gRPC LB policy.
20 *
David Garcia Quintas43339842016-07-18 12:56:09 -070021 * This policy takes as input a set of resolved addresses {a1..an} for which the
22 * LB set was set (it's the resolver's responsibility to ensure this). That is
23 * to say, {a1..an} represent a collection of LB servers.
24 *
25 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
26 * This channel behaves just like a regular channel. In particular, the
27 * constructed URI over the addresses a1..an will use the default pick first
28 * policy to select from this list of LB server backends.
29 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070030 * The first time the policy gets a request for a pick, a ping, or to exit the
David Garcia Quintas98da61b2016-10-29 08:46:31 +020031 * idle state, \a query_for_backends_locked() is called. This function sets up
32 * and initiates the internal communication with the LB server. In particular,
33 * it's responsible for instantiating the internal *streaming* call to the LB
34 * server (whichever address from {a1..an} pick-first chose). This call is
David Garcia Quintas7ec29132016-11-01 04:09:05 +010035 * serviced by two callbacks, \a lb_on_server_status_received and \a
36 * lb_on_response_received. The former will be called when the call to the LB
37 * server completes. This can happen if the LB server closes the connection or
38 * if this policy itself cancels the call (for example because it's shutting
David Garcia Quintas246c5642016-11-01 11:16:52 -070039 * down). If the internal call times out, the usual behavior of pick-first
David Garcia Quintas7ec29132016-11-01 04:09:05 +010040 * applies, continuing to pick from the list {a1..an}.
David Garcia Quintas43339842016-07-18 12:56:09 -070041 *
David Garcia Quintas98da61b2016-10-29 08:46:31 +020042 * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a
43 * res_recv. An invalid one results in the termination of the streaming call. A
44 * new streaming call should be created if possible, failing the original call
45 * otherwise. For a valid \a LoadBalancingResponse, the server list of actual
46 * backends is extracted. A Round Robin policy will be created from this list.
47 * There are two possible scenarios:
David Garcia Quintas43339842016-07-18 12:56:09 -070048 *
49 * 1. This is the first server list received. There was no previous instance of
David Garcia Quintas90712d52016-10-13 19:33:04 -070050 * the Round Robin policy. \a rr_handover_locked() will instantiate the RR
51 * policy and perform all the pending operations over it.
David Garcia Quintas43339842016-07-18 12:56:09 -070052 * 2. There's already a RR policy instance active. We need to introduce the new
53 * one build from the new serverlist, but taking care not to disrupt the
54 * operations in progress over the old RR instance. This is done by
55 * decreasing the reference count on the old policy. The moment no more
56 * references are held on the old RR policy, it'll be destroyed and \a
David Garcia Quintas348cfdb2016-08-19 12:19:43 -070057 * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
58 * state. At this point we can transition to a new RR instance safely, which
David Garcia Quintas90712d52016-10-13 19:33:04 -070059 * is done once again via \a rr_handover_locked().
David Garcia Quintas43339842016-07-18 12:56:09 -070060 *
61 *
62 * Once a RR policy instance is in place (and getting updated as described),
63 * calls to for a pick, a ping or a cancellation will be serviced right away by
64 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintas7ec29132016-11-01 04:09:05 +010065 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is
66 * received, etc), pick/ping requests are added to a list of pending picks/pings
67 * to be flushed and serviced as part of \a rr_handover_locked() the moment the
68 * RR policy instance becomes available.
David Garcia Quintas43339842016-07-18 12:56:09 -070069 *
70 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
71 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070072
73/* TODO(dgq):
74 * - Implement LB service forwarding (point 2c. in the doc's diagram).
75 */
76
murgatroid99085f9af2016-10-24 09:55:44 -070077/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
78 using that endpoint. Because of various transitive includes in uv.h,
79 including windows.h on Windows, uv.h must be included before other system
80 headers. Therefore, sockaddr.h must always be included first */
murgatroid997871f732016-09-23 13:49:05 -070081#include "src/core/lib/iomgr/sockaddr.h"
82
Mark D. Roth64d922a2017-05-03 12:52:04 -070083#include <limits.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070084#include <string.h>
85
86#include <grpc/byte_buffer_reader.h>
87#include <grpc/grpc.h>
88#include <grpc/support/alloc.h>
89#include <grpc/support/host_port.h>
90#include <grpc/support/string_util.h>
David Garcia Quintas69099222016-10-03 11:28:37 -070091#include <grpc/support/time.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070092
Craig Tiller9eb0fde2017-03-31 16:59:30 -070093#include "src/core/ext/filters/client_channel/client_channel.h"
94#include "src/core/ext/filters/client_channel/client_channel_factory.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070095#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070096#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
97#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070098#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070099#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
Craig Tillerd52e22f2017-04-02 16:22:52 -0700100#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
101#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
102#include "src/core/ext/filters/client_channel/parse_address.h"
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700103#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
Juanli Shen6502ecc2017-09-13 13:10:54 -0700104#include "src/core/ext/filters/client_channel/subchannel_index.h"
Mark D. Roth046cf762016-09-26 11:13:51 -0700105#include "src/core/lib/channel/channel_args.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -0700106#include "src/core/lib/channel/channel_stack.h"
Craig Tiller2400bf52017-02-09 16:25:19 -0800107#include "src/core/lib/iomgr/combiner.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200108#include "src/core/lib/iomgr/sockaddr.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700109#include "src/core/lib/iomgr/sockaddr_utils.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200110#include "src/core/lib/iomgr/timer.h"
David Garcia Quintas01291502017-02-07 13:26:41 -0800111#include "src/core/lib/slice/slice_hash_table.h"
Craig Tiller18b4ba32016-11-09 15:23:42 -0800112#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -0700113#include "src/core/lib/slice/slice_string_helpers.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200114#include "src/core/lib/support/backoff.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700115#include "src/core/lib/support/string.h"
116#include "src/core/lib/surface/call.h"
117#include "src/core/lib/surface/channel.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -0700118#include "src/core/lib/surface/channel_init.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700119#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700120
David Garcia Quintas1edfb952016-11-22 17:15:34 -0800121#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
122#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
123#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
124#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
125#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200126
ncteisen06bce6e2017-07-10 07:58:49 -0700127grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700128
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700129/* add lb_token of selected subchannel (address) to the call's initial
130 * metadata */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800131static grpc_error *initial_metadata_add_lb_token(
132 grpc_exec_ctx *exec_ctx, grpc_metadata_batch *initial_metadata,
133 grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem lb_token) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700134 GPR_ASSERT(lb_token_mdelem_storage != NULL);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800135 GPR_ASSERT(!GRPC_MDISNULL(lb_token));
136 return grpc_metadata_batch_add_tail(exec_ctx, initial_metadata,
137 lb_token_mdelem_storage, lb_token);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700138}
139
Mark D. Roth09e458c2017-05-02 08:13:26 -0700140static void destroy_client_stats(void *arg) {
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700141 grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats *)arg);
Mark D. Roth09e458c2017-05-02 08:13:26 -0700142}
143
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700144typedef struct wrapped_rr_closure_arg {
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700145 /* the closure instance using this struct as argument */
146 grpc_closure wrapper_closure;
147
David Garcia Quintas43339842016-07-18 12:56:09 -0700148 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
149 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700150 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700151
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700152 /* the pick's initial metadata, kept in order to append the LB token for the
153 * pick */
154 grpc_metadata_batch *initial_metadata;
155
156 /* the picked target, used to determine which LB token to add to the pick's
157 * initial metadata */
158 grpc_connected_subchannel **target;
159
Mark D. Roth09e458c2017-05-02 08:13:26 -0700160 /* the context to be populated for the subchannel call */
161 grpc_call_context_element *context;
162
163 /* Stats for client-side load reporting. Note that this holds a
164 * reference, which must be either passed on via context or unreffed. */
165 grpc_grpclb_client_stats *client_stats;
166
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700167 /* the LB token associated with the pick */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800168 grpc_mdelem lb_token;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700169
170 /* storage for the lb token initial metadata mdelem */
171 grpc_linked_mdelem *lb_token_mdelem_storage;
172
David Garcia Quintas43339842016-07-18 12:56:09 -0700173 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700174 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700175
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700176 /* heap memory to be freed upon closure execution. */
177 void *free_when_done;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700178} wrapped_rr_closure_arg;
179
180/* The \a on_complete closure passed as part of the pick requires keeping a
181 * reference to its associated round robin instance. We wrap this closure in
182 * order to unref the round robin instance upon its invocation */
183static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700184 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700185 wrapped_rr_closure_arg *wc_arg = (wrapped_rr_closure_arg *)arg;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700186
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200187 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
ncteisen969b46e2017-06-08 14:57:11 -0700188 GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200189
190 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas6493a732016-11-22 10:25:52 -0800191 /* if *target is NULL, no pick has been made by the RR policy (eg, all
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700192 * addresses failed to connect). There won't be any user_data/token
193 * available */
David Garcia Quintas6493a732016-11-22 10:25:52 -0800194 if (*wc_arg->target != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800195 if (!GRPC_MDISNULL(wc_arg->lb_token)) {
196 initial_metadata_add_lb_token(exec_ctx, wc_arg->initial_metadata,
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800197 wc_arg->lb_token_mdelem_storage,
198 GRPC_MDELEM_REF(wc_arg->lb_token));
199 } else {
200 gpr_log(GPR_ERROR,
201 "No LB token for connected subchannel pick %p (from RR "
202 "instance %p).",
203 (void *)*wc_arg->target, (void *)wc_arg->rr_policy);
204 abort();
205 }
Mark D. Roth09e458c2017-05-02 08:13:26 -0700206 // Pass on client stats via context. Passes ownership of the reference.
207 GPR_ASSERT(wc_arg->client_stats != NULL);
208 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
209 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
210 } else {
211 grpc_grpclb_client_stats_unref(wc_arg->client_stats);
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700212 }
Craig Tiller84f75d42017-05-03 13:06:35 -0700213 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800214 gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200215 }
216 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700217 }
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700218 GPR_ASSERT(wc_arg->free_when_done != NULL);
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700219 gpr_free(wc_arg->free_when_done);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700220}
221
David Garcia Quintasea11d162016-07-14 17:27:28 -0700222/* Linked list of pending pick requests. It stores all information needed to
223 * eventually call (Round Robin's) pick() on them. They mainly stay pending
224 * waiting for the RR policy to be created/updated.
225 *
226 * One particularity is the wrapping of the user-provided \a on_complete closure
227 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
228 * order to correctly unref the RR policy instance upon completion of the pick.
229 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700230typedef struct pending_pick {
231 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700232
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700233 /* original pick()'s arguments */
234 grpc_lb_policy_pick_args pick_args;
David Garcia Quintas43339842016-07-18 12:56:09 -0700235
236 /* output argument where to store the pick()ed connected subchannel, or NULL
237 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700238 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700239
David Garcia Quintas43339842016-07-18 12:56:09 -0700240 /* args for wrapped_on_complete */
241 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700242} pending_pick;
243
David Garcia Quintas8aace512016-08-15 14:55:12 -0700244static void add_pending_pick(pending_pick **root,
245 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas65318262016-07-29 13:43:38 -0700246 grpc_connected_subchannel **target,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700247 grpc_call_context_element *context,
David Garcia Quintas65318262016-07-29 13:43:38 -0700248 grpc_closure *on_complete) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700249 pending_pick *pp = (pending_pick *)gpr_zalloc(sizeof(*pp));
David Garcia Quintas65318262016-07-29 13:43:38 -0700250 pp->next = *root;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700251 pp->pick_args = *pick_args;
David Garcia Quintas65318262016-07-29 13:43:38 -0700252 pp->target = target;
David Garcia Quintas65318262016-07-29 13:43:38 -0700253 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700254 pp->wrapped_on_complete_arg.target = target;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700255 pp->wrapped_on_complete_arg.context = context;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700256 pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
257 pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
258 pick_args->lb_token_mdelem_storage;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700259 pp->wrapped_on_complete_arg.free_when_done = pp;
ncteisen969b46e2017-06-08 14:57:11 -0700260 GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -0800261 wrapped_rr_closure, &pp->wrapped_on_complete_arg,
262 grpc_schedule_on_exec_ctx);
David Garcia Quintas65318262016-07-29 13:43:38 -0700263 *root = pp;
264}
265
David Garcia Quintasea11d162016-07-14 17:27:28 -0700266/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700267typedef struct pending_ping {
268 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700269
David Garcia Quintas43339842016-07-18 12:56:09 -0700270 /* args for wrapped_notify */
271 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700272} pending_ping;
273
David Garcia Quintas65318262016-07-29 13:43:38 -0700274static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700275 pending_ping *pping = (pending_ping *)gpr_zalloc(sizeof(*pping));
David Garcia Quintas65318262016-07-29 13:43:38 -0700276 pping->wrapped_notify_arg.wrapped_closure = notify;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700277 pping->wrapped_notify_arg.free_when_done = pping;
David Garcia Quintas65318262016-07-29 13:43:38 -0700278 pping->next = *root;
ncteisen969b46e2017-06-08 14:57:11 -0700279 GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -0800280 wrapped_rr_closure, &pping->wrapped_notify_arg,
281 grpc_schedule_on_exec_ctx);
David Garcia Quintas65318262016-07-29 13:43:38 -0700282 *root = pping;
283}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700284
David Garcia Quintas8d489112016-07-29 15:20:42 -0700285/*
286 * glb_lb_policy
287 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700288typedef struct rr_connectivity_data rr_connectivity_data;
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700289
David Garcia Quintas65318262016-07-29 13:43:38 -0700290typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700291 /** base policy: must be first */
292 grpc_lb_policy base;
293
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700294 /** who the client is trying to communicate with */
Mark D. Rothd1604af2016-09-22 11:20:27 -0700295 const char *server_name;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700296 grpc_client_channel_factory *cc_factory;
Mark D. Roth046cf762016-09-26 11:13:51 -0700297 grpc_channel_args *args;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700298
Mark D. Roth64d922a2017-05-03 12:52:04 -0700299 /** timeout in milliseconds for the LB call. 0 means no deadline. */
300 int lb_call_timeout_ms;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700301
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700302 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700303 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700304
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700305 /** response generator to inject address updates into \a lb_channel */
306 grpc_fake_resolver_response_generator *response_generator;
307
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700308 /** the RR policy to use of the backend servers returned by the LB server */
309 grpc_lb_policy *rr_policy;
310
311 bool started_picking;
312
313 /** our connectivity state tracker */
314 grpc_connectivity_state_tracker state_tracker;
315
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700316 /** connectivity state of the LB channel */
317 grpc_connectivity_state lb_channel_connectivity;
318
David Garcia Quintasea11d162016-07-14 17:27:28 -0700319 /** stores the deserialized response from the LB. May be NULL until one such
320 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700321 grpc_grpclb_serverlist *serverlist;
322
Mark D. Rothd7389b42017-05-17 12:22:17 -0700323 /** Index into serverlist for next pick.
324 * If the server at this index is a drop, we return a drop.
325 * Otherwise, we delegate to the RR policy. */
326 size_t serverlist_index;
327
David Garcia Quintasea11d162016-07-14 17:27:28 -0700328 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700329 pending_pick *pending_picks;
330
David Garcia Quintasea11d162016-07-14 17:27:28 -0700331 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700332 pending_ping *pending_pings;
333
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200334 bool shutting_down;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700335
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700336 /** are we currently updating lb_call? */
337 bool updating_lb_call;
338
339 /** are we currently updating lb_channel? */
340 bool updating_lb_channel;
341
342 /** are we already watching the LB channel's connectivity? */
343 bool watching_lb_channel;
344
345 /** is \a lb_call_retry_timer active? */
346 bool retry_timer_active;
347
348 /** called upon changes to the LB channel's connectivity. */
349 grpc_closure lb_channel_on_connectivity_changed;
350
351 /** args from the latest update received while already updating, or NULL */
352 grpc_lb_policy_args *pending_update_args;
353
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200354 /************************************************************/
355 /* client data associated with the LB server communication */
356 /************************************************************/
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100357 /* Status from the LB server has been received. This signals the end of the LB
358 * call. */
359 grpc_closure lb_on_server_status_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200360
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100361 /* A response from the LB server has been received. Process it */
362 grpc_closure lb_on_response_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200363
Masood Malekghassemib5b43722017-01-05 15:07:26 -0800364 /* LB call retry timer callback. */
365 grpc_closure lb_on_call_retry;
366
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200367 grpc_call *lb_call; /* streaming call to the LB server, */
368
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100369 grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
370 grpc_metadata_array
371 lb_trailing_metadata_recv; /* trailing MD from LB server */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200372
373 /* what's being sent to the LB server. Note that its value may vary if the LB
374 * server indicates a redirect. */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100375 grpc_byte_buffer *lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200376
David Garcia Quintas246c5642016-11-01 11:16:52 -0700377 /* response the LB server, if any. Processed in lb_on_response_received() */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100378 grpc_byte_buffer *lb_response_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200379
David Garcia Quintas246c5642016-11-01 11:16:52 -0700380 /* call status code and details, set in lb_on_server_status_received() */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200381 grpc_status_code lb_call_status;
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800382 grpc_slice lb_call_status_details;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200383
384 /** LB call retry backoff state */
385 gpr_backoff lb_call_backoff_state;
386
387 /** LB call retry timer */
388 grpc_timer lb_call_retry_timer;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700389
Mark D. Roth09e458c2017-05-02 08:13:26 -0700390 bool seen_initial_response;
391
392 /* Stats for client-side load reporting. Should be unreffed and
393 * recreated whenever lb_call is replaced. */
394 grpc_grpclb_client_stats *client_stats;
395 /* Interval and timer for next client load report. */
396 gpr_timespec client_stats_report_interval;
397 grpc_timer client_load_report_timer;
398 bool client_load_report_timer_pending;
399 bool last_client_load_report_counters_were_zero;
400 /* Closure used for either the load report timer or the callback for
401 * completion of sending the load report. */
402 grpc_closure client_load_report_closure;
403 /* Client load report message payload. */
404 grpc_byte_buffer *client_load_report_payload;
David Garcia Quintas65318262016-07-29 13:43:38 -0700405} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700406
David Garcia Quintas65318262016-07-29 13:43:38 -0700407/* Keeps track and reacts to changes in connectivity of the RR instance */
408struct rr_connectivity_data {
409 grpc_closure on_change;
410 grpc_connectivity_state state;
411 glb_lb_policy *glb_policy;
412};
David Garcia Quintas8d489112016-07-29 15:20:42 -0700413
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700414static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
415 bool log) {
Mark D. Rothe7751802017-07-27 12:31:45 -0700416 if (server->drop) return false;
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700417 const grpc_grpclb_ip_address *ip = &server->ip_address;
418 if (server->port >> 16 != 0) {
419 if (log) {
420 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200421 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
422 server->port, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700423 }
424 return false;
425 }
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700426 if (ip->size != 4 && ip->size != 16) {
427 if (log) {
428 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200429 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700430 "serverlist. Ignoring",
Jan Tattermusch2b398082016-10-07 14:40:30 +0200431 ip->size, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700432 }
433 return false;
434 }
435 return true;
436}
437
Mark D. Roth16883a32016-10-21 10:30:58 -0700438/* vtable for LB tokens in grpc_lb_addresses. */
Mark D. Roth557c9902016-10-24 11:12:05 -0700439static void *lb_token_copy(void *token) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800440 return token == NULL
441 ? NULL
442 : (void *)GRPC_MDELEM_REF((grpc_mdelem){(uintptr_t)token}).payload;
Mark D. Roth16883a32016-10-21 10:30:58 -0700443}
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800444static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800445 if (token != NULL) {
446 GRPC_MDELEM_UNREF(exec_ctx, (grpc_mdelem){(uintptr_t)token});
447 }
Mark D. Roth16883a32016-10-21 10:30:58 -0700448}
Mark D. Roth557c9902016-10-24 11:12:05 -0700449static int lb_token_cmp(void *token1, void *token2) {
Mark D. Roth16883a32016-10-21 10:30:58 -0700450 if (token1 > token2) return 1;
451 if (token1 < token2) return -1;
452 return 0;
453}
454static const grpc_lb_user_data_vtable lb_token_vtable = {
455 lb_token_copy, lb_token_destroy, lb_token_cmp};
456
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100457static void parse_server(const grpc_grpclb_server *server,
458 grpc_resolved_address *addr) {
Mark D. Rothd7389b42017-05-17 12:22:17 -0700459 memset(addr, 0, sizeof(*addr));
Mark D. Rothe7751802017-07-27 12:31:45 -0700460 if (server->drop) return;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100461 const uint16_t netorder_port = htons((uint16_t)server->port);
462 /* the addresses are given in binary format (a in(6)_addr struct) in
463 * server->ip_address.bytes. */
464 const grpc_grpclb_ip_address *ip = &server->ip_address;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100465 if (ip->size == 4) {
466 addr->len = sizeof(struct sockaddr_in);
467 struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr;
468 addr4->sin_family = AF_INET;
469 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
470 addr4->sin_port = netorder_port;
471 } else if (ip->size == 16) {
472 addr->len = sizeof(struct sockaddr_in6);
473 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr;
David Garcia Quintas107ca162016-11-02 18:17:03 -0700474 addr6->sin6_family = AF_INET6;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100475 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
476 addr6->sin6_port = netorder_port;
477 }
478}
479
Mark D. Roth7ce14d22016-09-16 13:03:46 -0700480/* Returns addresses extracted from \a serverlist. */
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800481static grpc_lb_addresses *process_serverlist_locked(
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800482 grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700483 size_t num_valid = 0;
484 /* first pass: count how many are valid in order to allocate the necessary
485 * memory in a single block */
486 for (size_t i = 0; i < serverlist->num_servers; ++i) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700487 if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700488 }
Mark D. Roth16883a32016-10-21 10:30:58 -0700489 grpc_lb_addresses *lb_addresses =
490 grpc_lb_addresses_create(num_valid, &lb_token_vtable);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700491 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700492 * to the outside world) to be read by the RR policy during its creation.
493 * Given that the validity tests are very cheap, they are performed again
494 * instead of marking the valid ones during the first pass, as this would
495 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700496 size_t addr_idx = 0;
497 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700498 const grpc_grpclb_server *server = serverlist->servers[sl_idx];
499 if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700500 GPR_ASSERT(addr_idx < num_valid);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700501 /* address processing */
Mark D. Rothc5c38782016-09-16 08:51:01 -0700502 grpc_resolved_address addr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100503 parse_server(server, &addr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700504 /* lb token processing */
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700505 void *user_data;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700506 if (server->has_load_balance_token) {
David Garcia Quintas0baf1dc2016-10-28 04:44:01 +0200507 const size_t lb_token_max_length =
508 GPR_ARRAY_SIZE(server->load_balance_token);
509 const size_t lb_token_length =
510 strnlen(server->load_balance_token, lb_token_max_length);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800511 grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
512 server->load_balance_token, lb_token_length);
513 user_data = (void *)grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_LB_TOKEN,
514 lb_token_mdstr)
515 .payload;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700516 } else {
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800517 char *uri = grpc_sockaddr_to_uri(&addr);
518 gpr_log(GPR_INFO,
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700519 "Missing LB token for backend address '%s'. The empty token will "
520 "be used instead",
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800521 uri);
522 gpr_free(uri);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800523 user_data = (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700524 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700525
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700526 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
527 false /* is_balancer */,
Mark D. Rothc5c38782016-09-16 08:51:01 -0700528 NULL /* balancer_name */, user_data);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700529 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700530 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700531 GPR_ASSERT(addr_idx == num_valid);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700532 return lb_addresses;
533}
534
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700535static void update_lb_connectivity_status_locked(
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800536 grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700537 grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800538 const grpc_connectivity_state curr_glb_state =
539 grpc_connectivity_state_check(&glb_policy->state_tracker);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800540
541 /* The new connectivity status is a function of the previous one and the new
542 * input coming from the status of the RR policy.
543 *
David Garcia Quintas4283a262016-11-18 10:43:56 -0800544 * current state (grpclb's)
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800545 * |
546 * v || I | C | R | TF | SD | <- new state (RR's)
547 * ===++====+=====+=====+======+======+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800548 * I || I | C | R | [I] | [I] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800549 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800550 * C || I | C | R | [C] | [C] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800551 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800552 * R || I | C | R | [R] | [R] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800553 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800554 * TF || I | C | R | [TF] | [TF] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800555 * ---++----+-----+-----+------+------+
556 * SD || NA | NA | NA | NA | NA | (*)
557 * ---++----+-----+-----+------+------+
558 *
David Garcia Quintas4283a262016-11-18 10:43:56 -0800559 * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
560 * is the current state of grpclb, which is left untouched.
561 *
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800562 * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
563 * the previous RR instance.
564 *
565 * Note that the status is never updated to SHUTDOWN as a result of calling
566 * this function. Only glb_shutdown() has the power to set that state.
567 *
568 * (*) This function mustn't be called during shutting down. */
569 GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
570
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700571 switch (rr_state) {
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800572 case GRPC_CHANNEL_TRANSIENT_FAILURE:
573 case GRPC_CHANNEL_SHUTDOWN:
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700574 GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
575 break;
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800576 case GRPC_CHANNEL_INIT:
577 case GRPC_CHANNEL_IDLE:
578 case GRPC_CHANNEL_CONNECTING:
579 case GRPC_CHANNEL_READY:
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700580 GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800581 }
582
Craig Tiller84f75d42017-05-03 13:06:35 -0700583 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700584 gpr_log(
585 GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.",
586 grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800587 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700588 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state,
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700589 rr_state_error,
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800590 "update_lb_connectivity_status_locked");
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800591}
592
Mark D. Rothd7389b42017-05-17 12:22:17 -0700593/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
594 * immediately (ignoring its completion callback), we need to perform the
595 * cleanups this callback would otherwise be resposible for.
596 * If \a force_async is true, then we will manually schedule the
597 * completion callback even if the pick is available immediately. */
David Garcia Quintas20359062016-10-15 15:22:51 -0700598static bool pick_from_internal_rr_locked(
Mark D. Rothd7389b42017-05-17 12:22:17 -0700599 grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
600 const grpc_lb_policy_pick_args *pick_args, bool force_async,
David Garcia Quintas20359062016-10-15 15:22:51 -0700601 grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
Ken Payson01e83b52017-09-13 19:45:07 -0700602 // Look at the index into the serverlist to see if we should drop this call.
603 grpc_grpclb_server *server =
604 glb_policy->serverlist->servers[glb_policy->serverlist_index++];
605 if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
606 glb_policy->serverlist_index = 0; // Wrap-around.
607 }
608 if (server->drop) {
609 // Not using the RR policy, so unref it.
610 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
611 gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
612 (intptr_t)wc_arg->rr_policy);
Mark D. Rothd7389b42017-05-17 12:22:17 -0700613 }
Ken Payson01e83b52017-09-13 19:45:07 -0700614 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
615 // Update client load reporting stats to indicate the number of
616 // dropped calls. Note that we have to do this here instead of in
617 // the client_load_reporting filter, because we do not create a
618 // subchannel call (and therefore no client_load_reporting filter)
619 // for dropped calls.
620 grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
621 wc_arg->client_stats);
622 grpc_grpclb_client_stats_unref(wc_arg->client_stats);
623 if (force_async) {
624 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
625 GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
Mark D. Rothd7389b42017-05-17 12:22:17 -0700626 gpr_free(wc_arg->free_when_done);
Ken Payson01e83b52017-09-13 19:45:07 -0700627 return false;
Mark D. Rothd7389b42017-05-17 12:22:17 -0700628 }
Ken Payson01e83b52017-09-13 19:45:07 -0700629 gpr_free(wc_arg->free_when_done);
630 return true;
Mark D. Rothd7389b42017-05-17 12:22:17 -0700631 }
632 // Pick via the RR policy.
Craig Tiller2400bf52017-02-09 16:25:19 -0800633 const bool pick_done = grpc_lb_policy_pick_locked(
Mark D. Rothd7389b42017-05-17 12:22:17 -0700634 exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700635 (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
David Garcia Quintas20359062016-10-15 15:22:51 -0700636 if (pick_done) {
637 /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
Craig Tiller84f75d42017-05-03 13:06:35 -0700638 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas20359062016-10-15 15:22:51 -0700639 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
640 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700641 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200642 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
David Garcia Quintas20359062016-10-15 15:22:51 -0700643 /* add the load reporting initial metadata */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800644 initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata,
David Garcia Quintas20359062016-10-15 15:22:51 -0700645 pick_args->lb_token_mdelem_storage,
646 GRPC_MDELEM_REF(wc_arg->lb_token));
Mark D. Roth09e458c2017-05-02 08:13:26 -0700647 // Pass on client stats via context. Passes ownership of the reference.
648 GPR_ASSERT(wc_arg->client_stats != NULL);
649 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
650 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
Mark D. Rothd7389b42017-05-17 12:22:17 -0700651 if (force_async) {
652 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
ncteisen969b46e2017-06-08 14:57:11 -0700653 GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
Mark D. Rothd7389b42017-05-17 12:22:17 -0700654 gpr_free(wc_arg->free_when_done);
655 return false;
656 }
Mark D. Roth09e458c2017-05-02 08:13:26 -0700657 gpr_free(wc_arg->free_when_done);
David Garcia Quintas20359062016-10-15 15:22:51 -0700658 }
659 /* else, the pending pick will be registered and taken care of by the
660 * pending pick list inside the RR policy (glb_policy->rr_policy).
661 * Eventually, wrapped_on_complete will be called, which will -among other
662 * things- add the LB token to the call's initial metadata */
David Garcia Quintas20359062016-10-15 15:22:51 -0700663 return pick_done;
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700664}
665
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700666static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
667 glb_lb_policy *glb_policy) {
Ken Payson01e83b52017-09-13 19:45:07 -0700668 grpc_lb_addresses *addresses =
669 process_serverlist_locked(exec_ctx, glb_policy->serverlist);
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700670 GPR_ASSERT(addresses != NULL);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700671 grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args));
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700672 args->client_channel_factory = glb_policy->cc_factory;
673 args->combiner = glb_policy->base.combiner;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700674 // Replace the LB addresses in the channel args that we pass down to
675 // the subchannel.
Mark D. Roth557c9902016-10-24 11:12:05 -0700676 static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200677 const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700678 args->args = grpc_channel_args_copy_and_add_and_remove(
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700679 glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
680 1);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800681 grpc_lb_addresses_destroy(exec_ctx, addresses);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700682 return args;
683}
684
685static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx,
686 grpc_lb_policy_args *args) {
687 grpc_channel_args_destroy(exec_ctx, args->args);
688 gpr_free(args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700689}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700690
Craig Tiller2400bf52017-02-09 16:25:19 -0800691static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
692 void *arg, grpc_error *error);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700693static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
694 grpc_lb_policy_args *args) {
695 GPR_ASSERT(glb_policy->rr_policy == NULL);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800696
David Garcia Quintas4283a262016-11-18 10:43:56 -0800697 grpc_lb_policy *new_rr_policy =
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700698 grpc_lb_policy_create(exec_ctx, "round_robin", args);
David Garcia Quintas4283a262016-11-18 10:43:56 -0800699 if (new_rr_policy == NULL) {
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800700 gpr_log(GPR_ERROR,
701 "Failure creating a RoundRobin policy for serverlist update with "
702 "%lu entries. The previous RR instance (%p), if any, will continue "
703 "to be used. Future updates from the LB will attempt to create new "
704 "instances.",
705 (unsigned long)glb_policy->serverlist->num_servers,
David Garcia Quintas4283a262016-11-18 10:43:56 -0800706 (void *)glb_policy->rr_policy);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800707 return;
David Garcia Quintas65318262016-07-29 13:43:38 -0700708 }
David Garcia Quintas4283a262016-11-18 10:43:56 -0800709 glb_policy->rr_policy = new_rr_policy;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700710 grpc_error *rr_state_error = NULL;
711 const grpc_connectivity_state rr_state =
712 grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
713 &rr_state_error);
714 /* Connectivity state is a function of the RR policy updated/created */
715 update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state,
716 rr_state_error);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800717 /* Add the gRPC LB's interested_parties pollset_set to that of the newly
718 * created RR policy. This will make the RR policy progress upon activity on
719 * gRPC LB, which in turn is tied to the application's call */
Yuchen Zengb4291642016-09-01 19:17:14 -0700720 grpc_pollset_set_add_pollset_set(exec_ctx,
721 glb_policy->rr_policy->interested_parties,
722 glb_policy->base.interested_parties);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200723
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800724 /* Allocate the data for the tracking of the new RR policy's connectivity.
725 * It'll be deallocated in glb_rr_connectivity_changed() */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200726 rr_connectivity_data *rr_connectivity =
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700727 (rr_connectivity_data *)gpr_zalloc(sizeof(rr_connectivity_data));
ncteisen969b46e2017-06-08 14:57:11 -0700728 GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
Craig Tiller2400bf52017-02-09 16:25:19 -0800729 glb_rr_connectivity_changed_locked, rr_connectivity,
Craig Tilleree4b1452017-05-12 10:56:03 -0700730 grpc_combiner_scheduler(glb_policy->base.combiner));
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200731 rr_connectivity->glb_policy = glb_policy;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700732 rr_connectivity->state = rr_state;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200733
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800734 /* Subscribe to changes to the connectivity of the new RR */
David Garcia Quintasfc950fb2017-07-27 19:41:12 -0700735 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
Craig Tiller2400bf52017-02-09 16:25:19 -0800736 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
737 &rr_connectivity->state,
738 &rr_connectivity->on_change);
739 grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700740
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800741 /* Update picks and pings in wait */
David Garcia Quintas65318262016-07-29 13:43:38 -0700742 pending_pick *pp;
743 while ((pp = glb_policy->pending_picks)) {
744 glb_policy->pending_picks = pp->next;
745 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
746 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700747 pp->wrapped_on_complete_arg.client_stats =
748 grpc_grpclb_client_stats_ref(glb_policy->client_stats);
Craig Tiller84f75d42017-05-03 13:06:35 -0700749 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700750 gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p",
751 (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700752 }
Mark D. Rothd7389b42017-05-17 12:22:17 -0700753 pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args,
754 true /* force_async */, pp->target,
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700755 &pp->wrapped_on_complete_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700756 }
757
758 pending_ping *pping;
759 while ((pping = glb_policy->pending_pings)) {
760 glb_policy->pending_pings = pping->next;
761 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
762 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
Craig Tiller84f75d42017-05-03 13:06:35 -0700763 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700764 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
765 (intptr_t)glb_policy->rr_policy);
766 }
Craig Tiller2400bf52017-02-09 16:25:19 -0800767 grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
768 &pping->wrapped_notify_arg.wrapper_closure);
David Garcia Quintas65318262016-07-29 13:43:38 -0700769 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700770}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700771
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700772/* glb_policy->rr_policy may be NULL (initial handover) */
773static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
774 glb_lb_policy *glb_policy) {
Ken Payson01e83b52017-09-13 19:45:07 -0700775 GPR_ASSERT(glb_policy->serverlist != NULL &&
776 glb_policy->serverlist->num_servers > 0);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700777 if (glb_policy->shutting_down) return;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700778 grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700779 GPR_ASSERT(args != NULL);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700780 if (glb_policy->rr_policy != NULL) {
781 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
782 gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)",
783 (void *)glb_policy->rr_policy);
784 }
785 grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
786 } else {
787 create_rr_locked(exec_ctx, glb_policy, args);
788 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
789 gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)",
790 (void *)glb_policy->rr_policy);
791 }
792 }
793 lb_policy_args_destroy(exec_ctx, args);
794}
795
Craig Tiller2400bf52017-02-09 16:25:19 -0800796static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
797 void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700798 rr_connectivity_data *rr_connectivity = (rr_connectivity_data *)arg;
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800799 glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
David Garcia Quintasfc950fb2017-07-27 19:41:12 -0700800 if (glb_policy->shutting_down) {
David Garcia Quintas4283a262016-11-18 10:43:56 -0800801 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintasfc950fb2017-07-27 19:41:12 -0700802 "glb_rr_connectivity_cb");
803 gpr_free(rr_connectivity);
804 return;
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800805 }
David Garcia Quintasfc950fb2017-07-27 19:41:12 -0700806 if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) {
807 /* An RR policy that has transitioned into the SHUTDOWN connectivity state
808 * should not be considered for picks or updates: the SHUTDOWN state is a
809 * sink, policies can't transition back from it. .*/
810 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy,
811 "rr_connectivity_shutdown");
812 glb_policy->rr_policy = NULL;
813 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
814 "glb_rr_connectivity_cb");
815 gpr_free(rr_connectivity);
816 return;
817 }
818 /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
819 update_lb_connectivity_status_locked(
820 exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
821 /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
822 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
823 &rr_connectivity->state,
824 &rr_connectivity->on_change);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700825}
826
David Garcia Quintas01291502017-02-07 13:26:41 -0800827static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
828 void *balancer_name) {
829 gpr_free(balancer_name);
830}
831
David Garcia Quintas01291502017-02-07 13:26:41 -0800832static grpc_slice_hash_table_entry targets_info_entry_create(
833 const char *address, const char *balancer_name) {
David Garcia Quintas01291502017-02-07 13:26:41 -0800834 grpc_slice_hash_table_entry entry;
835 entry.key = grpc_slice_from_copied_string(address);
Mark D. Rothe3006702017-04-19 07:43:56 -0700836 entry.value = gpr_strdup(balancer_name);
David Garcia Quintas01291502017-02-07 13:26:41 -0800837 return entry;
838}
839
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700840static int balancer_name_cmp_fn(void *a, void *b) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700841 const char *a_str = (const char *)a;
842 const char *b_str = (const char *)b;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700843 return strcmp(a_str, b_str);
844}
845
846/* Returns the channel args for the LB channel, used to create a bidirectional
847 * stream for the reception of load balancing updates.
David Garcia Quintas01291502017-02-07 13:26:41 -0800848 *
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700849 * Inputs:
850 * - \a addresses: corresponding to the balancers.
851 * - \a response_generator: in order to propagate updates from the resolver
852 * above the grpclb policy.
853 * - \a args: other args inherited from the grpclb policy. */
854static grpc_channel_args *build_lb_channel_args(
855 grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses,
856 grpc_fake_resolver_response_generator *response_generator,
857 const grpc_channel_args *args) {
David Garcia Quintas01291502017-02-07 13:26:41 -0800858 size_t num_grpclb_addrs = 0;
859 for (size_t i = 0; i < addresses->num_addresses; ++i) {
860 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
861 }
862 /* All input addresses come from a resolver that claims they are LB services.
863 * It's the resolver's responsibility to make sure this policy is only
864 * instantiated and used in that case. Otherwise, something has gone wrong. */
865 GPR_ASSERT(num_grpclb_addrs > 0);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700866 grpc_lb_addresses *lb_addresses =
867 grpc_lb_addresses_create(num_grpclb_addrs, NULL);
David Garcia Quintas01291502017-02-07 13:26:41 -0800868 grpc_slice_hash_table_entry *targets_info_entries =
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700869 (grpc_slice_hash_table_entry *)gpr_zalloc(sizeof(*targets_info_entries) *
870 num_grpclb_addrs);
David Garcia Quintas01291502017-02-07 13:26:41 -0800871
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700872 size_t lb_addresses_idx = 0;
873 for (size_t i = 0; i < addresses->num_addresses; ++i) {
874 if (!addresses->addresses[i].is_balancer) continue;
David Garcia Quintas01291502017-02-07 13:26:41 -0800875 if (addresses->addresses[i].user_data != NULL) {
876 gpr_log(GPR_ERROR,
877 "This LB policy doesn't support user data. It will be ignored");
878 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700879 char *addr_str;
880 GPR_ASSERT(grpc_sockaddr_to_string(
881 &addr_str, &addresses->addresses[i].address, true) > 0);
882 targets_info_entries[lb_addresses_idx] = targets_info_entry_create(
883 addr_str, addresses->addresses[i].balancer_name);
884 gpr_free(addr_str);
885
886 grpc_lb_addresses_set_address(
887 lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
888 addresses->addresses[i].address.len, false /* is balancer */,
889 addresses->addresses[i].balancer_name, NULL /* user data */);
David Garcia Quintas01291502017-02-07 13:26:41 -0800890 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700891 GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
892 grpc_slice_hash_table *targets_info =
893 grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries,
894 destroy_balancer_name, balancer_name_cmp_fn);
David Garcia Quintas01291502017-02-07 13:26:41 -0800895 gpr_free(targets_info_entries);
896
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700897 grpc_channel_args *lb_channel_args =
898 grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info,
899 response_generator, args);
900
901 grpc_arg lb_channel_addresses_arg =
902 grpc_lb_addresses_create_channel_arg(lb_addresses);
903
904 grpc_channel_args *result = grpc_channel_args_copy_and_add(
905 lb_channel_args, &lb_channel_addresses_arg, 1);
906 grpc_slice_hash_table_unref(exec_ctx, targets_info);
907 grpc_channel_args_destroy(exec_ctx, lb_channel_args);
908 grpc_lb_addresses_destroy(exec_ctx, lb_addresses);
909 return result;
David Garcia Quintas01291502017-02-07 13:26:41 -0800910}
911
David Garcia Quintas65318262016-07-29 13:43:38 -0700912static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
913 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
914 GPR_ASSERT(glb_policy->pending_picks == NULL);
915 GPR_ASSERT(glb_policy->pending_pings == NULL);
Mark D. Rothd1604af2016-09-22 11:20:27 -0700916 gpr_free((void *)glb_policy->server_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800917 grpc_channel_args_destroy(exec_ctx, glb_policy->args);
Mark D. Roth09e458c2017-05-02 08:13:26 -0700918 if (glb_policy->client_stats != NULL) {
919 grpc_grpclb_client_stats_unref(glb_policy->client_stats);
920 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700921 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
922 if (glb_policy->serverlist != NULL) {
923 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
924 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700925 grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
Juanli Shen6502ecc2017-09-13 13:10:54 -0700926 grpc_subchannel_index_unref();
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700927 if (glb_policy->pending_update_args != NULL) {
928 grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
929 gpr_free(glb_policy->pending_update_args);
930 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700931 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700932}
933
Craig Tiller2400bf52017-02-09 16:25:19 -0800934static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700935 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200936 glb_policy->shutting_down = true;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700937
David Garcia Quintasa74b2462016-11-11 14:07:27 -0800938 /* We need a copy of the lb_call pointer because we can't cancell the call
939 * while holding glb_policy->mu: lb_on_server_status_received, invoked due to
940 * the cancel, needs to acquire that same lock */
941 grpc_call *lb_call = glb_policy->lb_call;
David Garcia Quintas65318262016-07-29 13:43:38 -0700942
David Garcia Quintasa74b2462016-11-11 14:07:27 -0800943 /* glb_policy->lb_call and this local lb_call must be consistent at this point
944 * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
945 * of query_for_backends_locked, which can only be invoked while
946 * glb_policy->shutting_down is false. */
947 if (lb_call != NULL) {
948 grpc_call_cancel(lb_call, NULL);
949 /* lb_on_server_status_received will pick up the cancel and clean up */
950 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700951 if (glb_policy->retry_timer_active) {
952 grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
953 glb_policy->retry_timer_active = false;
954 }
955
956 pending_pick *pp = glb_policy->pending_picks;
957 glb_policy->pending_picks = NULL;
958 pending_ping *pping = glb_policy->pending_pings;
959 glb_policy->pending_pings = NULL;
David Garcia Quintasfc950fb2017-07-27 19:41:12 -0700960 if (glb_policy->rr_policy != NULL) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700961 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
962 }
963 // We destroy the LB channel here because
964 // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
965 // instance. Destroying the lb channel in glb_destroy would likely result in
966 // a callback invocation without a valid glb_policy arg.
967 if (glb_policy->lb_channel != NULL) {
968 grpc_channel_destroy(glb_policy->lb_channel);
969 glb_policy->lb_channel = NULL;
970 }
971 grpc_connectivity_state_set(
972 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
973 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
974
David Garcia Quintas65318262016-07-29 13:43:38 -0700975 while (pp != NULL) {
976 pending_pick *next = pp->next;
977 *pp->target = NULL;
ncteisen969b46e2017-06-08 14:57:11 -0700978 GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -0800979 GRPC_ERROR_NONE);
David Garcia Quintas65318262016-07-29 13:43:38 -0700980 pp = next;
981 }
982
983 while (pping != NULL) {
984 pending_ping *next = pping->next;
ncteisen969b46e2017-06-08 14:57:11 -0700985 GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -0800986 GRPC_ERROR_NONE);
David Garcia Quintas65318262016-07-29 13:43:38 -0700987 pping = next;
988 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700989}
990
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700991// Cancel a specific pending pick.
992//
993// A grpclb pick progresses as follows:
994// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
995// handed over to the RR policy (in create_rr_locked()). From that point
996// onwards, it'll be RR's responsibility. For cancellations, that implies the
997// pick needs also be cancelled by the RR instance.
998// - Otherwise, without an RR instance, picks stay pending at this policy's
999// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
1000// we invoke the completion closure and set *target to NULL right here.
Craig Tiller2400bf52017-02-09 16:25:19 -08001001static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1002 grpc_connected_subchannel **target,
1003 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001004 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001005 pending_pick *pp = glb_policy->pending_picks;
1006 glb_policy->pending_picks = NULL;
1007 while (pp != NULL) {
1008 pending_pick *next = pp->next;
1009 if (pp->target == target) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001010 *target = NULL;
ncteisen969b46e2017-06-08 14:57:11 -07001011 GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
ncteisen4b36a3d2017-03-13 19:08:06 -07001012 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1013 "Pick Cancelled", &error, 1));
David Garcia Quintas65318262016-07-29 13:43:38 -07001014 } else {
1015 pp->next = glb_policy->pending_picks;
1016 glb_policy->pending_picks = pp;
1017 }
1018 pp = next;
1019 }
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001020 if (glb_policy->rr_policy != NULL) {
1021 grpc_lb_policy_cancel_pick_locked(exec_ctx, glb_policy->rr_policy, target,
1022 GRPC_ERROR_REF(error));
1023 }
Mark D. Roth5f844002016-09-08 08:20:53 -07001024 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -07001025}
1026
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001027// Cancel all pending picks.
1028//
1029// A grpclb pick progresses as follows:
1030// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
1031// handed over to the RR policy (in create_rr_locked()). From that point
1032// onwards, it'll be RR's responsibility. For cancellations, that implies the
1033// pick needs also be cancelled by the RR instance.
1034// - Otherwise, without an RR instance, picks stay pending at this policy's
1035// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
1036// we invoke the completion closure and set *target to NULL right here.
Craig Tiller2400bf52017-02-09 16:25:19 -08001037static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
1038 grpc_lb_policy *pol,
1039 uint32_t initial_metadata_flags_mask,
1040 uint32_t initial_metadata_flags_eq,
1041 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001042 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001043 pending_pick *pp = glb_policy->pending_picks;
1044 glb_policy->pending_picks = NULL;
1045 while (pp != NULL) {
1046 pending_pick *next = pp->next;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001047 if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
David Garcia Quintas65318262016-07-29 13:43:38 -07001048 initial_metadata_flags_eq) {
ncteisen969b46e2017-06-08 14:57:11 -07001049 GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
ncteisen4b36a3d2017-03-13 19:08:06 -07001050 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1051 "Pick Cancelled", &error, 1));
David Garcia Quintas65318262016-07-29 13:43:38 -07001052 } else {
1053 pp->next = glb_policy->pending_picks;
1054 glb_policy->pending_picks = pp;
1055 }
1056 pp = next;
1057 }
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001058 if (glb_policy->rr_policy != NULL) {
1059 grpc_lb_policy_cancel_picks_locked(
1060 exec_ctx, glb_policy->rr_policy, initial_metadata_flags_mask,
1061 initial_metadata_flags_eq, GRPC_ERROR_REF(error));
1062 }
Mark D. Rothe65ff112016-09-09 13:48:38 -07001063 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -07001064}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001065
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001066static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
1067 glb_lb_policy *glb_policy);
1068static void start_picking_locked(grpc_exec_ctx *exec_ctx,
1069 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001070 glb_policy->started_picking = true;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001071 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
1072 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001073}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001074
Craig Tiller2400bf52017-02-09 16:25:19 -08001075static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001076 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001077 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001078 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001079 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001080}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001081
Craig Tiller2400bf52017-02-09 16:25:19 -08001082static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1083 const grpc_lb_policy_pick_args *pick_args,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001084 grpc_connected_subchannel **target,
1085 grpc_call_context_element *context, void **user_data,
Craig Tiller2400bf52017-02-09 16:25:19 -08001086 grpc_closure *on_complete) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -07001087 if (pick_args->lb_token_mdelem_storage == NULL) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -07001088 *target = NULL;
ncteisen969b46e2017-06-08 14:57:11 -07001089 GRPC_CLOSURE_SCHED(exec_ctx, on_complete,
ncteisen4b36a3d2017-03-13 19:08:06 -07001090 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1091 "No mdelem storage for the LB token. Load reporting "
1092 "won't work without it. Failing"));
Mark D. Roth1e5f6af2016-10-07 08:32:58 -07001093 return 0;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -07001094 }
1095
David Garcia Quintas65318262016-07-29 13:43:38 -07001096 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001097 bool pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -07001098
1099 if (glb_policy->rr_policy != NULL) {
Craig Tiller84f75d42017-05-03 13:06:35 -07001100 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001101 gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p",
1102 (void *)glb_policy, (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001103 }
1104 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
David Garcia Quintas8aace512016-08-15 14:55:12 -07001105
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001106 wrapped_rr_closure_arg *wc_arg =
1107 (wrapped_rr_closure_arg *)gpr_zalloc(sizeof(wrapped_rr_closure_arg));
David Garcia Quintas331b9c02016-09-12 18:37:05 -07001108
ncteisen969b46e2017-06-08 14:57:11 -07001109 GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
Craig Tiller91031da2016-12-28 15:44:25 -08001110 grpc_schedule_on_exec_ctx);
David Garcia Quintas90712d52016-10-13 19:33:04 -07001111 wc_arg->rr_policy = glb_policy->rr_policy;
1112 wc_arg->target = target;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001113 wc_arg->context = context;
1114 GPR_ASSERT(glb_policy->client_stats != NULL);
1115 wc_arg->client_stats =
1116 grpc_grpclb_client_stats_ref(glb_policy->client_stats);
David Garcia Quintas90712d52016-10-13 19:33:04 -07001117 wc_arg->wrapped_closure = on_complete;
1118 wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
1119 wc_arg->initial_metadata = pick_args->initial_metadata;
David Garcia Quintas97ba6422016-10-14 13:06:45 -07001120 wc_arg->free_when_done = wc_arg;
Mark D. Rothd7389b42017-05-17 12:22:17 -07001121 pick_done =
1122 pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
1123 false /* force_async */, target, wc_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -07001124 } else {
Craig Tiller84f75d42017-05-03 13:06:35 -07001125 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001126 gpr_log(GPR_DEBUG,
1127 "No RR policy in grpclb instance %p. Adding to grpclb's pending "
1128 "picks",
1129 (void *)(glb_policy));
1130 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001131 add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
David Garcia Quintas8aace512016-08-15 14:55:12 -07001132 on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -07001133
1134 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001135 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001136 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001137 pick_done = false;
David Garcia Quintas65318262016-07-29 13:43:38 -07001138 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001139 return pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -07001140}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001141
Craig Tiller2400bf52017-02-09 16:25:19 -08001142static grpc_connectivity_state glb_check_connectivity_locked(
David Garcia Quintas65318262016-07-29 13:43:38 -07001143 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1144 grpc_error **connectivity_error) {
1145 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
Craig Tiller2400bf52017-02-09 16:25:19 -08001146 return grpc_connectivity_state_get(&glb_policy->state_tracker,
1147 connectivity_error);
David Garcia Quintas65318262016-07-29 13:43:38 -07001148}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001149
Craig Tiller2400bf52017-02-09 16:25:19 -08001150static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1151 grpc_closure *closure) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001152 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001153 if (glb_policy->rr_policy) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001154 grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure);
David Garcia Quintas65318262016-07-29 13:43:38 -07001155 } else {
1156 add_pending_ping(&glb_policy->pending_pings, closure);
1157 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001158 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001159 }
1160 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001161}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001162
Craig Tiller2400bf52017-02-09 16:25:19 -08001163static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
1164 grpc_lb_policy *pol,
1165 grpc_connectivity_state *current,
1166 grpc_closure *notify) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001167 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001168 grpc_connectivity_state_notify_on_state_change(
1169 exec_ctx, &glb_policy->state_tracker, current, notify);
David Garcia Quintas65318262016-07-29 13:43:38 -07001170}
1171
Mark D. Rotha4792f52017-09-26 09:06:35 -07001172static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
1173 grpc_error *error) {
1174 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
1175 glb_policy->retry_timer_active = false;
1176 if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
1177 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
1178 gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
1179 (void *)glb_policy);
1180 }
1181 GPR_ASSERT(glb_policy->lb_call == NULL);
1182 query_for_backends_locked(exec_ctx, glb_policy);
1183 }
1184 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
1185}
1186
1187static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx,
1188 glb_lb_policy *glb_policy) {
1189 if (glb_policy->started_picking && glb_policy->updating_lb_call) {
1190 if (glb_policy->retry_timer_active) {
1191 grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
1192 }
1193 if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy);
1194 glb_policy->updating_lb_call = false;
1195 } else if (!glb_policy->shutting_down) {
1196 /* if we aren't shutting down, restart the LB client call after some time */
1197 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1198 gpr_timespec next_try =
1199 gpr_backoff_step(&glb_policy->lb_call_backoff_state, now);
1200 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
1201 gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
1202 (void *)glb_policy);
1203 gpr_timespec timeout = gpr_time_sub(next_try, now);
1204 if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
1205 gpr_log(GPR_DEBUG,
1206 "... retry_timer_active in %" PRId64 ".%09d seconds.",
1207 timeout.tv_sec, timeout.tv_nsec);
1208 } else {
1209 gpr_log(GPR_DEBUG, "... retry_timer_active immediately.");
1210 }
1211 }
1212 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
1213 GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
1214 lb_call_on_retry_timer_locked, glb_policy,
1215 grpc_combiner_scheduler(glb_policy->base.combiner));
1216 glb_policy->retry_timer_active = true;
1217 grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
1218 &glb_policy->lb_on_call_retry, now);
1219 }
1220 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1221 "lb_on_server_status_received_locked");
1222}
1223
Mark D. Roth09e458c2017-05-02 08:13:26 -07001224static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
1225 grpc_error *error);
1226
1227static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
1228 glb_lb_policy *glb_policy) {
1229 const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1230 const gpr_timespec next_client_load_report_time =
1231 gpr_time_add(now, glb_policy->client_stats_report_interval);
ncteisen969b46e2017-06-08 14:57:11 -07001232 GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001233 send_client_load_report_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001234 grpc_combiner_scheduler(glb_policy->base.combiner));
Mark D. Roth09e458c2017-05-02 08:13:26 -07001235 grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer,
1236 next_client_load_report_time,
1237 &glb_policy->client_load_report_closure, now);
1238}
1239
1240static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
1241 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001242 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001243 grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
1244 glb_policy->client_load_report_payload = NULL;
1245 if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) {
1246 glb_policy->client_load_report_timer_pending = false;
1247 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1248 "client_load_report");
1249 return;
1250 }
1251 schedule_next_client_load_report(exec_ctx, glb_policy);
1252}
1253
Mark D. Roth09e458c2017-05-02 08:13:26 -07001254static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
Mark D. Rothe7751802017-07-27 12:31:45 -07001255 grpc_grpclb_dropped_call_counts *drop_entries =
Yash Tibrewalbc130da2017-09-12 22:44:08 -07001256 (grpc_grpclb_dropped_call_counts *)
1257 request->client_stats.calls_finished_with_drop.arg;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001258 return request->client_stats.num_calls_started == 0 &&
1259 request->client_stats.num_calls_finished == 0 &&
Mark D. Roth09e458c2017-05-02 08:13:26 -07001260 request->client_stats.num_calls_finished_with_client_failed_to_send ==
1261 0 &&
Mark D. Rothe7751802017-07-27 12:31:45 -07001262 request->client_stats.num_calls_finished_known_received == 0 &&
1263 (drop_entries == NULL || drop_entries->num_entries == 0);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001264}
1265
1266static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
1267 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001268 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001269 if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) {
1270 glb_policy->client_load_report_timer_pending = false;
1271 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1272 "client_load_report");
Mark D. Rotha4792f52017-09-26 09:06:35 -07001273 if (glb_policy->lb_call == NULL) {
1274 maybe_restart_lb_call(exec_ctx, glb_policy);
1275 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001276 return;
1277 }
1278 // Construct message payload.
1279 GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
1280 grpc_grpclb_request *request =
Mark D. Rothe7751802017-07-27 12:31:45 -07001281 grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001282 // Skip client load report if the counters were all zero in the last
1283 // report and they are still zero in this one.
1284 if (load_report_counters_are_zero(request)) {
1285 if (glb_policy->last_client_load_report_counters_were_zero) {
1286 grpc_grpclb_request_destroy(request);
1287 schedule_next_client_load_report(exec_ctx, glb_policy);
1288 return;
1289 }
1290 glb_policy->last_client_load_report_counters_were_zero = true;
1291 } else {
1292 glb_policy->last_client_load_report_counters_were_zero = false;
1293 }
1294 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
1295 glb_policy->client_load_report_payload =
1296 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1297 grpc_slice_unref_internal(exec_ctx, request_payload_slice);
1298 grpc_grpclb_request_destroy(request);
Mark D. Roth2de36a82017-09-25 14:54:44 -07001299 // Send load report message.
1300 grpc_op op;
1301 memset(&op, 0, sizeof(op));
1302 op.op = GRPC_OP_SEND_MESSAGE;
1303 op.data.send_message.send_message = glb_policy->client_load_report_payload;
1304 GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
1305 client_load_report_done_locked, glb_policy,
1306 grpc_combiner_scheduler(glb_policy->base.combiner));
1307 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1308 exec_ctx, glb_policy->lb_call, &op, 1,
1309 &glb_policy->client_load_report_closure);
Mark D. Rotha4792f52017-09-26 09:06:35 -07001310 if (call_error != GRPC_CALL_OK) {
1311 gpr_log(GPR_ERROR, "call_error=%d", call_error);
1312 GPR_ASSERT(GRPC_CALL_OK == call_error);
1313 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001314}
1315
Craig Tiller2400bf52017-02-09 16:25:19 -08001316static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
1317 void *arg, grpc_error *error);
1318static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
1319 grpc_error *error);
Craig Tillerc5866662016-11-16 15:25:00 -08001320static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
1321 glb_lb_policy *glb_policy) {
David Garcia Quintas55ba14a2016-09-27 18:45:30 -07001322 GPR_ASSERT(glb_policy->server_name != NULL);
1323 GPR_ASSERT(glb_policy->server_name[0] != '\0');
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001324 GPR_ASSERT(glb_policy->lb_call == NULL);
David Garcia Quintasa74b2462016-11-11 14:07:27 -08001325 GPR_ASSERT(!glb_policy->shutting_down);
David Garcia Quintas55ba14a2016-09-27 18:45:30 -07001326
David Garcia Quintas15eba132016-08-09 15:20:48 -07001327 /* Note the following LB call progresses every time there's activity in \a
1328 * glb_policy->base.interested_parties, which is comprised of the polling
Yuchen Zengf7c45ae2016-09-15 13:40:32 -07001329 * entities from \a client_channel. */
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001330 grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name);
Mark D. Roth64d922a2017-05-03 12:52:04 -07001331 gpr_timespec deadline =
1332 glb_policy->lb_call_timeout_ms == 0
Mark D. Roth175c73b2017-05-04 08:28:05 -07001333 ? gpr_inf_future(GPR_CLOCK_MONOTONIC)
1334 : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
1335 gpr_time_from_millis(glb_policy->lb_call_timeout_ms,
1336 GPR_TIMESPAN));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001337 glb_policy->lb_call = grpc_channel_create_pollset_set_call(
Craig Tiller87a7e1f2016-11-09 09:42:19 -08001338 exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
David Garcia Quintas4543e5c2016-09-22 15:09:34 -07001339 glb_policy->base.interested_parties,
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001340 GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
Mark D. Roth64d922a2017-05-03 12:52:04 -07001341 &host, deadline, NULL);
David Garcia Quintas7fadeae2017-04-18 14:38:56 -07001342 grpc_slice_unref_internal(exec_ctx, host);
David Garcia Quintas65318262016-07-29 13:43:38 -07001343
Mark D. Roth09e458c2017-05-02 08:13:26 -07001344 if (glb_policy->client_stats != NULL) {
1345 grpc_grpclb_client_stats_unref(glb_policy->client_stats);
1346 }
1347 glb_policy->client_stats = grpc_grpclb_client_stats_create();
1348
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001349 grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
1350 grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -07001351
David Garcia Quintas55ba14a2016-09-27 18:45:30 -07001352 grpc_grpclb_request *request =
1353 grpc_grpclb_request_create(glb_policy->server_name);
Craig Tillerd41a4a72016-10-26 16:16:06 -07001354 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001355 glb_policy->lb_request_payload =
David Garcia Quintas65318262016-07-29 13:43:38 -07001356 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
Craig Tiller18b4ba32016-11-09 15:23:42 -08001357 grpc_slice_unref_internal(exec_ctx, request_payload_slice);
David Garcia Quintas65318262016-07-29 13:43:38 -07001358 grpc_grpclb_request_destroy(request);
1359
ncteisen969b46e2017-06-08 14:57:11 -07001360 GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
Craig Tiller2400bf52017-02-09 16:25:19 -08001361 lb_on_server_status_received_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001362 grpc_combiner_scheduler(glb_policy->base.combiner));
ncteisen969b46e2017-06-08 14:57:11 -07001363 GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received,
Craig Tiller2400bf52017-02-09 16:25:19 -08001364 lb_on_response_received_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001365 grpc_combiner_scheduler(glb_policy->base.combiner));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001366
David Garcia Quintas1edfb952016-11-22 17:15:34 -08001367 gpr_backoff_init(&glb_policy->lb_call_backoff_state,
1368 GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
1369 GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
1370 GRPC_GRPCLB_RECONNECT_JITTER,
1371 GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
1372 GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001373
Mark D. Roth09e458c2017-05-02 08:13:26 -07001374 glb_policy->seen_initial_response = false;
1375 glb_policy->last_client_load_report_counters_were_zero = false;
David Garcia Quintas65318262016-07-29 13:43:38 -07001376}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001377
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001378static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
1379 glb_lb_policy *glb_policy) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001380 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tillerdd36b152017-03-31 08:27:28 -07001381 grpc_call_unref(glb_policy->lb_call);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001382 glb_policy->lb_call = NULL;
David Garcia Quintas65318262016-07-29 13:43:38 -07001383
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001384 grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv);
1385 grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -07001386
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001387 grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001388 grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001389
Mark D. Roth9247ad52017-09-25 13:35:48 -07001390 if (glb_policy->client_load_report_timer_pending) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001391 grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer);
1392 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001393}
1394
David Garcia Quintas8d489112016-07-29 15:20:42 -07001395/*
1396 * Auxiliary functions and LB client callbacks.
1397 */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001398static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
1399 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001400 GPR_ASSERT(glb_policy->lb_channel != NULL);
David Garcia Quintasa74b2462016-11-11 14:07:27 -08001401 if (glb_policy->shutting_down) return;
1402
Craig Tillerc5866662016-11-16 15:25:00 -08001403 lb_call_init_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001404
Craig Tiller84f75d42017-05-03 13:06:35 -07001405 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001406 gpr_log(GPR_INFO,
1407 "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)",
1408 (void *)glb_policy, (void *)glb_policy->lb_channel,
1409 (void *)glb_policy->lb_call);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001410 }
1411 GPR_ASSERT(glb_policy->lb_call != NULL);
1412
David Garcia Quintas65318262016-07-29 13:43:38 -07001413 grpc_call_error call_error;
Mark D. Roth2de36a82017-09-25 14:54:44 -07001414 grpc_op ops[3];
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001415 memset(ops, 0, sizeof(ops));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001416
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001417 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -07001418 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1419 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001420 op->flags = 0;
1421 op->reserved = NULL;
1422 op++;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001423 op->op = GRPC_OP_RECV_INITIAL_METADATA;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001424 op->data.recv_initial_metadata.recv_initial_metadata =
1425 &glb_policy->lb_initial_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001426 op->flags = 0;
1427 op->reserved = NULL;
1428 op++;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001429 GPR_ASSERT(glb_policy->lb_request_payload != NULL);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001430 op->op = GRPC_OP_SEND_MESSAGE;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001431 op->data.send_message.send_message = glb_policy->lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001432 op->flags = 0;
1433 op->reserved = NULL;
1434 op++;
Mark D. Roth2de36a82017-09-25 14:54:44 -07001435 call_error = grpc_call_start_batch_and_execute(exec_ctx, glb_policy->lb_call,
1436 ops, (size_t)(op - ops), NULL);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001437 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001438
Mark D. Roth09e458c2017-05-02 08:13:26 -07001439 op = ops;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001440 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1441 op->data.recv_status_on_client.trailing_metadata =
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001442 &glb_policy->lb_trailing_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001443 op->data.recv_status_on_client.status = &glb_policy->lb_call_status;
1444 op->data.recv_status_on_client.status_details =
1445 &glb_policy->lb_call_status_details;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001446 op->flags = 0;
1447 op->reserved = NULL;
1448 op++;
David Garcia Quintase224a762016-11-01 13:00:58 -07001449 /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001450 * count goes to zero) to be unref'd in lb_on_server_status_received_locked */
1451 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
1452 "lb_on_server_status_received_locked");
David Garcia Quintas65318262016-07-29 13:43:38 -07001453 call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001454 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1455 &glb_policy->lb_on_server_status_received);
David Garcia Quintas65318262016-07-29 13:43:38 -07001456 GPR_ASSERT(GRPC_CALL_OK == call_error);
1457
1458 op = ops;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001459 op->op = GRPC_OP_RECV_MESSAGE;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001460 op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001461 op->flags = 0;
1462 op->reserved = NULL;
1463 op++;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001464 /* take another weak ref to be unref'd/reused in
1465 * lb_on_response_received_locked */
1466 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked");
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001467 call_error = grpc_call_start_batch_and_execute(
1468 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1469 &glb_policy->lb_on_response_received);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001470 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001471}
1472
Craig Tiller2400bf52017-02-09 16:25:19 -08001473static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
1474 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001475 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001476 grpc_op ops[2];
1477 memset(ops, 0, sizeof(ops));
1478 grpc_op *op = ops;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001479 if (glb_policy->lb_response_payload != NULL) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001480 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
David Garcia Quintas41bef452016-07-28 19:19:58 -07001481 /* Received data from the LB server. Look inside
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001482 * glb_policy->lb_response_payload, for a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001483 grpc_byte_buffer_reader bbr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001484 grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
Craig Tillerd41a4a72016-10-26 16:16:06 -07001485 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas97e17852017-08-14 14:55:02 -07001486 grpc_byte_buffer_reader_destroy(&bbr);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001487 grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001488
Mark D. Roth09e458c2017-05-02 08:13:26 -07001489 grpc_grpclb_initial_response *response = NULL;
1490 if (!glb_policy->seen_initial_response &&
1491 (response = grpc_grpclb_initial_response_parse(response_slice)) !=
1492 NULL) {
1493 if (response->has_client_stats_report_interval) {
1494 glb_policy->client_stats_report_interval =
1495 gpr_time_max(gpr_time_from_seconds(1, GPR_TIMESPAN),
1496 grpc_grpclb_duration_to_timespec(
1497 &response->client_stats_report_interval));
Craig Tiller84f75d42017-05-03 13:06:35 -07001498 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001499 gpr_log(GPR_INFO,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001500 "received initial LB response message; "
1501 "client load reporting interval = %" PRId64 ".%09d sec",
1502 glb_policy->client_stats_report_interval.tv_sec,
1503 glb_policy->client_stats_report_interval.tv_nsec);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001504 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001505 /* take a weak ref (won't prevent calling of \a glb_shutdown() if the
1506 * strong ref count goes to zero) to be unref'd in
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001507 * send_client_load_report_locked() */
Mark D. Roth09e458c2017-05-02 08:13:26 -07001508 glb_policy->client_load_report_timer_pending = true;
1509 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
1510 schedule_next_client_load_report(exec_ctx, glb_policy);
Craig Tiller84f75d42017-05-03 13:06:35 -07001511 } else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001512 gpr_log(GPR_INFO,
1513 "received initial LB response message; "
1514 "client load reporting NOT enabled");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001515 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001516 grpc_grpclb_initial_response_destroy(response);
1517 glb_policy->seen_initial_response = true;
1518 } else {
1519 grpc_grpclb_serverlist *serverlist =
1520 grpc_grpclb_response_parse_serverlist(response_slice);
1521 if (serverlist != NULL) {
1522 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tiller84f75d42017-05-03 13:06:35 -07001523 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001524 gpr_log(GPR_INFO, "Serverlist with %lu servers received",
1525 (unsigned long)serverlist->num_servers);
1526 for (size_t i = 0; i < serverlist->num_servers; ++i) {
1527 grpc_resolved_address addr;
1528 parse_server(serverlist->servers[i], &addr);
1529 char *ipport;
1530 grpc_sockaddr_to_string(&ipport, &addr, false);
1531 gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
1532 gpr_free(ipport);
1533 }
1534 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001535 /* update serverlist */
1536 if (serverlist->num_servers > 0) {
1537 if (grpc_grpclb_serverlist_equals(glb_policy->serverlist,
1538 serverlist)) {
Craig Tiller84f75d42017-05-03 13:06:35 -07001539 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001540 gpr_log(GPR_INFO,
1541 "Incoming server list identical to current, ignoring.");
1542 }
1543 grpc_grpclb_destroy_serverlist(serverlist);
1544 } else { /* new serverlist */
1545 if (glb_policy->serverlist != NULL) {
1546 /* dispose of the old serverlist */
1547 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
1548 }
1549 /* and update the copy in the glb_lb_policy instance. This
1550 * serverlist instance will be destroyed either upon the next
1551 * update or in glb_destroy() */
1552 glb_policy->serverlist = serverlist;
Mark D. Rothd7389b42017-05-17 12:22:17 -07001553 glb_policy->serverlist_index = 0;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001554 rr_handover_locked(exec_ctx, glb_policy);
1555 }
1556 } else {
Craig Tiller84f75d42017-05-03 13:06:35 -07001557 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Ken Payson01e83b52017-09-13 19:45:07 -07001558 gpr_log(GPR_INFO,
1559 "Received empty server list. Picks will stay pending until "
1560 "a response with > 0 servers is received");
Mark D. Roth09e458c2017-05-02 08:13:26 -07001561 }
1562 grpc_grpclb_destroy_serverlist(serverlist);
1563 }
1564 } else { /* serverlist == NULL */
1565 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
1566 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
1567 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001568 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001569 grpc_slice_unref_internal(exec_ctx, response_slice);
David Garcia Quintas246c5642016-11-01 11:16:52 -07001570 if (!glb_policy->shutting_down) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001571 /* keep listening for serverlist updates */
1572 op->op = GRPC_OP_RECV_MESSAGE;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001573 op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001574 op->flags = 0;
1575 op->reserved = NULL;
1576 op++;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001577 /* reuse the "lb_on_response_received_locked" weak ref taken in
David Garcia Quintase224a762016-11-01 13:00:58 -07001578 * query_for_backends_locked() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001579 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas246c5642016-11-01 11:16:52 -07001580 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1581 &glb_policy->lb_on_response_received); /* loop */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001582 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas998bd2c2017-09-18 12:41:07 -07001583 } else {
1584 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1585 "lb_on_response_received_locked_shutdown");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001586 }
David Garcia Quintase224a762016-11-01 13:00:58 -07001587 } else { /* empty payload: call cancelled. */
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001588 /* dispose of the "lb_on_response_received_locked" weak ref taken in
David Garcia Quintase224a762016-11-01 13:00:58 -07001589 * query_for_backends_locked() and reused in every reception loop */
1590 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001591 "lb_on_response_received_locked_empty_payload");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001592 }
1593}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001594
Craig Tiller2400bf52017-02-09 16:25:19 -08001595static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
1596 void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001597 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001598 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tiller84f75d42017-05-03 13:06:35 -07001599 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001600 char *status_details =
1601 grpc_slice_to_c_string(glb_policy->lb_call_status_details);
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001602 gpr_log(GPR_INFO,
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001603 "Status from LB server received. Status = %d, Details = '%s', "
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001604 "(call: %p), error %p",
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001605 glb_policy->lb_call_status, status_details,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001606 (void *)glb_policy->lb_call, (void *)error);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001607 gpr_free(status_details);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001608 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001609 /* We need to perform cleanups no matter what. */
1610 lb_call_destroy_locked(exec_ctx, glb_policy);
Mark D. Rotha4792f52017-09-26 09:06:35 -07001611 // If the load report timer is still pending, we wait for it to be
1612 // called before restarting the call. Otherwise, we restart the call
1613 // here.
1614 if (!glb_policy->client_load_report_timer_pending) {
1615 maybe_restart_lb_call(exec_ctx, glb_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001616 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001617}
1618
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001619static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
1620 const grpc_lb_policy_args *args) {
1621 glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001622 if (glb_policy->updating_lb_channel) {
1623 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
1624 gpr_log(GPR_INFO,
1625 "Update already in progress for grpclb %p. Deferring update.",
1626 (void *)glb_policy);
1627 }
1628 if (glb_policy->pending_update_args != NULL) {
1629 grpc_channel_args_destroy(exec_ctx,
1630 glb_policy->pending_update_args->args);
1631 gpr_free(glb_policy->pending_update_args);
1632 }
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001633 glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc(
1634 sizeof(*glb_policy->pending_update_args));
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001635 glb_policy->pending_update_args->client_channel_factory =
1636 args->client_channel_factory;
1637 glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args);
1638 glb_policy->pending_update_args->combiner = args->combiner;
1639 return;
1640 }
1641
1642 glb_policy->updating_lb_channel = true;
1643 // Propagate update to lb_channel (pick first).
1644 const grpc_arg *arg =
1645 grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
1646 if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
1647 if (glb_policy->lb_channel == NULL) {
1648 // If we don't have a current channel to the LB, go into TRANSIENT
1649 // FAILURE.
1650 grpc_connectivity_state_set(
1651 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
1652 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
1653 "glb_update_missing");
1654 } else {
1655 // otherwise, keep using the current LB channel (ignore this update).
1656 gpr_log(GPR_ERROR,
1657 "No valid LB addresses channel arg for grpclb %p update, "
1658 "ignoring.",
1659 (void *)glb_policy);
1660 }
1661 }
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001662 const grpc_lb_addresses *addresses =
1663 (const grpc_lb_addresses *)arg->value.pointer.p;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001664 GPR_ASSERT(glb_policy->lb_channel != NULL);
1665 grpc_channel_args *lb_channel_args = build_lb_channel_args(
1666 exec_ctx, addresses, glb_policy->response_generator, args->args);
1667 /* Propagate updates to the LB channel through the fake resolver */
1668 grpc_fake_resolver_response_generator_set_response(
1669 exec_ctx, glb_policy->response_generator, lb_channel_args);
1670 grpc_channel_args_destroy(exec_ctx, lb_channel_args);
1671
1672 if (!glb_policy->watching_lb_channel) {
1673 // Watch the LB channel connectivity for connection.
David Garcia Quintas6a7935e2017-07-27 19:24:52 -07001674 glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
1675 glb_policy->lb_channel, true /* try to connect */);
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001676 grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
1677 grpc_channel_get_channel_stack(glb_policy->lb_channel));
1678 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1679 glb_policy->watching_lb_channel = true;
1680 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
1681 grpc_client_channel_watch_connectivity_state(
1682 exec_ctx, client_channel_elem,
1683 grpc_polling_entity_create_from_pollset_set(
1684 glb_policy->base.interested_parties),
1685 &glb_policy->lb_channel_connectivity,
1686 &glb_policy->lb_channel_on_connectivity_changed, NULL);
1687 }
1688}
1689
1690// Invoked as part of the update process. It continues watching the LB channel
1691// until it shuts down or becomes READY. It's invoked even if the LB channel
1692// stayed READY throughout the update (for example if the update is identical).
1693static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
1694 void *arg,
1695 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001696 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001697 if (glb_policy->shutting_down) goto done;
1698 // Re-initialize the lb_call. This should also take care of updating the
1699 // embedded RR policy. Note that the current RR policy, if any, will stay in
1700 // effect until an update from the new lb_call is received.
1701 switch (glb_policy->lb_channel_connectivity) {
1702 case GRPC_CHANNEL_INIT:
1703 case GRPC_CHANNEL_CONNECTING:
1704 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1705 /* resub. */
1706 grpc_channel_element *client_channel_elem =
1707 grpc_channel_stack_last_element(
1708 grpc_channel_get_channel_stack(glb_policy->lb_channel));
1709 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1710 grpc_client_channel_watch_connectivity_state(
1711 exec_ctx, client_channel_elem,
1712 grpc_polling_entity_create_from_pollset_set(
1713 glb_policy->base.interested_parties),
1714 &glb_policy->lb_channel_connectivity,
1715 &glb_policy->lb_channel_on_connectivity_changed, NULL);
1716 break;
1717 }
1718 case GRPC_CHANNEL_IDLE:
1719 // lb channel inactive (probably shutdown prior to update). Restart lb
1720 // call to kick the lb channel into gear.
1721 GPR_ASSERT(glb_policy->lb_call == NULL);
1722 /* fallthrough */
1723 case GRPC_CHANNEL_READY:
1724 if (glb_policy->lb_call != NULL) {
1725 glb_policy->updating_lb_channel = false;
1726 glb_policy->updating_lb_call = true;
1727 grpc_call_cancel(glb_policy->lb_call, NULL);
1728 // lb_on_server_status_received will pick up the cancel and reinit
1729 // lb_call.
1730 if (glb_policy->pending_update_args != NULL) {
David Garcia Quintasae5e83b2017-07-18 16:11:00 -07001731 grpc_lb_policy_args *args = glb_policy->pending_update_args;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001732 glb_policy->pending_update_args = NULL;
1733 glb_update_locked(exec_ctx, &glb_policy->base, args);
David Garcia Quintasae5e83b2017-07-18 16:11:00 -07001734 grpc_channel_args_destroy(exec_ctx, args->args);
1735 gpr_free(args);
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001736 }
1737 } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
1738 if (glb_policy->retry_timer_active) {
1739 grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
1740 glb_policy->retry_timer_active = false;
1741 }
1742 start_picking_locked(exec_ctx, glb_policy);
1743 }
1744 /* fallthrough */
1745 case GRPC_CHANNEL_SHUTDOWN:
1746 done:
1747 glb_policy->watching_lb_channel = false;
1748 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1749 "watch_lb_channel_connectivity_cb_shutdown");
1750 break;
1751 }
1752}
1753
David Garcia Quintas8d489112016-07-29 15:20:42 -07001754/* Code wiring the policy with the rest of the core */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001755static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
Craig Tiller2400bf52017-02-09 16:25:19 -08001756 glb_destroy,
1757 glb_shutdown_locked,
1758 glb_pick_locked,
1759 glb_cancel_pick_locked,
1760 glb_cancel_picks_locked,
1761 glb_ping_one_locked,
1762 glb_exit_idle_locked,
1763 glb_check_connectivity_locked,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001764 glb_notify_on_state_change_locked,
1765 glb_update_locked};
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001766
Yash Tibrewala4952202017-09-13 10:53:28 -07001767static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
1768 grpc_lb_policy_factory *factory,
1769 grpc_lb_policy_args *args) {
Yash Tibrewalc086c7e2017-09-14 10:05:46 -07001770 /* Count the number of gRPC-LB addresses. There must be at least one.
1771 * TODO(roth): For now, we ignore non-balancer addresses, but in the
1772 * future, we may change the behavior such that we fall back to using
1773 * the non-balancer addresses if we cannot reach any balancers. In the
1774 * fallback case, we should use the LB policy indicated by
1775 * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
1776 * unset, we should default to pick_first). */
Yash Tibrewala4952202017-09-13 10:53:28 -07001777 const grpc_arg *arg =
1778 grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
1779 if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
1780 return NULL;
1781 }
1782 grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
1783 size_t num_grpclb_addrs = 0;
1784 for (size_t i = 0; i < addresses->num_addresses; ++i) {
1785 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
1786 }
1787 if (num_grpclb_addrs == 0) return NULL;
1788
1789 glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
1790
1791 /* Get server name. */
1792 arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
1793 GPR_ASSERT(arg != NULL);
1794 GPR_ASSERT(arg->type == GRPC_ARG_STRING);
1795 grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
1796 GPR_ASSERT(uri->path[0] != '\0');
1797 glb_policy->server_name =
1798 gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1799 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
1800 gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
1801 glb_policy->server_name);
1802 }
1803 grpc_uri_destroy(uri);
1804
1805 glb_policy->cc_factory = args->client_channel_factory;
1806 GPR_ASSERT(glb_policy->cc_factory != NULL);
1807
1808 arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1809 glb_policy->lb_call_timeout_ms =
1810 grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
1811
1812 // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1813 // since we use this to trigger the client_load_reporting filter.
Yash Tibrewal9eb86722017-09-17 23:43:30 -07001814 grpc_arg new_arg = grpc_channel_arg_string_create(
1815 (char *)GRPC_ARG_LB_POLICY_NAME, (char *)"grpclb");
Yash Tibrewala4952202017-09-13 10:53:28 -07001816 static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1817 glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
1818 args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1819
1820 /* Create a client channel over them to communicate with a LB service */
1821 glb_policy->response_generator =
1822 grpc_fake_resolver_response_generator_create();
1823 grpc_channel_args *lb_channel_args = build_lb_channel_args(
1824 exec_ctx, addresses, glb_policy->response_generator, args->args);
1825 char *uri_str;
1826 gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
1827 glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
1828 exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
1829
1830 /* Propagate initial resolution */
1831 grpc_fake_resolver_response_generator_set_response(
1832 exec_ctx, glb_policy->response_generator, lb_channel_args);
1833 grpc_channel_args_destroy(exec_ctx, lb_channel_args);
1834 gpr_free(uri_str);
1835 if (glb_policy->lb_channel == NULL) {
1836 gpr_free((void *)glb_policy->server_name);
1837 grpc_channel_args_destroy(exec_ctx, glb_policy->args);
1838 gpr_free(glb_policy);
1839 return NULL;
1840 }
Ken Payson9fa10cc2017-09-14 11:49:52 -07001841 grpc_subchannel_index_ref();
Yash Tibrewala4952202017-09-13 10:53:28 -07001842 GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
1843 glb_lb_channel_on_connectivity_changed_cb, glb_policy,
1844 grpc_combiner_scheduler(args->combiner));
1845 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
1846 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
1847 "grpclb");
1848 return &glb_policy->base;
1849}
1850
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001851static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
1852
1853static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
1854
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001855static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
1856 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
1857
1858static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
1859
1860grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
1861 return &glb_lb_policy_factory;
1862}
1863
1864/* Plugin registration */
Mark D. Roth09e458c2017-05-02 08:13:26 -07001865
1866// Only add client_load_reporting filter if the grpclb LB policy is used.
1867static bool maybe_add_client_load_reporting_filter(
1868 grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) {
1869 const grpc_channel_args *args =
1870 grpc_channel_stack_builder_get_channel_arguments(builder);
1871 const grpc_arg *channel_arg =
1872 grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1873 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_STRING &&
1874 strcmp(channel_arg->value.string, "grpclb") == 0) {
1875 return grpc_channel_stack_builder_append_filter(
1876 builder, (const grpc_channel_filter *)arg, NULL, NULL);
1877 }
1878 return true;
1879}
1880
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001881void grpc_lb_policy_grpclb_init() {
1882 grpc_register_lb_policy(grpc_glb_lb_factory_create());
ncteisen06bce6e2017-07-10 07:58:49 -07001883 grpc_register_tracer(&grpc_lb_glb_trace);
ncteisen4b584052017-06-08 16:44:38 -07001884#ifndef NDEBUG
ncteisen06bce6e2017-07-10 07:58:49 -07001885 grpc_register_tracer(&grpc_trace_lb_policy_refcount);
ncteisen4b584052017-06-08 16:44:38 -07001886#endif
Mark D. Roth09e458c2017-05-02 08:13:26 -07001887 grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1888 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1889 maybe_add_client_load_reporting_filter,
1890 (void *)&grpc_client_load_reporting_filter);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001891}
1892
1893void grpc_lb_policy_grpclb_shutdown() {}