blob: f248a873ba9dfb401f51c0135e5ddd84840e7f8c [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2016 gRPC authors.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070016 *
17 */
18
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070019/** Implementation of the gRPC LB policy.
20 *
David Garcia Quintas43339842016-07-18 12:56:09 -070021 * This policy takes as input a set of resolved addresses {a1..an} for which the
22 * LB set was set (it's the resolver's responsibility to ensure this). That is
23 * to say, {a1..an} represent a collection of LB servers.
24 *
25 * An internal channel (\a glb_lb_policy.lb_channel) is created over {a1..an}.
26 * This channel behaves just like a regular channel. In particular, the
27 * constructed URI over the addresses a1..an will use the default pick first
28 * policy to select from this list of LB server backends.
29 *
David Garcia Quintas41bef452016-07-28 19:19:58 -070030 * The first time the policy gets a request for a pick, a ping, or to exit the
David Garcia Quintas98da61b2016-10-29 08:46:31 +020031 * idle state, \a query_for_backends_locked() is called. This function sets up
32 * and initiates the internal communication with the LB server. In particular,
33 * it's responsible for instantiating the internal *streaming* call to the LB
34 * server (whichever address from {a1..an} pick-first chose). This call is
David Garcia Quintas7ec29132016-11-01 04:09:05 +010035 * serviced by two callbacks, \a lb_on_server_status_received and \a
36 * lb_on_response_received. The former will be called when the call to the LB
37 * server completes. This can happen if the LB server closes the connection or
38 * if this policy itself cancels the call (for example because it's shutting
David Garcia Quintas246c5642016-11-01 11:16:52 -070039 * down). If the internal call times out, the usual behavior of pick-first
David Garcia Quintas7ec29132016-11-01 04:09:05 +010040 * applies, continuing to pick from the list {a1..an}.
David Garcia Quintas43339842016-07-18 12:56:09 -070041 *
David Garcia Quintas98da61b2016-10-29 08:46:31 +020042 * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a
43 * res_recv. An invalid one results in the termination of the streaming call. A
44 * new streaming call should be created if possible, failing the original call
45 * otherwise. For a valid \a LoadBalancingResponse, the server list of actual
46 * backends is extracted. A Round Robin policy will be created from this list.
47 * There are two possible scenarios:
David Garcia Quintas43339842016-07-18 12:56:09 -070048 *
49 * 1. This is the first server list received. There was no previous instance of
David Garcia Quintas90712d52016-10-13 19:33:04 -070050 * the Round Robin policy. \a rr_handover_locked() will instantiate the RR
51 * policy and perform all the pending operations over it.
David Garcia Quintas43339842016-07-18 12:56:09 -070052 * 2. There's already a RR policy instance active. We need to introduce the new
53 * one build from the new serverlist, but taking care not to disrupt the
54 * operations in progress over the old RR instance. This is done by
55 * decreasing the reference count on the old policy. The moment no more
56 * references are held on the old RR policy, it'll be destroyed and \a
David Garcia Quintas348cfdb2016-08-19 12:19:43 -070057 * glb_rr_connectivity_changed notified with a \a GRPC_CHANNEL_SHUTDOWN
58 * state. At this point we can transition to a new RR instance safely, which
David Garcia Quintas90712d52016-10-13 19:33:04 -070059 * is done once again via \a rr_handover_locked().
David Garcia Quintas43339842016-07-18 12:56:09 -070060 *
61 *
62 * Once a RR policy instance is in place (and getting updated as described),
63 * calls to for a pick, a ping or a cancellation will be serviced right away by
64 * forwarding them to the RR instance. Any time there's no RR policy available
David Garcia Quintas7ec29132016-11-01 04:09:05 +010065 * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is
66 * received, etc), pick/ping requests are added to a list of pending picks/pings
67 * to be flushed and serviced as part of \a rr_handover_locked() the moment the
68 * RR policy instance becomes available.
David Garcia Quintas43339842016-07-18 12:56:09 -070069 *
70 * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
71 * high level design and details. */
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070072
73/* TODO(dgq):
74 * - Implement LB service forwarding (point 2c. in the doc's diagram).
75 */
76
murgatroid99085f9af2016-10-24 09:55:44 -070077/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
78 using that endpoint. Because of various transitive includes in uv.h,
79 including windows.h on Windows, uv.h must be included before other system
80 headers. Therefore, sockaddr.h must always be included first */
murgatroid997871f732016-09-23 13:49:05 -070081#include "src/core/lib/iomgr/sockaddr.h"
82
Mark D. Roth64d922a2017-05-03 12:52:04 -070083#include <limits.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070084#include <string.h>
85
86#include <grpc/byte_buffer_reader.h>
87#include <grpc/grpc.h>
88#include <grpc/support/alloc.h>
89#include <grpc/support/host_port.h>
90#include <grpc/support/string_util.h>
David Garcia Quintas69099222016-10-03 11:28:37 -070091#include <grpc/support/time.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070092
Craig Tiller9eb0fde2017-03-31 16:59:30 -070093#include "src/core/ext/filters/client_channel/client_channel.h"
94#include "src/core/ext/filters/client_channel/client_channel_factory.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070095#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070096#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
97#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070098#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070099#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
Craig Tillerd52e22f2017-04-02 16:22:52 -0700100#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
101#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
102#include "src/core/ext/filters/client_channel/parse_address.h"
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700103#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
Mark D. Roth046cf762016-09-26 11:13:51 -0700104#include "src/core/lib/channel/channel_args.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -0700105#include "src/core/lib/channel/channel_stack.h"
Craig Tiller2400bf52017-02-09 16:25:19 -0800106#include "src/core/lib/iomgr/combiner.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200107#include "src/core/lib/iomgr/sockaddr.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700108#include "src/core/lib/iomgr/sockaddr_utils.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200109#include "src/core/lib/iomgr/timer.h"
David Garcia Quintas01291502017-02-07 13:26:41 -0800110#include "src/core/lib/slice/slice_hash_table.h"
Craig Tiller18b4ba32016-11-09 15:23:42 -0800111#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -0700112#include "src/core/lib/slice/slice_string_helpers.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200113#include "src/core/lib/support/backoff.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700114#include "src/core/lib/support/string.h"
115#include "src/core/lib/surface/call.h"
116#include "src/core/lib/surface/channel.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -0700117#include "src/core/lib/surface/channel_init.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700118#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700119
David Garcia Quintas1edfb952016-11-22 17:15:34 -0800120#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
121#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
122#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
123#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
124#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200125
ncteisen06bce6e2017-07-10 07:58:49 -0700126grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700127
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700128/* add lb_token of selected subchannel (address) to the call's initial
129 * metadata */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800130static grpc_error *initial_metadata_add_lb_token(
131 grpc_exec_ctx *exec_ctx, grpc_metadata_batch *initial_metadata,
132 grpc_linked_mdelem *lb_token_mdelem_storage, grpc_mdelem lb_token) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700133 GPR_ASSERT(lb_token_mdelem_storage != NULL);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800134 GPR_ASSERT(!GRPC_MDISNULL(lb_token));
135 return grpc_metadata_batch_add_tail(exec_ctx, initial_metadata,
136 lb_token_mdelem_storage, lb_token);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700137}
138
Mark D. Roth09e458c2017-05-02 08:13:26 -0700139static void destroy_client_stats(void *arg) {
140 grpc_grpclb_client_stats_unref(arg);
141}
142
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700143typedef struct wrapped_rr_closure_arg {
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700144 /* the closure instance using this struct as argument */
145 grpc_closure wrapper_closure;
146
David Garcia Quintas43339842016-07-18 12:56:09 -0700147 /* the original closure. Usually a on_complete/notify cb for pick() and ping()
148 * calls against the internal RR instance, respectively. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700149 grpc_closure *wrapped_closure;
David Garcia Quintas43339842016-07-18 12:56:09 -0700150
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700151 /* the pick's initial metadata, kept in order to append the LB token for the
152 * pick */
153 grpc_metadata_batch *initial_metadata;
154
155 /* the picked target, used to determine which LB token to add to the pick's
156 * initial metadata */
157 grpc_connected_subchannel **target;
158
Mark D. Roth09e458c2017-05-02 08:13:26 -0700159 /* the context to be populated for the subchannel call */
160 grpc_call_context_element *context;
161
162 /* Stats for client-side load reporting. Note that this holds a
163 * reference, which must be either passed on via context or unreffed. */
164 grpc_grpclb_client_stats *client_stats;
165
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700166 /* the LB token associated with the pick */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800167 grpc_mdelem lb_token;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700168
169 /* storage for the lb token initial metadata mdelem */
170 grpc_linked_mdelem *lb_token_mdelem_storage;
171
David Garcia Quintas43339842016-07-18 12:56:09 -0700172 /* The RR instance related to the closure */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700173 grpc_lb_policy *rr_policy;
David Garcia Quintas43339842016-07-18 12:56:09 -0700174
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700175 /* heap memory to be freed upon closure execution. */
176 void *free_when_done;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700177} wrapped_rr_closure_arg;
178
179/* The \a on_complete closure passed as part of the pick requires keeping a
180 * reference to its associated round robin instance. We wrap this closure in
181 * order to unref the round robin instance upon its invocation */
182static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
David Garcia Quintas280fd2a2016-06-20 22:04:48 -0700183 grpc_error *error) {
David Garcia Quintas43339842016-07-18 12:56:09 -0700184 wrapped_rr_closure_arg *wc_arg = arg;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700185
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200186 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
ncteisen969b46e2017-06-08 14:57:11 -0700187 GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200188
189 if (wc_arg->rr_policy != NULL) {
David Garcia Quintas6493a732016-11-22 10:25:52 -0800190 /* if *target is NULL, no pick has been made by the RR policy (eg, all
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700191 * addresses failed to connect). There won't be any user_data/token
192 * available */
David Garcia Quintas6493a732016-11-22 10:25:52 -0800193 if (*wc_arg->target != NULL) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800194 if (!GRPC_MDISNULL(wc_arg->lb_token)) {
195 initial_metadata_add_lb_token(exec_ctx, wc_arg->initial_metadata,
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800196 wc_arg->lb_token_mdelem_storage,
197 GRPC_MDELEM_REF(wc_arg->lb_token));
198 } else {
199 gpr_log(GPR_ERROR,
200 "No LB token for connected subchannel pick %p (from RR "
201 "instance %p).",
202 (void *)*wc_arg->target, (void *)wc_arg->rr_policy);
203 abort();
204 }
Mark D. Roth09e458c2017-05-02 08:13:26 -0700205 // Pass on client stats via context. Passes ownership of the reference.
206 GPR_ASSERT(wc_arg->client_stats != NULL);
207 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
208 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
209 } else {
210 grpc_grpclb_client_stats_unref(wc_arg->client_stats);
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700211 }
Craig Tiller84f75d42017-05-03 13:06:35 -0700212 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800213 gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200214 }
215 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700216 }
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700217 GPR_ASSERT(wc_arg->free_when_done != NULL);
David Garcia Quintas97ba6422016-10-14 13:06:45 -0700218 gpr_free(wc_arg->free_when_done);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700219}
220
David Garcia Quintasea11d162016-07-14 17:27:28 -0700221/* Linked list of pending pick requests. It stores all information needed to
222 * eventually call (Round Robin's) pick() on them. They mainly stay pending
223 * waiting for the RR policy to be created/updated.
224 *
225 * One particularity is the wrapping of the user-provided \a on_complete closure
226 * (in \a wrapped_on_complete and \a wrapped_on_complete_arg). This is needed in
227 * order to correctly unref the RR policy instance upon completion of the pick.
228 * See \a wrapped_rr_closure for details. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700229typedef struct pending_pick {
230 struct pending_pick *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700231
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700232 /* original pick()'s arguments */
233 grpc_lb_policy_pick_args pick_args;
David Garcia Quintas43339842016-07-18 12:56:09 -0700234
235 /* output argument where to store the pick()ed connected subchannel, or NULL
236 * upon error. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700237 grpc_connected_subchannel **target;
David Garcia Quintas43339842016-07-18 12:56:09 -0700238
David Garcia Quintas43339842016-07-18 12:56:09 -0700239 /* args for wrapped_on_complete */
240 wrapped_rr_closure_arg wrapped_on_complete_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700241} pending_pick;
242
David Garcia Quintas8aace512016-08-15 14:55:12 -0700243static void add_pending_pick(pending_pick **root,
244 const grpc_lb_policy_pick_args *pick_args,
David Garcia Quintas65318262016-07-29 13:43:38 -0700245 grpc_connected_subchannel **target,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700246 grpc_call_context_element *context,
David Garcia Quintas65318262016-07-29 13:43:38 -0700247 grpc_closure *on_complete) {
Craig Tiller6f417882017-02-16 14:09:39 -0800248 pending_pick *pp = gpr_zalloc(sizeof(*pp));
David Garcia Quintas65318262016-07-29 13:43:38 -0700249 pp->next = *root;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700250 pp->pick_args = *pick_args;
David Garcia Quintas65318262016-07-29 13:43:38 -0700251 pp->target = target;
David Garcia Quintas65318262016-07-29 13:43:38 -0700252 pp->wrapped_on_complete_arg.wrapped_closure = on_complete;
David Garcia Quintas5bb7b9c2016-09-15 23:46:32 -0700253 pp->wrapped_on_complete_arg.target = target;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700254 pp->wrapped_on_complete_arg.context = context;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700255 pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata;
256 pp->wrapped_on_complete_arg.lb_token_mdelem_storage =
257 pick_args->lb_token_mdelem_storage;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700258 pp->wrapped_on_complete_arg.free_when_done = pp;
ncteisen969b46e2017-06-08 14:57:11 -0700259 GRPC_CLOSURE_INIT(&pp->wrapped_on_complete_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -0800260 wrapped_rr_closure, &pp->wrapped_on_complete_arg,
261 grpc_schedule_on_exec_ctx);
David Garcia Quintas65318262016-07-29 13:43:38 -0700262 *root = pp;
263}
264
David Garcia Quintasea11d162016-07-14 17:27:28 -0700265/* Same as the \a pending_pick struct but for ping operations */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700266typedef struct pending_ping {
267 struct pending_ping *next;
David Garcia Quintas43339842016-07-18 12:56:09 -0700268
David Garcia Quintas43339842016-07-18 12:56:09 -0700269 /* args for wrapped_notify */
270 wrapped_rr_closure_arg wrapped_notify_arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700271} pending_ping;
272
David Garcia Quintas65318262016-07-29 13:43:38 -0700273static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
Craig Tiller6f417882017-02-16 14:09:39 -0800274 pending_ping *pping = gpr_zalloc(sizeof(*pping));
David Garcia Quintas65318262016-07-29 13:43:38 -0700275 pping->wrapped_notify_arg.wrapped_closure = notify;
David Garcia Quintasb39330d2016-10-14 13:35:56 -0700276 pping->wrapped_notify_arg.free_when_done = pping;
David Garcia Quintas65318262016-07-29 13:43:38 -0700277 pping->next = *root;
ncteisen969b46e2017-06-08 14:57:11 -0700278 GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -0800279 wrapped_rr_closure, &pping->wrapped_notify_arg,
280 grpc_schedule_on_exec_ctx);
David Garcia Quintas65318262016-07-29 13:43:38 -0700281 *root = pping;
282}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700283
David Garcia Quintas8d489112016-07-29 15:20:42 -0700284/*
285 * glb_lb_policy
286 */
David Garcia Quintas65318262016-07-29 13:43:38 -0700287typedef struct rr_connectivity_data rr_connectivity_data;
David Garcia Quintas65318262016-07-29 13:43:38 -0700288static const grpc_lb_policy_vtable glb_lb_policy_vtable;
289typedef struct glb_lb_policy {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700290 /** base policy: must be first */
291 grpc_lb_policy base;
292
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700293 /** who the client is trying to communicate with */
Mark D. Rothd1604af2016-09-22 11:20:27 -0700294 const char *server_name;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700295 grpc_client_channel_factory *cc_factory;
Mark D. Roth046cf762016-09-26 11:13:51 -0700296 grpc_channel_args *args;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700297
Mark D. Roth64d922a2017-05-03 12:52:04 -0700298 /** timeout in milliseconds for the LB call. 0 means no deadline. */
299 int lb_call_timeout_ms;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -0700300
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700301 /** for communicating with the LB server */
David Garcia Quintasea11d162016-07-14 17:27:28 -0700302 grpc_channel *lb_channel;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700303
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700304 /** response generator to inject address updates into \a lb_channel */
305 grpc_fake_resolver_response_generator *response_generator;
306
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700307 /** the RR policy to use of the backend servers returned by the LB server */
308 grpc_lb_policy *rr_policy;
309
310 bool started_picking;
311
312 /** our connectivity state tracker */
313 grpc_connectivity_state_tracker state_tracker;
314
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700315 /** connectivity state of the LB channel */
316 grpc_connectivity_state lb_channel_connectivity;
317
David Garcia Quintasea11d162016-07-14 17:27:28 -0700318 /** stores the deserialized response from the LB. May be NULL until one such
319 * response has arrived. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700320 grpc_grpclb_serverlist *serverlist;
321
Mark D. Rothd7389b42017-05-17 12:22:17 -0700322 /** Index into serverlist for next pick.
323 * If the server at this index is a drop, we return a drop.
324 * Otherwise, we delegate to the RR policy. */
325 size_t serverlist_index;
326
David Garcia Quintasea11d162016-07-14 17:27:28 -0700327 /** list of picks that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700328 pending_pick *pending_picks;
329
David Garcia Quintasea11d162016-07-14 17:27:28 -0700330 /** list of pings that are waiting on RR's policy connectivity */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700331 pending_ping *pending_pings;
332
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200333 bool shutting_down;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700334
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700335 /** are we currently updating lb_call? */
336 bool updating_lb_call;
337
338 /** are we currently updating lb_channel? */
339 bool updating_lb_channel;
340
341 /** are we already watching the LB channel's connectivity? */
342 bool watching_lb_channel;
343
344 /** is \a lb_call_retry_timer active? */
345 bool retry_timer_active;
346
347 /** called upon changes to the LB channel's connectivity. */
348 grpc_closure lb_channel_on_connectivity_changed;
349
350 /** args from the latest update received while already updating, or NULL */
351 grpc_lb_policy_args *pending_update_args;
352
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200353 /************************************************************/
354 /* client data associated with the LB server communication */
355 /************************************************************/
Mark D. Roth09e458c2017-05-02 08:13:26 -0700356 /* Finished sending initial request. */
357 grpc_closure lb_on_sent_initial_request;
358
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100359 /* Status from the LB server has been received. This signals the end of the LB
360 * call. */
361 grpc_closure lb_on_server_status_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200362
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100363 /* A response from the LB server has been received. Process it */
364 grpc_closure lb_on_response_received;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200365
Masood Malekghassemib5b43722017-01-05 15:07:26 -0800366 /* LB call retry timer callback. */
367 grpc_closure lb_on_call_retry;
368
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200369 grpc_call *lb_call; /* streaming call to the LB server, */
370
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100371 grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */
372 grpc_metadata_array
373 lb_trailing_metadata_recv; /* trailing MD from LB server */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200374
375 /* what's being sent to the LB server. Note that its value may vary if the LB
376 * server indicates a redirect. */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100377 grpc_byte_buffer *lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200378
David Garcia Quintas246c5642016-11-01 11:16:52 -0700379 /* response the LB server, if any. Processed in lb_on_response_received() */
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100380 grpc_byte_buffer *lb_response_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200381
David Garcia Quintas246c5642016-11-01 11:16:52 -0700382 /* call status code and details, set in lb_on_server_status_received() */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200383 grpc_status_code lb_call_status;
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800384 grpc_slice lb_call_status_details;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200385
386 /** LB call retry backoff state */
387 gpr_backoff lb_call_backoff_state;
388
389 /** LB call retry timer */
390 grpc_timer lb_call_retry_timer;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700391
392 bool initial_request_sent;
393 bool seen_initial_response;
394
395 /* Stats for client-side load reporting. Should be unreffed and
396 * recreated whenever lb_call is replaced. */
397 grpc_grpclb_client_stats *client_stats;
398 /* Interval and timer for next client load report. */
399 gpr_timespec client_stats_report_interval;
400 grpc_timer client_load_report_timer;
401 bool client_load_report_timer_pending;
402 bool last_client_load_report_counters_were_zero;
403 /* Closure used for either the load report timer or the callback for
404 * completion of sending the load report. */
405 grpc_closure client_load_report_closure;
406 /* Client load report message payload. */
407 grpc_byte_buffer *client_load_report_payload;
David Garcia Quintas65318262016-07-29 13:43:38 -0700408} glb_lb_policy;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700409
David Garcia Quintas65318262016-07-29 13:43:38 -0700410/* Keeps track and reacts to changes in connectivity of the RR instance */
411struct rr_connectivity_data {
412 grpc_closure on_change;
413 grpc_connectivity_state state;
414 glb_lb_policy *glb_policy;
415};
David Garcia Quintas8d489112016-07-29 15:20:42 -0700416
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700417static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
418 bool log) {
Mark D. Rothe7751802017-07-27 12:31:45 -0700419 if (server->drop) return false;
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700420 const grpc_grpclb_ip_address *ip = &server->ip_address;
421 if (server->port >> 16 != 0) {
422 if (log) {
423 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200424 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
425 server->port, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700426 }
427 return false;
428 }
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700429 if (ip->size != 4 && ip->size != 16) {
430 if (log) {
431 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200432 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700433 "serverlist. Ignoring",
Jan Tattermusch2b398082016-10-07 14:40:30 +0200434 ip->size, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700435 }
436 return false;
437 }
438 return true;
439}
440
Mark D. Roth16883a32016-10-21 10:30:58 -0700441/* vtable for LB tokens in grpc_lb_addresses. */
Mark D. Roth557c9902016-10-24 11:12:05 -0700442static void *lb_token_copy(void *token) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800443 return token == NULL
444 ? NULL
445 : (void *)GRPC_MDELEM_REF((grpc_mdelem){(uintptr_t)token}).payload;
Mark D. Roth16883a32016-10-21 10:30:58 -0700446}
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800447static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800448 if (token != NULL) {
449 GRPC_MDELEM_UNREF(exec_ctx, (grpc_mdelem){(uintptr_t)token});
450 }
Mark D. Roth16883a32016-10-21 10:30:58 -0700451}
Mark D. Roth557c9902016-10-24 11:12:05 -0700452static int lb_token_cmp(void *token1, void *token2) {
Mark D. Roth16883a32016-10-21 10:30:58 -0700453 if (token1 > token2) return 1;
454 if (token1 < token2) return -1;
455 return 0;
456}
457static const grpc_lb_user_data_vtable lb_token_vtable = {
458 lb_token_copy, lb_token_destroy, lb_token_cmp};
459
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100460static void parse_server(const grpc_grpclb_server *server,
461 grpc_resolved_address *addr) {
Mark D. Rothd7389b42017-05-17 12:22:17 -0700462 memset(addr, 0, sizeof(*addr));
Mark D. Rothe7751802017-07-27 12:31:45 -0700463 if (server->drop) return;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100464 const uint16_t netorder_port = htons((uint16_t)server->port);
465 /* the addresses are given in binary format (a in(6)_addr struct) in
466 * server->ip_address.bytes. */
467 const grpc_grpclb_ip_address *ip = &server->ip_address;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100468 if (ip->size == 4) {
469 addr->len = sizeof(struct sockaddr_in);
470 struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr;
471 addr4->sin_family = AF_INET;
472 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
473 addr4->sin_port = netorder_port;
474 } else if (ip->size == 16) {
475 addr->len = sizeof(struct sockaddr_in6);
476 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr;
David Garcia Quintas107ca162016-11-02 18:17:03 -0700477 addr6->sin6_family = AF_INET6;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100478 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
479 addr6->sin6_port = netorder_port;
480 }
481}
482
Mark D. Roth7ce14d22016-09-16 13:03:46 -0700483/* Returns addresses extracted from \a serverlist. */
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800484static grpc_lb_addresses *process_serverlist_locked(
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800485 grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700486 size_t num_valid = 0;
487 /* first pass: count how many are valid in order to allocate the necessary
488 * memory in a single block */
489 for (size_t i = 0; i < serverlist->num_servers; ++i) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700490 if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700491 }
Mark D. Roth16883a32016-10-21 10:30:58 -0700492 grpc_lb_addresses *lb_addresses =
493 grpc_lb_addresses_create(num_valid, &lb_token_vtable);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700494 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700495 * to the outside world) to be read by the RR policy during its creation.
496 * Given that the validity tests are very cheap, they are performed again
497 * instead of marking the valid ones during the first pass, as this would
498 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700499 size_t addr_idx = 0;
500 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700501 const grpc_grpclb_server *server = serverlist->servers[sl_idx];
502 if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700503 GPR_ASSERT(addr_idx < num_valid);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700504 /* address processing */
Mark D. Rothc5c38782016-09-16 08:51:01 -0700505 grpc_resolved_address addr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100506 parse_server(server, &addr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700507 /* lb token processing */
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700508 void *user_data;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700509 if (server->has_load_balance_token) {
David Garcia Quintas0baf1dc2016-10-28 04:44:01 +0200510 const size_t lb_token_max_length =
511 GPR_ARRAY_SIZE(server->load_balance_token);
512 const size_t lb_token_length =
513 strnlen(server->load_balance_token, lb_token_max_length);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800514 grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
515 server->load_balance_token, lb_token_length);
516 user_data = (void *)grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_LB_TOKEN,
517 lb_token_mdstr)
518 .payload;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700519 } else {
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800520 char *uri = grpc_sockaddr_to_uri(&addr);
521 gpr_log(GPR_INFO,
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700522 "Missing LB token for backend address '%s'. The empty token will "
523 "be used instead",
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800524 uri);
525 gpr_free(uri);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800526 user_data = (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700527 }
Mark D. Rothc5c38782016-09-16 08:51:01 -0700528
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700529 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
530 false /* is_balancer */,
Mark D. Rothc5c38782016-09-16 08:51:01 -0700531 NULL /* balancer_name */, user_data);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700532 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700533 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700534 GPR_ASSERT(addr_idx == num_valid);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700535 return lb_addresses;
536}
537
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700538static void update_lb_connectivity_status_locked(
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800539 grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700540 grpc_connectivity_state rr_state, grpc_error *rr_state_error) {
Craig Tiller613dafa2017-02-09 12:00:43 -0800541 const grpc_connectivity_state curr_glb_state =
542 grpc_connectivity_state_check(&glb_policy->state_tracker);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800543
544 /* The new connectivity status is a function of the previous one and the new
545 * input coming from the status of the RR policy.
546 *
David Garcia Quintas4283a262016-11-18 10:43:56 -0800547 * current state (grpclb's)
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800548 * |
549 * v || I | C | R | TF | SD | <- new state (RR's)
550 * ===++====+=====+=====+======+======+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800551 * I || I | C | R | [I] | [I] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800552 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800553 * C || I | C | R | [C] | [C] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800554 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800555 * R || I | C | R | [R] | [R] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800556 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -0800557 * TF || I | C | R | [TF] | [TF] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800558 * ---++----+-----+-----+------+------+
559 * SD || NA | NA | NA | NA | NA | (*)
560 * ---++----+-----+-----+------+------+
561 *
David Garcia Quintas4283a262016-11-18 10:43:56 -0800562 * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
563 * is the current state of grpclb, which is left untouched.
564 *
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800565 * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
566 * the previous RR instance.
567 *
568 * Note that the status is never updated to SHUTDOWN as a result of calling
569 * this function. Only glb_shutdown() has the power to set that state.
570 *
571 * (*) This function mustn't be called during shutting down. */
572 GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
573
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700574 switch (rr_state) {
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800575 case GRPC_CHANNEL_TRANSIENT_FAILURE:
576 case GRPC_CHANNEL_SHUTDOWN:
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700577 GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
578 break;
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800579 case GRPC_CHANNEL_INIT:
580 case GRPC_CHANNEL_IDLE:
581 case GRPC_CHANNEL_CONNECTING:
582 case GRPC_CHANNEL_READY:
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700583 GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800584 }
585
Craig Tiller84f75d42017-05-03 13:06:35 -0700586 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700587 gpr_log(
588 GPR_INFO, "Setting grpclb's state to %s from new RR policy %p state.",
589 grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800590 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700591 grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state,
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700592 rr_state_error,
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800593 "update_lb_connectivity_status_locked");
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800594}
595
Mark D. Rothd7389b42017-05-17 12:22:17 -0700596/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
597 * immediately (ignoring its completion callback), we need to perform the
598 * cleanups this callback would otherwise be resposible for.
599 * If \a force_async is true, then we will manually schedule the
600 * completion callback even if the pick is available immediately. */
David Garcia Quintas20359062016-10-15 15:22:51 -0700601static bool pick_from_internal_rr_locked(
Mark D. Rothd7389b42017-05-17 12:22:17 -0700602 grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
603 const grpc_lb_policy_pick_args *pick_args, bool force_async,
David Garcia Quintas20359062016-10-15 15:22:51 -0700604 grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) {
Mark D. Rothd7389b42017-05-17 12:22:17 -0700605 // Look at the index into the serverlist to see if we should drop this call.
606 grpc_grpclb_server *server =
607 glb_policy->serverlist->servers[glb_policy->serverlist_index++];
608 if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
609 glb_policy->serverlist_index = 0; // Wrap-around.
610 }
Mark D. Rothe7751802017-07-27 12:31:45 -0700611 if (server->drop) {
Mark D. Rothd7389b42017-05-17 12:22:17 -0700612 // Not using the RR policy, so unref it.
613 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
614 gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
615 (intptr_t)wc_arg->rr_policy);
616 }
617 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
618 // Update client load reporting stats to indicate the number of
619 // dropped calls. Note that we have to do this here instead of in
620 // the client_load_reporting filter, because we do not create a
621 // subchannel call (and therefore no client_load_reporting filter)
622 // for dropped calls.
Mark D. Rothe7751802017-07-27 12:31:45 -0700623 grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
624 wc_arg->client_stats);
Mark D. Rothd7389b42017-05-17 12:22:17 -0700625 grpc_grpclb_client_stats_unref(wc_arg->client_stats);
626 if (force_async) {
627 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
ncteisen969b46e2017-06-08 14:57:11 -0700628 GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
Mark D. Rothd7389b42017-05-17 12:22:17 -0700629 gpr_free(wc_arg->free_when_done);
630 return false;
631 }
632 gpr_free(wc_arg->free_when_done);
633 return true;
634 }
635 // Pick via the RR policy.
Craig Tiller2400bf52017-02-09 16:25:19 -0800636 const bool pick_done = grpc_lb_policy_pick_locked(
Mark D. Rothd7389b42017-05-17 12:22:17 -0700637 exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700638 (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure);
David Garcia Quintas20359062016-10-15 15:22:51 -0700639 if (pick_done) {
640 /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
Craig Tiller84f75d42017-05-03 13:06:35 -0700641 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas20359062016-10-15 15:22:51 -0700642 gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
643 (intptr_t)wc_arg->rr_policy);
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700644 }
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200645 GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync");
David Garcia Quintas20359062016-10-15 15:22:51 -0700646 /* add the load reporting initial metadata */
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800647 initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata,
David Garcia Quintas20359062016-10-15 15:22:51 -0700648 pick_args->lb_token_mdelem_storage,
649 GRPC_MDELEM_REF(wc_arg->lb_token));
Mark D. Roth09e458c2017-05-02 08:13:26 -0700650 // Pass on client stats via context. Passes ownership of the reference.
651 GPR_ASSERT(wc_arg->client_stats != NULL);
652 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats;
653 wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats;
Mark D. Rothd7389b42017-05-17 12:22:17 -0700654 if (force_async) {
655 GPR_ASSERT(wc_arg->wrapped_closure != NULL);
ncteisen969b46e2017-06-08 14:57:11 -0700656 GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE);
Mark D. Rothd7389b42017-05-17 12:22:17 -0700657 gpr_free(wc_arg->free_when_done);
658 return false;
659 }
Mark D. Roth09e458c2017-05-02 08:13:26 -0700660 gpr_free(wc_arg->free_when_done);
David Garcia Quintas20359062016-10-15 15:22:51 -0700661 }
662 /* else, the pending pick will be registered and taken care of by the
663 * pending pick list inside the RR policy (glb_policy->rr_policy).
664 * Eventually, wrapped_on_complete will be called, which will -among other
665 * things- add the LB token to the call's initial metadata */
David Garcia Quintas20359062016-10-15 15:22:51 -0700666 return pick_done;
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700667}
668
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700669static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx,
670 glb_lb_policy *glb_policy) {
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700671 grpc_lb_addresses *addresses =
672 process_serverlist_locked(exec_ctx, glb_policy->serverlist);
673 GPR_ASSERT(addresses != NULL);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700674 grpc_lb_policy_args *args = gpr_zalloc(sizeof(*args));
675 args->client_channel_factory = glb_policy->cc_factory;
676 args->combiner = glb_policy->base.combiner;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700677 // Replace the LB addresses in the channel args that we pass down to
678 // the subchannel.
Mark D. Roth557c9902016-10-24 11:12:05 -0700679 static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200680 const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700681 args->args = grpc_channel_args_copy_and_add_and_remove(
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700682 glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
683 1);
Craig Tiller87a7e1f2016-11-09 09:42:19 -0800684 grpc_lb_addresses_destroy(exec_ctx, addresses);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700685 return args;
686}
687
688static void lb_policy_args_destroy(grpc_exec_ctx *exec_ctx,
689 grpc_lb_policy_args *args) {
690 grpc_channel_args_destroy(exec_ctx, args->args);
691 gpr_free(args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700692}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700693
Craig Tiller2400bf52017-02-09 16:25:19 -0800694static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
695 void *arg, grpc_error *error);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700696static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
697 grpc_lb_policy_args *args) {
698 GPR_ASSERT(glb_policy->rr_policy == NULL);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800699
David Garcia Quintas4283a262016-11-18 10:43:56 -0800700 grpc_lb_policy *new_rr_policy =
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700701 grpc_lb_policy_create(exec_ctx, "round_robin", args);
David Garcia Quintas4283a262016-11-18 10:43:56 -0800702 if (new_rr_policy == NULL) {
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800703 gpr_log(GPR_ERROR,
704 "Failure creating a RoundRobin policy for serverlist update with "
705 "%lu entries. The previous RR instance (%p), if any, will continue "
706 "to be used. Future updates from the LB will attempt to create new "
707 "instances.",
708 (unsigned long)glb_policy->serverlist->num_servers,
David Garcia Quintas4283a262016-11-18 10:43:56 -0800709 (void *)glb_policy->rr_policy);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800710 return;
David Garcia Quintas65318262016-07-29 13:43:38 -0700711 }
David Garcia Quintas4283a262016-11-18 10:43:56 -0800712 glb_policy->rr_policy = new_rr_policy;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200713
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700714 grpc_error *rr_state_error = NULL;
715 const grpc_connectivity_state rr_state =
716 grpc_lb_policy_check_connectivity_locked(exec_ctx, glb_policy->rr_policy,
717 &rr_state_error);
718 /* Connectivity state is a function of the RR policy updated/created */
719 update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state,
720 rr_state_error);
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800721 /* Add the gRPC LB's interested_parties pollset_set to that of the newly
722 * created RR policy. This will make the RR policy progress upon activity on
723 * gRPC LB, which in turn is tied to the application's call */
Yuchen Zengb4291642016-09-01 19:17:14 -0700724 grpc_pollset_set_add_pollset_set(exec_ctx,
725 glb_policy->rr_policy->interested_parties,
726 glb_policy->base.interested_parties);
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200727
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800728 /* Allocate the data for the tracking of the new RR policy's connectivity.
729 * It'll be deallocated in glb_rr_connectivity_changed() */
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200730 rr_connectivity_data *rr_connectivity =
Craig Tiller6f417882017-02-16 14:09:39 -0800731 gpr_zalloc(sizeof(rr_connectivity_data));
ncteisen969b46e2017-06-08 14:57:11 -0700732 GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
Craig Tiller2400bf52017-02-09 16:25:19 -0800733 glb_rr_connectivity_changed_locked, rr_connectivity,
Craig Tilleree4b1452017-05-12 10:56:03 -0700734 grpc_combiner_scheduler(glb_policy->base.combiner));
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200735 rr_connectivity->glb_policy = glb_policy;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700736 rr_connectivity->state = rr_state;
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200737
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800738 /* Subscribe to changes to the connectivity of the new RR */
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700739 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_sched");
Craig Tiller2400bf52017-02-09 16:25:19 -0800740 grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy,
741 &rr_connectivity->state,
742 &rr_connectivity->on_change);
743 grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700744
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800745 /* Update picks and pings in wait */
David Garcia Quintas65318262016-07-29 13:43:38 -0700746 pending_pick *pp;
747 while ((pp = glb_policy->pending_picks)) {
748 glb_policy->pending_picks = pp->next;
749 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick");
750 pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy;
Mark D. Roth09e458c2017-05-02 08:13:26 -0700751 pp->wrapped_on_complete_arg.client_stats =
752 grpc_grpclb_client_stats_ref(glb_policy->client_stats);
Craig Tiller84f75d42017-05-03 13:06:35 -0700753 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700754 gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p",
755 (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -0700756 }
Mark D. Rothd7389b42017-05-17 12:22:17 -0700757 pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args,
758 true /* force_async */, pp->target,
David Garcia Quintas58c18e72016-10-14 15:23:45 -0700759 &pp->wrapped_on_complete_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -0700760 }
761
762 pending_ping *pping;
763 while ((pping = glb_policy->pending_pings)) {
764 glb_policy->pending_pings = pping->next;
765 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
766 pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
Craig Tiller84f75d42017-05-03 13:06:35 -0700767 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas65318262016-07-29 13:43:38 -0700768 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "",
769 (intptr_t)glb_policy->rr_policy);
770 }
Craig Tiller2400bf52017-02-09 16:25:19 -0800771 grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
772 &pping->wrapped_notify_arg.wrapper_closure);
David Garcia Quintas65318262016-07-29 13:43:38 -0700773 }
David Garcia Quintas65318262016-07-29 13:43:38 -0700774}
David Garcia Quintas8d489112016-07-29 15:20:42 -0700775
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700776/* glb_policy->rr_policy may be NULL (initial handover) */
777static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
778 glb_lb_policy *glb_policy) {
779 GPR_ASSERT(glb_policy->serverlist != NULL &&
780 glb_policy->serverlist->num_servers > 0);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700781 if (glb_policy->shutting_down) return;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700782 grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy);
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700783 GPR_ASSERT(args != NULL);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700784 if (glb_policy->rr_policy != NULL) {
785 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
786 gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)",
787 (void *)glb_policy->rr_policy);
788 }
789 grpc_lb_policy_update_locked(exec_ctx, glb_policy->rr_policy, args);
790 } else {
791 create_rr_locked(exec_ctx, glb_policy, args);
792 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
793 gpr_log(GPR_DEBUG, "Created new Round Robin policy (%p)",
794 (void *)glb_policy->rr_policy);
795 }
796 }
797 lb_policy_args_destroy(exec_ctx, args);
798}
799
Craig Tiller2400bf52017-02-09 16:25:19 -0800800static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx,
801 void *arg, grpc_error *error) {
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800802 rr_connectivity_data *rr_connectivity = arg;
803 glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
David Garcia Quintas348cfdb2016-08-19 12:19:43 -0700804
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800805 const bool shutting_down = glb_policy->shutting_down;
David Garcia Quintas4283a262016-11-18 10:43:56 -0800806 bool unref_needed = false;
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800807 GRPC_ERROR_REF(error);
808
809 if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) {
810 /* RR policy shutting down. Don't renew subscription and free the arg of
811 * this callback. In addition we need to stash away the current policy to
812 * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last
813 * one, the policy would be destroyed, alongside the lock, which would
814 * result in a use-after-free */
David Garcia Quintas4283a262016-11-18 10:43:56 -0800815 unref_needed = true;
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800816 gpr_free(rr_connectivity);
817 } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700818 update_lb_connectivity_status_locked(
819 exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error));
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800820 /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
Craig Tiller2400bf52017-02-09 16:25:19 -0800821 grpc_lb_policy_notify_on_state_change_locked(
822 exec_ctx, glb_policy->rr_policy, &rr_connectivity->state,
823 &rr_connectivity->on_change);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700824 }
David Garcia Quintas4283a262016-11-18 10:43:56 -0800825 if (unref_needed) {
826 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
827 "rr_connectivity_cb");
David Garcia Quintas149f09d2016-11-17 20:43:10 -0800828 }
829 GRPC_ERROR_UNREF(error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700830}
831
David Garcia Quintas01291502017-02-07 13:26:41 -0800832static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
833 void *balancer_name) {
834 gpr_free(balancer_name);
835}
836
David Garcia Quintas01291502017-02-07 13:26:41 -0800837static grpc_slice_hash_table_entry targets_info_entry_create(
838 const char *address, const char *balancer_name) {
David Garcia Quintas01291502017-02-07 13:26:41 -0800839 grpc_slice_hash_table_entry entry;
840 entry.key = grpc_slice_from_copied_string(address);
Mark D. Rothe3006702017-04-19 07:43:56 -0700841 entry.value = gpr_strdup(balancer_name);
David Garcia Quintas01291502017-02-07 13:26:41 -0800842 return entry;
843}
844
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700845static int balancer_name_cmp_fn(void *a, void *b) {
846 const char *a_str = a;
847 const char *b_str = b;
848 return strcmp(a_str, b_str);
849}
850
851/* Returns the channel args for the LB channel, used to create a bidirectional
852 * stream for the reception of load balancing updates.
David Garcia Quintas01291502017-02-07 13:26:41 -0800853 *
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700854 * Inputs:
855 * - \a addresses: corresponding to the balancers.
856 * - \a response_generator: in order to propagate updates from the resolver
857 * above the grpclb policy.
858 * - \a args: other args inherited from the grpclb policy. */
859static grpc_channel_args *build_lb_channel_args(
860 grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses,
861 grpc_fake_resolver_response_generator *response_generator,
862 const grpc_channel_args *args) {
David Garcia Quintas01291502017-02-07 13:26:41 -0800863 size_t num_grpclb_addrs = 0;
864 for (size_t i = 0; i < addresses->num_addresses; ++i) {
865 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
866 }
867 /* All input addresses come from a resolver that claims they are LB services.
868 * It's the resolver's responsibility to make sure this policy is only
869 * instantiated and used in that case. Otherwise, something has gone wrong. */
870 GPR_ASSERT(num_grpclb_addrs > 0);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700871 grpc_lb_addresses *lb_addresses =
872 grpc_lb_addresses_create(num_grpclb_addrs, NULL);
David Garcia Quintas01291502017-02-07 13:26:41 -0800873 grpc_slice_hash_table_entry *targets_info_entries =
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700874 gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs);
David Garcia Quintas01291502017-02-07 13:26:41 -0800875
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700876 size_t lb_addresses_idx = 0;
877 for (size_t i = 0; i < addresses->num_addresses; ++i) {
878 if (!addresses->addresses[i].is_balancer) continue;
David Garcia Quintas01291502017-02-07 13:26:41 -0800879 if (addresses->addresses[i].user_data != NULL) {
880 gpr_log(GPR_ERROR,
881 "This LB policy doesn't support user data. It will be ignored");
882 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700883 char *addr_str;
884 GPR_ASSERT(grpc_sockaddr_to_string(
885 &addr_str, &addresses->addresses[i].address, true) > 0);
886 targets_info_entries[lb_addresses_idx] = targets_info_entry_create(
887 addr_str, addresses->addresses[i].balancer_name);
888 gpr_free(addr_str);
889
890 grpc_lb_addresses_set_address(
891 lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
892 addresses->addresses[i].address.len, false /* is balancer */,
893 addresses->addresses[i].balancer_name, NULL /* user data */);
David Garcia Quintas01291502017-02-07 13:26:41 -0800894 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700895 GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
896 grpc_slice_hash_table *targets_info =
897 grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries,
898 destroy_balancer_name, balancer_name_cmp_fn);
David Garcia Quintas01291502017-02-07 13:26:41 -0800899 gpr_free(targets_info_entries);
900
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700901 grpc_channel_args *lb_channel_args =
902 grpc_lb_policy_grpclb_build_lb_channel_args(exec_ctx, targets_info,
903 response_generator, args);
904
905 grpc_arg lb_channel_addresses_arg =
906 grpc_lb_addresses_create_channel_arg(lb_addresses);
907
908 grpc_channel_args *result = grpc_channel_args_copy_and_add(
909 lb_channel_args, &lb_channel_addresses_arg, 1);
910 grpc_slice_hash_table_unref(exec_ctx, targets_info);
911 grpc_channel_args_destroy(exec_ctx, lb_channel_args);
912 grpc_lb_addresses_destroy(exec_ctx, lb_addresses);
913 return result;
David Garcia Quintas01291502017-02-07 13:26:41 -0800914}
915
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700916static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
917 void *arg,
918 grpc_error *error);
David Garcia Quintas65318262016-07-29 13:43:38 -0700919static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
920 grpc_lb_policy_factory *factory,
921 grpc_lb_policy_args *args) {
Mark D. Rothe011b1e2016-09-07 08:28:00 -0700922 /* Count the number of gRPC-LB addresses. There must be at least one.
923 * TODO(roth): For now, we ignore non-balancer addresses, but in the
924 * future, we may change the behavior such that we fall back to using
Mark D. Roth1eb96dc2017-03-22 12:19:03 -0700925 * the non-balancer addresses if we cannot reach any balancers. In the
926 * fallback case, we should use the LB policy indicated by
927 * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
928 * unset, we should default to pick_first). */
Mark D. Roth201db7d2016-12-12 09:36:02 -0800929 const grpc_arg *arg =
930 grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
David Garcia Quintas228a5142017-03-30 19:43:00 -0700931 if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
932 return NULL;
933 }
Mark D. Roth557c9902016-10-24 11:12:05 -0700934 grpc_lb_addresses *addresses = arg->value.pointer.p;
Mark D. Rothf655c852016-09-06 10:40:38 -0700935 size_t num_grpclb_addrs = 0;
Mark D. Roth5bd7be02016-10-21 14:19:50 -0700936 for (size_t i = 0; i < addresses->num_addresses; ++i) {
937 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
Mark D. Rothf655c852016-09-06 10:40:38 -0700938 }
939 if (num_grpclb_addrs == 0) return NULL;
940
Craig Tiller6f417882017-02-16 14:09:39 -0800941 glb_lb_policy *glb_policy = gpr_zalloc(sizeof(*glb_policy));
David Garcia Quintas65318262016-07-29 13:43:38 -0700942
Mark D. Roth201db7d2016-12-12 09:36:02 -0800943 /* Get server name. */
944 arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
945 GPR_ASSERT(arg != NULL);
946 GPR_ASSERT(arg->type == GRPC_ARG_STRING);
Yuchen Zengc40d1d82017-02-15 20:42:06 -0800947 grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
David Garcia Quintas855a1062016-12-16 13:11:49 -0800948 GPR_ASSERT(uri->path[0] != '\0');
949 glb_policy->server_name =
950 gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
Craig Tiller84f75d42017-05-03 13:06:35 -0700951 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas855a1062016-12-16 13:11:49 -0800952 gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
953 glb_policy->server_name);
954 }
Mark D. Roth201db7d2016-12-12 09:36:02 -0800955 grpc_uri_destroy(uri);
956
David Garcia Quintas65318262016-07-29 13:43:38 -0700957 glb_policy->cc_factory = args->client_channel_factory;
958 GPR_ASSERT(glb_policy->cc_factory != NULL);
David Garcia Quintas65318262016-07-29 13:43:38 -0700959
Mark D. Roth64d922a2017-05-03 12:52:04 -0700960 arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
Mark D. Roth175c73b2017-05-04 08:28:05 -0700961 glb_policy->lb_call_timeout_ms =
962 grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
Mark D. Roth64d922a2017-05-03 12:52:04 -0700963
Mark D. Roth09e458c2017-05-02 08:13:26 -0700964 // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
965 // since we use this to trigger the client_load_reporting filter.
Mark D. Roth8d5e60b2017-06-09 09:08:23 -0700966 grpc_arg new_arg =
967 grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
Mark D. Roth09e458c2017-05-02 08:13:26 -0700968 static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
969 glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
970 args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
971
David Garcia Quintas01291502017-02-07 13:26:41 -0800972 /* Create a client channel over them to communicate with a LB service */
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700973 glb_policy->response_generator =
974 grpc_fake_resolver_response_generator_create();
975 grpc_channel_args *lb_channel_args = build_lb_channel_args(
976 exec_ctx, addresses, glb_policy->response_generator, args->args);
977 char *uri_str;
978 gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
David Garcia Quintas01291502017-02-07 13:26:41 -0800979 glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700980 exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
981
982 /* Propagate initial resolution */
983 grpc_fake_resolver_response_generator_set_response(
984 exec_ctx, glb_policy->response_generator, lb_channel_args);
David Garcia Quintas01291502017-02-07 13:26:41 -0800985 grpc_channel_args_destroy(exec_ctx, lb_channel_args);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700986 gpr_free(uri_str);
David Garcia Quintas65318262016-07-29 13:43:38 -0700987 if (glb_policy->lb_channel == NULL) {
Mark D. Roth09e458c2017-05-02 08:13:26 -0700988 gpr_free((void *)glb_policy->server_name);
989 grpc_channel_args_destroy(exec_ctx, glb_policy->args);
David Garcia Quintas65318262016-07-29 13:43:38 -0700990 gpr_free(glb_policy);
991 return NULL;
992 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700993
ncteisen969b46e2017-06-08 14:57:11 -0700994 GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700995 glb_lb_channel_on_connectivity_changed_cb, glb_policy,
Craig Tiller4bbbfaf2017-06-07 13:02:15 -0700996 grpc_combiner_scheduler(args->combiner));
Craig Tiller2400bf52017-02-09 16:25:19 -0800997 grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
David Garcia Quintas65318262016-07-29 13:43:38 -0700998 grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
999 "grpclb");
1000 return &glb_policy->base;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001001}
1002
David Garcia Quintas65318262016-07-29 13:43:38 -07001003static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
1004 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
1005 GPR_ASSERT(glb_policy->pending_picks == NULL);
1006 GPR_ASSERT(glb_policy->pending_pings == NULL);
Mark D. Rothd1604af2016-09-22 11:20:27 -07001007 gpr_free((void *)glb_policy->server_name);
Craig Tiller87a7e1f2016-11-09 09:42:19 -08001008 grpc_channel_args_destroy(exec_ctx, glb_policy->args);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001009 if (glb_policy->client_stats != NULL) {
1010 grpc_grpclb_client_stats_unref(glb_policy->client_stats);
1011 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001012 grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
1013 if (glb_policy->serverlist != NULL) {
1014 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
1015 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001016 grpc_fake_resolver_response_generator_unref(glb_policy->response_generator);
1017 if (glb_policy->pending_update_args != NULL) {
1018 grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args);
1019 gpr_free(glb_policy->pending_update_args);
1020 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001021 gpr_free(glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001022}
1023
Craig Tiller2400bf52017-02-09 16:25:19 -08001024static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001025 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001026 glb_policy->shutting_down = true;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001027
David Garcia Quintasa74b2462016-11-11 14:07:27 -08001028 /* We need a copy of the lb_call pointer because we can't cancell the call
1029 * while holding glb_policy->mu: lb_on_server_status_received, invoked due to
1030 * the cancel, needs to acquire that same lock */
1031 grpc_call *lb_call = glb_policy->lb_call;
David Garcia Quintas65318262016-07-29 13:43:38 -07001032
David Garcia Quintasa74b2462016-11-11 14:07:27 -08001033 /* glb_policy->lb_call and this local lb_call must be consistent at this point
1034 * because glb_policy->lb_call is only assigned in lb_call_init_locked as part
1035 * of query_for_backends_locked, which can only be invoked while
1036 * glb_policy->shutting_down is false. */
1037 if (lb_call != NULL) {
1038 grpc_call_cancel(lb_call, NULL);
1039 /* lb_on_server_status_received will pick up the cancel and clean up */
1040 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001041 if (glb_policy->retry_timer_active) {
1042 grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
1043 glb_policy->retry_timer_active = false;
1044 }
1045
1046 pending_pick *pp = glb_policy->pending_picks;
1047 glb_policy->pending_picks = NULL;
1048 pending_ping *pping = glb_policy->pending_pings;
1049 glb_policy->pending_pings = NULL;
1050 if (glb_policy->rr_policy) {
1051 GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
1052 }
1053 // We destroy the LB channel here because
1054 // glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
1055 // instance. Destroying the lb channel in glb_destroy would likely result in
1056 // a callback invocation without a valid glb_policy arg.
1057 if (glb_policy->lb_channel != NULL) {
1058 grpc_channel_destroy(glb_policy->lb_channel);
1059 glb_policy->lb_channel = NULL;
1060 }
1061 grpc_connectivity_state_set(
1062 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
1063 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
1064
David Garcia Quintas65318262016-07-29 13:43:38 -07001065 while (pp != NULL) {
1066 pending_pick *next = pp->next;
1067 *pp->target = NULL;
ncteisen969b46e2017-06-08 14:57:11 -07001068 GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -08001069 GRPC_ERROR_NONE);
David Garcia Quintas65318262016-07-29 13:43:38 -07001070 pp = next;
1071 }
1072
1073 while (pping != NULL) {
1074 pending_ping *next = pping->next;
ncteisen969b46e2017-06-08 14:57:11 -07001075 GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
Craig Tiller91031da2016-12-28 15:44:25 -08001076 GRPC_ERROR_NONE);
David Garcia Quintas65318262016-07-29 13:43:38 -07001077 pping = next;
1078 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001079}
1080
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001081// Cancel a specific pending pick.
1082//
1083// A grpclb pick progresses as follows:
1084// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
1085// handed over to the RR policy (in create_rr_locked()). From that point
1086// onwards, it'll be RR's responsibility. For cancellations, that implies the
1087// pick needs also be cancelled by the RR instance.
1088// - Otherwise, without an RR instance, picks stay pending at this policy's
1089// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
1090// we invoke the completion closure and set *target to NULL right here.
Craig Tiller2400bf52017-02-09 16:25:19 -08001091static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1092 grpc_connected_subchannel **target,
1093 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001094 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001095 pending_pick *pp = glb_policy->pending_picks;
1096 glb_policy->pending_picks = NULL;
1097 while (pp != NULL) {
1098 pending_pick *next = pp->next;
1099 if (pp->target == target) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001100 *target = NULL;
ncteisen969b46e2017-06-08 14:57:11 -07001101 GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
ncteisen4b36a3d2017-03-13 19:08:06 -07001102 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1103 "Pick Cancelled", &error, 1));
David Garcia Quintas65318262016-07-29 13:43:38 -07001104 } else {
1105 pp->next = glb_policy->pending_picks;
1106 glb_policy->pending_picks = pp;
1107 }
1108 pp = next;
1109 }
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001110 if (glb_policy->rr_policy != NULL) {
1111 grpc_lb_policy_cancel_pick_locked(exec_ctx, glb_policy->rr_policy, target,
1112 GRPC_ERROR_REF(error));
1113 }
Mark D. Roth5f844002016-09-08 08:20:53 -07001114 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -07001115}
1116
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001117// Cancel all pending picks.
1118//
1119// A grpclb pick progresses as follows:
1120// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be
1121// handed over to the RR policy (in create_rr_locked()). From that point
1122// onwards, it'll be RR's responsibility. For cancellations, that implies the
1123// pick needs also be cancelled by the RR instance.
1124// - Otherwise, without an RR instance, picks stay pending at this policy's
1125// level (grpclb), inside the glb_policy->pending_picks list. To cancel these,
1126// we invoke the completion closure and set *target to NULL right here.
Craig Tiller2400bf52017-02-09 16:25:19 -08001127static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
1128 grpc_lb_policy *pol,
1129 uint32_t initial_metadata_flags_mask,
1130 uint32_t initial_metadata_flags_eq,
1131 grpc_error *error) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001132 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001133 pending_pick *pp = glb_policy->pending_picks;
1134 glb_policy->pending_picks = NULL;
1135 while (pp != NULL) {
1136 pending_pick *next = pp->next;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001137 if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) ==
David Garcia Quintas65318262016-07-29 13:43:38 -07001138 initial_metadata_flags_eq) {
ncteisen969b46e2017-06-08 14:57:11 -07001139 GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
ncteisen4b36a3d2017-03-13 19:08:06 -07001140 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1141 "Pick Cancelled", &error, 1));
David Garcia Quintas65318262016-07-29 13:43:38 -07001142 } else {
1143 pp->next = glb_policy->pending_picks;
1144 glb_policy->pending_picks = pp;
1145 }
1146 pp = next;
1147 }
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001148 if (glb_policy->rr_policy != NULL) {
1149 grpc_lb_policy_cancel_picks_locked(
1150 exec_ctx, glb_policy->rr_policy, initial_metadata_flags_mask,
1151 initial_metadata_flags_eq, GRPC_ERROR_REF(error));
1152 }
Mark D. Rothe65ff112016-09-09 13:48:38 -07001153 GRPC_ERROR_UNREF(error);
David Garcia Quintas65318262016-07-29 13:43:38 -07001154}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001155
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001156static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
1157 glb_lb_policy *glb_policy);
1158static void start_picking_locked(grpc_exec_ctx *exec_ctx,
1159 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001160 glb_policy->started_picking = true;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001161 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
1162 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001163}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001164
Craig Tiller2400bf52017-02-09 16:25:19 -08001165static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001166 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001167 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001168 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001169 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001170}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001171
Craig Tiller2400bf52017-02-09 16:25:19 -08001172static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1173 const grpc_lb_policy_pick_args *pick_args,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001174 grpc_connected_subchannel **target,
1175 grpc_call_context_element *context, void **user_data,
Craig Tiller2400bf52017-02-09 16:25:19 -08001176 grpc_closure *on_complete) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -07001177 if (pick_args->lb_token_mdelem_storage == NULL) {
David Garcia Quintas5b0e9462016-08-15 19:38:39 -07001178 *target = NULL;
ncteisen969b46e2017-06-08 14:57:11 -07001179 GRPC_CLOSURE_SCHED(exec_ctx, on_complete,
ncteisen4b36a3d2017-03-13 19:08:06 -07001180 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1181 "No mdelem storage for the LB token. Load reporting "
1182 "won't work without it. Failing"));
Mark D. Roth1e5f6af2016-10-07 08:32:58 -07001183 return 0;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -07001184 }
1185
David Garcia Quintas65318262016-07-29 13:43:38 -07001186 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001187 bool pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -07001188
1189 if (glb_policy->rr_policy != NULL) {
Craig Tiller84f75d42017-05-03 13:06:35 -07001190 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001191 gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p",
1192 (void *)glb_policy, (void *)glb_policy->rr_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001193 }
1194 GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick");
David Garcia Quintas8aace512016-08-15 14:55:12 -07001195
Craig Tiller6f417882017-02-16 14:09:39 -08001196 wrapped_rr_closure_arg *wc_arg = gpr_zalloc(sizeof(wrapped_rr_closure_arg));
David Garcia Quintas331b9c02016-09-12 18:37:05 -07001197
ncteisen969b46e2017-06-08 14:57:11 -07001198 GRPC_CLOSURE_INIT(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg,
Craig Tiller91031da2016-12-28 15:44:25 -08001199 grpc_schedule_on_exec_ctx);
David Garcia Quintas90712d52016-10-13 19:33:04 -07001200 wc_arg->rr_policy = glb_policy->rr_policy;
1201 wc_arg->target = target;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001202 wc_arg->context = context;
1203 GPR_ASSERT(glb_policy->client_stats != NULL);
1204 wc_arg->client_stats =
1205 grpc_grpclb_client_stats_ref(glb_policy->client_stats);
David Garcia Quintas90712d52016-10-13 19:33:04 -07001206 wc_arg->wrapped_closure = on_complete;
1207 wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage;
1208 wc_arg->initial_metadata = pick_args->initial_metadata;
David Garcia Quintas97ba6422016-10-14 13:06:45 -07001209 wc_arg->free_when_done = wc_arg;
Mark D. Rothd7389b42017-05-17 12:22:17 -07001210 pick_done =
1211 pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args,
1212 false /* force_async */, target, wc_arg);
David Garcia Quintas65318262016-07-29 13:43:38 -07001213 } else {
Craig Tiller84f75d42017-05-03 13:06:35 -07001214 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001215 gpr_log(GPR_DEBUG,
1216 "No RR policy in grpclb instance %p. Adding to grpclb's pending "
1217 "picks",
1218 (void *)(glb_policy));
1219 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001220 add_pending_pick(&glb_policy->pending_picks, pick_args, target, context,
David Garcia Quintas8aace512016-08-15 14:55:12 -07001221 on_complete);
David Garcia Quintas65318262016-07-29 13:43:38 -07001222
1223 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001224 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001225 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001226 pick_done = false;
David Garcia Quintas65318262016-07-29 13:43:38 -07001227 }
David Garcia Quintas92eb6b92016-09-30 14:07:39 -07001228 return pick_done;
David Garcia Quintas65318262016-07-29 13:43:38 -07001229}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001230
Craig Tiller2400bf52017-02-09 16:25:19 -08001231static grpc_connectivity_state glb_check_connectivity_locked(
David Garcia Quintas65318262016-07-29 13:43:38 -07001232 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1233 grpc_error **connectivity_error) {
1234 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
Craig Tiller2400bf52017-02-09 16:25:19 -08001235 return grpc_connectivity_state_get(&glb_policy->state_tracker,
1236 connectivity_error);
David Garcia Quintas65318262016-07-29 13:43:38 -07001237}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001238
Craig Tiller2400bf52017-02-09 16:25:19 -08001239static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
1240 grpc_closure *closure) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001241 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001242 if (glb_policy->rr_policy) {
Craig Tiller2400bf52017-02-09 16:25:19 -08001243 grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure);
David Garcia Quintas65318262016-07-29 13:43:38 -07001244 } else {
1245 add_pending_ping(&glb_policy->pending_pings, closure);
1246 if (!glb_policy->started_picking) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001247 start_picking_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001248 }
1249 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001250}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001251
Craig Tiller2400bf52017-02-09 16:25:19 -08001252static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
1253 grpc_lb_policy *pol,
1254 grpc_connectivity_state *current,
1255 grpc_closure *notify) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001256 glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
David Garcia Quintas65318262016-07-29 13:43:38 -07001257 grpc_connectivity_state_notify_on_state_change(
1258 exec_ctx, &glb_policy->state_tracker, current, notify);
David Garcia Quintas65318262016-07-29 13:43:38 -07001259}
1260
Mark D. Roth09e458c2017-05-02 08:13:26 -07001261static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
1262 grpc_error *error);
1263
1264static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx,
1265 glb_lb_policy *glb_policy) {
1266 const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1267 const gpr_timespec next_client_load_report_time =
1268 gpr_time_add(now, glb_policy->client_stats_report_interval);
ncteisen969b46e2017-06-08 14:57:11 -07001269 GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001270 send_client_load_report_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001271 grpc_combiner_scheduler(glb_policy->base.combiner));
Mark D. Roth09e458c2017-05-02 08:13:26 -07001272 grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer,
1273 next_client_load_report_time,
1274 &glb_policy->client_load_report_closure, now);
1275}
1276
1277static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg,
1278 grpc_error *error) {
1279 glb_lb_policy *glb_policy = arg;
1280 grpc_byte_buffer_destroy(glb_policy->client_load_report_payload);
1281 glb_policy->client_load_report_payload = NULL;
1282 if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) {
1283 glb_policy->client_load_report_timer_pending = false;
1284 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1285 "client_load_report");
1286 return;
1287 }
1288 schedule_next_client_load_report(exec_ctx, glb_policy);
1289}
1290
1291static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
1292 glb_lb_policy *glb_policy) {
1293 grpc_op op;
1294 memset(&op, 0, sizeof(op));
1295 op.op = GRPC_OP_SEND_MESSAGE;
1296 op.data.send_message.send_message = glb_policy->client_load_report_payload;
ncteisen969b46e2017-06-08 14:57:11 -07001297 GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001298 client_load_report_done_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001299 grpc_combiner_scheduler(glb_policy->base.combiner));
Mark D. Roth09e458c2017-05-02 08:13:26 -07001300 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1301 exec_ctx, glb_policy->lb_call, &op, 1,
1302 &glb_policy->client_load_report_closure);
1303 GPR_ASSERT(GRPC_CALL_OK == call_error);
1304}
1305
1306static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
Mark D. Rothe7751802017-07-27 12:31:45 -07001307 grpc_grpclb_dropped_call_counts *drop_entries =
1308 request->client_stats.calls_finished_with_drop.arg;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001309 return request->client_stats.num_calls_started == 0 &&
1310 request->client_stats.num_calls_finished == 0 &&
Mark D. Roth09e458c2017-05-02 08:13:26 -07001311 request->client_stats.num_calls_finished_with_client_failed_to_send ==
1312 0 &&
Mark D. Rothe7751802017-07-27 12:31:45 -07001313 request->client_stats.num_calls_finished_known_received == 0 &&
1314 (drop_entries == NULL || drop_entries->num_entries == 0);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001315}
1316
1317static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
1318 grpc_error *error) {
1319 glb_lb_policy *glb_policy = arg;
1320 if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) {
1321 glb_policy->client_load_report_timer_pending = false;
1322 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1323 "client_load_report");
1324 return;
1325 }
1326 // Construct message payload.
1327 GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
1328 grpc_grpclb_request *request =
Mark D. Rothe7751802017-07-27 12:31:45 -07001329 grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001330 // Skip client load report if the counters were all zero in the last
1331 // report and they are still zero in this one.
1332 if (load_report_counters_are_zero(request)) {
1333 if (glb_policy->last_client_load_report_counters_were_zero) {
1334 grpc_grpclb_request_destroy(request);
1335 schedule_next_client_load_report(exec_ctx, glb_policy);
1336 return;
1337 }
1338 glb_policy->last_client_load_report_counters_were_zero = true;
1339 } else {
1340 glb_policy->last_client_load_report_counters_were_zero = false;
1341 }
1342 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
1343 glb_policy->client_load_report_payload =
1344 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1345 grpc_slice_unref_internal(exec_ctx, request_payload_slice);
1346 grpc_grpclb_request_destroy(request);
1347 // If we've already sent the initial request, then we can go ahead and
1348 // sent the load report. Otherwise, we need to wait until the initial
1349 // request has been sent to send this
1350 // (see lb_on_sent_initial_request_locked() below).
1351 if (glb_policy->initial_request_sent) {
1352 do_send_client_load_report_locked(exec_ctx, glb_policy);
1353 }
1354}
1355
1356static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx,
1357 void *arg, grpc_error *error);
Craig Tiller2400bf52017-02-09 16:25:19 -08001358static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
1359 void *arg, grpc_error *error);
1360static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
1361 grpc_error *error);
Craig Tillerc5866662016-11-16 15:25:00 -08001362static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
1363 glb_lb_policy *glb_policy) {
David Garcia Quintas55ba14a2016-09-27 18:45:30 -07001364 GPR_ASSERT(glb_policy->server_name != NULL);
1365 GPR_ASSERT(glb_policy->server_name[0] != '\0');
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001366 GPR_ASSERT(glb_policy->lb_call == NULL);
David Garcia Quintasa74b2462016-11-11 14:07:27 -08001367 GPR_ASSERT(!glb_policy->shutting_down);
David Garcia Quintas55ba14a2016-09-27 18:45:30 -07001368
David Garcia Quintas15eba132016-08-09 15:20:48 -07001369 /* Note the following LB call progresses every time there's activity in \a
1370 * glb_policy->base.interested_parties, which is comprised of the polling
Yuchen Zengf7c45ae2016-09-15 13:40:32 -07001371 * entities from \a client_channel. */
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001372 grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name);
Mark D. Roth64d922a2017-05-03 12:52:04 -07001373 gpr_timespec deadline =
1374 glb_policy->lb_call_timeout_ms == 0
Mark D. Roth175c73b2017-05-04 08:28:05 -07001375 ? gpr_inf_future(GPR_CLOCK_MONOTONIC)
1376 : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
1377 gpr_time_from_millis(glb_policy->lb_call_timeout_ms,
1378 GPR_TIMESPAN));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001379 glb_policy->lb_call = grpc_channel_create_pollset_set_call(
Craig Tiller87a7e1f2016-11-09 09:42:19 -08001380 exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
David Garcia Quintas4543e5c2016-09-22 15:09:34 -07001381 glb_policy->base.interested_parties,
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001382 GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
Mark D. Roth64d922a2017-05-03 12:52:04 -07001383 &host, deadline, NULL);
David Garcia Quintas7fadeae2017-04-18 14:38:56 -07001384 grpc_slice_unref_internal(exec_ctx, host);
David Garcia Quintas65318262016-07-29 13:43:38 -07001385
Mark D. Roth09e458c2017-05-02 08:13:26 -07001386 if (glb_policy->client_stats != NULL) {
1387 grpc_grpclb_client_stats_unref(glb_policy->client_stats);
1388 }
1389 glb_policy->client_stats = grpc_grpclb_client_stats_create();
1390
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001391 grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
1392 grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -07001393
David Garcia Quintas55ba14a2016-09-27 18:45:30 -07001394 grpc_grpclb_request *request =
1395 grpc_grpclb_request_create(glb_policy->server_name);
Craig Tillerd41a4a72016-10-26 16:16:06 -07001396 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001397 glb_policy->lb_request_payload =
David Garcia Quintas65318262016-07-29 13:43:38 -07001398 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
Craig Tiller18b4ba32016-11-09 15:23:42 -08001399 grpc_slice_unref_internal(exec_ctx, request_payload_slice);
David Garcia Quintas65318262016-07-29 13:43:38 -07001400 grpc_grpclb_request_destroy(request);
1401
ncteisen969b46e2017-06-08 14:57:11 -07001402 GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001403 lb_on_sent_initial_request_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001404 grpc_combiner_scheduler(glb_policy->base.combiner));
ncteisen969b46e2017-06-08 14:57:11 -07001405 GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received,
Craig Tiller2400bf52017-02-09 16:25:19 -08001406 lb_on_server_status_received_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001407 grpc_combiner_scheduler(glb_policy->base.combiner));
ncteisen969b46e2017-06-08 14:57:11 -07001408 GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received,
Craig Tiller2400bf52017-02-09 16:25:19 -08001409 lb_on_response_received_locked, glb_policy,
Craig Tilleree4b1452017-05-12 10:56:03 -07001410 grpc_combiner_scheduler(glb_policy->base.combiner));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001411
David Garcia Quintas1edfb952016-11-22 17:15:34 -08001412 gpr_backoff_init(&glb_policy->lb_call_backoff_state,
1413 GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS,
1414 GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
1415 GRPC_GRPCLB_RECONNECT_JITTER,
1416 GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
1417 GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001418
1419 glb_policy->initial_request_sent = false;
1420 glb_policy->seen_initial_response = false;
1421 glb_policy->last_client_load_report_counters_were_zero = false;
David Garcia Quintas65318262016-07-29 13:43:38 -07001422}
David Garcia Quintas8d489112016-07-29 15:20:42 -07001423
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001424static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
1425 glb_lb_policy *glb_policy) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001426 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tillerdd36b152017-03-31 08:27:28 -07001427 grpc_call_unref(glb_policy->lb_call);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001428 glb_policy->lb_call = NULL;
David Garcia Quintas65318262016-07-29 13:43:38 -07001429
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001430 grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv);
1431 grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv);
David Garcia Quintas65318262016-07-29 13:43:38 -07001432
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001433 grpc_byte_buffer_destroy(glb_policy->lb_request_payload);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001434 grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001435
1436 if (!glb_policy->client_load_report_timer_pending) {
1437 grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer);
1438 }
David Garcia Quintas65318262016-07-29 13:43:38 -07001439}
1440
David Garcia Quintas8d489112016-07-29 15:20:42 -07001441/*
1442 * Auxiliary functions and LB client callbacks.
1443 */
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001444static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
1445 glb_lb_policy *glb_policy) {
David Garcia Quintas65318262016-07-29 13:43:38 -07001446 GPR_ASSERT(glb_policy->lb_channel != NULL);
David Garcia Quintasa74b2462016-11-11 14:07:27 -08001447 if (glb_policy->shutting_down) return;
1448
Craig Tillerc5866662016-11-16 15:25:00 -08001449 lb_call_init_locked(exec_ctx, glb_policy);
David Garcia Quintas65318262016-07-29 13:43:38 -07001450
Craig Tiller84f75d42017-05-03 13:06:35 -07001451 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001452 gpr_log(GPR_INFO,
1453 "Query for backends (grpclb: %p, lb_channel: %p, lb_call: %p)",
1454 (void *)glb_policy, (void *)glb_policy->lb_channel,
1455 (void *)glb_policy->lb_call);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001456 }
1457 GPR_ASSERT(glb_policy->lb_call != NULL);
1458
David Garcia Quintas65318262016-07-29 13:43:38 -07001459 grpc_call_error call_error;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001460 grpc_op ops[4];
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001461 memset(ops, 0, sizeof(ops));
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001462
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001463 grpc_op *op = ops;
David Garcia Quintas65318262016-07-29 13:43:38 -07001464 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1465 op->data.send_initial_metadata.count = 0;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001466 op->flags = 0;
1467 op->reserved = NULL;
1468 op++;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001469 op->op = GRPC_OP_RECV_INITIAL_METADATA;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001470 op->data.recv_initial_metadata.recv_initial_metadata =
1471 &glb_policy->lb_initial_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001472 op->flags = 0;
1473 op->reserved = NULL;
1474 op++;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001475 GPR_ASSERT(glb_policy->lb_request_payload != NULL);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001476 op->op = GRPC_OP_SEND_MESSAGE;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001477 op->data.send_message.send_message = glb_policy->lb_request_payload;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001478 op->flags = 0;
1479 op->reserved = NULL;
1480 op++;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001481 /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
1482 * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001483 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
1484 "lb_on_sent_initial_request_locked");
Mark D. Roth09e458c2017-05-02 08:13:26 -07001485 call_error = grpc_call_start_batch_and_execute(
1486 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1487 &glb_policy->lb_on_sent_initial_request);
1488 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001489
Mark D. Roth09e458c2017-05-02 08:13:26 -07001490 op = ops;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001491 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1492 op->data.recv_status_on_client.trailing_metadata =
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001493 &glb_policy->lb_trailing_metadata_recv;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001494 op->data.recv_status_on_client.status = &glb_policy->lb_call_status;
1495 op->data.recv_status_on_client.status_details =
1496 &glb_policy->lb_call_status_details;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001497 op->flags = 0;
1498 op->reserved = NULL;
1499 op++;
David Garcia Quintase224a762016-11-01 13:00:58 -07001500 /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001501 * count goes to zero) to be unref'd in lb_on_server_status_received_locked */
1502 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base,
1503 "lb_on_server_status_received_locked");
David Garcia Quintas65318262016-07-29 13:43:38 -07001504 call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001505 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1506 &glb_policy->lb_on_server_status_received);
David Garcia Quintas65318262016-07-29 13:43:38 -07001507 GPR_ASSERT(GRPC_CALL_OK == call_error);
1508
1509 op = ops;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001510 op->op = GRPC_OP_RECV_MESSAGE;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001511 op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001512 op->flags = 0;
1513 op->reserved = NULL;
1514 op++;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001515 /* take another weak ref to be unref'd/reused in
1516 * lb_on_response_received_locked */
1517 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked");
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001518 call_error = grpc_call_start_batch_and_execute(
1519 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1520 &glb_policy->lb_on_response_received);
David Garcia Quintas280fd2a2016-06-20 22:04:48 -07001521 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001522}
1523
Mark D. Roth09e458c2017-05-02 08:13:26 -07001524static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx,
1525 void *arg, grpc_error *error) {
1526 glb_lb_policy *glb_policy = arg;
1527 glb_policy->initial_request_sent = true;
1528 // If we attempted to send a client load report before the initial
1529 // request was sent, send the load report now.
1530 if (glb_policy->client_load_report_payload != NULL) {
1531 do_send_client_load_report_locked(exec_ctx, glb_policy);
1532 }
1533 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001534 "lb_on_sent_initial_request_locked");
Mark D. Roth09e458c2017-05-02 08:13:26 -07001535}
1536
Craig Tiller2400bf52017-02-09 16:25:19 -08001537static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
1538 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001539 glb_lb_policy *glb_policy = arg;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001540 grpc_op ops[2];
1541 memset(ops, 0, sizeof(ops));
1542 grpc_op *op = ops;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001543 if (glb_policy->lb_response_payload != NULL) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001544 gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
David Garcia Quintas41bef452016-07-28 19:19:58 -07001545 /* Received data from the LB server. Look inside
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001546 * glb_policy->lb_response_payload, for a serverlist. */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001547 grpc_byte_buffer_reader bbr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001548 grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload);
Craig Tillerd41a4a72016-10-26 16:16:06 -07001549 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001550 grpc_byte_buffer_destroy(glb_policy->lb_response_payload);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001551
Mark D. Roth09e458c2017-05-02 08:13:26 -07001552 grpc_grpclb_initial_response *response = NULL;
1553 if (!glb_policy->seen_initial_response &&
1554 (response = grpc_grpclb_initial_response_parse(response_slice)) !=
1555 NULL) {
1556 if (response->has_client_stats_report_interval) {
1557 glb_policy->client_stats_report_interval =
1558 gpr_time_max(gpr_time_from_seconds(1, GPR_TIMESPAN),
1559 grpc_grpclb_duration_to_timespec(
1560 &response->client_stats_report_interval));
Craig Tiller84f75d42017-05-03 13:06:35 -07001561 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintasea11d162016-07-14 17:27:28 -07001562 gpr_log(GPR_INFO,
Mark D. Roth09e458c2017-05-02 08:13:26 -07001563 "received initial LB response message; "
1564 "client load reporting interval = %" PRId64 ".%09d sec",
1565 glb_policy->client_stats_report_interval.tv_sec,
1566 glb_policy->client_stats_report_interval.tv_nsec);
David Garcia Quintasea11d162016-07-14 17:27:28 -07001567 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001568 /* take a weak ref (won't prevent calling of \a glb_shutdown() if the
1569 * strong ref count goes to zero) to be unref'd in
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001570 * send_client_load_report_locked() */
Mark D. Roth09e458c2017-05-02 08:13:26 -07001571 glb_policy->client_load_report_timer_pending = true;
1572 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report");
1573 schedule_next_client_load_report(exec_ctx, glb_policy);
Craig Tiller84f75d42017-05-03 13:06:35 -07001574 } else if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001575 gpr_log(GPR_INFO,
1576 "received initial LB response message; "
1577 "client load reporting NOT enabled");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001578 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001579 grpc_grpclb_initial_response_destroy(response);
1580 glb_policy->seen_initial_response = true;
1581 } else {
1582 grpc_grpclb_serverlist *serverlist =
1583 grpc_grpclb_response_parse_serverlist(response_slice);
1584 if (serverlist != NULL) {
1585 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tiller84f75d42017-05-03 13:06:35 -07001586 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001587 gpr_log(GPR_INFO, "Serverlist with %lu servers received",
1588 (unsigned long)serverlist->num_servers);
1589 for (size_t i = 0; i < serverlist->num_servers; ++i) {
1590 grpc_resolved_address addr;
1591 parse_server(serverlist->servers[i], &addr);
1592 char *ipport;
1593 grpc_sockaddr_to_string(&ipport, &addr, false);
1594 gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport);
1595 gpr_free(ipport);
1596 }
1597 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001598 /* update serverlist */
1599 if (serverlist->num_servers > 0) {
1600 if (grpc_grpclb_serverlist_equals(glb_policy->serverlist,
1601 serverlist)) {
Craig Tiller84f75d42017-05-03 13:06:35 -07001602 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001603 gpr_log(GPR_INFO,
1604 "Incoming server list identical to current, ignoring.");
1605 }
1606 grpc_grpclb_destroy_serverlist(serverlist);
1607 } else { /* new serverlist */
1608 if (glb_policy->serverlist != NULL) {
1609 /* dispose of the old serverlist */
1610 grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
1611 }
1612 /* and update the copy in the glb_lb_policy instance. This
1613 * serverlist instance will be destroyed either upon the next
1614 * update or in glb_destroy() */
1615 glb_policy->serverlist = serverlist;
Mark D. Rothd7389b42017-05-17 12:22:17 -07001616 glb_policy->serverlist_index = 0;
Mark D. Roth09e458c2017-05-02 08:13:26 -07001617 rr_handover_locked(exec_ctx, glb_policy);
1618 }
1619 } else {
Craig Tiller84f75d42017-05-03 13:06:35 -07001620 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001621 gpr_log(GPR_INFO,
1622 "Received empty server list. Picks will stay pending until "
1623 "a response with > 0 servers is received");
1624 }
1625 grpc_grpclb_destroy_serverlist(serverlist);
1626 }
1627 } else { /* serverlist == NULL */
1628 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
1629 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
1630 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001631 }
Mark D. Roth09e458c2017-05-02 08:13:26 -07001632 grpc_slice_unref_internal(exec_ctx, response_slice);
David Garcia Quintas246c5642016-11-01 11:16:52 -07001633 if (!glb_policy->shutting_down) {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001634 /* keep listening for serverlist updates */
1635 op->op = GRPC_OP_RECV_MESSAGE;
Mark D. Roth448c1f02017-01-25 10:44:30 -08001636 op->data.recv_message.recv_message = &glb_policy->lb_response_payload;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001637 op->flags = 0;
1638 op->reserved = NULL;
1639 op++;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001640 /* reuse the "lb_on_response_received_locked" weak ref taken in
David Garcia Quintase224a762016-11-01 13:00:58 -07001641 * query_for_backends_locked() */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001642 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
David Garcia Quintas246c5642016-11-01 11:16:52 -07001643 exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops),
1644 &glb_policy->lb_on_response_received); /* loop */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001645 GPR_ASSERT(GRPC_CALL_OK == call_error);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001646 }
David Garcia Quintase224a762016-11-01 13:00:58 -07001647 } else { /* empty payload: call cancelled. */
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001648 /* dispose of the "lb_on_response_received_locked" weak ref taken in
David Garcia Quintase224a762016-11-01 13:00:58 -07001649 * query_for_backends_locked() and reused in every reception loop */
1650 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001651 "lb_on_response_received_locked_empty_payload");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001652 }
1653}
David Garcia Quintasea11d162016-07-14 17:27:28 -07001654
Craig Tiller2400bf52017-02-09 16:25:19 -08001655static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
1656 grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001657 glb_lb_policy *glb_policy = arg;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001658 glb_policy->retry_timer_active = false;
1659 if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) {
Craig Tiller84f75d42017-05-03 13:06:35 -07001660 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001661 gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)",
1662 (void *)glb_policy);
1663 }
1664 GPR_ASSERT(glb_policy->lb_call == NULL);
1665 query_for_backends_locked(exec_ctx, glb_policy);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001666 }
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001667 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001668}
1669
Craig Tiller2400bf52017-02-09 16:25:19 -08001670static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
1671 void *arg, grpc_error *error) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001672 glb_lb_policy *glb_policy = arg;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001673 GPR_ASSERT(glb_policy->lb_call != NULL);
Craig Tiller84f75d42017-05-03 13:06:35 -07001674 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001675 char *status_details =
1676 grpc_slice_to_c_string(glb_policy->lb_call_status_details);
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001677 gpr_log(GPR_INFO,
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001678 "Status from LB server received. Status = %d, Details = '%s', "
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001679 "(call: %p), error %p",
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001680 glb_policy->lb_call_status, status_details,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001681 (void *)glb_policy->lb_call, (void *)error);
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001682 gpr_free(status_details);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001683 }
Craig Tiller7c70b6c2017-01-23 07:48:42 -08001684 /* We need to perform cleanups no matter what. */
1685 lb_call_destroy_locked(exec_ctx, glb_policy);
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001686 if (glb_policy->started_picking && glb_policy->updating_lb_call) {
1687 if (glb_policy->retry_timer_active) {
1688 grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
1689 }
1690 if (!glb_policy->shutting_down) start_picking_locked(exec_ctx, glb_policy);
1691 glb_policy->updating_lb_call = false;
1692 } else if (!glb_policy->shutting_down) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001693 /* if we aren't shutting down, restart the LB client call after some time */
1694 gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
1695 gpr_timespec next_try =
1696 gpr_backoff_step(&glb_policy->lb_call_backoff_state, now);
Craig Tiller84f75d42017-05-03 13:06:35 -07001697 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001698 gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...",
1699 (void *)glb_policy);
1700 gpr_timespec timeout = gpr_time_sub(next_try, now);
1701 if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001702 gpr_log(GPR_DEBUG,
1703 "... retry_timer_active in %" PRId64 ".%09d seconds.",
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001704 timeout.tv_sec, timeout.tv_nsec);
1705 } else {
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001706 gpr_log(GPR_DEBUG, "... retry_timer_active immediately.");
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001707 }
1708 }
1709 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer");
ncteisen969b46e2017-06-08 14:57:11 -07001710 GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry,
Craig Tilleree4b1452017-05-12 10:56:03 -07001711 lb_call_on_retry_timer_locked, glb_policy,
1712 grpc_combiner_scheduler(glb_policy->base.combiner));
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001713 glb_policy->retry_timer_active = true;
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001714 grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try,
Masood Malekghassemib5b43722017-01-05 15:07:26 -08001715 &glb_policy->lb_on_call_retry, now);
David Garcia Quintas98da61b2016-10-29 08:46:31 +02001716 }
David Garcia Quintas7ec29132016-11-01 04:09:05 +01001717 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001718 "lb_on_server_status_received_locked");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001719}
1720
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001721static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
1722 const grpc_lb_policy_args *args) {
1723 glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001724 if (glb_policy->updating_lb_channel) {
1725 if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
1726 gpr_log(GPR_INFO,
1727 "Update already in progress for grpclb %p. Deferring update.",
1728 (void *)glb_policy);
1729 }
1730 if (glb_policy->pending_update_args != NULL) {
1731 grpc_channel_args_destroy(exec_ctx,
1732 glb_policy->pending_update_args->args);
1733 gpr_free(glb_policy->pending_update_args);
1734 }
1735 glb_policy->pending_update_args =
1736 gpr_zalloc(sizeof(*glb_policy->pending_update_args));
1737 glb_policy->pending_update_args->client_channel_factory =
1738 args->client_channel_factory;
1739 glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args);
1740 glb_policy->pending_update_args->combiner = args->combiner;
1741 return;
1742 }
1743
1744 glb_policy->updating_lb_channel = true;
1745 // Propagate update to lb_channel (pick first).
1746 const grpc_arg *arg =
1747 grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
1748 if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
1749 if (glb_policy->lb_channel == NULL) {
1750 // If we don't have a current channel to the LB, go into TRANSIENT
1751 // FAILURE.
1752 grpc_connectivity_state_set(
1753 exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
1754 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
1755 "glb_update_missing");
1756 } else {
1757 // otherwise, keep using the current LB channel (ignore this update).
1758 gpr_log(GPR_ERROR,
1759 "No valid LB addresses channel arg for grpclb %p update, "
1760 "ignoring.",
1761 (void *)glb_policy);
1762 }
1763 }
1764 const grpc_lb_addresses *addresses = arg->value.pointer.p;
1765 GPR_ASSERT(glb_policy->lb_channel != NULL);
1766 grpc_channel_args *lb_channel_args = build_lb_channel_args(
1767 exec_ctx, addresses, glb_policy->response_generator, args->args);
1768 /* Propagate updates to the LB channel through the fake resolver */
1769 grpc_fake_resolver_response_generator_set_response(
1770 exec_ctx, glb_policy->response_generator, lb_channel_args);
1771 grpc_channel_args_destroy(exec_ctx, lb_channel_args);
1772
1773 if (!glb_policy->watching_lb_channel) {
1774 // Watch the LB channel connectivity for connection.
1775 glb_policy->lb_channel_connectivity = GRPC_CHANNEL_INIT;
1776 grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
1777 grpc_channel_get_channel_stack(glb_policy->lb_channel));
1778 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1779 glb_policy->watching_lb_channel = true;
1780 GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "watch_lb_channel_connectivity");
1781 grpc_client_channel_watch_connectivity_state(
1782 exec_ctx, client_channel_elem,
1783 grpc_polling_entity_create_from_pollset_set(
1784 glb_policy->base.interested_parties),
1785 &glb_policy->lb_channel_connectivity,
1786 &glb_policy->lb_channel_on_connectivity_changed, NULL);
1787 }
1788}
1789
1790// Invoked as part of the update process. It continues watching the LB channel
1791// until it shuts down or becomes READY. It's invoked even if the LB channel
1792// stayed READY throughout the update (for example if the update is identical).
1793static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
1794 void *arg,
1795 grpc_error *error) {
1796 glb_lb_policy *glb_policy = arg;
1797 if (glb_policy->shutting_down) goto done;
1798 // Re-initialize the lb_call. This should also take care of updating the
1799 // embedded RR policy. Note that the current RR policy, if any, will stay in
1800 // effect until an update from the new lb_call is received.
1801 switch (glb_policy->lb_channel_connectivity) {
1802 case GRPC_CHANNEL_INIT:
1803 case GRPC_CHANNEL_CONNECTING:
1804 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1805 /* resub. */
1806 grpc_channel_element *client_channel_elem =
1807 grpc_channel_stack_last_element(
1808 grpc_channel_get_channel_stack(glb_policy->lb_channel));
1809 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1810 grpc_client_channel_watch_connectivity_state(
1811 exec_ctx, client_channel_elem,
1812 grpc_polling_entity_create_from_pollset_set(
1813 glb_policy->base.interested_parties),
1814 &glb_policy->lb_channel_connectivity,
1815 &glb_policy->lb_channel_on_connectivity_changed, NULL);
1816 break;
1817 }
1818 case GRPC_CHANNEL_IDLE:
1819 // lb channel inactive (probably shutdown prior to update). Restart lb
1820 // call to kick the lb channel into gear.
1821 GPR_ASSERT(glb_policy->lb_call == NULL);
1822 /* fallthrough */
1823 case GRPC_CHANNEL_READY:
1824 if (glb_policy->lb_call != NULL) {
1825 glb_policy->updating_lb_channel = false;
1826 glb_policy->updating_lb_call = true;
1827 grpc_call_cancel(glb_policy->lb_call, NULL);
1828 // lb_on_server_status_received will pick up the cancel and reinit
1829 // lb_call.
1830 if (glb_policy->pending_update_args != NULL) {
David Garcia Quintasae5e83b2017-07-18 16:11:00 -07001831 grpc_lb_policy_args *args = glb_policy->pending_update_args;
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001832 glb_policy->pending_update_args = NULL;
1833 glb_update_locked(exec_ctx, &glb_policy->base, args);
David Garcia Quintasae5e83b2017-07-18 16:11:00 -07001834 grpc_channel_args_destroy(exec_ctx, args->args);
1835 gpr_free(args);
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001836 }
1837 } else if (glb_policy->started_picking && !glb_policy->shutting_down) {
1838 if (glb_policy->retry_timer_active) {
1839 grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
1840 glb_policy->retry_timer_active = false;
1841 }
1842 start_picking_locked(exec_ctx, glb_policy);
1843 }
1844 /* fallthrough */
1845 case GRPC_CHANNEL_SHUTDOWN:
1846 done:
1847 glb_policy->watching_lb_channel = false;
1848 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
1849 "watch_lb_channel_connectivity_cb_shutdown");
1850 break;
1851 }
1852}
1853
David Garcia Quintas8d489112016-07-29 15:20:42 -07001854/* Code wiring the policy with the rest of the core */
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001855static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
Craig Tiller2400bf52017-02-09 16:25:19 -08001856 glb_destroy,
1857 glb_shutdown_locked,
1858 glb_pick_locked,
1859 glb_cancel_pick_locked,
1860 glb_cancel_picks_locked,
1861 glb_ping_one_locked,
1862 glb_exit_idle_locked,
1863 glb_check_connectivity_locked,
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001864 glb_notify_on_state_change_locked,
1865 glb_update_locked};
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001866
1867static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
1868
1869static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
1870
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001871static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
1872 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
1873
1874static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
1875
1876grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
1877 return &glb_lb_policy_factory;
1878}
1879
1880/* Plugin registration */
Mark D. Roth09e458c2017-05-02 08:13:26 -07001881
1882// Only add client_load_reporting filter if the grpclb LB policy is used.
1883static bool maybe_add_client_load_reporting_filter(
1884 grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) {
1885 const grpc_channel_args *args =
1886 grpc_channel_stack_builder_get_channel_arguments(builder);
1887 const grpc_arg *channel_arg =
1888 grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1889 if (channel_arg != NULL && channel_arg->type == GRPC_ARG_STRING &&
1890 strcmp(channel_arg->value.string, "grpclb") == 0) {
1891 return grpc_channel_stack_builder_append_filter(
1892 builder, (const grpc_channel_filter *)arg, NULL, NULL);
1893 }
1894 return true;
1895}
1896
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001897void grpc_lb_policy_grpclb_init() {
1898 grpc_register_lb_policy(grpc_glb_lb_factory_create());
ncteisen06bce6e2017-07-10 07:58:49 -07001899 grpc_register_tracer(&grpc_lb_glb_trace);
ncteisen4b584052017-06-08 16:44:38 -07001900#ifndef NDEBUG
ncteisen06bce6e2017-07-10 07:58:49 -07001901 grpc_register_tracer(&grpc_trace_lb_policy_refcount);
ncteisen4b584052017-06-08 16:44:38 -07001902#endif
Mark D. Roth09e458c2017-05-02 08:13:26 -07001903 grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1904 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1905 maybe_add_client_load_reporting_filter,
1906 (void *)&grpc_client_load_reporting_filter);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001907}
1908
1909void grpc_lb_policy_grpclb_shutdown() {}