blob: a776a07d99ad2d294491787e5d3e97b5c2fcaa2c [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2016 gRPC authors.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070016 *
17 */
18
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070019/** Implementation of the gRPC LB policy.
20 *
David Garcia Quintas43339842016-07-18 12:56:09 -070021 * This policy takes as input a set of resolved addresses {a1..an} for which the
22 * LB set was set (it's the resolver's responsibility to ensure this). That is
23 * to say, {a1..an} represent a collection of LB servers.
24 *
25 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
26 * This channel behaves just like a regular channel. In particular, the
27 * constructed URI over the addresses a1..an will use the default pick first
28 * policy to select from this list of LB server backends.
29 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070030 * The first time the policy gets a request for a pick, a ping, or to exit the
David Garcia Quintas98da61b2016-10-29 08:46:31 +020031 * idle state, \a query_for_backends_locked() is called. This function sets up
32 * and initiates the internal communication with the LB server. In particular,
33 * it's responsible for instantiating the internal *streaming* call to the LB
34 * server (whichever address from {a1..an} pick-first chose). This call is
David Garcia Quintas7ec29132016-11-01 04:09:05 +010035 * serviced by two callbacks, \a lb_on_server_status_received and \a
36 * lb_on_response_received. The former will be called when the call to the LB
37 * server completes. This can happen if the LB server closes the connection or
38 * if this policy itself cancels the call (for example because it's shutting
David Garcia Quintas246c5642016-11-01 11:16:52 -070039 * down). If the internal call times out, the usual behavior of pick-first
David Garcia Quintas7ec29132016-11-01 04:09:05 +010040 * applies, continuing to pick from the list {a1..an}.
David Garcia Quintas43339842016-07-18 12:56:09 -070041 *
David Garcia Quintas98da61b2016-10-29 08:46:31 +020042 * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a
43 * res_recv. An invalid one results in the termination of the streaming call. A
44 * new streaming call should be created if possible, failing the original call
45 * otherwise. For a valid \a LoadBalancingResponse, the server list of actual
46 * backends is extracted. A Round Robin policy will be created from this list.
47 * There are two possible scenarios:
David Garcia Quintas43339842016-07-18 12:56:09 -070048 *
49 * 1. This is the first server list received. There was no previous instance of
David Garcia Quintas90712d52016-10-13 19:33:04 -070050 * the Round Robin policy. \a rr_handover_locked() will instantiate the RR
51 * policy and perform all the pending operations over it.
David Garcia Quintas43339842016-07-18 12:56:09 -070052 * 2. There's already a RR policy instance active. We need to introduce the new
53 * one build from the new serverlist, but taking care not to disrupt the
54 * operations in progress over the old RR instance. This is done by
55 * decreasing the reference count on the old policy. The moment no more
56 * references are held on the old RR policy, it'll be destroyed and \a
David Garcia Quintas348cfdb2016-08-19 12:19:43 -070057 * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
58 * state. At this point we can transition to a new RR instance safely, which
David Garcia Quintas90712d52016-10-13 19:33:04 -070059 * is done once again via \a rr_handover_locked().
David Garcia Quintas43339842016-07-18 12:56:09 -070060 *
61 *
62 * Once a RR policy instance is in place (and getting updated as described),
63 * calls to for a pick, a ping or a cancellation will be serviced right away by
64 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintas7ec29132016-11-01 04:09:05 +010065 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is
66 * received, etc), pick/ping requests are added to a list of pending picks/pings
67 * to be flushed and serviced as part of \a rr_handover_locked() the moment the
68 * RR policy instance becomes available.
David Garcia Quintas43339842016-07-18 12:56:09 -070069 *
70 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
71 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070072
73/* TODO(dgq):
74 * - Implement LB service forwarding (point 2c. in the doc's diagram).
75 */
76
murgatroid99085f9af2016-10-24 09:55:44 -070077/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
78 using that endpoint. Because of various transitive includes in uv.h,
79 including windows.h on Windows, uv.h must be included before other system
80 headers. Therefore, sockaddr.h must always be included first */
murgatroid997871f732016-09-23 13:49:05 -070081#include "src/core/lib/iomgr/sockaddr.h"
82
Mark D. Roth64d922a2017-05-03 12:52:04 -070083#include <limits.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070084#include <string.h>
85
86#include <grpc/byte_buffer_reader.h>
87#include <grpc/grpc.h>
88#include <grpc/support/alloc.h>
89#include <grpc/support/host_port.h>
90#include <grpc/support/string_util.h>
David Garcia Quintas69099222016-10-03 11:28:37 -070091#include <grpc/support/time.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070092
Craig Tiller9eb0fde2017-03-31 16:59:30 -070093#include "src/core/ext/filters/client_channel/client_channel.h"
94#include "src/core/ext/filters/client_channel/client_channel_factory.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070095#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070096#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
97#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070098#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070099#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
Craig Tillerd52e22f2017-04-02 16:22:52 -0700100#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
101#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
102#include "src/core/ext/filters/client_channel/parse_address.h"
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700103#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
Juanli Shen6502ecc2017-09-13 13:10:54 -0700104#include "src/core/ext/filters/client_channel/subchannel_index.h"
Mark D. Roth046cf762016-09-26 11:13:51 -0700105#include "src/core/lib/channel/channel_args.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -0700106#include "src/core/lib/channel/channel_stack.h"
Craig Tiller2400bf52017-02-09 16:25:19 -0800107#include "src/core/lib/iomgr/combiner.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200108#include "src/core/lib/iomgr/sockaddr.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700109#include "src/core/lib/iomgr/sockaddr_utils.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200110#include "src/core/lib/iomgr/timer.h"
David Garcia Quintas01291502017-02-07 13:26:41 -0800111#include "src/core/lib/slice/slice_hash_table.h"
Craig Tiller18b4ba32016-11-09 15:23:42 -0800112#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -0700113#include "src/core/lib/slice/slice_string_helpers.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200114#include "src/core/lib/support/backoff.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700115#include "src/core/lib/support/string.h"
116#include "src/core/lib/surface/call.h"
117#include "src/core/lib/surface/channel.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -0700118#include "src/core/lib/surface/channel_init.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700119#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700120
David Garcia Quintas1edfb952016-11-22 17:15:34 -0800121#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
122#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
123#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
124#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
125#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200126
ncteisen06bce6e2017-07-10 07:58:49 -0700127grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700128
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700129/* add lb_token of selected subchannel (address) to the call's initial
130 * metadata */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800131static grpc_error *initial_metadata_add_lb_token(
132 grpc_exec_ctx *exec_ctx, grpc_metadata_batch *initial_metadata,
133 grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem lb_token) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700134 GPR_ASSERT(lb_token_mdelem_storage != NULL);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800135 GPR_ASSERT(!GRPC_MDISNULL(lb_token));
136 return grpc_metadata_batch_add_tail(exec_ctx, initial_metadata,
137 lb_token_mdelem_storage, lb_token);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700138}
139
Mark D. Roth09e458c2017-05-02 08:13:26 -0700140static void destroy_client_stats(void *arg) {
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700141 grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats *)arg);
Mark D. Roth09e458c2017-05-02 08:13:26 -0700142}
143
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700144typedef struct wrapped_rr_closure_arg {
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700145 /* the closure instance using this struct as argument */
146 grpc_closure wrapper_closure;
147
David Garcia Quintas43339842016-07-18 12:56:09 -0700148 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
149 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700150 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700151
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700152 /* the pick's initial metadata, kept in order to append the LB token for the
153 * pick */
154 grpc_metadata_batch *initial_metadata;
155
156 /* the picked target, used to determine which LB token to add to the pick's
157 * initial metadata */
158 grpc_connected_subchannel **target;
159
Mark D. Roth09e458c2017-05-02 08:13:26 -0700160 /* the context to be populated for the subchannel call */
161 grpc_call_context_element *context;
162
163 /* Stats for client-side load reporting. Note that this holds a
164 * reference, which must be either passed on via context or unreffed. */
165 grpc_grpclb_client_stats *client_stats;
166
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700167 /* the LB token associated with the pick */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800168 grpc_mdelem lb_token;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700169
170 /* storage for the lb token initial metadata mdelem */
171 grpc_linked_mdelem *lb_token_mdelem_storage;
172
David Garcia Quintas43339842016-07-18 12:56:09 -0700173 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700174 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700175
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700176 /* heap memory to be freed upon closure execution. */
177 void *free_when_done;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700178} wrapped_rr_closure_arg;
179
180/* The \a on_complete closure passed as part of the pick requires keeping a
181 * reference to its associated round robin instance. We wrap this closure in
182 * order to unref the round robin instance upon its invocation */
183static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700184 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700185 wrapped_rr_closure_arg *wc_arg = (wrapped_rr_closure_arg *)arg;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700186
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200187 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
ncteisen969b46e2017-06-08 14:57:11 -0700188 GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200189
190 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas6493a732016-11-22 10:25:52 -0800191 /* if *target is NULL, no pick has been made by the RR policy (eg, all
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700192 * addresses failed to connect). There won't be any user_data/token
193 * available */
David Garcia Quintas6493a732016-11-22 10:25:52 -0800194 if (*wc_arg->target != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800195 if (!GRPC_MDISNULL(wc_arg->lb_token)) {
196 initial_metadata_add_lb_token(exec_ctx, wc_arg->initial_metadata,
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800197 wc_arg->lb_token_mdelem_storage,
198 GRPC_MDELEM_REF(wc_arg->lb_token));
199 } else {
200 gpr_log(GPR_ERROR,
201 "No LB token for connected subchannel pick %p (from RR "
202 "instance %p).",
203 (void *)*wc_arg->target, (void *)wc_arg->rr_policy);
204 abort();
205 }
Mark D. Roth09e458c2017-05-02 08:13:26 -0700206 // Pass on client stats via context. Passes ownership of the reference.
207 GPR_ASSERT(wc_arg->client_stats != NULL);
208 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
209 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
210 } else {
211 grpc_grpclb_client_stats_unref(wc_arg->client_stats);
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700212 }
Craig Tiller84f75d42017-05-03 13:06:35 -0700213 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800214 gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200215 }
216 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700217 }
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700218 GPR_ASSERT(wc_arg->free_when_done != NULL);
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700219 gpr_free(wc_arg->free_when_done);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700220}
221
David Garcia Quintasea11d162016-07-14 17:27:28 -0700222/* Linked list of pending pick requests. It stores all information needed to
223 * eventually call (Round Robin's) pick() on them. They mainly stay pending
224 * waiting for the RR policy to be created/updated.
225 *
226 * One particularity is the wrapping of the user-provided \a on_complete closure
227 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
228 * order to correctly unref the RR policy instance upon completion of the pick.
229 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700230typedef struct pending_pick {
231 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700232
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700233 /* original pick()'s arguments */
234 grpc_lb_policy_pick_args pick_args;
David Garcia Quintas43339842016-07-18 12:56:09 -0700235
236 /* output argument where to store the pick()ed connected subchannel, or NULL
237 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700238 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700239
David Garcia Quintas43339842016-07-18 12:56:09 -0700240 /* args for wrapped_on_complete */
241 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700242} pending_pick;
243
David Garcia Quintas8aace512016-08-15 14:55:12 -0700244static void add_pending_pick(pending_pick **root,
245 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas65318262016-07-29 13:43:38 -0700246 grpc_connected_subchannel **target,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700247 grpc_call_context_element *context,
David Garcia Quintas65318262016-07-29 13:43:38 -0700248 grpc_closure *on_complete) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700249 pending_pick *pp = (pending_pick *)gpr_zalloc(sizeof(*pp));
David Garcia Quintas65318262016-07-29 13:43:38 -0700250 pp->next = *root;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700251 pp->pick_args = *pick_args;
David Garcia Quintas65318262016-07-29 13:43:38 -0700252 pp->target = target;
David Garcia Quintas65318262016-07-29 13:43:38 -0700253 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700254 pp->wrapped_on_complete_arg.target = target;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700255 pp->wrapped_on_complete_arg.context = context;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700256 pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
257 pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
258 pick_args->lb_token_mdelem_storage;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700259 pp->wrapped_on_complete_arg.free_when_done = pp;
ncteisen969b46e2017-06-08 14:57:11 -0700260 GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -0800261 wrapped_rr_closure, &pp->wrapped_on_complete_arg,
262 grpc_schedule_on_exec_ctx);
David Garcia Quintas65318262016-07-29 13:43:38 -0700263 *root = pp;
264}
265
David Garcia Quintasea11d162016-07-14 17:27:28 -0700266/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700267typedef struct pending_ping {
268 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700269
David Garcia Quintas43339842016-07-18 12:56:09 -0700270 /* args for wrapped_notify */
271 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700272} pending_ping;
273
David Garcia Quintas65318262016-07-29 13:43:38 -0700274static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700275 pending_ping *pping = (pending_ping *)gpr_zalloc(sizeof(*pping));
David Garcia Quintas65318262016-07-29 13:43:38 -0700276 pping->wrapped_notify_arg.wrapped_closure = notify;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700277 pping->wrapped_notify_arg.free_when_done = pping;
David Garcia Quintas65318262016-07-29 13:43:38 -0700278 pping->next = *root;
ncteisen969b46e2017-06-08 14:57:11 -0700279 GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -0800280 wrapped_rr_closure, &pping->wrapped_notify_arg,
281 grpc_schedule_on_exec_ctx);
David Garcia Quintas65318262016-07-29 13:43:38 -0700282 *root = pping;
283}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700284
David Garcia Quintas8d489112016-07-29 15:20:42 -0700285/*
286 * glb_lb_policy
287 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700288typedef struct rr_connectivity_data rr_connectivity_data;
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700289
David Garcia Quintas65318262016-07-29 13:43:38 -0700290typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700291 /** base policy: must be first */
292 grpc_lb_policy base;
293
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700294 /** who the client is trying to communicate with */
Mark D. Rothd1604af2016-09-22 11:20:27 -0700295 const char *server_name;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700296 grpc_client_channel_factory *cc_factory;
Mark D. Roth046cf762016-09-26 11:13:51 -0700297 grpc_channel_args *args;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700298
Mark D. Roth64d922a2017-05-03 12:52:04 -0700299 /** timeout in milliseconds for the LB call. 0 means no deadline. */
300 int lb_call_timeout_ms;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700301
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700302 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700303 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700304
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700305 /** response generator to inject address updates into \a lb_channel */
306 grpc_fake_resolver_response_generator *response_generator;
307
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700308 /** the RR policy to use of the backend servers returned by the LB server */
309 grpc_lb_policy *rr_policy;
310
311 bool started_picking;
312
313 /** our connectivity state tracker */
314 grpc_connectivity_state_tracker state_tracker;
315
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700316 /** connectivity state of the LB channel */
317 grpc_connectivity_state lb_channel_connectivity;
318
David Garcia Quintasea11d162016-07-14 17:27:28 -0700319 /** stores the deserialized response from the LB. May be NULL until one such
320 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700321 grpc_grpclb_serverlist *serverlist;
322
Mark D. Rothd7389b42017-05-17 12:22:17 -0700323 /** Index into serverlist for next pick.
324 * If the server at this index is a drop, we return a drop.
325 * Otherwise, we delegate to the RR policy. */
326 size_t serverlist_index;
327
David Garcia Quintasea11d162016-07-14 17:27:28 -0700328 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700329 pending_pick *pending_picks;
330
David Garcia Quintasea11d162016-07-14 17:27:28 -0700331 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700332 pending_ping *pending_pings;
333
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200334 bool shutting_down;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700335
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700336 /** are we currently updating lb_call? */
337 bool updating_lb_call;
338
339 /** are we currently updating lb_channel? */
340 bool updating_lb_channel;
341
342 /** are we already watching the LB channel's connectivity? */
343 bool watching_lb_channel;
344
345 /** is \a lb_call_retry_timer active? */
346 bool retry_timer_active;
347
348 /** called upon changes to the LB channel's connectivity. */
349 grpc_closure lb_channel_on_connectivity_changed;
350
351 /** args from the latest update received while already updating, or NULL */
352 grpc_lb_policy_args *pending_update_args;
353
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200354 /************************************************************/
355 /* client data associated with the LB server communication */
356 /************************************************************/
Mark D. Roth09e458c2017-05-02 08:13:26 -0700357 /* Finished sending initial request. */
358 grpc_closure lb_on_sent_initial_request;
359
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100360 /* Status from the LB server has been received. This signals the end of the LB
361 * call. */
362 grpc_closure lb_on_server_status_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200363
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100364 /* A response from the LB server has been received. Process it */
365 grpc_closure lb_on_response_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200366
Masood Malekghassemib5b43722017-01-05 15:07:26 -0800367 /* LB call retry timer callback. */
368 grpc_closure lb_on_call_retry;
369
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200370 grpc_call *lb_call; /* streaming call to the LB server, */
371
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100372 grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
373 grpc_metadata_array
374 lb_trailing_metadata_recv; /* trailing MD from LB server */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200375
376 /* what's being sent to the LB server. Note that its value may vary if the LB
377 * server indicates a redirect. */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100378 grpc_byte_buffer *lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200379
David Garcia Quintas246c5642016-11-01 11:16:52 -0700380 /* response the LB server, if any. Processed in lb_on_response_received() */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100381 grpc_byte_buffer *lb_response_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200382
David Garcia Quintas246c5642016-11-01 11:16:52 -0700383 /* call status code and details, set in lb_on_server_status_received() */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200384 grpc_status_code lb_call_status;
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800385 grpc_slice lb_call_status_details;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200386
387 /** LB call retry backoff state */
388 gpr_backoff lb_call_backoff_state;
389
390 /** LB call retry timer */
391 grpc_timer lb_call_retry_timer;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700392
393 bool initial_request_sent;
394 bool seen_initial_response;
395
396 /* Stats for client-side load reporting. Should be unreffed and
397 * recreated whenever lb_call is replaced. */
398 grpc_grpclb_client_stats *client_stats;
399 /* Interval and timer for next client load report. */
400 gpr_timespec client_stats_report_interval;
401 grpc_timer client_load_report_timer;
402 bool client_load_report_timer_pending;
403 bool last_client_load_report_counters_were_zero;
404 /* Closure used for either the load report timer or the callback for
405 * completion of sending the load report. */
406 grpc_closure client_load_report_closure;
407 /* Client load report message payload. */
408 grpc_byte_buffer *client_load_report_payload;
David Garcia Quintas65318262016-07-29 13:43:38 -0700409} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700410
David Garcia Quintas65318262016-07-29 13:43:38 -0700411/* Keeps track and reacts to changes in connectivity of the RR instance */
412struct rr_connectivity_data {
413 grpc_closure on_change;
414 grpc_connectivity_state state;
415 glb_lb_policy *glb_policy;
416};
David Garcia Quintas8d489112016-07-29 15:20:42 -0700417
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700418static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
419 bool log) {
Mark D. Rothe7751802017-07-27 12:31:45 -0700420 if (server->drop) return false;
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700421 const grpc_grpclb_ip_address *ip = &server->ip_address;
422 if (server->port >> 16 != 0) {
423 if (log) {
424 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200425 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
426 server->port, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700427 }
428 return false;
429 }
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700430 if (ip->size != 4 && ip->size != 16) {
431 if (log) {
432 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200433 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700434 "serverlist. Ignoring",
Jan Tattermusch2b398082016-10-07 14:40:30 +0200435 ip->size, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700436 }
437 return false;
438 }
439 return true;
440}
441
Mark D. Roth16883a32016-10-21 10:30:58 -0700442/* vtable for LB tokens in grpc_lb_addresses. */
Mark D. Roth557c9902016-10-24 11:12:05 -0700443static void *lb_token_copy(void *token) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800444 return token == NULL
445 ? NULL
446 : (void *)GRPC_MDELEM_REF((grpc_mdelem){(uintptr_t)token}).payload;
Mark D. Roth16883a32016-10-21 10:30:58 -0700447}
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800448static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800449 if (token != NULL) {
450 GRPC_MDELEM_UNREF(exec_ctx, (grpc_mdelem){(uintptr_t)token});
451 }
Mark D. Roth16883a32016-10-21 10:30:58 -0700452}
Mark D. Roth557c9902016-10-24 11:12:05 -0700453static int lb_token_cmp(void *token1, void *token2) {
Mark D. Roth16883a32016-10-21 10:30:58 -0700454 if (token1 > token2) return 1;
455 if (token1 < token2) return -1;
456 return 0;
457}
458static const grpc_lb_user_data_vtable lb_token_vtable = {
459 lb_token_copy, lb_token_destroy, lb_token_cmp};
460
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100461static void parse_server(const grpc_grpclb_server *server,
462 grpc_resolved_address *addr) {
Mark D. Rothd7389b42017-05-17 12:22:17 -0700463 memset(addr, 0, sizeof(*addr));
Mark D. Rothe7751802017-07-27 12:31:45 -0700464 if (server->drop) return;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100465 const uint16_t netorder_port = htons((uint16_t)server->port);
466 /* the addresses are given in binary format (a in(6)_addr struct) in
467 * server->ip_address.bytes. */
468 const grpc_grpclb_ip_address *ip = &server->ip_address;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100469 if (ip->size == 4) {
470 addr->len = sizeof(struct sockaddr_in);
471 struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr;
472 addr4->sin_family = AF_INET;
473 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
474 addr4->sin_port = netorder_port;
475 } else if (ip->size == 16) {
476 addr->len = sizeof(struct sockaddr_in6);
477 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr;
David Garcia Quintas107ca162016-11-02 18:17:03 -0700478 addr6->sin6_family = AF_INET6;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100479 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
480 addr6->sin6_port = netorder_port;
481 }
482}
483
Mark D. Roth7ce14d22016-09-16 13:03:46 -0700484/* Returns addresses extracted from \a serverlist. */
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800485static grpc_lb_addresses *process_serverlist_locked(
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800486 grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700487 size_t num_valid = 0;
488 /* first pass: count how many are valid in order to allocate the necessary
489 * memory in a single block */
490 for (size_t i = 0; i < serverlist->num_servers; ++i) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700491 if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700492 }
Mark D. Roth16883a32016-10-21 10:30:58 -0700493 grpc_lb_addresses *lb_addresses =
494 grpc_lb_addresses_create(num_valid, &lb_token_vtable);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700495 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700496 * to the outside world) to be read by the RR policy during its creation.
497 * Given that the validity tests are very cheap, they are performed again
498 * instead of marking the valid ones during the first pass, as this would
499 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700500 size_t addr_idx = 0;
501 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700502 const grpc_grpclb_server *server = serverlist->servers[sl_idx];
503 if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700504 GPR_ASSERT(addr_idx < num_valid);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700505 /* address processing */
Mark D. Rothc5c38782016-09-16 08:51:01 -0700506 grpc_resolved_address addr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100507 parse_server(server, &addr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700508 /* lb token processing */
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700509 void *user_data;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700510 if (server->has_load_balance_token) {
David Garcia Quintas0baf1dc2016-10-28 04:44:01 +0200511 const size_t lb_token_max_length =
512 GPR_ARRAY_SIZE(server->load_balance_token);
513 const size_t lb_token_length =
514 strnlen(server->load_balance_token, lb_token_max_length);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800515 grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
516 server->load_balance_token, lb_token_length);
517 user_data = (void *)grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_LB_TOKEN,
518 lb_token_mdstr)
519 .payload;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700520 } else {
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800521 char *uri = grpc_sockaddr_to_uri(&addr);
522 gpr_log(GPR_INFO,
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700523 "Missing LB token for backend address '%s'. The empty token will "
524 "be used instead",
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800525 uri);
526 gpr_free(uri);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800527 user_data = (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700528 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700529
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700530 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
531 false /* is_balancer */,
Mark D. Rothc5c38782016-09-16 08:51:01 -0700532 NULL /* balancer_name */, user_data);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700533 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700534 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700535 GPR_ASSERT(addr_idx == num_valid);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700536 return lb_addresses;
537}
538
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700539static void update_lb_connectivity_status_locked(
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800540 grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700541 grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800542 const grpc_connectivity_state curr_glb_state =
543 grpc_connectivity_state_check(&glb_policy->state_tracker);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800544
545 /* The new connectivity status is a function of the previous one and the new
546 * input coming from the status of the RR policy.
547 *
David Garcia Quintas4283a262016-11-18 10:43:56 -0800548 * current state (grpclb's)
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800549 * |
550 * v || I | C | R | TF | SD | <- new state (RR's)
551 * ===++====+=====+=====+======+======+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800552 * I || I | C | R | [I] | [I] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800553 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800554 * C || I | C | R | [C] | [C] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800555 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800556 * R || I | C | R | [R] | [R] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800557 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800558 * TF || I | C | R | [TF] | [TF] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800559 * ---++----+-----+-----+------+------+
560 * SD || NA | NA | NA | NA | NA | (*)
561 * ---++----+-----+-----+------+------+
562 *
David Garcia Quintas4283a262016-11-18 10:43:56 -0800563 * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
564 * is the current state of grpclb, which is left untouched.
565 *
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800566 * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
567 * the previous RR instance.
568 *
569 * Note that the status is never updated to SHUTDOWN as a result of calling
570 * this function. Only glb_shutdown() has the power to set that state.
571 *
572 * (*) This function mustn't be called during shutting down. */
573 GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
574
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700575 switch (rr_state) {
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800576 case GRPC_CHANNEL_TRANSIENT_FAILURE:
577 case GRPC_CHANNEL_SHUTDOWN:
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700578 GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
579 break;
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800580 case GRPC_CHANNEL_INIT:
581 case GRPC_CHANNEL_IDLE:
582 case GRPC_CHANNEL_CONNECTING:
583 case GRPC_CHANNEL_READY:
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700584 GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800585 }
586
Craig Tiller84f75d42017-05-03 13:06:35 -0700587 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700588 gpr_log(
589 GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.",
590 grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800591 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700592 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state,
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700593 rr_state_error,
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800594 "update_lb_connectivity_status_locked");
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800595}
596
Mark D. Rothd7389b42017-05-17 12:22:17 -0700597/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
598 * immediately (ignoring its completion callback), we need to perform the
599 * cleanups this callback would otherwise be resposible for.
600 * If \a force_async is true, then we will manually schedule the
601 * completion callback even if the pick is available immediately. */
David Garcia Quintas20359062016-10-15 15:22:51 -0700602static bool pick_from_internal_rr_locked(
Mark D. Rothd7389b42017-05-17 12:22:17 -0700603 grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
604 const grpc_lb_policy_pick_args *pick_args, bool force_async,
David Garcia Quintas20359062016-10-15 15:22:51 -0700605 grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
Ken Payson01e83b52017-09-13 19:45:07 -0700606 // Look at the index into the serverlist to see if we should drop this call.
607 grpc_grpclb_server *server =
608 glb_policy->serverlist->servers[glb_policy->serverlist_index++];
609 if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
610 glb_policy->serverlist_index = 0; // Wrap-around.
611 }
612 if (server->drop) {
613 // Not using the RR policy, so unref it.
614 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
615 gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
616 (intptr_t)wc_arg->rr_policy);
Mark D. Rothd7389b42017-05-17 12:22:17 -0700617 }
Ken Payson01e83b52017-09-13 19:45:07 -0700618 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
619 // Update client load reporting stats to indicate the number of
620 // dropped calls. Note that we have to do this here instead of in
621 // the client_load_reporting filter, because we do not create a
622 // subchannel call (and therefore no client_load_reporting filter)
623 // for dropped calls.
624 grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
625 wc_arg->client_stats);
626 grpc_grpclb_client_stats_unref(wc_arg->client_stats);
627 if (force_async) {
628 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
629 GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
Mark D. Rothd7389b42017-05-17 12:22:17 -0700630 gpr_free(wc_arg->free_when_done);
Ken Payson01e83b52017-09-13 19:45:07 -0700631 return false;
Mark D. Rothd7389b42017-05-17 12:22:17 -0700632 }
Ken Payson01e83b52017-09-13 19:45:07 -0700633 gpr_free(wc_arg->free_when_done);
634 return true;
Mark D. Rothd7389b42017-05-17 12:22:17 -0700635 }
636 // Pick via the RR policy.
Craig Tiller2400bf52017-02-09 16:25:19 -0800637 const bool pick_done = grpc_lb_policy_pick_locked(
Mark D. Rothd7389b42017-05-17 12:22:17 -0700638 exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700639 (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
David Garcia Quintas20359062016-10-15 15:22:51 -0700640 if (pick_done) {
641 /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
Craig Tiller84f75d42017-05-03 13:06:35 -0700642 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas20359062016-10-15 15:22:51 -0700643 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
644 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700645 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200646 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
David Garcia Quintas20359062016-10-15 15:22:51 -0700647 /* add the load reporting initial metadata */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800648 initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata,
David Garcia Quintas20359062016-10-15 15:22:51 -0700649 pick_args->lb_token_mdelem_storage,
650 GRPC_MDELEM_REF(wc_arg->lb_token));
Mark D. Roth09e458c2017-05-02 08:13:26 -0700651 // Pass on client stats via context. Passes ownership of the reference.
652 GPR_ASSERT(wc_arg->client_stats != NULL);
653 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
654 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
Mark D. Rothd7389b42017-05-17 12:22:17 -0700655 if (force_async) {
656 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
ncteisen969b46e2017-06-08 14:57:11 -0700657 GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
Mark D. Rothd7389b42017-05-17 12:22:17 -0700658 gpr_free(wc_arg->free_when_done);
659 return false;
660 }
Mark D. Roth09e458c2017-05-02 08:13:26 -0700661 gpr_free(wc_arg->free_when_done);
David Garcia Quintas20359062016-10-15 15:22:51 -0700662 }
663 /* else, the pending pick will be registered and taken care of by the
664 * pending pick list inside the RR policy (glb_policy->rr_policy).
665 * Eventually, wrapped_on_complete will be called, which will -among other
666 * things- add the LB token to the call's initial metadata */
David Garcia Quintas20359062016-10-15 15:22:51 -0700667 return pick_done;
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700668}
669
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700670static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
671 glb_lb_policy *glb_policy) {
Ken Payson01e83b52017-09-13 19:45:07 -0700672 grpc_lb_addresses *addresses =
673 process_serverlist_locked(exec_ctx, glb_policy->serverlist);
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700674 GPR_ASSERT(addresses != NULL);
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700675 grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args));
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700676 args->client_channel_factory = glb_policy->cc_factory;
677 args->combiner = glb_policy->base.combiner;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700678 // Replace the LB addresses in the channel args that we pass down to
679 // the subchannel.
Mark D. Roth557c9902016-10-24 11:12:05 -0700680 static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200681 const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700682 args->args = grpc_channel_args_copy_and_add_and_remove(
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700683 glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
684 1);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800685 grpc_lb_addresses_destroy(exec_ctx, addresses);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700686 return args;
687}
688
689static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx,
690 grpc_lb_policy_args *args) {
691 grpc_channel_args_destroy(exec_ctx, args->args);
692 gpr_free(args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700693}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700694
Craig Tiller2400bf52017-02-09 16:25:19 -0800695static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
696 void *arg, grpc_error *error);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700697static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
698 grpc_lb_policy_args *args) {
699 GPR_ASSERT(glb_policy->rr_policy == NULL);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800700
David Garcia Quintas4283a262016-11-18 10:43:56 -0800701 grpc_lb_policy *new_rr_policy =
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700702 grpc_lb_policy_create(exec_ctx, "round_robin", args);
David Garcia Quintas4283a262016-11-18 10:43:56 -0800703 if (new_rr_policy == NULL) {
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800704 gpr_log(GPR_ERROR,
705 "Failure creating a RoundRobin policy for serverlist update with "
706 "%lu entries. The previous RR instance (%p), if any, will continue "
707 "to be used. Future updates from the LB will attempt to create new "
708 "instances.",
709 (unsigned long)glb_policy->serverlist->num_servers,
David Garcia Quintas4283a262016-11-18 10:43:56 -0800710 (void *)glb_policy->rr_policy);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800711 return;
David Garcia Quintas65318262016-07-29 13:43:38 -0700712 }
David Garcia Quintas4283a262016-11-18 10:43:56 -0800713 glb_policy->rr_policy = new_rr_policy;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700714 grpc_error *rr_state_error = NULL;
715 const grpc_connectivity_state rr_state =
716 grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
717 &rr_state_error);
718 /* Connectivity state is a function of the RR policy updated/created */
719 update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state,
720 rr_state_error);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800721 /* Add the gRPC LB's interested_parties pollset_set to that of the newly
722 * created RR policy. This will make the RR policy progress upon activity on
723 * gRPC LB, which in turn is tied to the application's call */
Yuchen Zengb4291642016-09-01 19:17:14 -0700724 grpc_pollset_set_add_pollset_set(exec_ctx,
725 glb_policy->rr_policy->interested_parties,
726 glb_policy->base.interested_parties);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200727
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800728 /* Allocate the data for the tracking of the new RR policy's connectivity.
729 * It'll be deallocated in glb_rr_connectivity_changed() */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200730 rr_connectivity_data *rr_connectivity =
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700731 (rr_connectivity_data *)gpr_zalloc(sizeof(rr_connectivity_data));
ncteisen969b46e2017-06-08 14:57:11 -0700732 GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
Craig Tiller2400bf52017-02-09 16:25:19 -0800733 glb_rr_connectivity_changed_locked, rr_connectivity,
Craig Tilleree4b1452017-05-12 10:56:03 -0700734 grpc_combiner_scheduler(glb_policy->base.combiner));
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200735 rr_connectivity->glb_policy = glb_policy;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700736 rr_connectivity->state = rr_state;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200737
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800738 /* Subscribe to changes to the connectivity of the new RR */
David Garcia Quintasfc950fb2017-07-27 19:41:12 -0700739 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "glb_rr_connectivity_cb");
Craig Tiller2400bf52017-02-09 16:25:19 -0800740 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
741 &rr_connectivity->state,
742 &rr_connectivity->on_change);
743 grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700744
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800745 /* Update picks and pings in wait */
David Garcia Quintas65318262016-07-29 13:43:38 -0700746 pending_pick *pp;
747 while ((pp = glb_policy->pending_picks)) {
748 glb_policy->pending_picks = pp->next;
749 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
750 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700751 pp->wrapped_on_complete_arg.client_stats =
752 grpc_grpclb_client_stats_ref(glb_policy->client_stats);
Craig Tiller84f75d42017-05-03 13:06:35 -0700753 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700754 gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p",
755 (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700756 }
Mark D. Rothd7389b42017-05-17 12:22:17 -0700757 pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args,
758 true /* force_async */, pp->target,
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700759 &pp->wrapped_on_complete_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700760 }
761
762 pending_ping *pping;
763 while ((pping = glb_policy->pending_pings)) {
764 glb_policy->pending_pings = pping->next;
765 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
766 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
Craig Tiller84f75d42017-05-03 13:06:35 -0700767 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700768 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
769 (intptr_t)glb_policy->rr_policy);
770 }
Craig Tiller2400bf52017-02-09 16:25:19 -0800771 grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
772 &pping->wrapped_notify_arg.wrapper_closure);
David Garcia Quintas65318262016-07-29 13:43:38 -0700773 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700774}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700775
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700776/* glb_policy->rr_policy may be NULL (initial handover) */
777static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
778 glb_lb_policy *glb_policy) {
Ken Payson01e83b52017-09-13 19:45:07 -0700779 GPR_ASSERT(glb_policy->serverlist != NULL &&
780 glb_policy->serverlist->num_servers > 0);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700781 if (glb_policy->shutting_down) return;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700782 grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700783 GPR_ASSERT(args != NULL);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700784 if (glb_policy->rr_policy != NULL) {
785 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
786 gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)",
787 (void *)glb_policy->rr_policy);
788 }
789 grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
790 } else {
791 create_rr_locked(exec_ctx, glb_policy, args);
792 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
793 gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)",
794 (void *)glb_policy->rr_policy);
795 }
796 }
797 lb_policy_args_destroy(exec_ctx, args);
798}
799
Craig Tiller2400bf52017-02-09 16:25:19 -0800800static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
801 void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700802 rr_connectivity_data *rr_connectivity = (rr_connectivity_data *)arg;
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800803 glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
David Garcia Quintasfc950fb2017-07-27 19:41:12 -0700804 if (glb_policy->shutting_down) {
David Garcia Quintas4283a262016-11-18 10:43:56 -0800805 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintasfc950fb2017-07-27 19:41:12 -0700806 "glb_rr_connectivity_cb");
807 gpr_free(rr_connectivity);
808 return;
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800809 }
David Garcia Quintasfc950fb2017-07-27 19:41:12 -0700810 if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN) {
811 /* An RR policy that has transitioned into the SHUTDOWN connectivity state
812 * should not be considered for picks or updates: the SHUTDOWN state is a
813 * sink, policies can't transition back from it. .*/
814 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy,
815 "rr_connectivity_shutdown");
816 glb_policy->rr_policy = NULL;
817 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
818 "glb_rr_connectivity_cb");
819 gpr_free(rr_connectivity);
820 return;
821 }
822 /* rr state != SHUTDOWN && !glb_policy->shutting down: biz as usual */
823 update_lb_connectivity_status_locked(
824 exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
825 /* Resubscribe. Reuse the "glb_rr_connectivity_cb" weak ref. */
826 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
827 &rr_connectivity->state,
828 &rr_connectivity->on_change);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700829}
830
David Garcia Quintas01291502017-02-07 13:26:41 -0800831static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
832 void *balancer_name) {
833 gpr_free(balancer_name);
834}
835
David Garcia Quintas01291502017-02-07 13:26:41 -0800836static grpc_slice_hash_table_entry targets_info_entry_create(
837 const char *address, const char *balancer_name) {
David Garcia Quintas01291502017-02-07 13:26:41 -0800838 grpc_slice_hash_table_entry entry;
839 entry.key = grpc_slice_from_copied_string(address);
Mark D. Rothe3006702017-04-19 07:43:56 -0700840 entry.value = gpr_strdup(balancer_name);
David Garcia Quintas01291502017-02-07 13:26:41 -0800841 return entry;
842}
843
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700844static int balancer_name_cmp_fn(void *a, void *b) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700845 const char *a_str = (const char *)a;
846 const char *b_str = (const char *)b;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700847 return strcmp(a_str, b_str);
848}
849
850/* Returns the channel args for the LB channel, used to create a bidirectional
851 * stream for the reception of load balancing updates.
David Garcia Quintas01291502017-02-07 13:26:41 -0800852 *
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700853 * Inputs:
854 * - \a addresses: corresponding to the balancers.
855 * - \a response_generator: in order to propagate updates from the resolver
856 * above the grpclb policy.
857 * - \a args: other args inherited from the grpclb policy. */
858static grpc_channel_args *build_lb_channel_args(
859 grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses,
860 grpc_fake_resolver_response_generator *response_generator,
861 const grpc_channel_args *args) {
David Garcia Quintas01291502017-02-07 13:26:41 -0800862 size_t num_grpclb_addrs = 0;
863 for (size_t i = 0; i < addresses->num_addresses; ++i) {
864 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
865 }
866 /* All input addresses come from a resolver that claims they are LB services.
867 * It's the resolver's responsibility to make sure this policy is only
868 * instantiated and used in that case. Otherwise, something has gone wrong. */
869 GPR_ASSERT(num_grpclb_addrs > 0);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700870 grpc_lb_addresses *lb_addresses =
871 grpc_lb_addresses_create(num_grpclb_addrs, NULL);
David Garcia Quintas01291502017-02-07 13:26:41 -0800872 grpc_slice_hash_table_entry *targets_info_entries =
Yash Tibrewalbc130da2017-09-12 22:44:08 -0700873 (grpc_slice_hash_table_entry *)gpr_zalloc(sizeof(*targets_info_entries) *
874 num_grpclb_addrs);
David Garcia Quintas01291502017-02-07 13:26:41 -0800875
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700876 size_t lb_addresses_idx = 0;
877 for (size_t i = 0; i < addresses->num_addresses; ++i) {
878 if (!addresses->addresses[i].is_balancer) continue;
David Garcia Quintas01291502017-02-07 13:26:41 -0800879 if (addresses->addresses[i].user_data != NULL) {
880 gpr_log(GPR_ERROR,
881 "This LB policy doesn't support user data. It will be ignored");
882 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700883 char *addr_str;
884 GPR_ASSERT(grpc_sockaddr_to_string(
885 &addr_str, &addresses->addresses[i].address, true) > 0);
886 targets_info_entries[lb_addresses_idx] = targets_info_entry_create(
887 addr_str, addresses->addresses[i].balancer_name);
888 gpr_free(addr_str);
889
890 grpc_lb_addresses_set_address(
891 lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
892 addresses->addresses[i].address.len, false /* is balancer */,
893 addresses->addresses[i].balancer_name, NULL /* user data */);
David Garcia Quintas01291502017-02-07 13:26:41 -0800894 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700895 GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
896 grpc_slice_hash_table *targets_info =
897 grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries,
898 destroy_balancer_name, balancer_name_cmp_fn);
David Garcia Quintas01291502017-02-07 13:26:41 -0800899 gpr_free(targets_info_entries);
900
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700901 grpc_channel_args *lb_channel_args =
902 grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info,
903 response_generator, args);
904
905 grpc_arg lb_channel_addresses_arg =
906 grpc_lb_addresses_create_channel_arg(lb_addresses);
907
908 grpc_channel_args *result = grpc_channel_args_copy_and_add(
909 lb_channel_args, &lb_channel_addresses_arg, 1);
910 grpc_slice_hash_table_unref(exec_ctx, targets_info);
911 grpc_channel_args_destroy(exec_ctx, lb_channel_args);
912 grpc_lb_addresses_destroy(exec_ctx, lb_addresses);
913 return result;
David Garcia Quintas01291502017-02-07 13:26:41 -0800914}
915
David Garcia Quintas65318262016-07-29 13:43:38 -0700916static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
917 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
918 GPR_ASSERT(glb_policy->pending_picks == NULL);
919 GPR_ASSERT(glb_policy->pending_pings == NULL);
Mark D. Rothd1604af2016-09-22 11:20:27 -0700920 gpr_free((void *)glb_policy->server_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800921 grpc_channel_args_destroy(exec_ctx, glb_policy->args);
Mark D. Roth09e458c2017-05-02 08:13:26 -0700922 if (glb_policy->client_stats != NULL) {
923 grpc_grpclb_client_stats_unref(glb_policy->client_stats);
924 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700925 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
926 if (glb_policy->serverlist != NULL) {
927 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
928 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700929 grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
Juanli Shen6502ecc2017-09-13 13:10:54 -0700930 grpc_subchannel_index_unref();
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700931 if (glb_policy->pending_update_args != NULL) {
932 grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
933 gpr_free(glb_policy->pending_update_args);
934 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700935 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700936}
937
Craig Tiller2400bf52017-02-09 16:25:19 -0800938static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700939 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200940 glb_policy->shutting_down = true;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700941
David Garcia Quintasa74b2462016-11-11 14:07:27 -0800942 /* We need a copy of the lb_call pointer because we can't cancell the call
943 * while holding glb_policy->mu: lb_on_server_status_received, invoked due to
944 * the cancel, needs to acquire that same lock */
945 grpc_call *lb_call = glb_policy->lb_call;
David Garcia Quintas65318262016-07-29 13:43:38 -0700946
David Garcia Quintasa74b2462016-11-11 14:07:27 -0800947 /* glb_policy->lb_call and this local lb_call must be consistent at this point
948 * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
949 * of query_for_backends_locked, which can only be invoked while
950 * glb_policy->shutting_down is false. */
951 if (lb_call != NULL) {
952 grpc_call_cancel(lb_call, NULL);
953 /* lb_on_server_status_received will pick up the cancel and clean up */
954 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700955 if (glb_policy->retry_timer_active) {
956 grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
957 glb_policy->retry_timer_active = false;
958 }
959
960 pending_pick *pp = glb_policy->pending_picks;
961 glb_policy->pending_picks = NULL;
962 pending_ping *pping = glb_policy->pending_pings;
963 glb_policy->pending_pings = NULL;
David Garcia Quintasfc950fb2017-07-27 19:41:12 -0700964 if (glb_policy->rr_policy != NULL) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700965 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
966 }
967 // We destroy the LB channel here because
968 // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
969 // instance. Destroying the lb channel in glb_destroy would likely result in
970 // a callback invocation without a valid glb_policy arg.
971 if (glb_policy->lb_channel != NULL) {
972 grpc_channel_destroy(glb_policy->lb_channel);
973 glb_policy->lb_channel = NULL;
974 }
975 grpc_connectivity_state_set(
976 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
977 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
978
David Garcia Quintas65318262016-07-29 13:43:38 -0700979 while (pp != NULL) {
980 pending_pick *next = pp->next;
981 *pp->target = NULL;
ncteisen969b46e2017-06-08 14:57:11 -0700982 GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -0800983 GRPC_ERROR_NONE);
David Garcia Quintas65318262016-07-29 13:43:38 -0700984 pp = next;
985 }
986
987 while (pping != NULL) {
988 pending_ping *next = pping->next;
ncteisen969b46e2017-06-08 14:57:11 -0700989 GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -0800990 GRPC_ERROR_NONE);
David Garcia Quintas65318262016-07-29 13:43:38 -0700991 pping = next;
992 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700993}
994
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700995// Cancel a specific pending pick.
996//
997// A grpclb pick progresses as follows:
998// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
999// handed over to the RR policy (in create_rr_locked()). From that point
1000// onwards, it'll be RR's responsibility. For cancellations, that implies the
1001// pick needs also be cancelled by the RR instance.
1002// - Otherwise, without an RR instance, picks stay pending at this policy's
1003// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
1004// we invoke the completion closure and set *target to NULL right here.
Craig Tiller2400bf52017-02-09 16:25:19 -08001005static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1006 grpc_connected_subchannel **target,
1007 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001008 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001009 pending_pick *pp = glb_policy->pending_picks;
1010 glb_policy->pending_picks = NULL;
1011 while (pp != NULL) {
1012 pending_pick *next = pp->next;
1013 if (pp->target == target) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001014 *target = NULL;
ncteisen969b46e2017-06-08 14:57:11 -07001015 GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
ncteisen4b36a3d2017-03-13 19:08:06 -07001016 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1017 "Pick Cancelled", &error, 1));
David Garcia Quintas65318262016-07-29 13:43:38 -07001018 } else {
1019 pp->next = glb_policy->pending_picks;
1020 glb_policy->pending_picks = pp;
1021 }
1022 pp = next;
1023 }
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001024 if (glb_policy->rr_policy != NULL) {
1025 grpc_lb_policy_cancel_pick_locked(exec_ctx, glb_policy->rr_policy, target,
1026 GRPC_ERROR_REF(error));
1027 }
Mark D. Roth5f844002016-09-08 08:20:53 -07001028 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -07001029}
1030
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001031// Cancel all pending picks.
1032//
1033// A grpclb pick progresses as follows:
1034// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
1035// handed over to the RR policy (in create_rr_locked()). From that point
1036// onwards, it'll be RR's responsibility. For cancellations, that implies the
1037// pick needs also be cancelled by the RR instance.
1038// - Otherwise, without an RR instance, picks stay pending at this policy's
1039// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
1040// we invoke the completion closure and set *target to NULL right here.
Craig Tiller2400bf52017-02-09 16:25:19 -08001041static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
1042 grpc_lb_policy *pol,
1043 uint32_t initial_metadata_flags_mask,
1044 uint32_t initial_metadata_flags_eq,
1045 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001046 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001047 pending_pick *pp = glb_policy->pending_picks;
1048 glb_policy->pending_picks = NULL;
1049 while (pp != NULL) {
1050 pending_pick *next = pp->next;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001051 if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
David Garcia Quintas65318262016-07-29 13:43:38 -07001052 initial_metadata_flags_eq) {
ncteisen969b46e2017-06-08 14:57:11 -07001053 GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
ncteisen4b36a3d2017-03-13 19:08:06 -07001054 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1055 "Pick Cancelled", &error, 1));
David Garcia Quintas65318262016-07-29 13:43:38 -07001056 } else {
1057 pp->next = glb_policy->pending_picks;
1058 glb_policy->pending_picks = pp;
1059 }
1060 pp = next;
1061 }
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001062 if (glb_policy->rr_policy != NULL) {
1063 grpc_lb_policy_cancel_picks_locked(
1064 exec_ctx, glb_policy->rr_policy, initial_metadata_flags_mask,
1065 initial_metadata_flags_eq, GRPC_ERROR_REF(error));
1066 }
Mark D. Rothe65ff112016-09-09 13:48:38 -07001067 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -07001068}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001069
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001070static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
1071 glb_lb_policy *glb_policy);
1072static void start_picking_locked(grpc_exec_ctx *exec_ctx,
1073 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001074 glb_policy->started_picking = true;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001075 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
1076 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001077}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001078
Craig Tiller2400bf52017-02-09 16:25:19 -08001079static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001080 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001081 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001082 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001083 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001084}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001085
Craig Tiller2400bf52017-02-09 16:25:19 -08001086static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1087 const grpc_lb_policy_pick_args *pick_args,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001088 grpc_connected_subchannel **target,
1089 grpc_call_context_element *context, void **user_data,
Craig Tiller2400bf52017-02-09 16:25:19 -08001090 grpc_closure *on_complete) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -07001091 if (pick_args->lb_token_mdelem_storage == NULL) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -07001092 *target = NULL;
ncteisen969b46e2017-06-08 14:57:11 -07001093 GRPC_CLOSURE_SCHED(exec_ctx, on_complete,
ncteisen4b36a3d2017-03-13 19:08:06 -07001094 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1095 "No mdelem storage for the LB token. Load reporting "
1096 "won't work without it. Failing"));
Mark D. Roth1e5f6af2016-10-07 08:32:58 -07001097 return 0;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -07001098 }
1099
David Garcia Quintas65318262016-07-29 13:43:38 -07001100 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001101 bool pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -07001102
1103 if (glb_policy->rr_policy != NULL) {
Craig Tiller84f75d42017-05-03 13:06:35 -07001104 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001105 gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p",
1106 (void *)glb_policy, (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001107 }
1108 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
David Garcia Quintas8aace512016-08-15 14:55:12 -07001109
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001110 wrapped_rr_closure_arg *wc_arg =
1111 (wrapped_rr_closure_arg *)gpr_zalloc(sizeof(wrapped_rr_closure_arg));
David Garcia Quintas331b9c02016-09-12 18:37:05 -07001112
ncteisen969b46e2017-06-08 14:57:11 -07001113 GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
Craig Tiller91031da2016-12-28 15:44:25 -08001114 grpc_schedule_on_exec_ctx);
David Garcia Quintas90712d52016-10-13 19:33:04 -07001115 wc_arg->rr_policy = glb_policy->rr_policy;
1116 wc_arg->target = target;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001117 wc_arg->context = context;
1118 GPR_ASSERT(glb_policy->client_stats != NULL);
1119 wc_arg->client_stats =
1120 grpc_grpclb_client_stats_ref(glb_policy->client_stats);
David Garcia Quintas90712d52016-10-13 19:33:04 -07001121 wc_arg->wrapped_closure = on_complete;
1122 wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
1123 wc_arg->initial_metadata = pick_args->initial_metadata;
David Garcia Quintas97ba6422016-10-14 13:06:45 -07001124 wc_arg->free_when_done = wc_arg;
Mark D. Rothd7389b42017-05-17 12:22:17 -07001125 pick_done =
1126 pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
1127 false /* force_async */, target, wc_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -07001128 } else {
Craig Tiller84f75d42017-05-03 13:06:35 -07001129 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001130 gpr_log(GPR_DEBUG,
1131 "No RR policy in grpclb instance %p. Adding to grpclb's pending "
1132 "picks",
1133 (void *)(glb_policy));
1134 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001135 add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
David Garcia Quintas8aace512016-08-15 14:55:12 -07001136 on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -07001137
1138 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001139 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001140 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001141 pick_done = false;
David Garcia Quintas65318262016-07-29 13:43:38 -07001142 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001143 return pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -07001144}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001145
Craig Tiller2400bf52017-02-09 16:25:19 -08001146static grpc_connectivity_state glb_check_connectivity_locked(
David Garcia Quintas65318262016-07-29 13:43:38 -07001147 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1148 grpc_error **connectivity_error) {
1149 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
Craig Tiller2400bf52017-02-09 16:25:19 -08001150 return grpc_connectivity_state_get(&glb_policy->state_tracker,
1151 connectivity_error);
David Garcia Quintas65318262016-07-29 13:43:38 -07001152}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001153
Craig Tiller2400bf52017-02-09 16:25:19 -08001154static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1155 grpc_closure *closure) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001156 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001157 if (glb_policy->rr_policy) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001158 grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure);
David Garcia Quintas65318262016-07-29 13:43:38 -07001159 } else {
1160 add_pending_ping(&glb_policy->pending_pings, closure);
1161 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001162 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001163 }
1164 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001165}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001166
Craig Tiller2400bf52017-02-09 16:25:19 -08001167static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
1168 grpc_lb_policy *pol,
1169 grpc_connectivity_state *current,
1170 grpc_closure *notify) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001171 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001172 grpc_connectivity_state_notify_on_state_change(
1173 exec_ctx, &glb_policy->state_tracker, current, notify);
David Garcia Quintas65318262016-07-29 13:43:38 -07001174}
1175
Mark D. Roth09e458c2017-05-02 08:13:26 -07001176static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
1177 grpc_error *error);
1178
1179static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
1180 glb_lb_policy *glb_policy) {
1181 const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1182 const gpr_timespec next_client_load_report_time =
1183 gpr_time_add(now, glb_policy->client_stats_report_interval);
ncteisen969b46e2017-06-08 14:57:11 -07001184 GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001185 send_client_load_report_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001186 grpc_combiner_scheduler(glb_policy->base.combiner));
Mark D. Roth09e458c2017-05-02 08:13:26 -07001187 grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer,
1188 next_client_load_report_time,
1189 &glb_policy->client_load_report_closure, now);
1190}
1191
1192static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
1193 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001194 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001195 grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
1196 glb_policy->client_load_report_payload = NULL;
1197 if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) {
1198 glb_policy->client_load_report_timer_pending = false;
1199 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1200 "client_load_report");
1201 return;
1202 }
1203 schedule_next_client_load_report(exec_ctx, glb_policy);
1204}
1205
1206static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
1207 glb_lb_policy *glb_policy) {
1208 grpc_op op;
1209 memset(&op, 0, sizeof(op));
1210 op.op = GRPC_OP_SEND_MESSAGE;
1211 op.data.send_message.send_message = glb_policy->client_load_report_payload;
ncteisen969b46e2017-06-08 14:57:11 -07001212 GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001213 client_load_report_done_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001214 grpc_combiner_scheduler(glb_policy->base.combiner));
Mark D. Roth09e458c2017-05-02 08:13:26 -07001215 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1216 exec_ctx, glb_policy->lb_call, &op, 1,
1217 &glb_policy->client_load_report_closure);
1218 GPR_ASSERT(GRPC_CALL_OK == call_error);
1219}
1220
1221static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
Mark D. Rothe7751802017-07-27 12:31:45 -07001222 grpc_grpclb_dropped_call_counts *drop_entries =
Yash Tibrewalbc130da2017-09-12 22:44:08 -07001223 (grpc_grpclb_dropped_call_counts *)
1224 request->client_stats.calls_finished_with_drop.arg;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001225 return request->client_stats.num_calls_started == 0 &&
1226 request->client_stats.num_calls_finished == 0 &&
Mark D. Roth09e458c2017-05-02 08:13:26 -07001227 request->client_stats.num_calls_finished_with_client_failed_to_send ==
1228 0 &&
Mark D. Rothe7751802017-07-27 12:31:45 -07001229 request->client_stats.num_calls_finished_known_received == 0 &&
1230 (drop_entries == NULL || drop_entries->num_entries == 0);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001231}
1232
1233static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
1234 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001235 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001236 if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) {
1237 glb_policy->client_load_report_timer_pending = false;
1238 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1239 "client_load_report");
1240 return;
1241 }
1242 // Construct message payload.
1243 GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
1244 grpc_grpclb_request *request =
Mark D. Rothe7751802017-07-27 12:31:45 -07001245 grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001246 // Skip client load report if the counters were all zero in the last
1247 // report and they are still zero in this one.
1248 if (load_report_counters_are_zero(request)) {
1249 if (glb_policy->last_client_load_report_counters_were_zero) {
1250 grpc_grpclb_request_destroy(request);
1251 schedule_next_client_load_report(exec_ctx, glb_policy);
1252 return;
1253 }
1254 glb_policy->last_client_load_report_counters_were_zero = true;
1255 } else {
1256 glb_policy->last_client_load_report_counters_were_zero = false;
1257 }
1258 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
1259 glb_policy->client_load_report_payload =
1260 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1261 grpc_slice_unref_internal(exec_ctx, request_payload_slice);
1262 grpc_grpclb_request_destroy(request);
1263 // If we've already sent the initial request, then we can go ahead and
1264 // sent the load report. Otherwise, we need to wait until the initial
1265 // request has been sent to send this
1266 // (see lb_on_sent_initial_request_locked() below).
1267 if (glb_policy->initial_request_sent) {
1268 do_send_client_load_report_locked(exec_ctx, glb_policy);
1269 }
1270}
1271
1272static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx,
1273 void *arg, grpc_error *error);
Craig Tiller2400bf52017-02-09 16:25:19 -08001274static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
1275 void *arg, grpc_error *error);
1276static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
1277 grpc_error *error);
Craig Tillerc5866662016-11-16 15:25:00 -08001278static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
1279 glb_lb_policy *glb_policy) {
David Garcia Quintas55ba14a2016-09-27 18:45:30 -07001280 GPR_ASSERT(glb_policy->server_name != NULL);
1281 GPR_ASSERT(glb_policy->server_name[0] != '\0');
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001282 GPR_ASSERT(glb_policy->lb_call == NULL);
David Garcia Quintasa74b2462016-11-11 14:07:27 -08001283 GPR_ASSERT(!glb_policy->shutting_down);
David Garcia Quintas55ba14a2016-09-27 18:45:30 -07001284
David Garcia Quintas15eba132016-08-09 15:20:48 -07001285 /* Note the following LB call progresses every time there's activity in \a
1286 * glb_policy->base.interested_parties, which is comprised of the polling
Yuchen Zengf7c45ae2016-09-15 13:40:32 -07001287 * entities from \a client_channel. */
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001288 grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name);
Mark D. Roth64d922a2017-05-03 12:52:04 -07001289 gpr_timespec deadline =
1290 glb_policy->lb_call_timeout_ms == 0
Mark D. Roth175c73b2017-05-04 08:28:05 -07001291 ? gpr_inf_future(GPR_CLOCK_MONOTONIC)
1292 : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
1293 gpr_time_from_millis(glb_policy->lb_call_timeout_ms,
1294 GPR_TIMESPAN));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001295 glb_policy->lb_call = grpc_channel_create_pollset_set_call(
Craig Tiller87a7e1f2016-11-09 09:42:19 -08001296 exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
David Garcia Quintas4543e5c2016-09-22 15:09:34 -07001297 glb_policy->base.interested_parties,
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001298 GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
Mark D. Roth64d922a2017-05-03 12:52:04 -07001299 &host, deadline, NULL);
David Garcia Quintas7fadeae2017-04-18 14:38:56 -07001300 grpc_slice_unref_internal(exec_ctx, host);
David Garcia Quintas65318262016-07-29 13:43:38 -07001301
Mark D. Roth09e458c2017-05-02 08:13:26 -07001302 if (glb_policy->client_stats != NULL) {
1303 grpc_grpclb_client_stats_unref(glb_policy->client_stats);
1304 }
1305 glb_policy->client_stats = grpc_grpclb_client_stats_create();
1306
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001307 grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
1308 grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -07001309
David Garcia Quintas55ba14a2016-09-27 18:45:30 -07001310 grpc_grpclb_request *request =
1311 grpc_grpclb_request_create(glb_policy->server_name);
Craig Tillerd41a4a72016-10-26 16:16:06 -07001312 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001313 glb_policy->lb_request_payload =
David Garcia Quintas65318262016-07-29 13:43:38 -07001314 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
Craig Tiller18b4ba32016-11-09 15:23:42 -08001315 grpc_slice_unref_internal(exec_ctx, request_payload_slice);
David Garcia Quintas65318262016-07-29 13:43:38 -07001316 grpc_grpclb_request_destroy(request);
1317
ncteisen969b46e2017-06-08 14:57:11 -07001318 GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001319 lb_on_sent_initial_request_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001320 grpc_combiner_scheduler(glb_policy->base.combiner));
ncteisen969b46e2017-06-08 14:57:11 -07001321 GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
Craig Tiller2400bf52017-02-09 16:25:19 -08001322 lb_on_server_status_received_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001323 grpc_combiner_scheduler(glb_policy->base.combiner));
ncteisen969b46e2017-06-08 14:57:11 -07001324 GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received,
Craig Tiller2400bf52017-02-09 16:25:19 -08001325 lb_on_response_received_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001326 grpc_combiner_scheduler(glb_policy->base.combiner));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001327
David Garcia Quintas1edfb952016-11-22 17:15:34 -08001328 gpr_backoff_init(&glb_policy->lb_call_backoff_state,
1329 GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
1330 GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
1331 GRPC_GRPCLB_RECONNECT_JITTER,
1332 GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
1333 GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001334
1335 glb_policy->initial_request_sent = false;
1336 glb_policy->seen_initial_response = false;
1337 glb_policy->last_client_load_report_counters_were_zero = false;
David Garcia Quintas65318262016-07-29 13:43:38 -07001338}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001339
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001340static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
1341 glb_lb_policy *glb_policy) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001342 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tillerdd36b152017-03-31 08:27:28 -07001343 grpc_call_unref(glb_policy->lb_call);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001344 glb_policy->lb_call = NULL;
David Garcia Quintas65318262016-07-29 13:43:38 -07001345
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001346 grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv);
1347 grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -07001348
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001349 grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001350 grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001351
1352 if (!glb_policy->client_load_report_timer_pending) {
1353 grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer);
1354 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001355}
1356
David Garcia Quintas8d489112016-07-29 15:20:42 -07001357/*
1358 * Auxiliary functions and LB client callbacks.
1359 */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001360static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
1361 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001362 GPR_ASSERT(glb_policy->lb_channel != NULL);
David Garcia Quintasa74b2462016-11-11 14:07:27 -08001363 if (glb_policy->shutting_down) return;
1364
Craig Tillerc5866662016-11-16 15:25:00 -08001365 lb_call_init_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001366
Craig Tiller84f75d42017-05-03 13:06:35 -07001367 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001368 gpr_log(GPR_INFO,
1369 "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)",
1370 (void *)glb_policy, (void *)glb_policy->lb_channel,
1371 (void *)glb_policy->lb_call);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001372 }
1373 GPR_ASSERT(glb_policy->lb_call != NULL);
1374
David Garcia Quintas65318262016-07-29 13:43:38 -07001375 grpc_call_error call_error;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001376 grpc_op ops[4];
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001377 memset(ops, 0, sizeof(ops));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001378
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001379 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -07001380 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1381 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001382 op->flags = 0;
1383 op->reserved = NULL;
1384 op++;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001385 op->op = GRPC_OP_RECV_INITIAL_METADATA;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001386 op->data.recv_initial_metadata.recv_initial_metadata =
1387 &glb_policy->lb_initial_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001388 op->flags = 0;
1389 op->reserved = NULL;
1390 op++;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001391 GPR_ASSERT(glb_policy->lb_request_payload != NULL);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001392 op->op = GRPC_OP_SEND_MESSAGE;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001393 op->data.send_message.send_message = glb_policy->lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001394 op->flags = 0;
1395 op->reserved = NULL;
1396 op++;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001397 /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
1398 * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001399 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
1400 "lb_on_sent_initial_request_locked");
Mark D. Roth09e458c2017-05-02 08:13:26 -07001401 call_error = grpc_call_start_batch_and_execute(
1402 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1403 &glb_policy->lb_on_sent_initial_request);
1404 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001405
Mark D. Roth09e458c2017-05-02 08:13:26 -07001406 op = ops;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001407 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1408 op->data.recv_status_on_client.trailing_metadata =
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001409 &glb_policy->lb_trailing_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001410 op->data.recv_status_on_client.status = &glb_policy->lb_call_status;
1411 op->data.recv_status_on_client.status_details =
1412 &glb_policy->lb_call_status_details;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001413 op->flags = 0;
1414 op->reserved = NULL;
1415 op++;
David Garcia Quintase224a762016-11-01 13:00:58 -07001416 /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001417 * count goes to zero) to be unref'd in lb_on_server_status_received_locked */
1418 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
1419 "lb_on_server_status_received_locked");
David Garcia Quintas65318262016-07-29 13:43:38 -07001420 call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001421 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1422 &glb_policy->lb_on_server_status_received);
David Garcia Quintas65318262016-07-29 13:43:38 -07001423 GPR_ASSERT(GRPC_CALL_OK == call_error);
1424
1425 op = ops;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001426 op->op = GRPC_OP_RECV_MESSAGE;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001427 op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001428 op->flags = 0;
1429 op->reserved = NULL;
1430 op++;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001431 /* take another weak ref to be unref'd/reused in
1432 * lb_on_response_received_locked */
1433 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked");
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001434 call_error = grpc_call_start_batch_and_execute(
1435 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1436 &glb_policy->lb_on_response_received);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001437 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001438}
1439
Mark D. Roth09e458c2017-05-02 08:13:26 -07001440static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx,
1441 void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001442 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001443 glb_policy->initial_request_sent = true;
1444 // If we attempted to send a client load report before the initial
1445 // request was sent, send the load report now.
1446 if (glb_policy->client_load_report_payload != NULL) {
1447 do_send_client_load_report_locked(exec_ctx, glb_policy);
1448 }
1449 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001450 "lb_on_sent_initial_request_locked");
Mark D. Roth09e458c2017-05-02 08:13:26 -07001451}
1452
Craig Tiller2400bf52017-02-09 16:25:19 -08001453static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
1454 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001455 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001456 grpc_op ops[2];
1457 memset(ops, 0, sizeof(ops));
1458 grpc_op *op = ops;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001459 if (glb_policy->lb_response_payload != NULL) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001460 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
David Garcia Quintas41bef452016-07-28 19:19:58 -07001461 /* Received data from the LB server. Look inside
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001462 * glb_policy->lb_response_payload, for a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001463 grpc_byte_buffer_reader bbr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001464 grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
Craig Tillerd41a4a72016-10-26 16:16:06 -07001465 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas97e17852017-08-14 14:55:02 -07001466 grpc_byte_buffer_reader_destroy(&bbr);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001467 grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001468
Mark D. Roth09e458c2017-05-02 08:13:26 -07001469 grpc_grpclb_initial_response *response = NULL;
1470 if (!glb_policy->seen_initial_response &&
1471 (response = grpc_grpclb_initial_response_parse(response_slice)) !=
1472 NULL) {
1473 if (response->has_client_stats_report_interval) {
1474 glb_policy->client_stats_report_interval =
1475 gpr_time_max(gpr_time_from_seconds(1, GPR_TIMESPAN),
1476 grpc_grpclb_duration_to_timespec(
1477 &response->client_stats_report_interval));
Craig Tiller84f75d42017-05-03 13:06:35 -07001478 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001479 gpr_log(GPR_INFO,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001480 "received initial LB response message; "
1481 "client load reporting interval = %" PRId64 ".%09d sec",
1482 glb_policy->client_stats_report_interval.tv_sec,
1483 glb_policy->client_stats_report_interval.tv_nsec);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001484 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001485 /* take a weak ref (won't prevent calling of \a glb_shutdown() if the
1486 * strong ref count goes to zero) to be unref'd in
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001487 * send_client_load_report_locked() */
Mark D. Roth09e458c2017-05-02 08:13:26 -07001488 glb_policy->client_load_report_timer_pending = true;
1489 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
1490 schedule_next_client_load_report(exec_ctx, glb_policy);
Craig Tiller84f75d42017-05-03 13:06:35 -07001491 } else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001492 gpr_log(GPR_INFO,
1493 "received initial LB response message; "
1494 "client load reporting NOT enabled");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001495 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001496 grpc_grpclb_initial_response_destroy(response);
1497 glb_policy->seen_initial_response = true;
1498 } else {
1499 grpc_grpclb_serverlist *serverlist =
1500 grpc_grpclb_response_parse_serverlist(response_slice);
1501 if (serverlist != NULL) {
1502 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tiller84f75d42017-05-03 13:06:35 -07001503 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001504 gpr_log(GPR_INFO, "Serverlist with %lu servers received",
1505 (unsigned long)serverlist->num_servers);
1506 for (size_t i = 0; i < serverlist->num_servers; ++i) {
1507 grpc_resolved_address addr;
1508 parse_server(serverlist->servers[i], &addr);
1509 char *ipport;
1510 grpc_sockaddr_to_string(&ipport, &addr, false);
1511 gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
1512 gpr_free(ipport);
1513 }
1514 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001515 /* update serverlist */
1516 if (serverlist->num_servers > 0) {
1517 if (grpc_grpclb_serverlist_equals(glb_policy->serverlist,
1518 serverlist)) {
Craig Tiller84f75d42017-05-03 13:06:35 -07001519 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001520 gpr_log(GPR_INFO,
1521 "Incoming server list identical to current, ignoring.");
1522 }
1523 grpc_grpclb_destroy_serverlist(serverlist);
1524 } else { /* new serverlist */
1525 if (glb_policy->serverlist != NULL) {
1526 /* dispose of the old serverlist */
1527 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
1528 }
1529 /* and update the copy in the glb_lb_policy instance. This
1530 * serverlist instance will be destroyed either upon the next
1531 * update or in glb_destroy() */
1532 glb_policy->serverlist = serverlist;
Mark D. Rothd7389b42017-05-17 12:22:17 -07001533 glb_policy->serverlist_index = 0;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001534 rr_handover_locked(exec_ctx, glb_policy);
1535 }
1536 } else {
Craig Tiller84f75d42017-05-03 13:06:35 -07001537 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Ken Payson01e83b52017-09-13 19:45:07 -07001538 gpr_log(GPR_INFO,
1539 "Received empty server list. Picks will stay pending until "
1540 "a response with > 0 servers is received");
Mark D. Roth09e458c2017-05-02 08:13:26 -07001541 }
1542 grpc_grpclb_destroy_serverlist(serverlist);
1543 }
1544 } else { /* serverlist == NULL */
1545 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
1546 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
1547 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001548 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001549 grpc_slice_unref_internal(exec_ctx, response_slice);
David Garcia Quintas246c5642016-11-01 11:16:52 -07001550 if (!glb_policy->shutting_down) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001551 /* keep listening for serverlist updates */
1552 op->op = GRPC_OP_RECV_MESSAGE;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001553 op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001554 op->flags = 0;
1555 op->reserved = NULL;
1556 op++;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001557 /* reuse the "lb_on_response_received_locked" weak ref taken in
David Garcia Quintase224a762016-11-01 13:00:58 -07001558 * query_for_backends_locked() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001559 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas246c5642016-11-01 11:16:52 -07001560 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1561 &glb_policy->lb_on_response_received); /* loop */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001562 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001563 }
David Garcia Quintase224a762016-11-01 13:00:58 -07001564 } else { /* empty payload: call cancelled. */
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001565 /* dispose of the "lb_on_response_received_locked" weak ref taken in
David Garcia Quintase224a762016-11-01 13:00:58 -07001566 * query_for_backends_locked() and reused in every reception loop */
1567 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001568 "lb_on_response_received_locked_empty_payload");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001569 }
1570}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001571
Craig Tiller2400bf52017-02-09 16:25:19 -08001572static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
1573 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001574 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001575 glb_policy->retry_timer_active = false;
1576 if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
Craig Tiller84f75d42017-05-03 13:06:35 -07001577 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001578 gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
1579 (void *)glb_policy);
1580 }
1581 GPR_ASSERT(glb_policy->lb_call == NULL);
1582 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001583 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001584 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001585}
1586
Craig Tiller2400bf52017-02-09 16:25:19 -08001587static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
1588 void *arg, grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001589 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001590 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tiller84f75d42017-05-03 13:06:35 -07001591 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001592 char *status_details =
1593 grpc_slice_to_c_string(glb_policy->lb_call_status_details);
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001594 gpr_log(GPR_INFO,
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001595 "Status from LB server received. Status = %d, Details = '%s', "
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001596 "(call: %p), error %p",
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001597 glb_policy->lb_call_status, status_details,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001598 (void *)glb_policy->lb_call, (void *)error);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001599 gpr_free(status_details);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001600 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001601 /* We need to perform cleanups no matter what. */
1602 lb_call_destroy_locked(exec_ctx, glb_policy);
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001603 if (glb_policy->started_picking && glb_policy->updating_lb_call) {
1604 if (glb_policy->retry_timer_active) {
1605 grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
1606 }
1607 if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy);
1608 glb_policy->updating_lb_call = false;
1609 } else if (!glb_policy->shutting_down) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001610 /* if we aren't shutting down, restart the LB client call after some time */
1611 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1612 gpr_timespec next_try =
1613 gpr_backoff_step(&glb_policy->lb_call_backoff_state, now);
Craig Tiller84f75d42017-05-03 13:06:35 -07001614 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001615 gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
1616 (void *)glb_policy);
1617 gpr_timespec timeout = gpr_time_sub(next_try, now);
1618 if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001619 gpr_log(GPR_DEBUG,
1620 "... retry_timer_active in %" PRId64 ".%09d seconds.",
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001621 timeout.tv_sec, timeout.tv_nsec);
1622 } else {
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001623 gpr_log(GPR_DEBUG, "... retry_timer_active immediately.");
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001624 }
1625 }
1626 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
ncteisen969b46e2017-06-08 14:57:11 -07001627 GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
Craig Tilleree4b1452017-05-12 10:56:03 -07001628 lb_call_on_retry_timer_locked, glb_policy,
1629 grpc_combiner_scheduler(glb_policy->base.combiner));
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001630 glb_policy->retry_timer_active = true;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001631 grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
Masood Malekghassemib5b43722017-01-05 15:07:26 -08001632 &glb_policy->lb_on_call_retry, now);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001633 }
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001634 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001635 "lb_on_server_status_received_locked");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001636}
1637
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001638static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
1639 const grpc_lb_policy_args *args) {
1640 glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001641 if (glb_policy->updating_lb_channel) {
1642 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
1643 gpr_log(GPR_INFO,
1644 "Update already in progress for grpclb %p. Deferring update.",
1645 (void *)glb_policy);
1646 }
1647 if (glb_policy->pending_update_args != NULL) {
1648 grpc_channel_args_destroy(exec_ctx,
1649 glb_policy->pending_update_args->args);
1650 gpr_free(glb_policy->pending_update_args);
1651 }
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001652 glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc(
1653 sizeof(*glb_policy->pending_update_args));
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001654 glb_policy->pending_update_args->client_channel_factory =
1655 args->client_channel_factory;
1656 glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args);
1657 glb_policy->pending_update_args->combiner = args->combiner;
1658 return;
1659 }
1660
1661 glb_policy->updating_lb_channel = true;
1662 // Propagate update to lb_channel (pick first).
1663 const grpc_arg *arg =
1664 grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
1665 if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
1666 if (glb_policy->lb_channel == NULL) {
1667 // If we don't have a current channel to the LB, go into TRANSIENT
1668 // FAILURE.
1669 grpc_connectivity_state_set(
1670 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
1671 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
1672 "glb_update_missing");
1673 } else {
1674 // otherwise, keep using the current LB channel (ignore this update).
1675 gpr_log(GPR_ERROR,
1676 "No valid LB addresses channel arg for grpclb %p update, "
1677 "ignoring.",
1678 (void *)glb_policy);
1679 }
1680 }
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001681 const grpc_lb_addresses *addresses =
1682 (const grpc_lb_addresses *)arg->value.pointer.p;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001683 GPR_ASSERT(glb_policy->lb_channel != NULL);
1684 grpc_channel_args *lb_channel_args = build_lb_channel_args(
1685 exec_ctx, addresses, glb_policy->response_generator, args->args);
1686 /* Propagate updates to the LB channel through the fake resolver */
1687 grpc_fake_resolver_response_generator_set_response(
1688 exec_ctx, glb_policy->response_generator, lb_channel_args);
1689 grpc_channel_args_destroy(exec_ctx, lb_channel_args);
1690
1691 if (!glb_policy->watching_lb_channel) {
1692 // Watch the LB channel connectivity for connection.
David Garcia Quintas6a7935e2017-07-27 19:24:52 -07001693 glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state(
1694 glb_policy->lb_channel, true /* try to connect */);
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001695 grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
1696 grpc_channel_get_channel_stack(glb_policy->lb_channel));
1697 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1698 glb_policy->watching_lb_channel = true;
1699 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
1700 grpc_client_channel_watch_connectivity_state(
1701 exec_ctx, client_channel_elem,
1702 grpc_polling_entity_create_from_pollset_set(
1703 glb_policy->base.interested_parties),
1704 &glb_policy->lb_channel_connectivity,
1705 &glb_policy->lb_channel_on_connectivity_changed, NULL);
1706 }
1707}
1708
1709// Invoked as part of the update process. It continues watching the LB channel
1710// until it shuts down or becomes READY. It's invoked even if the LB channel
1711// stayed READY throughout the update (for example if the update is identical).
1712static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
1713 void *arg,
1714 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -07001715 glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001716 if (glb_policy->shutting_down) goto done;
1717 // Re-initialize the lb_call. This should also take care of updating the
1718 // embedded RR policy. Note that the current RR policy, if any, will stay in
1719 // effect until an update from the new lb_call is received.
1720 switch (glb_policy->lb_channel_connectivity) {
1721 case GRPC_CHANNEL_INIT:
1722 case GRPC_CHANNEL_CONNECTING:
1723 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1724 /* resub. */
1725 grpc_channel_element *client_channel_elem =
1726 grpc_channel_stack_last_element(
1727 grpc_channel_get_channel_stack(glb_policy->lb_channel));
1728 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1729 grpc_client_channel_watch_connectivity_state(
1730 exec_ctx, client_channel_elem,
1731 grpc_polling_entity_create_from_pollset_set(
1732 glb_policy->base.interested_parties),
1733 &glb_policy->lb_channel_connectivity,
1734 &glb_policy->lb_channel_on_connectivity_changed, NULL);
1735 break;
1736 }
1737 case GRPC_CHANNEL_IDLE:
1738 // lb channel inactive (probably shutdown prior to update). Restart lb
1739 // call to kick the lb channel into gear.
1740 GPR_ASSERT(glb_policy->lb_call == NULL);
1741 /* fallthrough */
1742 case GRPC_CHANNEL_READY:
1743 if (glb_policy->lb_call != NULL) {
1744 glb_policy->updating_lb_channel = false;
1745 glb_policy->updating_lb_call = true;
1746 grpc_call_cancel(glb_policy->lb_call, NULL);
1747 // lb_on_server_status_received will pick up the cancel and reinit
1748 // lb_call.
1749 if (glb_policy->pending_update_args != NULL) {
David Garcia Quintasae5e83b2017-07-18 16:11:00 -07001750 grpc_lb_policy_args *args = glb_policy->pending_update_args;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001751 glb_policy->pending_update_args = NULL;
1752 glb_update_locked(exec_ctx, &glb_policy->base, args);
David Garcia Quintasae5e83b2017-07-18 16:11:00 -07001753 grpc_channel_args_destroy(exec_ctx, args->args);
1754 gpr_free(args);
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001755 }
1756 } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
1757 if (glb_policy->retry_timer_active) {
1758 grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
1759 glb_policy->retry_timer_active = false;
1760 }
1761 start_picking_locked(exec_ctx, glb_policy);
1762 }
1763 /* fallthrough */
1764 case GRPC_CHANNEL_SHUTDOWN:
1765 done:
1766 glb_policy->watching_lb_channel = false;
1767 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1768 "watch_lb_channel_connectivity_cb_shutdown");
1769 break;
1770 }
1771}
1772
David Garcia Quintas8d489112016-07-29 15:20:42 -07001773/* Code wiring the policy with the rest of the core */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001774static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
Craig Tiller2400bf52017-02-09 16:25:19 -08001775 glb_destroy,
1776 glb_shutdown_locked,
1777 glb_pick_locked,
1778 glb_cancel_pick_locked,
1779 glb_cancel_picks_locked,
1780 glb_ping_one_locked,
1781 glb_exit_idle_locked,
1782 glb_check_connectivity_locked,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001783 glb_notify_on_state_change_locked,
1784 glb_update_locked};
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001785
Yash Tibrewala4952202017-09-13 10:53:28 -07001786static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
1787 grpc_lb_policy_factory *factory,
1788 grpc_lb_policy_args *args) {
Yash Tibrewalc086c7e2017-09-14 10:05:46 -07001789 /* Count the number of gRPC-LB addresses. There must be at least one.
1790 * TODO(roth): For now, we ignore non-balancer addresses, but in the
1791 * future, we may change the behavior such that we fall back to using
1792 * the non-balancer addresses if we cannot reach any balancers. In the
1793 * fallback case, we should use the LB policy indicated by
1794 * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
1795 * unset, we should default to pick_first). */
Yash Tibrewala4952202017-09-13 10:53:28 -07001796 const grpc_arg *arg =
1797 grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
1798 if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
1799 return NULL;
1800 }
1801 grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
1802 size_t num_grpclb_addrs = 0;
1803 for (size_t i = 0; i < addresses->num_addresses; ++i) {
1804 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
1805 }
1806 if (num_grpclb_addrs == 0) return NULL;
1807
1808 glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
1809
1810 /* Get server name. */
1811 arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
1812 GPR_ASSERT(arg != NULL);
1813 GPR_ASSERT(arg->type == GRPC_ARG_STRING);
1814 grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
1815 GPR_ASSERT(uri->path[0] != '\0');
1816 glb_policy->server_name =
1817 gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1818 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
1819 gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
1820 glb_policy->server_name);
1821 }
1822 grpc_uri_destroy(uri);
1823
1824 glb_policy->cc_factory = args->client_channel_factory;
1825 GPR_ASSERT(glb_policy->cc_factory != NULL);
1826
1827 arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1828 glb_policy->lb_call_timeout_ms =
1829 grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
1830
1831 // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1832 // since we use this to trigger the client_load_reporting filter.
1833 grpc_arg new_arg =
1834 grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
1835 static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1836 glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
1837 args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1838
1839 /* Create a client channel over them to communicate with a LB service */
1840 glb_policy->response_generator =
1841 grpc_fake_resolver_response_generator_create();
1842 grpc_channel_args *lb_channel_args = build_lb_channel_args(
1843 exec_ctx, addresses, glb_policy->response_generator, args->args);
1844 char *uri_str;
1845 gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
1846 glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
1847 exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
1848
1849 /* Propagate initial resolution */
1850 grpc_fake_resolver_response_generator_set_response(
1851 exec_ctx, glb_policy->response_generator, lb_channel_args);
1852 grpc_channel_args_destroy(exec_ctx, lb_channel_args);
1853 gpr_free(uri_str);
1854 if (glb_policy->lb_channel == NULL) {
1855 gpr_free((void *)glb_policy->server_name);
1856 grpc_channel_args_destroy(exec_ctx, glb_policy->args);
1857 gpr_free(glb_policy);
1858 return NULL;
1859 }
Ken Payson9fa10cc2017-09-14 11:49:52 -07001860 grpc_subchannel_index_ref();
Yash Tibrewala4952202017-09-13 10:53:28 -07001861 GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
1862 glb_lb_channel_on_connectivity_changed_cb, glb_policy,
1863 grpc_combiner_scheduler(args->combiner));
1864 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
1865 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
1866 "grpclb");
1867 return &glb_policy->base;
1868}
1869
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001870static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
1871
1872static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
1873
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001874static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
1875 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
1876
1877static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
1878
1879grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
1880 return &glb_lb_policy_factory;
1881}
1882
1883/* Plugin registration */
Mark D. Roth09e458c2017-05-02 08:13:26 -07001884
1885// Only add client_load_reporting filter if the grpclb LB policy is used.
1886static bool maybe_add_client_load_reporting_filter(
1887 grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) {
1888 const grpc_channel_args *args =
1889 grpc_channel_stack_builder_get_channel_arguments(builder);
1890 const grpc_arg *channel_arg =
1891 grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1892 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_STRING &&
1893 strcmp(channel_arg->value.string, "grpclb") == 0) {
1894 return grpc_channel_stack_builder_append_filter(
1895 builder, (const grpc_channel_filter *)arg, NULL, NULL);
1896 }
1897 return true;
1898}
1899
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001900void grpc_lb_policy_grpclb_init() {
1901 grpc_register_lb_policy(grpc_glb_lb_factory_create());
ncteisen06bce6e2017-07-10 07:58:49 -07001902 grpc_register_tracer(&grpc_lb_glb_trace);
ncteisen4b584052017-06-08 16:44:38 -07001903#ifndef NDEBUG
ncteisen06bce6e2017-07-10 07:58:49 -07001904 grpc_register_tracer(&grpc_trace_lb_policy_refcount);
ncteisen4b584052017-06-08 16:44:38 -07001905#endif
Mark D. Roth09e458c2017-05-02 08:13:26 -07001906 grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1907 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1908 maybe_add_client_load_reporting_filter,
1909 (void *)&grpc_client_load_reporting_filter);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001910}
1911
1912void grpc_lb_policy_grpclb_shutdown() {}