blob: 17ab0febb7fe597cec446cbbaf57ad1400a31ea7 [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2016 gRPC authors.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070016 *
17 */
18
Mark D. Rothc8875492018-02-20 08:33:48 -080019/// Implementation of the gRPC LB policy.
20///
21/// This policy takes as input a list of resolved addresses, which must
22/// include at least one balancer address.
23///
24/// An internal channel (\a lb_channel_) is created for the addresses
25/// from that are balancers. This channel behaves just like a regular
26/// channel that uses pick_first to select from the list of balancer
27/// addresses.
28///
29/// The first time the policy gets a request for a pick, a ping, or to exit
30/// the idle state, \a StartPickingLocked() is called. This method is
31/// responsible for instantiating the internal *streaming* call to the LB
32/// server (whichever address pick_first chose). The call will be complete
33/// when either the balancer sends status or when we cancel the call (e.g.,
34/// because we are shutting down). In needed, we retry the call. If we
35/// received at least one valid message from the server, a new call attempt
36/// will be made immediately; otherwise, we apply back-off delays between
37/// attempts.
38///
39/// We maintain an internal round_robin policy instance for distributing
40/// requests across backends. Whenever we receive a new serverlist from
41/// the balancer, we update the round_robin policy with the new list of
42/// addresses. If we cannot communicate with the balancer on startup,
43/// however, we may enter fallback mode, in which case we will populate
44/// the RR policy's addresses from the backend addresses returned by the
45/// resolver.
46///
47/// Once an RR policy instance is in place (and getting updated as described),
48/// calls for a pick, a ping, or a cancellation will be serviced right
49/// away by forwarding them to the RR instance. Any time there's no RR
50/// policy available (i.e., right after the creation of the gRPCLB policy),
51/// pick and ping requests are added to a list of pending picks and pings
52/// to be flushed and serviced when the RR policy instance becomes available.
53///
54/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
55/// high level design and details.
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070056
Mark D. Rothc8875492018-02-20 08:33:48 -080057// With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
58// using that endpoint. Because of various transitive includes in uv.h,
59// including windows.h on Windows, uv.h must be included before other system
60// headers. Therefore, sockaddr.h must always be included first.
Alexander Polcyndb3e8982018-02-21 16:59:24 -080061#include <grpc/support/port_platform.h>
62
murgatroid997871f732016-09-23 13:49:05 -070063#include "src/core/lib/iomgr/sockaddr.h"
64
Yash Tibrewalfcd26bc2017-09-25 15:08:28 -070065#include <inttypes.h>
Mark D. Roth64d922a2017-05-03 12:52:04 -070066#include <limits.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070067#include <string.h>
68
69#include <grpc/byte_buffer_reader.h>
70#include <grpc/grpc.h>
71#include <grpc/support/alloc.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070072#include <grpc/support/string_util.h>
David Garcia Quintas69099222016-10-03 11:28:37 -070073#include <grpc/support/time.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070074
Craig Tiller9eb0fde2017-03-31 16:59:30 -070075#include "src/core/ext/filters/client_channel/client_channel.h"
76#include "src/core/ext/filters/client_channel/client_channel_factory.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070077#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070078#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070079#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070080#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
Craig Tillerd52e22f2017-04-02 16:22:52 -070081#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
82#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
83#include "src/core/ext/filters/client_channel/parse_address.h"
David Garcia Quintas87d5a312017-06-06 19:45:58 -070084#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
Juanli Shen6502ecc2017-09-13 13:10:54 -070085#include "src/core/ext/filters/client_channel/subchannel_index.h"
Craig Tillerc0df1c02017-07-17 16:12:33 -070086#include "src/core/lib/backoff/backoff.h"
Mark D. Roth046cf762016-09-26 11:13:51 -070087#include "src/core/lib/channel/channel_args.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070088#include "src/core/lib/channel/channel_stack.h"
Vijay Paiae376bf2018-01-25 22:54:02 -080089#include "src/core/lib/gpr/host_port.h"
Mark D. Rothdbdf4952018-01-18 11:21:12 -080090#include "src/core/lib/gpr/string.h"
Mark D. Roth4f2b0fd2018-01-19 12:12:23 -080091#include "src/core/lib/gprpp/manual_constructor.h"
Mark D. Rothc8875492018-02-20 08:33:48 -080092#include "src/core/lib/gprpp/memory.h"
93#include "src/core/lib/gprpp/orphanable.h"
Mark D. Roth209f6442018-02-08 10:26:46 -080094#include "src/core/lib/gprpp/ref_counted_ptr.h"
Craig Tiller2400bf52017-02-09 16:25:19 -080095#include "src/core/lib/iomgr/combiner.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +020096#include "src/core/lib/iomgr/sockaddr.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070097#include "src/core/lib/iomgr/sockaddr_utils.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +020098#include "src/core/lib/iomgr/timer.h"
David Garcia Quintas01291502017-02-07 13:26:41 -080099#include "src/core/lib/slice/slice_hash_table.h"
Craig Tiller18b4ba32016-11-09 15:23:42 -0800100#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -0700101#include "src/core/lib/slice/slice_string_helpers.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700102#include "src/core/lib/surface/call.h"
103#include "src/core/lib/surface/channel.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -0700104#include "src/core/lib/surface/channel_init.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700105#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700106
David Garcia Quintas1edfb952016-11-22 17:15:34 -0800107#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
108#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
109#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
110#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
Juanli Shenfe408152017-09-27 12:27:20 -0700111#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200112
Mark D. Rothc8875492018-02-20 08:33:48 -0800113namespace grpc_core {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700114
Mark D. Rothc8875492018-02-20 08:33:48 -0800115TraceFlag grpc_lb_glb_trace(false, "glb");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700116
Vijay Pai849bd732018-01-02 23:30:47 +0000117namespace {
Mark D. Rothc0febd32018-01-09 10:25:24 -0800118
Mark D. Rothc8875492018-02-20 08:33:48 -0800119class GrpcLb : public LoadBalancingPolicy {
120 public:
121 GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
122
123 void UpdateLocked(const grpc_channel_args& args) override;
124 bool PickLocked(PickState* pick) override;
125 void CancelPickLocked(PickState* pick, grpc_error* error) override;
126 void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
127 uint32_t initial_metadata_flags_eq,
128 grpc_error* error) override;
129 void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
130 grpc_closure* closure) override;
131 grpc_connectivity_state CheckConnectivityLocked(
132 grpc_error** connectivity_error) override;
133 void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
134 void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
135 void ExitIdleLocked() override;
136
137 private:
138 /// Linked list of pending pick requests. It stores all information needed to
139 /// eventually call (Round Robin's) pick() on them. They mainly stay pending
140 /// waiting for the RR policy to be created.
141 ///
142 /// Note that when a pick is sent to the RR policy, we inject our own
143 /// on_complete callback, so that we can intercept the result before
144 /// invoking the original on_complete callback. This allows us to set the
145 /// LB token metadata and add client_stats to the call context.
146 /// See \a pending_pick_complete() for details.
147 struct PendingPick {
148 // The grpclb instance that created the wrapping. This instance is not
149 // owned; reference counts are untouched. It's used only for logging
150 // purposes.
151 GrpcLb* grpclb_policy;
152 // The original pick.
153 PickState* pick;
154 // Our on_complete closure and the original one.
155 grpc_closure on_complete;
156 grpc_closure* original_on_complete;
157 // The LB token associated with the pick. This is set via user_data in
158 // the pick.
159 grpc_mdelem lb_token;
160 // Stats for client-side load reporting. Note that this holds a
161 // reference, which must be either passed on via context or unreffed.
162 grpc_grpclb_client_stats* client_stats = nullptr;
163 // Next pending pick.
164 PendingPick* next = nullptr;
165 };
166
167 /// A linked list of pending pings waiting for the RR policy to be created.
168 struct PendingPing {
169 grpc_closure* on_initiate;
170 grpc_closure* on_ack;
171 PendingPing* next = nullptr;
172 };
173
174 /// Contains a call to the LB server and all the data related to the call.
175 class BalancerCallState
176 : public InternallyRefCountedWithTracing<BalancerCallState> {
177 public:
178 explicit BalancerCallState(
179 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
180
181 // It's the caller's responsibility to ensure that Orphan() is called from
182 // inside the combiner.
183 void Orphan() override;
184
185 void StartQuery();
186
187 grpc_grpclb_client_stats* client_stats() const { return client_stats_; }
188 bool seen_initial_response() const { return seen_initial_response_; }
189
190 private:
191 ~BalancerCallState();
192
193 GrpcLb* grpclb_policy() const {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800194 return static_cast<GrpcLb*>(grpclb_policy_.get());
Mark D. Rothc8875492018-02-20 08:33:48 -0800195 }
196
197 void ScheduleNextClientLoadReportLocked();
198 void SendClientLoadReportLocked();
199
200 static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
201
202 static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
203 static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
204 static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
205 static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
206 static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
207
208 // The owning LB policy.
209 RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
210
211 // The streaming call to the LB server. Always non-NULL.
212 grpc_call* lb_call_ = nullptr;
213
214 // recv_initial_metadata
215 grpc_metadata_array lb_initial_metadata_recv_;
216
217 // send_message
218 grpc_byte_buffer* send_message_payload_ = nullptr;
219 grpc_closure lb_on_initial_request_sent_;
220
221 // recv_message
222 grpc_byte_buffer* recv_message_payload_ = nullptr;
223 grpc_closure lb_on_balancer_message_received_;
224 bool seen_initial_response_ = false;
225
226 // recv_trailing_metadata
227 grpc_closure lb_on_balancer_status_received_;
228 grpc_metadata_array lb_trailing_metadata_recv_;
229 grpc_status_code lb_call_status_;
230 grpc_slice lb_call_status_details_;
231
232 // The stats for client-side load reporting associated with this LB call.
233 // Created after the first serverlist is received.
234 grpc_grpclb_client_stats* client_stats_ = nullptr;
235 grpc_millis client_stats_report_interval_ = 0;
236 grpc_timer client_load_report_timer_;
237 bool client_load_report_timer_callback_pending_ = false;
238 bool last_client_load_report_counters_were_zero_ = false;
239 bool client_load_report_is_due_ = false;
240 // The closure used for either the load report timer or the callback for
241 // completion of sending the load report.
242 grpc_closure client_load_report_closure_;
243 };
244
245 ~GrpcLb();
246
247 void ShutdownLocked() override;
248
249 // Helper function used in ctor and UpdateLocked().
250 void ProcessChannelArgsLocked(const grpc_channel_args& args);
251
252 // Methods for dealing with the balancer channel and call.
253 void StartPickingLocked();
254 void StartBalancerCallLocked();
255 static void OnFallbackTimerLocked(void* arg, grpc_error* error);
256 void StartBalancerCallRetryTimerLocked();
257 static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
258 static void OnBalancerChannelConnectivityChangedLocked(void* arg,
259 grpc_error* error);
260
261 // Pending pick methods.
262 static void PendingPickSetMetadataAndContext(PendingPick* pp);
263 PendingPick* PendingPickCreate(PickState* pick);
264 void AddPendingPick(PendingPick* pp);
265 static void OnPendingPickComplete(void* arg, grpc_error* error);
266
267 // Pending ping methods.
268 void AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack);
269
270 // Methods for dealing with the RR policy.
271 void CreateOrUpdateRoundRobinPolicyLocked();
272 grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
273 void CreateRoundRobinPolicyLocked(const Args& args);
274 bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp);
275 void UpdateConnectivityStateFromRoundRobinPolicyLocked(
276 grpc_error* rr_state_error);
277 static void OnRoundRobinConnectivityChangedLocked(void* arg,
278 grpc_error* error);
279 static void OnRoundRobinRequestReresolutionLocked(void* arg,
280 grpc_error* error);
281
282 // Who the client is trying to communicate with.
283 const char* server_name_ = nullptr;
284
285 // Current channel args from the resolver.
286 grpc_channel_args* args_ = nullptr;
287
288 // Internal state.
289 bool started_picking_ = false;
290 bool shutting_down_ = false;
291 grpc_connectivity_state_tracker state_tracker_;
292
293 // The channel for communicating with the LB server.
294 grpc_channel* lb_channel_ = nullptr;
295 grpc_connectivity_state lb_channel_connectivity_;
296 grpc_closure lb_channel_on_connectivity_changed_;
297 // Are we already watching the LB channel's connectivity?
298 bool watching_lb_channel_ = false;
299 // Response generator to inject address updates into lb_channel_.
300 RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
301
302 // The data associated with the current LB call. It holds a ref to this LB
303 // policy. It's initialized every time we query for backends. It's reset to
304 // NULL whenever the current LB call is no longer needed (e.g., the LB policy
305 // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
306 // contains a non-NULL lb_call_.
307 OrphanablePtr<BalancerCallState> lb_calld_;
308 // Timeout in milliseconds for the LB call. 0 means no deadline.
309 int lb_call_timeout_ms_ = 0;
310 // Balancer call retry state.
311 BackOff lb_call_backoff_;
312 bool retry_timer_callback_pending_ = false;
313 grpc_timer lb_call_retry_timer_;
314 grpc_closure lb_on_call_retry_;
315
316 // The deserialized response from the balancer. May be nullptr until one
317 // such response has arrived.
318 grpc_grpclb_serverlist* serverlist_ = nullptr;
319 // Index into serverlist for next pick.
320 // If the server at this index is a drop, we return a drop.
321 // Otherwise, we delegate to the RR policy.
322 size_t serverlist_index_ = 0;
323
324 // Timeout in milliseconds for before using fallback backend addresses.
325 // 0 means not using fallback.
326 int lb_fallback_timeout_ms_ = 0;
327 // The backend addresses from the resolver.
328 grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
329 // Fallback timer.
330 bool fallback_timer_callback_pending_ = false;
331 grpc_timer lb_fallback_timer_;
332 grpc_closure lb_on_fallback_;
333
334 // Pending picks and pings that are waiting on the RR policy's connectivity.
335 PendingPick* pending_picks_ = nullptr;
336 PendingPing* pending_pings_ = nullptr;
337
338 // The RR policy to use for the backends.
339 OrphanablePtr<LoadBalancingPolicy> rr_policy_;
340 grpc_connectivity_state rr_connectivity_state_;
341 grpc_closure on_rr_connectivity_changed_;
342 grpc_closure on_rr_request_reresolution_;
Vijay Pai849bd732018-01-02 23:30:47 +0000343};
Mark D. Rothc0febd32018-01-09 10:25:24 -0800344
Mark D. Rothc8875492018-02-20 08:33:48 -0800345//
346// serverlist parsing code
347//
Mark D. Rothc0febd32018-01-09 10:25:24 -0800348
Mark D. Rothc8875492018-02-20 08:33:48 -0800349// vtable for LB tokens in grpc_lb_addresses
350void* lb_token_copy(void* token) {
351 return token == nullptr
352 ? nullptr
353 : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
354}
355void lb_token_destroy(void* token) {
356 if (token != nullptr) {
357 GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
Juanli Shen8e4c9d32018-01-23 16:26:39 -0800358 }
359}
Mark D. Rothc8875492018-02-20 08:33:48 -0800360int lb_token_cmp(void* token1, void* token2) {
361 if (token1 > token2) return 1;
362 if (token1 < token2) return -1;
363 return 0;
Juanli Shen8e4c9d32018-01-23 16:26:39 -0800364}
Mark D. Rothc8875492018-02-20 08:33:48 -0800365const grpc_lb_user_data_vtable lb_token_vtable = {
366 lb_token_copy, lb_token_destroy, lb_token_cmp};
Juanli Shen8e4c9d32018-01-23 16:26:39 -0800367
Mark D. Rothc8875492018-02-20 08:33:48 -0800368// Returns the backend addresses extracted from the given addresses.
369grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
370 // First pass: count the number of backend addresses.
371 size_t num_backends = 0;
372 for (size_t i = 0; i < addresses->num_addresses; ++i) {
373 if (!addresses->addresses[i].is_balancer) {
374 ++num_backends;
Mark D. Roth83d5cd62018-01-11 08:56:53 -0800375 }
Mark D. Rothc0febd32018-01-09 10:25:24 -0800376 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800377 // Second pass: actually populate the addresses and (empty) LB tokens.
378 grpc_lb_addresses* backend_addresses =
379 grpc_lb_addresses_create(num_backends, &lb_token_vtable);
380 size_t num_copied = 0;
381 for (size_t i = 0; i < addresses->num_addresses; ++i) {
382 if (addresses->addresses[i].is_balancer) continue;
383 const grpc_resolved_address* addr = &addresses->addresses[i].address;
384 grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
385 addr->len, false /* is_balancer */,
386 nullptr /* balancer_name */,
387 (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
388 ++num_copied;
389 }
390 return backend_addresses;
Mark D. Rothc0febd32018-01-09 10:25:24 -0800391}
392
Mark D. Rothc8875492018-02-20 08:33:48 -0800393bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
Mark D. Rothe7751802017-07-27 12:31:45 -0700394 if (server->drop) return false;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700395 const grpc_grpclb_ip_address* ip = &server->ip_address;
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700396 if (server->port >> 16 != 0) {
397 if (log) {
398 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200399 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
Mark D. Rothc8875492018-02-20 08:33:48 -0800400 server->port, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700401 }
402 return false;
403 }
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700404 if (ip->size != 4 && ip->size != 16) {
405 if (log) {
406 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200407 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700408 "serverlist. Ignoring",
Mark D. Rothc8875492018-02-20 08:33:48 -0800409 ip->size, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700410 }
411 return false;
412 }
413 return true;
414}
415
Mark D. Rothc8875492018-02-20 08:33:48 -0800416void ParseServer(const grpc_grpclb_server* server,
417 grpc_resolved_address* addr) {
Mark D. Rothd7389b42017-05-17 12:22:17 -0700418 memset(addr, 0, sizeof(*addr));
Mark D. Rothe7751802017-07-27 12:31:45 -0700419 if (server->drop) return;
Mark D. Rothc8875492018-02-20 08:33:48 -0800420 const uint16_t netorder_port = htons((uint16_t)server->port);
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100421 /* the addresses are given in binary format (a in(6)_addr struct) in
422 * server->ip_address.bytes. */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700423 const grpc_grpclb_ip_address* ip = &server->ip_address;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100424 if (ip->size == 4) {
425 addr->len = sizeof(struct sockaddr_in);
Mark D. Rothc8875492018-02-20 08:33:48 -0800426 struct sockaddr_in* addr4 = (struct sockaddr_in*)&addr->addr;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100427 addr4->sin_family = AF_INET;
428 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
429 addr4->sin_port = netorder_port;
430 } else if (ip->size == 16) {
431 addr->len = sizeof(struct sockaddr_in6);
Mark D. Rothc8875492018-02-20 08:33:48 -0800432 struct sockaddr_in6* addr6 = (struct sockaddr_in6*)&addr->addr;
David Garcia Quintas107ca162016-11-02 18:17:03 -0700433 addr6->sin6_family = AF_INET6;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100434 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
435 addr6->sin6_port = netorder_port;
436 }
437}
438
Mark D. Rothc8875492018-02-20 08:33:48 -0800439// Returns addresses extracted from \a serverlist.
440grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700441 size_t num_valid = 0;
442 /* first pass: count how many are valid in order to allocate the necessary
443 * memory in a single block */
444 for (size_t i = 0; i < serverlist->num_servers; ++i) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800445 if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700446 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700447 grpc_lb_addresses* lb_addresses =
Mark D. Roth16883a32016-10-21 10:30:58 -0700448 grpc_lb_addresses_create(num_valid, &lb_token_vtable);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700449 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700450 * to the outside world) to be read by the RR policy during its creation.
451 * Given that the validity tests are very cheap, they are performed again
452 * instead of marking the valid ones during the first pass, as this would
453 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700454 size_t addr_idx = 0;
455 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700456 const grpc_grpclb_server* server = serverlist->servers[sl_idx];
Mark D. Rothc8875492018-02-20 08:33:48 -0800457 if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700458 GPR_ASSERT(addr_idx < num_valid);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700459 /* address processing */
Mark D. Rothc5c38782016-09-16 08:51:01 -0700460 grpc_resolved_address addr;
Mark D. Rothc8875492018-02-20 08:33:48 -0800461 ParseServer(server, &addr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700462 /* lb token processing */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700463 void* user_data;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700464 if (server->has_load_balance_token) {
David Garcia Quintas0baf1dc2016-10-28 04:44:01 +0200465 const size_t lb_token_max_length =
466 GPR_ARRAY_SIZE(server->load_balance_token);
467 const size_t lb_token_length =
468 strnlen(server->load_balance_token, lb_token_max_length);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800469 grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
470 server->load_balance_token, lb_token_length);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800471 user_data =
472 (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
473 .payload;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700474 } else {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700475 char* uri = grpc_sockaddr_to_uri(&addr);
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800476 gpr_log(GPR_INFO,
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700477 "Missing LB token for backend address '%s'. The empty token will "
478 "be used instead",
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800479 uri);
480 gpr_free(uri);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700481 user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700482 }
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700483 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
484 false /* is_balancer */,
Noah Eisen882dfed2017-11-14 14:58:20 -0800485 nullptr /* balancer_name */, user_data);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700486 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700487 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700488 GPR_ASSERT(addr_idx == num_valid);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700489 return lb_addresses;
490}
491
Mark D. Rothc8875492018-02-20 08:33:48 -0800492//
493// GrpcLb::BalancerCallState
494//
495
496GrpcLb::BalancerCallState::BalancerCallState(
497 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
498 : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_glb_trace),
499 grpclb_policy_(std::move(parent_grpclb_policy)) {
500 GPR_ASSERT(grpclb_policy_ != nullptr);
501 GPR_ASSERT(!grpclb_policy()->shutting_down_);
502 // Init the LB call. Note that the LB call will progress every time there's
503 // activity in grpclb_policy_->interested_parties(), which is comprised of
504 // the polling entities from client_channel.
505 GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
506 GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
David Garcia Quintasc7c0d692018-03-10 17:27:15 -0800507 const grpc_millis deadline =
Mark D. Rothc8875492018-02-20 08:33:48 -0800508 grpclb_policy()->lb_call_timeout_ms_ == 0
509 ? GRPC_MILLIS_INF_FUTURE
510 : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
511 lb_call_ = grpc_channel_create_pollset_set_call(
512 grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
513 grpclb_policy_->interested_parties(),
514 GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
David Garcia Quintasc7c0d692018-03-10 17:27:15 -0800515 nullptr, deadline, nullptr);
Mark D. Rothc8875492018-02-20 08:33:48 -0800516 // Init the LB call request payload.
517 grpc_grpclb_request* request =
518 grpc_grpclb_request_create(grpclb_policy()->server_name_);
519 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
520 send_message_payload_ =
521 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
522 grpc_slice_unref_internal(request_payload_slice);
523 grpc_grpclb_request_destroy(request);
524 // Init other data associated with the LB call.
525 grpc_metadata_array_init(&lb_initial_metadata_recv_);
526 grpc_metadata_array_init(&lb_trailing_metadata_recv_);
527 GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
528 this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
529 GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
530 OnBalancerMessageReceivedLocked, this,
531 grpc_combiner_scheduler(grpclb_policy()->combiner()));
532 GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
533 OnBalancerStatusReceivedLocked, this,
534 grpc_combiner_scheduler(grpclb_policy()->combiner()));
Juanli Shenfe408152017-09-27 12:27:20 -0700535}
536
Mark D. Rothc8875492018-02-20 08:33:48 -0800537GrpcLb::BalancerCallState::~BalancerCallState() {
538 GPR_ASSERT(lb_call_ != nullptr);
539 grpc_call_unref(lb_call_);
540 grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
541 grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
542 grpc_byte_buffer_destroy(send_message_payload_);
543 grpc_byte_buffer_destroy(recv_message_payload_);
544 grpc_slice_unref_internal(lb_call_status_details_);
545 if (client_stats_ != nullptr) {
546 grpc_grpclb_client_stats_unref(client_stats_);
547 }
548}
549
550void GrpcLb::BalancerCallState::Orphan() {
551 GPR_ASSERT(lb_call_ != nullptr);
552 // If we are here because grpclb_policy wants to cancel the call,
553 // lb_on_balancer_status_received_ will complete the cancellation and clean
554 // up. Otherwise, we are here because grpclb_policy has to orphan a failed
555 // call, then the following cancellation will be a no-op.
556 grpc_call_cancel(lb_call_, nullptr);
557 if (client_load_report_timer_callback_pending_) {
558 grpc_timer_cancel(&client_load_report_timer_);
559 }
560 // Note that the initial ref is hold by lb_on_balancer_status_received_
561 // instead of the caller of this function. So the corresponding unref happens
562 // in lb_on_balancer_status_received_ instead of here.
563}
564
565void GrpcLb::BalancerCallState::StartQuery() {
566 GPR_ASSERT(lb_call_ != nullptr);
567 if (grpc_lb_glb_trace.enabled()) {
568 gpr_log(GPR_INFO,
569 "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
570 grpclb_policy_.get(), this, lb_call_);
571 }
572 // Create the ops.
573 grpc_call_error call_error;
574 grpc_op ops[3];
575 memset(ops, 0, sizeof(ops));
576 // Op: send initial metadata.
577 grpc_op* op = ops;
578 op->op = GRPC_OP_SEND_INITIAL_METADATA;
579 op->data.send_initial_metadata.count = 0;
580 op->flags = 0;
581 op->reserved = nullptr;
582 op++;
583 // Op: send request message.
584 GPR_ASSERT(send_message_payload_ != nullptr);
585 op->op = GRPC_OP_SEND_MESSAGE;
586 op->data.send_message.send_message = send_message_payload_;
587 op->flags = 0;
588 op->reserved = nullptr;
589 op++;
590 // TODO(roth): We currently track this ref manually. Once the
591 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
592 // with the callback.
593 auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
594 self.release();
595 call_error = grpc_call_start_batch_and_execute(
596 lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
597 GPR_ASSERT(GRPC_CALL_OK == call_error);
598 // Op: recv initial metadata.
599 op = ops;
600 op->op = GRPC_OP_RECV_INITIAL_METADATA;
601 op->data.recv_initial_metadata.recv_initial_metadata =
602 &lb_initial_metadata_recv_;
603 op->flags = 0;
604 op->reserved = nullptr;
605 op++;
606 // Op: recv response.
607 op->op = GRPC_OP_RECV_MESSAGE;
608 op->data.recv_message.recv_message = &recv_message_payload_;
609 op->flags = 0;
610 op->reserved = nullptr;
611 op++;
612 // TODO(roth): We currently track this ref manually. Once the
613 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
614 // with the callback.
615 self = Ref(DEBUG_LOCATION, "on_message_received");
616 self.release();
617 call_error = grpc_call_start_batch_and_execute(
618 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
619 GPR_ASSERT(GRPC_CALL_OK == call_error);
620 // Op: recv server status.
621 op = ops;
622 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
623 op->data.recv_status_on_client.trailing_metadata =
624 &lb_trailing_metadata_recv_;
625 op->data.recv_status_on_client.status = &lb_call_status_;
626 op->data.recv_status_on_client.status_details = &lb_call_status_details_;
627 op->flags = 0;
628 op->reserved = nullptr;
629 op++;
630 // This callback signals the end of the LB call, so it relies on the initial
631 // ref instead of a new ref. When it's invoked, it's the initial ref that is
632 // unreffed.
633 call_error = grpc_call_start_batch_and_execute(
634 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
635 GPR_ASSERT(GRPC_CALL_OK == call_error);
636};
637
638void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
639 const grpc_millis next_client_load_report_time =
640 ExecCtx::Get()->Now() + client_stats_report_interval_;
641 GRPC_CLOSURE_INIT(&client_load_report_closure_,
642 MaybeSendClientLoadReportLocked, this,
643 grpc_combiner_scheduler(grpclb_policy()->combiner()));
644 grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
645 &client_load_report_closure_);
646 client_load_report_timer_callback_pending_ = true;
647}
648
649void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
650 void* arg, grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800651 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -0800652 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
653 lb_calld->client_load_report_timer_callback_pending_ = false;
654 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
655 lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
656 return;
657 }
658 // If we've already sent the initial request, then we can go ahead and send
659 // the load report. Otherwise, we need to wait until the initial request has
660 // been sent to send this (see OnInitialRequestSentLocked()).
661 if (lb_calld->send_message_payload_ == nullptr) {
662 lb_calld->SendClientLoadReportLocked();
663 } else {
664 lb_calld->client_load_report_is_due_ = true;
665 }
666}
667
668bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
669 grpc_grpclb_request* request) {
670 grpc_grpclb_dropped_call_counts* drop_entries =
671 static_cast<grpc_grpclb_dropped_call_counts*>(
672 request->client_stats.calls_finished_with_drop.arg);
673 return request->client_stats.num_calls_started == 0 &&
674 request->client_stats.num_calls_finished == 0 &&
675 request->client_stats.num_calls_finished_with_client_failed_to_send ==
676 0 &&
677 request->client_stats.num_calls_finished_known_received == 0 &&
678 (drop_entries == nullptr || drop_entries->num_entries == 0);
679}
680
681void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
682 // Construct message payload.
683 GPR_ASSERT(send_message_payload_ == nullptr);
684 grpc_grpclb_request* request =
685 grpc_grpclb_load_report_request_create_locked(client_stats_);
686 // Skip client load report if the counters were all zero in the last
687 // report and they are still zero in this one.
688 if (LoadReportCountersAreZero(request)) {
689 if (last_client_load_report_counters_were_zero_) {
690 grpc_grpclb_request_destroy(request);
691 ScheduleNextClientLoadReportLocked();
692 return;
693 }
694 last_client_load_report_counters_were_zero_ = true;
695 } else {
696 last_client_load_report_counters_were_zero_ = false;
697 }
698 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
699 send_message_payload_ =
700 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
701 grpc_slice_unref_internal(request_payload_slice);
702 grpc_grpclb_request_destroy(request);
703 // Send the report.
704 grpc_op op;
705 memset(&op, 0, sizeof(op));
706 op.op = GRPC_OP_SEND_MESSAGE;
707 op.data.send_message.send_message = send_message_payload_;
708 GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
709 this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
710 grpc_call_error call_error = grpc_call_start_batch_and_execute(
711 lb_call_, &op, 1, &client_load_report_closure_);
712 if (call_error != GRPC_CALL_OK) {
713 gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(),
714 call_error);
715 GPR_ASSERT(GRPC_CALL_OK == call_error);
716 }
717}
718
719void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
720 grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800721 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -0800722 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
723 grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
724 lb_calld->send_message_payload_ = nullptr;
725 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
726 lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
727 return;
728 }
729 lb_calld->ScheduleNextClientLoadReportLocked();
730}
731
732void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
733 grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800734 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -0800735 grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
736 lb_calld->send_message_payload_ = nullptr;
737 // If we attempted to send a client load report before the initial request was
738 // sent (and this lb_calld is still in use), send the load report now.
739 if (lb_calld->client_load_report_is_due_ &&
740 lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
741 lb_calld->SendClientLoadReportLocked();
742 lb_calld->client_load_report_is_due_ = false;
743 }
744 lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
745}
746
747void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
748 void* arg, grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800749 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -0800750 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
751 // Empty payload means the LB call was cancelled.
752 if (lb_calld != grpclb_policy->lb_calld_.get() ||
753 lb_calld->recv_message_payload_ == nullptr) {
754 lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
755 return;
756 }
757 grpc_byte_buffer_reader bbr;
758 grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
759 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
760 grpc_byte_buffer_reader_destroy(&bbr);
761 grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
762 lb_calld->recv_message_payload_ = nullptr;
763 grpc_grpclb_initial_response* initial_response;
764 grpc_grpclb_serverlist* serverlist;
765 if (!lb_calld->seen_initial_response_ &&
766 (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
767 nullptr) {
768 // Have NOT seen initial response, look for initial response.
769 if (initial_response->has_client_stats_report_interval) {
770 lb_calld->client_stats_report_interval_ = GPR_MAX(
771 GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
772 &initial_response->client_stats_report_interval));
773 if (grpc_lb_glb_trace.enabled()) {
774 gpr_log(GPR_INFO,
775 "[grpclb %p] Received initial LB response message; "
776 "client load reporting interval = %" PRIdPTR " milliseconds",
777 grpclb_policy, lb_calld->client_stats_report_interval_);
778 }
779 } else if (grpc_lb_glb_trace.enabled()) {
780 gpr_log(GPR_INFO,
781 "[grpclb %p] Received initial LB response message; client load "
782 "reporting NOT enabled",
783 grpclb_policy);
784 }
785 grpc_grpclb_initial_response_destroy(initial_response);
786 lb_calld->seen_initial_response_ = true;
787 } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
788 response_slice)) != nullptr) {
789 // Have seen initial response, look for serverlist.
790 GPR_ASSERT(lb_calld->lb_call_ != nullptr);
791 if (grpc_lb_glb_trace.enabled()) {
792 gpr_log(GPR_INFO,
793 "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
794 grpclb_policy, serverlist->num_servers);
795 for (size_t i = 0; i < serverlist->num_servers; ++i) {
796 grpc_resolved_address addr;
797 ParseServer(serverlist->servers[i], &addr);
798 char* ipport;
799 grpc_sockaddr_to_string(&ipport, &addr, false);
800 gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
801 grpclb_policy, i, ipport);
802 gpr_free(ipport);
803 }
804 }
805 /* update serverlist */
806 if (serverlist->num_servers > 0) {
807 // Start sending client load report only after we start using the
808 // serverlist returned from the current LB call.
809 if (lb_calld->client_stats_report_interval_ > 0 &&
810 lb_calld->client_stats_ == nullptr) {
811 lb_calld->client_stats_ = grpc_grpclb_client_stats_create();
812 // TODO(roth): We currently track this ref manually. Once the
813 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
814 // with the callback.
815 auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
816 self.release();
817 lb_calld->ScheduleNextClientLoadReportLocked();
818 }
819 if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_,
820 serverlist)) {
821 if (grpc_lb_glb_trace.enabled()) {
822 gpr_log(GPR_INFO,
823 "[grpclb %p] Incoming server list identical to current, "
824 "ignoring.",
825 grpclb_policy);
826 }
827 grpc_grpclb_destroy_serverlist(serverlist);
828 } else { /* new serverlist */
829 if (grpclb_policy->serverlist_ != nullptr) {
830 /* dispose of the old serverlist */
831 grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
832 } else {
833 /* or dispose of the fallback */
834 grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
835 grpclb_policy->fallback_backend_addresses_ = nullptr;
836 if (grpclb_policy->fallback_timer_callback_pending_) {
837 grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
838 }
839 }
840 // and update the copy in the GrpcLb instance. This
841 // serverlist instance will be destroyed either upon the next
842 // update or when the GrpcLb instance is destroyed.
843 grpclb_policy->serverlist_ = serverlist;
844 grpclb_policy->serverlist_index_ = 0;
845 grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
846 }
847 } else {
848 if (grpc_lb_glb_trace.enabled()) {
849 gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
850 grpclb_policy);
851 }
852 grpc_grpclb_destroy_serverlist(serverlist);
853 }
854 } else {
855 // No valid initial response or serverlist found.
856 gpr_log(GPR_ERROR,
857 "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
858 grpclb_policy,
859 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
860 }
861 grpc_slice_unref_internal(response_slice);
862 if (!grpclb_policy->shutting_down_) {
863 // Keep listening for serverlist updates.
864 grpc_op op;
865 memset(&op, 0, sizeof(op));
866 op.op = GRPC_OP_RECV_MESSAGE;
867 op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
868 op.flags = 0;
869 op.reserved = nullptr;
870 // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
871 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
872 lb_calld->lb_call_, &op, 1,
873 &lb_calld->lb_on_balancer_message_received_);
874 GPR_ASSERT(GRPC_CALL_OK == call_error);
875 } else {
876 lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
877 }
878}
879
880void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
881 void* arg, grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800882 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -0800883 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
884 GPR_ASSERT(lb_calld->lb_call_ != nullptr);
885 if (grpc_lb_glb_trace.enabled()) {
886 char* status_details =
887 grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
888 gpr_log(GPR_INFO,
889 "[grpclb %p] Status from LB server received. Status = %d, details "
890 "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
891 grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
892 lb_calld->lb_call_, grpc_error_string(error));
893 gpr_free(status_details);
894 }
895 grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
896 // If this lb_calld is still in use, this call ended because of a failure so
897 // we want to retry connecting. Otherwise, we have deliberately ended this
898 // call and no further action is required.
899 if (lb_calld == grpclb_policy->lb_calld_.get()) {
900 grpclb_policy->lb_calld_.reset();
901 GPR_ASSERT(!grpclb_policy->shutting_down_);
902 if (lb_calld->seen_initial_response_) {
903 // If we lose connection to the LB server, reset the backoff and restart
904 // the LB call immediately.
905 grpclb_policy->lb_call_backoff_.Reset();
906 grpclb_policy->StartBalancerCallLocked();
907 } else {
908 // If this LB call fails establishing any connection to the LB server,
909 // retry later.
910 grpclb_policy->StartBalancerCallRetryTimerLocked();
911 }
912 }
913 lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
914}
915
916//
917// helper code for creating balancer channel
918//
919
Mark D. Rothbd0f1512018-02-20 10:28:22 -0800920grpc_lb_addresses* ExtractBalancerAddresses(
921 const grpc_lb_addresses* addresses) {
922 size_t num_grpclb_addrs = 0;
923 for (size_t i = 0; i < addresses->num_addresses; ++i) {
924 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
925 }
926 // There must be at least one balancer address, or else the
927 // client_channel would not have chosen this LB policy.
928 GPR_ASSERT(num_grpclb_addrs > 0);
929 grpc_lb_addresses* lb_addresses =
930 grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
931 size_t lb_addresses_idx = 0;
932 for (size_t i = 0; i < addresses->num_addresses; ++i) {
933 if (!addresses->addresses[i].is_balancer) continue;
934 if (addresses->addresses[i].user_data != nullptr) {
935 gpr_log(GPR_ERROR,
936 "This LB policy doesn't support user data. It will be ignored");
937 }
938 grpc_lb_addresses_set_address(
939 lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
940 addresses->addresses[i].address.len, false /* is balancer */,
941 addresses->addresses[i].balancer_name, nullptr /* user data */);
942 }
943 GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
944 return lb_addresses;
Mark D. Rothc8875492018-02-20 08:33:48 -0800945}
946
947/* Returns the channel args for the LB channel, used to create a bidirectional
948 * stream for the reception of load balancing updates.
949 *
950 * Inputs:
951 * - \a addresses: corresponding to the balancers.
952 * - \a response_generator: in order to propagate updates from the resolver
953 * above the grpclb policy.
954 * - \a args: other args inherited from the grpclb policy. */
955grpc_channel_args* BuildBalancerChannelArgs(
956 const grpc_lb_addresses* addresses,
957 FakeResolverResponseGenerator* response_generator,
958 const grpc_channel_args* args) {
Mark D. Rothbd0f1512018-02-20 10:28:22 -0800959 grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
960 // Channel args to remove.
961 static const char* args_to_remove[] = {
962 // LB policy name, since we want to use the default (pick_first) in
963 // the LB channel.
964 GRPC_ARG_LB_POLICY_NAME,
965 // The channel arg for the server URI, since that will be different for
966 // the LB channel than for the parent channel. The client channel
967 // factory will re-add this arg with the right value.
968 GRPC_ARG_SERVER_URI,
969 // The resolved addresses, which will be generated by the name resolver
970 // used in the LB channel. Note that the LB channel will use the fake
971 // resolver, so this won't actually generate a query to DNS (or some
972 // other name service). However, the addresses returned by the fake
973 // resolver will have is_balancer=false, whereas our own addresses have
974 // is_balancer=true. We need the LB channel to return addresses with
975 // is_balancer=false so that it does not wind up recursively using the
976 // grpclb LB policy, as per the special case logic in client_channel.c.
977 GRPC_ARG_LB_ADDRESSES,
978 // The fake resolver response generator, because we are replacing it
979 // with the one from the grpclb policy, used to propagate updates to
980 // the LB channel.
981 GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
David Garcia Quintasc7c0d692018-03-10 17:27:15 -0800982 // The LB channel should use the authority indicated by the target
983 // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
984 // as opposed to the authority from the parent channel.
985 GRPC_ARG_DEFAULT_AUTHORITY,
David Garcia Quintas7b84b3d2018-03-12 12:08:30 -0700986 // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
987 // treated as a stand-alone channel and not inherit this argument from the
988 // args of the parent channel.
989 GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
Mark D. Rothbd0f1512018-02-20 10:28:22 -0800990 };
991 // Channel args to add.
992 const grpc_arg args_to_add[] = {
993 // New LB addresses.
994 // Note that we pass these in both when creating the LB channel
995 // and via the fake resolver. The latter is what actually gets used.
996 grpc_lb_addresses_create_channel_arg(lb_addresses),
997 // The fake resolver response generator, which we use to inject
998 // address updates into the LB channel.
999 grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
1000 response_generator),
1001 };
1002 // Construct channel args.
1003 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1004 args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
1005 GPR_ARRAY_SIZE(args_to_add));
1006 // Make any necessary modifications for security.
1007 new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
1008 // Clean up.
Mark D. Rothc8875492018-02-20 08:33:48 -08001009 grpc_lb_addresses_destroy(lb_addresses);
Mark D. Rothbd0f1512018-02-20 10:28:22 -08001010 return new_args;
Mark D. Rothc8875492018-02-20 08:33:48 -08001011}
1012
1013//
1014// ctor and dtor
1015//
1016
1017GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
1018 const LoadBalancingPolicy::Args& args)
1019 : LoadBalancingPolicy(args),
1020 response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1021 lb_call_backoff_(
1022 BackOff::Options()
1023 .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
1024 1000)
1025 .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1026 .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1027 .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1028 1000)) {
1029 // Initialization.
1030 grpc_subchannel_index_ref();
1031 GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
1032 &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
1033 grpc_combiner_scheduler(args.combiner));
1034 GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_,
1035 &GrpcLb::OnRoundRobinConnectivityChangedLocked, this,
1036 grpc_combiner_scheduler(args.combiner));
1037 GRPC_CLOSURE_INIT(&on_rr_request_reresolution_,
1038 &GrpcLb::OnRoundRobinRequestReresolutionLocked, this,
1039 grpc_combiner_scheduler(args.combiner));
1040 grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb");
1041 // Record server name.
1042 const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1043 const char* server_uri = grpc_channel_arg_get_string(arg);
1044 GPR_ASSERT(server_uri != nullptr);
1045 grpc_uri* uri = grpc_uri_parse(server_uri, true);
1046 GPR_ASSERT(uri->path[0] != '\0');
1047 server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1048 if (grpc_lb_glb_trace.enabled()) {
1049 gpr_log(GPR_INFO,
1050 "[grpclb %p] Will use '%s' as the server name for LB request.",
1051 this, server_name_);
1052 }
1053 grpc_uri_destroy(uri);
1054 // Record LB call timeout.
1055 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1056 lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
1057 // Record fallback timeout.
1058 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
1059 lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
1060 arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
1061 // Process channel args.
1062 ProcessChannelArgsLocked(*args.args);
1063}
1064
1065GrpcLb::~GrpcLb() {
1066 GPR_ASSERT(pending_picks_ == nullptr);
1067 GPR_ASSERT(pending_pings_ == nullptr);
1068 gpr_free((void*)server_name_);
1069 grpc_channel_args_destroy(args_);
1070 grpc_connectivity_state_destroy(&state_tracker_);
1071 if (serverlist_ != nullptr) {
1072 grpc_grpclb_destroy_serverlist(serverlist_);
1073 }
1074 if (fallback_backend_addresses_ != nullptr) {
1075 grpc_lb_addresses_destroy(fallback_backend_addresses_);
1076 }
1077 grpc_subchannel_index_unref();
1078}
1079
1080void GrpcLb::ShutdownLocked() {
1081 grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
1082 shutting_down_ = true;
1083 lb_calld_.reset();
1084 if (retry_timer_callback_pending_) {
1085 grpc_timer_cancel(&lb_call_retry_timer_);
1086 }
1087 if (fallback_timer_callback_pending_) {
1088 grpc_timer_cancel(&lb_fallback_timer_);
1089 }
1090 rr_policy_.reset();
1091 TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
1092 // We destroy the LB channel here instead of in our destructor because
1093 // destroying the channel triggers a last callback to
1094 // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1095 // alive when that callback is invoked.
1096 if (lb_channel_ != nullptr) {
1097 grpc_channel_destroy(lb_channel_);
1098 lb_channel_ = nullptr;
1099 }
1100 grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
1101 GRPC_ERROR_REF(error), "grpclb_shutdown");
1102 // Clear pending picks.
1103 PendingPick* pp;
1104 while ((pp = pending_picks_) != nullptr) {
1105 pending_picks_ = pp->next;
1106 pp->pick->connected_subchannel.reset();
1107 // Note: pp is deleted in this callback.
1108 GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
1109 }
1110 // Clear pending pings.
1111 PendingPing* pping;
1112 while ((pping = pending_pings_) != nullptr) {
1113 pending_pings_ = pping->next;
1114 GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
1115 GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
1116 Delete(pping);
1117 }
1118 GRPC_ERROR_UNREF(error);
1119}
1120
1121//
1122// public methods
1123//
1124
1125void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
1126 PendingPick* pp;
1127 while ((pp = pending_picks_) != nullptr) {
1128 pending_picks_ = pp->next;
1129 pp->pick->on_complete = pp->original_on_complete;
1130 pp->pick->user_data = nullptr;
1131 if (new_policy->PickLocked(pp->pick)) {
1132 // Synchronous return; schedule closure.
1133 GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
1134 }
1135 Delete(pp);
1136 }
1137}
1138
1139// Cancel a specific pending pick.
1140//
1141// A grpclb pick progresses as follows:
1142// - If there's a Round Robin policy (rr_policy_) available, it'll be
1143// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
1144// that point onwards, it'll be RR's responsibility. For cancellations, that
1145// implies the pick needs also be cancelled by the RR instance.
1146// - Otherwise, without an RR instance, picks stay pending at this policy's
1147// level (grpclb), inside the pending_picks_ list. To cancel these,
1148// we invoke the completion closure and set the pick's connected
1149// subchannel to nullptr right here.
1150void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) {
1151 PendingPick* pp = pending_picks_;
1152 pending_picks_ = nullptr;
1153 while (pp != nullptr) {
1154 PendingPick* next = pp->next;
1155 if (pp->pick == pick) {
1156 pick->connected_subchannel.reset();
1157 // Note: pp is deleted in this callback.
1158 GRPC_CLOSURE_SCHED(&pp->on_complete,
1159 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1160 "Pick Cancelled", &error, 1));
1161 } else {
1162 pp->next = pending_picks_;
1163 pending_picks_ = pp;
1164 }
1165 pp = next;
1166 }
1167 if (rr_policy_ != nullptr) {
1168 rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
1169 }
1170 GRPC_ERROR_UNREF(error);
1171}
1172
1173// Cancel all pending picks.
1174//
1175// A grpclb pick progresses as follows:
1176// - If there's a Round Robin policy (rr_policy_) available, it'll be
1177// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
1178// that point onwards, it'll be RR's responsibility. For cancellations, that
1179// implies the pick needs also be cancelled by the RR instance.
1180// - Otherwise, without an RR instance, picks stay pending at this policy's
1181// level (grpclb), inside the pending_picks_ list. To cancel these,
1182// we invoke the completion closure and set the pick's connected
1183// subchannel to nullptr right here.
1184void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
1185 uint32_t initial_metadata_flags_eq,
1186 grpc_error* error) {
1187 PendingPick* pp = pending_picks_;
1188 pending_picks_ = nullptr;
1189 while (pp != nullptr) {
1190 PendingPick* next = pp->next;
1191 if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
1192 initial_metadata_flags_eq) {
1193 // Note: pp is deleted in this callback.
1194 GRPC_CLOSURE_SCHED(&pp->on_complete,
1195 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1196 "Pick Cancelled", &error, 1));
1197 } else {
1198 pp->next = pending_picks_;
1199 pending_picks_ = pp;
1200 }
1201 pp = next;
1202 }
1203 if (rr_policy_ != nullptr) {
1204 rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
1205 initial_metadata_flags_eq,
1206 GRPC_ERROR_REF(error));
1207 }
1208 GRPC_ERROR_UNREF(error);
1209}
1210
1211void GrpcLb::ExitIdleLocked() {
1212 if (!started_picking_) {
1213 StartPickingLocked();
1214 }
1215}
1216
1217bool GrpcLb::PickLocked(PickState* pick) {
1218 PendingPick* pp = PendingPickCreate(pick);
1219 bool pick_done = false;
1220 if (rr_policy_ != nullptr) {
1221 const grpc_connectivity_state rr_connectivity_state =
1222 rr_policy_->CheckConnectivityLocked(nullptr);
1223 // The RR policy may have transitioned to SHUTDOWN but the callback
1224 // registered to capture this event (on_rr_connectivity_changed_) may not
1225 // have been invoked yet. We need to make sure we aren't trying to pick
1226 // from an RR policy instance that's in shutdown.
1227 if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
1228 if (grpc_lb_glb_trace.enabled()) {
1229 gpr_log(GPR_INFO,
1230 "[grpclb %p] NOT picking from from RR %p: RR conn state=%s",
1231 this, rr_policy_.get(),
1232 grpc_connectivity_state_name(rr_connectivity_state));
1233 }
1234 AddPendingPick(pp);
1235 pick_done = false;
1236 } else { // RR not in shutdown
1237 if (grpc_lb_glb_trace.enabled()) {
1238 gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
1239 rr_policy_.get());
1240 }
1241 pick_done = PickFromRoundRobinPolicyLocked(false /* force_async */, pp);
1242 }
1243 } else { // rr_policy_ == NULL
1244 if (grpc_lb_glb_trace.enabled()) {
1245 gpr_log(GPR_DEBUG,
1246 "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
1247 this);
1248 }
1249 AddPendingPick(pp);
1250 if (!started_picking_) {
1251 StartPickingLocked();
1252 }
1253 pick_done = false;
1254 }
1255 return pick_done;
1256}
1257
1258void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
1259 if (rr_policy_ != nullptr) {
1260 rr_policy_->PingOneLocked(on_initiate, on_ack);
1261 } else {
1262 AddPendingPing(on_initiate, on_ack);
1263 if (!started_picking_) {
1264 StartPickingLocked();
1265 }
1266 }
1267}
1268
1269grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
1270 grpc_error** connectivity_error) {
1271 return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
1272}
1273
1274void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
1275 grpc_closure* notify) {
1276 grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
1277 notify);
1278}
1279
1280void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
1281 const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
1282 if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
1283 // Ignore this update.
1284 gpr_log(
1285 GPR_ERROR,
1286 "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
1287 this);
1288 return;
1289 }
1290 const grpc_lb_addresses* addresses =
Vijay Pai7fed69b2018-03-05 09:58:05 -08001291 static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
Mark D. Rothc8875492018-02-20 08:33:48 -08001292 // Update fallback address list.
1293 if (fallback_backend_addresses_ != nullptr) {
1294 grpc_lb_addresses_destroy(fallback_backend_addresses_);
1295 }
1296 fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
1297 // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1298 // since we use this to trigger the client_load_reporting filter.
1299 static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1300 grpc_arg new_arg = grpc_channel_arg_string_create(
1301 (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
1302 grpc_channel_args_destroy(args_);
1303 args_ = grpc_channel_args_copy_and_add_and_remove(
1304 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1305 // Construct args for balancer channel.
1306 grpc_channel_args* lb_channel_args =
1307 BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
1308 // Create balancer channel if needed.
1309 if (lb_channel_ == nullptr) {
1310 char* uri_str;
1311 gpr_asprintf(&uri_str, "fake:///%s", server_name_);
Mark D. Rothbd0f1512018-02-20 10:28:22 -08001312 lb_channel_ = grpc_client_channel_factory_create_channel(
1313 client_channel_factory(), uri_str,
1314 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
Mark D. Rothc8875492018-02-20 08:33:48 -08001315 GPR_ASSERT(lb_channel_ != nullptr);
1316 gpr_free(uri_str);
1317 }
1318 // Propagate updates to the LB channel (pick_first) through the fake
1319 // resolver.
1320 response_generator_->SetResponse(lb_channel_args);
1321 grpc_channel_args_destroy(lb_channel_args);
1322}
1323
1324void GrpcLb::UpdateLocked(const grpc_channel_args& args) {
1325 ProcessChannelArgsLocked(args);
1326 // If fallback is configured and the RR policy already exists, update
1327 // it with the new fallback addresses.
1328 if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) {
1329 CreateOrUpdateRoundRobinPolicyLocked();
1330 }
1331 // Start watching the LB channel connectivity for connection, if not
1332 // already doing so.
1333 if (!watching_lb_channel_) {
1334 lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
1335 lb_channel_, true /* try to connect */);
1336 grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1337 grpc_channel_get_channel_stack(lb_channel_));
1338 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1339 watching_lb_channel_ = true;
1340 // TODO(roth): We currently track this ref manually. Once the
1341 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1342 // with the callback.
1343 auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
1344 self.release();
1345 grpc_client_channel_watch_connectivity_state(
1346 client_channel_elem,
1347 grpc_polling_entity_create_from_pollset_set(interested_parties()),
1348 &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
1349 nullptr);
1350 }
1351}
1352
1353//
1354// code for balancer channel and call
1355//
1356
1357void GrpcLb::StartPickingLocked() {
1358 // Start a timer to fall back.
1359 if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
1360 !fallback_timer_callback_pending_) {
1361 grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
1362 // TODO(roth): We currently track this ref manually. Once the
1363 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1364 // with the callback.
1365 auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
1366 self.release();
1367 GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
1368 grpc_combiner_scheduler(combiner()));
1369 fallback_timer_callback_pending_ = true;
1370 grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1371 }
1372 started_picking_ = true;
1373 StartBalancerCallLocked();
1374}
1375
1376void GrpcLb::StartBalancerCallLocked() {
1377 GPR_ASSERT(lb_channel_ != nullptr);
1378 if (shutting_down_) return;
1379 // Init the LB call data.
1380 GPR_ASSERT(lb_calld_ == nullptr);
1381 lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1382 if (grpc_lb_glb_trace.enabled()) {
1383 gpr_log(GPR_INFO,
1384 "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1385 this, lb_channel_, lb_calld_.get());
1386 }
1387 lb_calld_->StartQuery();
1388}
1389
1390void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1391 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1392 grpclb_policy->fallback_timer_callback_pending_ = false;
1393 // If we receive a serverlist after the timer fires but before this callback
1394 // actually runs, don't fall back.
1395 if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
1396 error == GRPC_ERROR_NONE) {
1397 if (grpc_lb_glb_trace.enabled()) {
1398 gpr_log(GPR_INFO,
1399 "[grpclb %p] Falling back to use backends from resolver",
1400 grpclb_policy);
1401 }
1402 GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr);
1403 grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
1404 }
1405 grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1406}
1407
1408void GrpcLb::StartBalancerCallRetryTimerLocked() {
1409 grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
1410 if (grpc_lb_glb_trace.enabled()) {
1411 gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", this);
1412 grpc_millis timeout = next_try - ExecCtx::Get()->Now();
1413 if (timeout > 0) {
1414 gpr_log(GPR_DEBUG,
1415 "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", this,
1416 timeout);
1417 } else {
1418 gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.",
1419 this);
1420 }
1421 }
1422 // TODO(roth): We currently track this ref manually. Once the
1423 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1424 // with the callback.
1425 auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1426 self.release();
1427 GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
1428 this, grpc_combiner_scheduler(combiner()));
1429 retry_timer_callback_pending_ = true;
1430 grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
1431}
1432
1433void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -08001434 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -08001435 grpclb_policy->retry_timer_callback_pending_ = false;
1436 if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
1437 grpclb_policy->lb_calld_ == nullptr) {
1438 if (grpc_lb_glb_trace.enabled()) {
1439 gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
1440 grpclb_policy);
1441 }
1442 grpclb_policy->StartBalancerCallLocked();
1443 }
1444 grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1445}
1446
1447// Invoked as part of the update process. It continues watching the LB channel
1448// until it shuts down or becomes READY. It's invoked even if the LB channel
1449// stayed READY throughout the update (for example if the update is identical).
1450void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
1451 grpc_error* error) {
1452 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1453 if (grpclb_policy->shutting_down_) goto done;
1454 // Re-initialize the lb_call. This should also take care of updating the
1455 // embedded RR policy. Note that the current RR policy, if any, will stay in
1456 // effect until an update from the new lb_call is received.
1457 switch (grpclb_policy->lb_channel_connectivity_) {
1458 case GRPC_CHANNEL_CONNECTING:
1459 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1460 // Keep watching the LB channel.
1461 grpc_channel_element* client_channel_elem =
1462 grpc_channel_stack_last_element(
1463 grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
1464 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1465 grpc_client_channel_watch_connectivity_state(
1466 client_channel_elem,
1467 grpc_polling_entity_create_from_pollset_set(
1468 grpclb_policy->interested_parties()),
1469 &grpclb_policy->lb_channel_connectivity_,
1470 &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
1471 break;
1472 }
1473 // The LB channel may be IDLE because it's shut down before the update.
1474 // Restart the LB call to kick the LB channel into gear.
1475 case GRPC_CHANNEL_IDLE:
1476 case GRPC_CHANNEL_READY:
1477 grpclb_policy->lb_calld_.reset();
1478 if (grpclb_policy->started_picking_) {
1479 if (grpclb_policy->retry_timer_callback_pending_) {
1480 grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_);
1481 }
1482 grpclb_policy->lb_call_backoff_.Reset();
1483 grpclb_policy->StartBalancerCallLocked();
1484 }
1485 // Fall through.
1486 case GRPC_CHANNEL_SHUTDOWN:
1487 done:
1488 grpclb_policy->watching_lb_channel_ = false;
1489 grpclb_policy->Unref(DEBUG_LOCATION,
1490 "watch_lb_channel_connectivity_cb_shutdown");
1491 }
1492}
1493
1494//
1495// PendingPick
1496//
1497
1498// Adds lb_token of selected subchannel (address) to the call's initial
1499// metadata.
1500grpc_error* AddLbTokenToInitialMetadata(
1501 grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
1502 grpc_metadata_batch* initial_metadata) {
1503 GPR_ASSERT(lb_token_mdelem_storage != nullptr);
1504 GPR_ASSERT(!GRPC_MDISNULL(lb_token));
1505 return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
1506 lb_token);
1507}
1508
1509// Destroy function used when embedding client stats in call context.
1510void DestroyClientStats(void* arg) {
Vijay Pai7fed69b2018-03-05 09:58:05 -08001511 grpc_grpclb_client_stats_unref(static_cast<grpc_grpclb_client_stats*>(arg));
Mark D. Rothc8875492018-02-20 08:33:48 -08001512}
1513
1514void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
1515 /* if connected_subchannel is nullptr, no pick has been made by the RR
1516 * policy (e.g., all addresses failed to connect). There won't be any
1517 * user_data/token available */
1518 if (pp->pick->connected_subchannel != nullptr) {
1519 if (!GRPC_MDISNULL(pp->lb_token)) {
1520 AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
1521 &pp->pick->lb_token_mdelem_storage,
1522 pp->pick->initial_metadata);
1523 } else {
1524 gpr_log(GPR_ERROR,
1525 "[grpclb %p] No LB token for connected subchannel pick %p",
1526 pp->grpclb_policy, pp->pick);
1527 abort();
1528 }
1529 // Pass on client stats via context. Passes ownership of the reference.
1530 if (pp->client_stats != nullptr) {
1531 pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
1532 pp->client_stats;
1533 pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
1534 DestroyClientStats;
1535 }
1536 } else {
1537 if (pp->client_stats != nullptr) {
1538 grpc_grpclb_client_stats_unref(pp->client_stats);
1539 }
1540 }
1541}
1542
1543/* The \a on_complete closure passed as part of the pick requires keeping a
1544 * reference to its associated round robin instance. We wrap this closure in
1545 * order to unref the round robin instance upon its invocation */
1546void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -08001547 PendingPick* pp = static_cast<PendingPick*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -08001548 PendingPickSetMetadataAndContext(pp);
1549 GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
1550 Delete(pp);
1551}
1552
1553GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) {
1554 PendingPick* pp = New<PendingPick>();
1555 pp->grpclb_policy = this;
1556 pp->pick = pick;
1557 GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp,
1558 grpc_schedule_on_exec_ctx);
1559 pp->original_on_complete = pick->on_complete;
1560 pick->on_complete = &pp->on_complete;
1561 return pp;
1562}
1563
1564void GrpcLb::AddPendingPick(PendingPick* pp) {
1565 pp->next = pending_picks_;
1566 pending_picks_ = pp;
1567}
1568
1569//
1570// PendingPing
1571//
1572
1573void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
1574 PendingPing* pping = New<PendingPing>();
1575 pping->on_initiate = on_initiate;
1576 pping->on_ack = on_ack;
1577 pping->next = pending_pings_;
1578 pending_pings_ = pping;
1579}
1580
1581//
1582// code for interacting with the RR policy
1583//
1584
1585// Performs a pick over \a rr_policy_. Given that a pick can return
1586// immediately (ignoring its completion callback), we need to perform the
1587// cleanups this callback would otherwise be responsible for.
1588// If \a force_async is true, then we will manually schedule the
1589// completion callback even if the pick is available immediately.
1590bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
1591 // Check for drops if we are not using fallback backend addresses.
1592 if (serverlist_ != nullptr) {
1593 // Look at the index into the serverlist to see if we should drop this call.
1594 grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++];
1595 if (serverlist_index_ == serverlist_->num_servers) {
1596 serverlist_index_ = 0; // Wrap-around.
1597 }
1598 if (server->drop) {
1599 // Update client load reporting stats to indicate the number of
1600 // dropped calls. Note that we have to do this here instead of in
1601 // the client_load_reporting filter, because we do not create a
1602 // subchannel call (and therefore no client_load_reporting filter)
1603 // for dropped calls.
1604 if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
1605 grpc_grpclb_client_stats_add_call_dropped_locked(
1606 server->load_balance_token, lb_calld_->client_stats());
1607 }
1608 if (force_async) {
1609 GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
1610 Delete(pp);
1611 return false;
1612 }
1613 Delete(pp);
1614 return true;
1615 }
1616 }
1617 // Set client_stats and user_data.
1618 if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
1619 pp->client_stats = grpc_grpclb_client_stats_ref(lb_calld_->client_stats());
1620 }
1621 GPR_ASSERT(pp->pick->user_data == nullptr);
1622 pp->pick->user_data = (void**)&pp->lb_token;
1623 // Pick via the RR policy.
1624 bool pick_done = rr_policy_->PickLocked(pp->pick);
1625 if (pick_done) {
1626 PendingPickSetMetadataAndContext(pp);
1627 if (force_async) {
1628 GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
1629 pick_done = false;
1630 }
1631 Delete(pp);
1632 }
1633 // else, the pending pick will be registered and taken care of by the
1634 // pending pick list inside the RR policy. Eventually,
1635 // OnPendingPickComplete() will be called, which will (among other
1636 // things) add the LB token to the call's initial metadata.
1637 return pick_done;
1638}
1639
1640void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
1641 GPR_ASSERT(rr_policy_ == nullptr);
1642 rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1643 "round_robin", args);
1644 if (rr_policy_ == nullptr) {
1645 gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
1646 this);
1647 return;
1648 }
1649 // TODO(roth): We currently track this ref manually. Once the new
1650 // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
1651 auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
1652 self.release();
1653 rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_);
1654 grpc_error* rr_state_error = nullptr;
1655 rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error);
1656 // Connectivity state is a function of the RR policy updated/created.
1657 UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error);
1658 // Add the gRPC LB's interested_parties pollset_set to that of the newly
1659 // created RR policy. This will make the RR policy progress upon activity on
1660 // gRPC LB, which in turn is tied to the application's call.
1661 grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
1662 interested_parties());
1663 // Subscribe to changes to the connectivity of the new RR.
1664 // TODO(roth): We currently track this ref manually. Once the new
1665 // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
1666 self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed");
1667 self.release();
1668 rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_,
1669 &on_rr_connectivity_changed_);
1670 rr_policy_->ExitIdleLocked();
1671 // Send pending picks to RR policy.
1672 PendingPick* pp;
1673 while ((pp = pending_picks_)) {
1674 pending_picks_ = pp->next;
1675 if (grpc_lb_glb_trace.enabled()) {
1676 gpr_log(GPR_INFO,
1677 "[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
1678 rr_policy_.get());
1679 }
1680 PickFromRoundRobinPolicyLocked(true /* force_async */, pp);
1681 }
1682 // Send pending pings to RR policy.
1683 PendingPing* pping;
1684 while ((pping = pending_pings_)) {
1685 pending_pings_ = pping->next;
1686 if (grpc_lb_glb_trace.enabled()) {
1687 gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
1688 this, rr_policy_.get());
1689 }
1690 rr_policy_->PingOneLocked(pping->on_initiate, pping->on_ack);
1691 Delete(pping);
1692 }
1693}
1694
1695grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
1696 grpc_lb_addresses* addresses;
1697 if (serverlist_ != nullptr) {
1698 GPR_ASSERT(serverlist_->num_servers > 0);
1699 addresses = ProcessServerlist(serverlist_);
1700 } else {
1701 // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
1702 // received any serverlist from the balancer, we use the fallback backends
1703 // returned by the resolver. Note that the fallback backend list may be
1704 // empty, in which case the new round_robin policy will keep the requested
1705 // picks pending.
1706 GPR_ASSERT(fallback_backend_addresses_ != nullptr);
1707 addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
1708 }
1709 GPR_ASSERT(addresses != nullptr);
1710 // Replace the LB addresses in the channel args that we pass down to
1711 // the subchannel.
1712 static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
1713 const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses);
1714 grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
1715 args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1);
1716 grpc_lb_addresses_destroy(addresses);
1717 return args;
1718}
1719
1720void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
1721 if (shutting_down_) return;
1722 grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
1723 GPR_ASSERT(args != nullptr);
1724 if (rr_policy_ != nullptr) {
1725 if (grpc_lb_glb_trace.enabled()) {
1726 gpr_log(GPR_DEBUG, "[grpclb %p] Updating RR policy %p", this,
1727 rr_policy_.get());
1728 }
1729 rr_policy_->UpdateLocked(*args);
1730 } else {
1731 LoadBalancingPolicy::Args lb_policy_args;
1732 lb_policy_args.combiner = combiner();
1733 lb_policy_args.client_channel_factory = client_channel_factory();
1734 lb_policy_args.args = args;
1735 CreateRoundRobinPolicyLocked(lb_policy_args);
1736 if (grpc_lb_glb_trace.enabled()) {
1737 gpr_log(GPR_DEBUG, "[grpclb %p] Created new RR policy %p", this,
1738 rr_policy_.get());
1739 }
1740 }
1741 grpc_channel_args_destroy(args);
1742}
1743
1744void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg,
1745 grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -08001746 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -08001747 if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) {
1748 grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested");
1749 return;
1750 }
1751 if (grpc_lb_glb_trace.enabled()) {
1752 gpr_log(
1753 GPR_DEBUG,
1754 "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
1755 grpclb_policy, grpclb_policy->rr_policy_.get());
1756 }
1757 // If we are talking to a balancer, we expect to get updated addresses form
1758 // the balancer, so we can ignore the re-resolution request from the RR
1759 // policy. Otherwise, handle the re-resolution request using the
1760 // grpclb policy's original re-resolution closure.
1761 if (grpclb_policy->lb_calld_ == nullptr ||
1762 !grpclb_policy->lb_calld_->seen_initial_response()) {
1763 grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
1764 }
1765 // Give back the wrapper closure to the RR policy.
1766 grpclb_policy->rr_policy_->SetReresolutionClosureLocked(
1767 &grpclb_policy->on_rr_request_reresolution_);
1768}
1769
1770void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
1771 grpc_error* rr_state_error) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001772 const grpc_connectivity_state curr_glb_state =
Mark D. Rothc8875492018-02-20 08:33:48 -08001773 grpc_connectivity_state_check(&state_tracker_);
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001774 /* The new connectivity status is a function of the previous one and the new
1775 * input coming from the status of the RR policy.
1776 *
David Garcia Quintas4283a262016-11-18 10:43:56 -08001777 * current state (grpclb's)
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001778 * |
1779 * v || I | C | R | TF | SD | <- new state (RR's)
1780 * ===++====+=====+=====+======+======+
David Garcia Quintas4283a262016-11-18 10:43:56 -08001781 * I || I | C | R | [I] | [I] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001782 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -08001783 * C || I | C | R | [C] | [C] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001784 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -08001785 * R || I | C | R | [R] | [R] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001786 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -08001787 * TF || I | C | R | [TF] | [TF] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001788 * ---++----+-----+-----+------+------+
1789 * SD || NA | NA | NA | NA | NA | (*)
1790 * ---++----+-----+-----+------+------+
1791 *
David Garcia Quintas4283a262016-11-18 10:43:56 -08001792 * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
1793 * is the current state of grpclb, which is left untouched.
1794 *
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001795 * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
1796 * the previous RR instance.
1797 *
1798 * Note that the status is never updated to SHUTDOWN as a result of calling
1799 * this function. Only glb_shutdown() has the power to set that state.
1800 *
1801 * (*) This function mustn't be called during shutting down. */
1802 GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
Mark D. Rothc8875492018-02-20 08:33:48 -08001803 switch (rr_connectivity_state_) {
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001804 case GRPC_CHANNEL_TRANSIENT_FAILURE:
1805 case GRPC_CHANNEL_SHUTDOWN:
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001806 GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
1807 break;
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001808 case GRPC_CHANNEL_IDLE:
1809 case GRPC_CHANNEL_CONNECTING:
1810 case GRPC_CHANNEL_READY:
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001811 GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001812 }
Craig Tiller6014e8a2017-10-16 13:50:29 -07001813 if (grpc_lb_glb_trace.enabled()) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001814 gpr_log(
David Garcia Quintasa1c65902017-11-09 10:37:35 -08001815 GPR_INFO,
1816 "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
Mark D. Rothc8875492018-02-20 08:33:48 -08001817 this, grpc_connectivity_state_name(rr_connectivity_state_),
1818 rr_policy_.get());
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001819 }
Mark D. Rothc8875492018-02-20 08:33:48 -08001820 grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_,
1821 rr_state_error,
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001822 "update_lb_connectivity_status_locked");
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001823}
1824
Mark D. Rothc8875492018-02-20 08:33:48 -08001825void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg,
1826 grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -08001827 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -08001828 if (grpclb_policy->shutting_down_) {
1829 grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed");
Juanli Shen87c65042018-02-15 09:42:45 -08001830 return;
1831 }
Mark D. Rothc8875492018-02-20 08:33:48 -08001832 grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked(
1833 GRPC_ERROR_REF(error));
1834 // Resubscribe. Reuse the "on_rr_connectivity_changed" ref.
1835 grpclb_policy->rr_policy_->NotifyOnStateChangeLocked(
1836 &grpclb_policy->rr_connectivity_state_,
1837 &grpclb_policy->on_rr_connectivity_changed_);
Juanli Shen87c65042018-02-15 09:42:45 -08001838}
1839
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001840//
Mark D. Rothc8875492018-02-20 08:33:48 -08001841// factory
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001842//
Mark D. Rothc8875492018-02-20 08:33:48 -08001843
1844class GrpcLbFactory : public LoadBalancingPolicyFactory {
1845 public:
1846 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1847 const LoadBalancingPolicy::Args& args) const override {
1848 /* Count the number of gRPC-LB addresses. There must be at least one. */
1849 const grpc_arg* arg =
1850 grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
1851 if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
1852 return nullptr;
David Garcia Quintas65318262016-07-29 13:43:38 -07001853 }
Mark D. Rothc8875492018-02-20 08:33:48 -08001854 grpc_lb_addresses* addresses =
Vijay Pai7fed69b2018-03-05 09:58:05 -08001855 static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
Mark D. Rothc8875492018-02-20 08:33:48 -08001856 size_t num_grpclb_addrs = 0;
1857 for (size_t i = 0; i < addresses->num_addresses; ++i) {
1858 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
David Garcia Quintas65318262016-07-29 13:43:38 -07001859 }
Mark D. Rothc8875492018-02-20 08:33:48 -08001860 if (num_grpclb_addrs == 0) return nullptr;
1861 return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args));
David Garcia Quintas65318262016-07-29 13:43:38 -07001862 }
David Garcia Quintas8d489112016-07-29 15:20:42 -07001863
Mark D. Rothc8875492018-02-20 08:33:48 -08001864 const char* name() const override { return "grpclb"; }
1865};
David Garcia Quintas8d489112016-07-29 15:20:42 -07001866
Mark D. Rothc8875492018-02-20 08:33:48 -08001867} // namespace
David Garcia Quintas8d489112016-07-29 15:20:42 -07001868
Mark D. Rothc8875492018-02-20 08:33:48 -08001869} // namespace grpc_core
David Garcia Quintas65318262016-07-29 13:43:38 -07001870
Mark D. Rothc8875492018-02-20 08:33:48 -08001871//
1872// Plugin registration
1873//
Mark D. Rotha4792f52017-09-26 09:06:35 -07001874
Mark D. Rothc8875492018-02-20 08:33:48 -08001875namespace {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001876
1877// Only add client_load_reporting filter if the grpclb LB policy is used.
Mark D. Rothc8875492018-02-20 08:33:48 -08001878bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
1879 void* arg) {
Craig Tillerbaa14a92017-11-03 09:09:36 -07001880 const grpc_channel_args* args =
Mark D. Roth09e458c2017-05-02 08:13:26 -07001881 grpc_channel_stack_builder_get_channel_arguments(builder);
Craig Tillerbaa14a92017-11-03 09:09:36 -07001882 const grpc_arg* channel_arg =
Mark D. Roth09e458c2017-05-02 08:13:26 -07001883 grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
Noah Eisen882dfed2017-11-14 14:58:20 -08001884 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
Mark D. Roth09e458c2017-05-02 08:13:26 -07001885 strcmp(channel_arg->value.string, "grpclb") == 0) {
1886 return grpc_channel_stack_builder_append_filter(
Mark D. Rothc8875492018-02-20 08:33:48 -08001887 builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001888 }
1889 return true;
1890}
1891
Mark D. Rothc8875492018-02-20 08:33:48 -08001892} // namespace
1893
ncteisenadbfbd52017-11-16 15:35:45 -08001894void grpc_lb_policy_grpclb_init() {
Mark D. Rothc8875492018-02-20 08:33:48 -08001895 grpc_core::LoadBalancingPolicyRegistry::Builder::
1896 RegisterLoadBalancingPolicyFactory(
1897 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
1898 grpc_core::New<grpc_core::GrpcLbFactory>()));
Mark D. Roth09e458c2017-05-02 08:13:26 -07001899 grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1900 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1901 maybe_add_client_load_reporting_filter,
Craig Tillerbaa14a92017-11-03 09:09:36 -07001902 (void*)&grpc_client_load_reporting_filter);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001903}
1904
ncteisenadbfbd52017-11-16 15:35:45 -08001905void grpc_lb_policy_grpclb_shutdown() {}