blob: 622c03a8d1529af2dc847c2c058141c08ecea129 [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2016 gRPC authors.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070016 *
17 */
18
Mark D. Rothc8875492018-02-20 08:33:48 -080019/// Implementation of the gRPC LB policy.
20///
21/// This policy takes as input a list of resolved addresses, which must
22/// include at least one balancer address.
23///
24/// An internal channel (\a lb_channel_) is created for the addresses
25/// from that are balancers. This channel behaves just like a regular
26/// channel that uses pick_first to select from the list of balancer
27/// addresses.
28///
29/// The first time the policy gets a request for a pick, a ping, or to exit
30/// the idle state, \a StartPickingLocked() is called. This method is
31/// responsible for instantiating the internal *streaming* call to the LB
32/// server (whichever address pick_first chose). The call will be complete
33/// when either the balancer sends status or when we cancel the call (e.g.,
34/// because we are shutting down). In needed, we retry the call. If we
35/// received at least one valid message from the server, a new call attempt
36/// will be made immediately; otherwise, we apply back-off delays between
37/// attempts.
38///
39/// We maintain an internal round_robin policy instance for distributing
40/// requests across backends. Whenever we receive a new serverlist from
41/// the balancer, we update the round_robin policy with the new list of
42/// addresses. If we cannot communicate with the balancer on startup,
43/// however, we may enter fallback mode, in which case we will populate
44/// the RR policy's addresses from the backend addresses returned by the
45/// resolver.
46///
47/// Once an RR policy instance is in place (and getting updated as described),
48/// calls for a pick, a ping, or a cancellation will be serviced right
49/// away by forwarding them to the RR instance. Any time there's no RR
50/// policy available (i.e., right after the creation of the gRPCLB policy),
51/// pick and ping requests are added to a list of pending picks and pings
52/// to be flushed and serviced when the RR policy instance becomes available.
53///
54/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
55/// high level design and details.
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070056
Mark D. Rothc8875492018-02-20 08:33:48 -080057// With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
58// using that endpoint. Because of various transitive includes in uv.h,
59// including windows.h on Windows, uv.h must be included before other system
60// headers. Therefore, sockaddr.h must always be included first.
Alexander Polcyndb3e8982018-02-21 16:59:24 -080061#include <grpc/support/port_platform.h>
62
murgatroid997871f732016-09-23 13:49:05 -070063#include "src/core/lib/iomgr/sockaddr.h"
kpayson64539f5062018-03-12 19:16:30 -070064#include "src/core/lib/iomgr/socket_utils.h"
murgatroid997871f732016-09-23 13:49:05 -070065
Yash Tibrewalfcd26bc2017-09-25 15:08:28 -070066#include <inttypes.h>
Mark D. Roth64d922a2017-05-03 12:52:04 -070067#include <limits.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070068#include <string.h>
69
70#include <grpc/byte_buffer_reader.h>
71#include <grpc/grpc.h>
72#include <grpc/support/alloc.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070073#include <grpc/support/string_util.h>
David Garcia Quintas69099222016-10-03 11:28:37 -070074#include <grpc/support/time.h>
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070075
Craig Tiller9eb0fde2017-03-31 16:59:30 -070076#include "src/core/ext/filters/client_channel/client_channel.h"
77#include "src/core/ext/filters/client_channel/client_channel_factory.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070078#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
Yihua Zhang392dad72018-05-03 20:12:20 -070079#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070080#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070081#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
Craig Tiller9eb0fde2017-03-31 16:59:30 -070082#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
Craig Tillerd52e22f2017-04-02 16:22:52 -070083#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
84#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
85#include "src/core/ext/filters/client_channel/parse_address.h"
David Garcia Quintas87d5a312017-06-06 19:45:58 -070086#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
Juanli Shen6502ecc2017-09-13 13:10:54 -070087#include "src/core/ext/filters/client_channel/subchannel_index.h"
Craig Tillerc0df1c02017-07-17 16:12:33 -070088#include "src/core/lib/backoff/backoff.h"
Mark D. Roth046cf762016-09-26 11:13:51 -070089#include "src/core/lib/channel/channel_args.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -070090#include "src/core/lib/channel/channel_stack.h"
Vijay Paiae376bf2018-01-25 22:54:02 -080091#include "src/core/lib/gpr/host_port.h"
Mark D. Rothdbdf4952018-01-18 11:21:12 -080092#include "src/core/lib/gpr/string.h"
Mark D. Roth4f2b0fd2018-01-19 12:12:23 -080093#include "src/core/lib/gprpp/manual_constructor.h"
Mark D. Rothc8875492018-02-20 08:33:48 -080094#include "src/core/lib/gprpp/memory.h"
95#include "src/core/lib/gprpp/orphanable.h"
Mark D. Roth209f6442018-02-08 10:26:46 -080096#include "src/core/lib/gprpp/ref_counted_ptr.h"
Craig Tiller2400bf52017-02-09 16:25:19 -080097#include "src/core/lib/iomgr/combiner.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +020098#include "src/core/lib/iomgr/sockaddr.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070099#include "src/core/lib/iomgr/sockaddr_utils.h"
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200100#include "src/core/lib/iomgr/timer.h"
David Garcia Quintas01291502017-02-07 13:26:41 -0800101#include "src/core/lib/slice/slice_hash_table.h"
Craig Tiller18b4ba32016-11-09 15:23:42 -0800102#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -0700103#include "src/core/lib/slice/slice_string_helpers.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700104#include "src/core/lib/surface/call.h"
105#include "src/core/lib/surface/channel.h"
Mark D. Roth09e458c2017-05-02 08:13:26 -0700106#include "src/core/lib/surface/channel_init.h"
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700107#include "src/core/lib/transport/static_metadata.h"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700108
David Garcia Quintas1edfb952016-11-22 17:15:34 -0800109#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
110#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
111#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
112#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
Juanli Shenfe408152017-09-27 12:27:20 -0700113#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
David Garcia Quintas98da61b2016-10-29 08:46:31 +0200114
Mark D. Rothc8875492018-02-20 08:33:48 -0800115namespace grpc_core {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700116
Mark D. Rothc8875492018-02-20 08:33:48 -0800117TraceFlag grpc_lb_glb_trace(false, "glb");
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700118
Vijay Pai849bd732018-01-02 23:30:47 +0000119namespace {
Mark D. Rothc0febd32018-01-09 10:25:24 -0800120
Mark D. Rothc8875492018-02-20 08:33:48 -0800121class GrpcLb : public LoadBalancingPolicy {
122 public:
123 GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
124
125 void UpdateLocked(const grpc_channel_args& args) override;
126 bool PickLocked(PickState* pick) override;
127 void CancelPickLocked(PickState* pick, grpc_error* error) override;
128 void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
129 uint32_t initial_metadata_flags_eq,
130 grpc_error* error) override;
131 void NotifyOnStateChangeLocked(grpc_connectivity_state* state,
132 grpc_closure* closure) override;
133 grpc_connectivity_state CheckConnectivityLocked(
134 grpc_error** connectivity_error) override;
135 void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
136 void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
137 void ExitIdleLocked() override;
ncteisen82e9cb62018-07-12 17:42:36 -0700138 void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
139 ChildRefsList* child_channels) override {}
Mark D. Rothc8875492018-02-20 08:33:48 -0800140
141 private:
142 /// Linked list of pending pick requests. It stores all information needed to
143 /// eventually call (Round Robin's) pick() on them. They mainly stay pending
144 /// waiting for the RR policy to be created.
145 ///
146 /// Note that when a pick is sent to the RR policy, we inject our own
147 /// on_complete callback, so that we can intercept the result before
148 /// invoking the original on_complete callback. This allows us to set the
149 /// LB token metadata and add client_stats to the call context.
150 /// See \a pending_pick_complete() for details.
151 struct PendingPick {
152 // The grpclb instance that created the wrapping. This instance is not
153 // owned; reference counts are untouched. It's used only for logging
154 // purposes.
155 GrpcLb* grpclb_policy;
156 // The original pick.
157 PickState* pick;
158 // Our on_complete closure and the original one.
159 grpc_closure on_complete;
160 grpc_closure* original_on_complete;
161 // The LB token associated with the pick. This is set via user_data in
162 // the pick.
163 grpc_mdelem lb_token;
Mark D. Roth290d35e2018-05-17 14:15:36 -0700164 // Stats for client-side load reporting.
165 RefCountedPtr<GrpcLbClientStats> client_stats;
Mark D. Rothc8875492018-02-20 08:33:48 -0800166 // Next pending pick.
167 PendingPick* next = nullptr;
168 };
169
170 /// A linked list of pending pings waiting for the RR policy to be created.
171 struct PendingPing {
172 grpc_closure* on_initiate;
173 grpc_closure* on_ack;
174 PendingPing* next = nullptr;
175 };
176
177 /// Contains a call to the LB server and all the data related to the call.
178 class BalancerCallState
179 : public InternallyRefCountedWithTracing<BalancerCallState> {
180 public:
181 explicit BalancerCallState(
182 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
183
184 // It's the caller's responsibility to ensure that Orphan() is called from
185 // inside the combiner.
186 void Orphan() override;
187
188 void StartQuery();
189
Mark D. Roth290d35e2018-05-17 14:15:36 -0700190 GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
191
Mark D. Rothc8875492018-02-20 08:33:48 -0800192 bool seen_initial_response() const { return seen_initial_response_; }
193
194 private:
Mark D. Roth9635a042018-04-20 10:44:17 -0700195 // So Delete() can access our private dtor.
196 template <typename T>
197 friend void grpc_core::Delete(T*);
198
Mark D. Rothc8875492018-02-20 08:33:48 -0800199 ~BalancerCallState();
200
201 GrpcLb* grpclb_policy() const {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800202 return static_cast<GrpcLb*>(grpclb_policy_.get());
Mark D. Rothc8875492018-02-20 08:33:48 -0800203 }
204
205 void ScheduleNextClientLoadReportLocked();
206 void SendClientLoadReportLocked();
207
208 static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
209
210 static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
211 static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
212 static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
213 static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
214 static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
215
216 // The owning LB policy.
217 RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
218
219 // The streaming call to the LB server. Always non-NULL.
220 grpc_call* lb_call_ = nullptr;
221
222 // recv_initial_metadata
223 grpc_metadata_array lb_initial_metadata_recv_;
224
225 // send_message
226 grpc_byte_buffer* send_message_payload_ = nullptr;
227 grpc_closure lb_on_initial_request_sent_;
228
229 // recv_message
230 grpc_byte_buffer* recv_message_payload_ = nullptr;
231 grpc_closure lb_on_balancer_message_received_;
232 bool seen_initial_response_ = false;
233
234 // recv_trailing_metadata
235 grpc_closure lb_on_balancer_status_received_;
236 grpc_metadata_array lb_trailing_metadata_recv_;
237 grpc_status_code lb_call_status_;
238 grpc_slice lb_call_status_details_;
239
240 // The stats for client-side load reporting associated with this LB call.
241 // Created after the first serverlist is received.
Mark D. Roth290d35e2018-05-17 14:15:36 -0700242 RefCountedPtr<GrpcLbClientStats> client_stats_;
Mark D. Rothc8875492018-02-20 08:33:48 -0800243 grpc_millis client_stats_report_interval_ = 0;
244 grpc_timer client_load_report_timer_;
245 bool client_load_report_timer_callback_pending_ = false;
246 bool last_client_load_report_counters_were_zero_ = false;
247 bool client_load_report_is_due_ = false;
248 // The closure used for either the load report timer or the callback for
249 // completion of sending the load report.
250 grpc_closure client_load_report_closure_;
251 };
252
253 ~GrpcLb();
254
255 void ShutdownLocked() override;
256
257 // Helper function used in ctor and UpdateLocked().
258 void ProcessChannelArgsLocked(const grpc_channel_args& args);
259
260 // Methods for dealing with the balancer channel and call.
261 void StartPickingLocked();
262 void StartBalancerCallLocked();
263 static void OnFallbackTimerLocked(void* arg, grpc_error* error);
264 void StartBalancerCallRetryTimerLocked();
265 static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
266 static void OnBalancerChannelConnectivityChangedLocked(void* arg,
267 grpc_error* error);
268
269 // Pending pick methods.
270 static void PendingPickSetMetadataAndContext(PendingPick* pp);
271 PendingPick* PendingPickCreate(PickState* pick);
272 void AddPendingPick(PendingPick* pp);
273 static void OnPendingPickComplete(void* arg, grpc_error* error);
274
275 // Pending ping methods.
276 void AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack);
277
278 // Methods for dealing with the RR policy.
279 void CreateOrUpdateRoundRobinPolicyLocked();
280 grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
281 void CreateRoundRobinPolicyLocked(const Args& args);
282 bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp);
283 void UpdateConnectivityStateFromRoundRobinPolicyLocked(
284 grpc_error* rr_state_error);
285 static void OnRoundRobinConnectivityChangedLocked(void* arg,
286 grpc_error* error);
287 static void OnRoundRobinRequestReresolutionLocked(void* arg,
288 grpc_error* error);
289
290 // Who the client is trying to communicate with.
291 const char* server_name_ = nullptr;
292
293 // Current channel args from the resolver.
294 grpc_channel_args* args_ = nullptr;
295
296 // Internal state.
297 bool started_picking_ = false;
298 bool shutting_down_ = false;
299 grpc_connectivity_state_tracker state_tracker_;
300
301 // The channel for communicating with the LB server.
302 grpc_channel* lb_channel_ = nullptr;
303 grpc_connectivity_state lb_channel_connectivity_;
304 grpc_closure lb_channel_on_connectivity_changed_;
305 // Are we already watching the LB channel's connectivity?
306 bool watching_lb_channel_ = false;
307 // Response generator to inject address updates into lb_channel_.
308 RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
309
310 // The data associated with the current LB call. It holds a ref to this LB
311 // policy. It's initialized every time we query for backends. It's reset to
312 // NULL whenever the current LB call is no longer needed (e.g., the LB policy
313 // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
314 // contains a non-NULL lb_call_.
315 OrphanablePtr<BalancerCallState> lb_calld_;
316 // Timeout in milliseconds for the LB call. 0 means no deadline.
317 int lb_call_timeout_ms_ = 0;
318 // Balancer call retry state.
319 BackOff lb_call_backoff_;
320 bool retry_timer_callback_pending_ = false;
321 grpc_timer lb_call_retry_timer_;
322 grpc_closure lb_on_call_retry_;
323
324 // The deserialized response from the balancer. May be nullptr until one
325 // such response has arrived.
326 grpc_grpclb_serverlist* serverlist_ = nullptr;
327 // Index into serverlist for next pick.
328 // If the server at this index is a drop, we return a drop.
329 // Otherwise, we delegate to the RR policy.
330 size_t serverlist_index_ = 0;
331
332 // Timeout in milliseconds for before using fallback backend addresses.
333 // 0 means not using fallback.
334 int lb_fallback_timeout_ms_ = 0;
335 // The backend addresses from the resolver.
336 grpc_lb_addresses* fallback_backend_addresses_ = nullptr;
337 // Fallback timer.
338 bool fallback_timer_callback_pending_ = false;
339 grpc_timer lb_fallback_timer_;
340 grpc_closure lb_on_fallback_;
341
342 // Pending picks and pings that are waiting on the RR policy's connectivity.
343 PendingPick* pending_picks_ = nullptr;
344 PendingPing* pending_pings_ = nullptr;
345
346 // The RR policy to use for the backends.
347 OrphanablePtr<LoadBalancingPolicy> rr_policy_;
348 grpc_connectivity_state rr_connectivity_state_;
349 grpc_closure on_rr_connectivity_changed_;
350 grpc_closure on_rr_request_reresolution_;
Vijay Pai849bd732018-01-02 23:30:47 +0000351};
Mark D. Rothc0febd32018-01-09 10:25:24 -0800352
Mark D. Rothc8875492018-02-20 08:33:48 -0800353//
354// serverlist parsing code
355//
Mark D. Rothc0febd32018-01-09 10:25:24 -0800356
Mark D. Rothc8875492018-02-20 08:33:48 -0800357// vtable for LB tokens in grpc_lb_addresses
358void* lb_token_copy(void* token) {
359 return token == nullptr
360 ? nullptr
361 : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
362}
363void lb_token_destroy(void* token) {
364 if (token != nullptr) {
365 GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
Juanli Shen8e4c9d32018-01-23 16:26:39 -0800366 }
367}
Mark D. Rothc8875492018-02-20 08:33:48 -0800368int lb_token_cmp(void* token1, void* token2) {
369 if (token1 > token2) return 1;
370 if (token1 < token2) return -1;
371 return 0;
Juanli Shen8e4c9d32018-01-23 16:26:39 -0800372}
Mark D. Rothc8875492018-02-20 08:33:48 -0800373const grpc_lb_user_data_vtable lb_token_vtable = {
374 lb_token_copy, lb_token_destroy, lb_token_cmp};
Juanli Shen8e4c9d32018-01-23 16:26:39 -0800375
Mark D. Rothc8875492018-02-20 08:33:48 -0800376// Returns the backend addresses extracted from the given addresses.
377grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) {
378 // First pass: count the number of backend addresses.
379 size_t num_backends = 0;
380 for (size_t i = 0; i < addresses->num_addresses; ++i) {
381 if (!addresses->addresses[i].is_balancer) {
382 ++num_backends;
Mark D. Roth83d5cd62018-01-11 08:56:53 -0800383 }
Mark D. Rothc0febd32018-01-09 10:25:24 -0800384 }
Mark D. Rothc8875492018-02-20 08:33:48 -0800385 // Second pass: actually populate the addresses and (empty) LB tokens.
386 grpc_lb_addresses* backend_addresses =
387 grpc_lb_addresses_create(num_backends, &lb_token_vtable);
388 size_t num_copied = 0;
389 for (size_t i = 0; i < addresses->num_addresses; ++i) {
390 if (addresses->addresses[i].is_balancer) continue;
391 const grpc_resolved_address* addr = &addresses->addresses[i].address;
392 grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr,
393 addr->len, false /* is_balancer */,
394 nullptr /* balancer_name */,
395 (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload);
396 ++num_copied;
397 }
398 return backend_addresses;
Mark D. Rothc0febd32018-01-09 10:25:24 -0800399}
400
Mark D. Rothc8875492018-02-20 08:33:48 -0800401bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
Mark D. Rothe7751802017-07-27 12:31:45 -0700402 if (server->drop) return false;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700403 const grpc_grpclb_ip_address* ip = &server->ip_address;
Yash Tibrewal7f51ba82018-04-12 13:21:20 -0700404 if (GPR_UNLIKELY(server->port >> 16 != 0)) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700405 if (log) {
406 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200407 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
Mark D. Rothc8875492018-02-20 08:33:48 -0800408 server->port, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700409 }
410 return false;
411 }
Yash Tibrewal7f51ba82018-04-12 13:21:20 -0700412 if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700413 if (log) {
414 gpr_log(GPR_ERROR,
Jan Tattermusch2b398082016-10-07 14:40:30 +0200415 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700416 "serverlist. Ignoring",
Mark D. Rothc8875492018-02-20 08:33:48 -0800417 ip->size, (unsigned long)idx);
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700418 }
419 return false;
420 }
421 return true;
422}
423
Mark D. Rothc8875492018-02-20 08:33:48 -0800424void ParseServer(const grpc_grpclb_server* server,
425 grpc_resolved_address* addr) {
Mark D. Rothd7389b42017-05-17 12:22:17 -0700426 memset(addr, 0, sizeof(*addr));
Mark D. Rothe7751802017-07-27 12:31:45 -0700427 if (server->drop) return;
kpayson64539f5062018-03-12 19:16:30 -0700428 const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100429 /* the addresses are given in binary format (a in(6)_addr struct) in
430 * server->ip_address.bytes. */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700431 const grpc_grpclb_ip_address* ip = &server->ip_address;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100432 if (ip->size == 4) {
Yash Tibrewal44a15882018-03-14 18:41:33 -0700433 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
kpayson64539f5062018-03-12 19:16:30 -0700434 grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
435 addr4->sin_family = GRPC_AF_INET;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100436 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
437 addr4->sin_port = netorder_port;
438 } else if (ip->size == 16) {
Yash Tibrewal44a15882018-03-14 18:41:33 -0700439 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
kpayson64539f5062018-03-12 19:16:30 -0700440 grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
441 addr6->sin6_family = GRPC_AF_INET6;
David Garcia Quintas7ec29132016-11-01 04:09:05 +0100442 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
443 addr6->sin6_port = netorder_port;
444 }
445}
446
Mark D. Rothc8875492018-02-20 08:33:48 -0800447// Returns addresses extracted from \a serverlist.
448grpc_lb_addresses* ProcessServerlist(const grpc_grpclb_serverlist* serverlist) {
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700449 size_t num_valid = 0;
450 /* first pass: count how many are valid in order to allocate the necessary
451 * memory in a single block */
452 for (size_t i = 0; i < serverlist->num_servers; ++i) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800453 if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid;
David Garcia Quintasb8b384a2016-08-23 21:10:29 -0700454 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700455 grpc_lb_addresses* lb_addresses =
Mark D. Roth16883a32016-10-21 10:30:58 -0700456 grpc_lb_addresses_create(num_valid, &lb_token_vtable);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700457 /* second pass: actually populate the addresses and LB tokens (aka user data
David Garcia Quintas35c2aba2016-09-13 15:28:09 -0700458 * to the outside world) to be read by the RR policy during its creation.
459 * Given that the validity tests are very cheap, they are performed again
460 * instead of marking the valid ones during the first pass, as this would
461 * incurr in an allocation due to the arbitrary number of server */
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700462 size_t addr_idx = 0;
463 for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700464 const grpc_grpclb_server* server = serverlist->servers[sl_idx];
Mark D. Rothc8875492018-02-20 08:33:48 -0800465 if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue;
David Garcia Quintasc22c65b2017-07-25 14:22:20 -0700466 GPR_ASSERT(addr_idx < num_valid);
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700467 /* address processing */
Mark D. Rothc5c38782016-09-16 08:51:01 -0700468 grpc_resolved_address addr;
Mark D. Rothc8875492018-02-20 08:33:48 -0800469 ParseServer(server, &addr);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700470 /* lb token processing */
Craig Tillerbaa14a92017-11-03 09:09:36 -0700471 void* user_data;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700472 if (server->has_load_balance_token) {
David Garcia Quintas0baf1dc2016-10-28 04:44:01 +0200473 const size_t lb_token_max_length =
474 GPR_ARRAY_SIZE(server->load_balance_token);
475 const size_t lb_token_length =
476 strnlen(server->load_balance_token, lb_token_max_length);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800477 grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
478 server->load_balance_token, lb_token_length);
Yash Tibrewal8cf14702017-12-06 09:47:54 -0800479 user_data =
480 (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr)
481 .payload;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700482 } else {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700483 char* uri = grpc_sockaddr_to_uri(&addr);
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800484 gpr_log(GPR_INFO,
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700485 "Missing LB token for backend address '%s'. The empty token will "
486 "be used instead",
David Garcia Quintas850cbaa2016-11-15 15:13:35 -0800487 uri);
488 gpr_free(uri);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700489 user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700490 }
Mark D. Roth64f1f8d2016-09-16 09:00:09 -0700491 grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
492 false /* is_balancer */,
Noah Eisen882dfed2017-11-14 14:58:20 -0800493 nullptr /* balancer_name */, user_data);
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700494 ++addr_idx;
David Garcia Quintas331b9c02016-09-12 18:37:05 -0700495 }
David Garcia Quintasf47d6fb2016-09-14 12:59:17 -0700496 GPR_ASSERT(addr_idx == num_valid);
Mark D. Rothc5c38782016-09-16 08:51:01 -0700497 return lb_addresses;
498}
499
Mark D. Rothc8875492018-02-20 08:33:48 -0800500//
501// GrpcLb::BalancerCallState
502//
503
504GrpcLb::BalancerCallState::BalancerCallState(
505 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
506 : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_glb_trace),
507 grpclb_policy_(std::move(parent_grpclb_policy)) {
508 GPR_ASSERT(grpclb_policy_ != nullptr);
509 GPR_ASSERT(!grpclb_policy()->shutting_down_);
510 // Init the LB call. Note that the LB call will progress every time there's
511 // activity in grpclb_policy_->interested_parties(), which is comprised of
512 // the polling entities from client_channel.
513 GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
514 GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
David Garcia Quintasc7c0d692018-03-10 17:27:15 -0800515 const grpc_millis deadline =
Mark D. Rothc8875492018-02-20 08:33:48 -0800516 grpclb_policy()->lb_call_timeout_ms_ == 0
517 ? GRPC_MILLIS_INF_FUTURE
518 : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
519 lb_call_ = grpc_channel_create_pollset_set_call(
520 grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
521 grpclb_policy_->interested_parties(),
522 GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
David Garcia Quintasc7c0d692018-03-10 17:27:15 -0800523 nullptr, deadline, nullptr);
Mark D. Rothc8875492018-02-20 08:33:48 -0800524 // Init the LB call request payload.
525 grpc_grpclb_request* request =
526 grpc_grpclb_request_create(grpclb_policy()->server_name_);
527 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
528 send_message_payload_ =
529 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
530 grpc_slice_unref_internal(request_payload_slice);
531 grpc_grpclb_request_destroy(request);
532 // Init other data associated with the LB call.
533 grpc_metadata_array_init(&lb_initial_metadata_recv_);
534 grpc_metadata_array_init(&lb_trailing_metadata_recv_);
535 GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
536 this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
537 GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
538 OnBalancerMessageReceivedLocked, this,
539 grpc_combiner_scheduler(grpclb_policy()->combiner()));
540 GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
541 OnBalancerStatusReceivedLocked, this,
542 grpc_combiner_scheduler(grpclb_policy()->combiner()));
Juanli Shenfe408152017-09-27 12:27:20 -0700543}
544
Mark D. Rothc8875492018-02-20 08:33:48 -0800545GrpcLb::BalancerCallState::~BalancerCallState() {
546 GPR_ASSERT(lb_call_ != nullptr);
547 grpc_call_unref(lb_call_);
548 grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
549 grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
550 grpc_byte_buffer_destroy(send_message_payload_);
551 grpc_byte_buffer_destroy(recv_message_payload_);
552 grpc_slice_unref_internal(lb_call_status_details_);
Mark D. Rothc8875492018-02-20 08:33:48 -0800553}
554
555void GrpcLb::BalancerCallState::Orphan() {
556 GPR_ASSERT(lb_call_ != nullptr);
557 // If we are here because grpclb_policy wants to cancel the call,
558 // lb_on_balancer_status_received_ will complete the cancellation and clean
559 // up. Otherwise, we are here because grpclb_policy has to orphan a failed
560 // call, then the following cancellation will be a no-op.
561 grpc_call_cancel(lb_call_, nullptr);
562 if (client_load_report_timer_callback_pending_) {
563 grpc_timer_cancel(&client_load_report_timer_);
564 }
565 // Note that the initial ref is hold by lb_on_balancer_status_received_
566 // instead of the caller of this function. So the corresponding unref happens
567 // in lb_on_balancer_status_received_ instead of here.
568}
569
570void GrpcLb::BalancerCallState::StartQuery() {
571 GPR_ASSERT(lb_call_ != nullptr);
572 if (grpc_lb_glb_trace.enabled()) {
573 gpr_log(GPR_INFO,
574 "[grpclb %p] Starting LB call (lb_calld: %p, lb_call: %p)",
575 grpclb_policy_.get(), this, lb_call_);
576 }
577 // Create the ops.
578 grpc_call_error call_error;
579 grpc_op ops[3];
580 memset(ops, 0, sizeof(ops));
581 // Op: send initial metadata.
582 grpc_op* op = ops;
583 op->op = GRPC_OP_SEND_INITIAL_METADATA;
584 op->data.send_initial_metadata.count = 0;
585 op->flags = 0;
586 op->reserved = nullptr;
587 op++;
588 // Op: send request message.
589 GPR_ASSERT(send_message_payload_ != nullptr);
590 op->op = GRPC_OP_SEND_MESSAGE;
591 op->data.send_message.send_message = send_message_payload_;
592 op->flags = 0;
593 op->reserved = nullptr;
594 op++;
595 // TODO(roth): We currently track this ref manually. Once the
596 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
597 // with the callback.
598 auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
599 self.release();
600 call_error = grpc_call_start_batch_and_execute(
601 lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
602 GPR_ASSERT(GRPC_CALL_OK == call_error);
603 // Op: recv initial metadata.
604 op = ops;
605 op->op = GRPC_OP_RECV_INITIAL_METADATA;
606 op->data.recv_initial_metadata.recv_initial_metadata =
607 &lb_initial_metadata_recv_;
608 op->flags = 0;
609 op->reserved = nullptr;
610 op++;
611 // Op: recv response.
612 op->op = GRPC_OP_RECV_MESSAGE;
613 op->data.recv_message.recv_message = &recv_message_payload_;
614 op->flags = 0;
615 op->reserved = nullptr;
616 op++;
617 // TODO(roth): We currently track this ref manually. Once the
618 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
619 // with the callback.
620 self = Ref(DEBUG_LOCATION, "on_message_received");
621 self.release();
622 call_error = grpc_call_start_batch_and_execute(
623 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
624 GPR_ASSERT(GRPC_CALL_OK == call_error);
625 // Op: recv server status.
626 op = ops;
627 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
628 op->data.recv_status_on_client.trailing_metadata =
629 &lb_trailing_metadata_recv_;
630 op->data.recv_status_on_client.status = &lb_call_status_;
631 op->data.recv_status_on_client.status_details = &lb_call_status_details_;
632 op->flags = 0;
633 op->reserved = nullptr;
634 op++;
635 // This callback signals the end of the LB call, so it relies on the initial
636 // ref instead of a new ref. When it's invoked, it's the initial ref that is
637 // unreffed.
638 call_error = grpc_call_start_batch_and_execute(
639 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
640 GPR_ASSERT(GRPC_CALL_OK == call_error);
641};
642
643void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
644 const grpc_millis next_client_load_report_time =
645 ExecCtx::Get()->Now() + client_stats_report_interval_;
646 GRPC_CLOSURE_INIT(&client_load_report_closure_,
647 MaybeSendClientLoadReportLocked, this,
648 grpc_combiner_scheduler(grpclb_policy()->combiner()));
649 grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
650 &client_load_report_closure_);
651 client_load_report_timer_callback_pending_ = true;
652}
653
654void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
655 void* arg, grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800656 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -0800657 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
658 lb_calld->client_load_report_timer_callback_pending_ = false;
659 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
660 lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
661 return;
662 }
663 // If we've already sent the initial request, then we can go ahead and send
664 // the load report. Otherwise, we need to wait until the initial request has
665 // been sent to send this (see OnInitialRequestSentLocked()).
666 if (lb_calld->send_message_payload_ == nullptr) {
667 lb_calld->SendClientLoadReportLocked();
668 } else {
669 lb_calld->client_load_report_is_due_ = true;
670 }
671}
672
673bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
674 grpc_grpclb_request* request) {
Mark D. Roth290d35e2018-05-17 14:15:36 -0700675 GrpcLbClientStats::DroppedCallCounts* drop_entries =
676 static_cast<GrpcLbClientStats::DroppedCallCounts*>(
Mark D. Rothc8875492018-02-20 08:33:48 -0800677 request->client_stats.calls_finished_with_drop.arg);
678 return request->client_stats.num_calls_started == 0 &&
679 request->client_stats.num_calls_finished == 0 &&
680 request->client_stats.num_calls_finished_with_client_failed_to_send ==
681 0 &&
682 request->client_stats.num_calls_finished_known_received == 0 &&
Mark D. Roth290d35e2018-05-17 14:15:36 -0700683 (drop_entries == nullptr || drop_entries->size() == 0);
Mark D. Rothc8875492018-02-20 08:33:48 -0800684}
685
686void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
687 // Construct message payload.
688 GPR_ASSERT(send_message_payload_ == nullptr);
689 grpc_grpclb_request* request =
Mark D. Roth290d35e2018-05-17 14:15:36 -0700690 grpc_grpclb_load_report_request_create_locked(client_stats_.get());
Mark D. Rothc8875492018-02-20 08:33:48 -0800691 // Skip client load report if the counters were all zero in the last
692 // report and they are still zero in this one.
693 if (LoadReportCountersAreZero(request)) {
694 if (last_client_load_report_counters_were_zero_) {
695 grpc_grpclb_request_destroy(request);
696 ScheduleNextClientLoadReportLocked();
697 return;
698 }
699 last_client_load_report_counters_were_zero_ = true;
700 } else {
701 last_client_load_report_counters_were_zero_ = false;
702 }
703 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
704 send_message_payload_ =
705 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
706 grpc_slice_unref_internal(request_payload_slice);
707 grpc_grpclb_request_destroy(request);
708 // Send the report.
709 grpc_op op;
710 memset(&op, 0, sizeof(op));
711 op.op = GRPC_OP_SEND_MESSAGE;
712 op.data.send_message.send_message = send_message_payload_;
713 GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
714 this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
715 grpc_call_error call_error = grpc_call_start_batch_and_execute(
716 lb_call_, &op, 1, &client_load_report_closure_);
Yash Tibrewal7f51ba82018-04-12 13:21:20 -0700717 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
Mark D. Rothc8875492018-02-20 08:33:48 -0800718 gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", grpclb_policy_.get(),
719 call_error);
720 GPR_ASSERT(GRPC_CALL_OK == call_error);
721 }
722}
723
724void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
725 grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800726 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -0800727 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
728 grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
729 lb_calld->send_message_payload_ = nullptr;
730 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
731 lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
732 return;
733 }
734 lb_calld->ScheduleNextClientLoadReportLocked();
735}
736
737void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
738 grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800739 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -0800740 grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
741 lb_calld->send_message_payload_ = nullptr;
742 // If we attempted to send a client load report before the initial request was
743 // sent (and this lb_calld is still in use), send the load report now.
744 if (lb_calld->client_load_report_is_due_ &&
745 lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
746 lb_calld->SendClientLoadReportLocked();
747 lb_calld->client_load_report_is_due_ = false;
748 }
749 lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
750}
751
752void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
753 void* arg, grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800754 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -0800755 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
756 // Empty payload means the LB call was cancelled.
757 if (lb_calld != grpclb_policy->lb_calld_.get() ||
758 lb_calld->recv_message_payload_ == nullptr) {
759 lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
760 return;
761 }
762 grpc_byte_buffer_reader bbr;
763 grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
764 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
765 grpc_byte_buffer_reader_destroy(&bbr);
766 grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
767 lb_calld->recv_message_payload_ = nullptr;
768 grpc_grpclb_initial_response* initial_response;
769 grpc_grpclb_serverlist* serverlist;
770 if (!lb_calld->seen_initial_response_ &&
771 (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
772 nullptr) {
773 // Have NOT seen initial response, look for initial response.
774 if (initial_response->has_client_stats_report_interval) {
775 lb_calld->client_stats_report_interval_ = GPR_MAX(
776 GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
777 &initial_response->client_stats_report_interval));
778 if (grpc_lb_glb_trace.enabled()) {
779 gpr_log(GPR_INFO,
780 "[grpclb %p] Received initial LB response message; "
Sree Kuchibhotla1dd12c02018-04-11 18:05:48 -0700781 "client load reporting interval = %" PRId64 " milliseconds",
Mark D. Rothc8875492018-02-20 08:33:48 -0800782 grpclb_policy, lb_calld->client_stats_report_interval_);
783 }
784 } else if (grpc_lb_glb_trace.enabled()) {
785 gpr_log(GPR_INFO,
786 "[grpclb %p] Received initial LB response message; client load "
787 "reporting NOT enabled",
788 grpclb_policy);
789 }
790 grpc_grpclb_initial_response_destroy(initial_response);
791 lb_calld->seen_initial_response_ = true;
792 } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
793 response_slice)) != nullptr) {
794 // Have seen initial response, look for serverlist.
795 GPR_ASSERT(lb_calld->lb_call_ != nullptr);
796 if (grpc_lb_glb_trace.enabled()) {
797 gpr_log(GPR_INFO,
798 "[grpclb %p] Serverlist with %" PRIuPTR " servers received",
799 grpclb_policy, serverlist->num_servers);
800 for (size_t i = 0; i < serverlist->num_servers; ++i) {
801 grpc_resolved_address addr;
802 ParseServer(serverlist->servers[i], &addr);
803 char* ipport;
804 grpc_sockaddr_to_string(&ipport, &addr, false);
805 gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s",
806 grpclb_policy, i, ipport);
807 gpr_free(ipport);
808 }
809 }
810 /* update serverlist */
811 if (serverlist->num_servers > 0) {
812 // Start sending client load report only after we start using the
813 // serverlist returned from the current LB call.
814 if (lb_calld->client_stats_report_interval_ > 0 &&
815 lb_calld->client_stats_ == nullptr) {
Mark D. Roth290d35e2018-05-17 14:15:36 -0700816 lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
Mark D. Rothc8875492018-02-20 08:33:48 -0800817 // TODO(roth): We currently track this ref manually. Once the
818 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
819 // with the callback.
820 auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
821 self.release();
822 lb_calld->ScheduleNextClientLoadReportLocked();
823 }
824 if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_,
825 serverlist)) {
826 if (grpc_lb_glb_trace.enabled()) {
827 gpr_log(GPR_INFO,
828 "[grpclb %p] Incoming server list identical to current, "
829 "ignoring.",
830 grpclb_policy);
831 }
832 grpc_grpclb_destroy_serverlist(serverlist);
833 } else { /* new serverlist */
834 if (grpclb_policy->serverlist_ != nullptr) {
835 /* dispose of the old serverlist */
836 grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
837 } else {
838 /* or dispose of the fallback */
839 grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
840 grpclb_policy->fallback_backend_addresses_ = nullptr;
841 if (grpclb_policy->fallback_timer_callback_pending_) {
842 grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
843 }
844 }
845 // and update the copy in the GrpcLb instance. This
846 // serverlist instance will be destroyed either upon the next
847 // update or when the GrpcLb instance is destroyed.
848 grpclb_policy->serverlist_ = serverlist;
849 grpclb_policy->serverlist_index_ = 0;
850 grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
851 }
852 } else {
853 if (grpc_lb_glb_trace.enabled()) {
854 gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
855 grpclb_policy);
856 }
857 grpc_grpclb_destroy_serverlist(serverlist);
858 }
859 } else {
860 // No valid initial response or serverlist found.
861 gpr_log(GPR_ERROR,
862 "[grpclb %p] Invalid LB response received: '%s'. Ignoring.",
863 grpclb_policy,
864 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX));
865 }
866 grpc_slice_unref_internal(response_slice);
867 if (!grpclb_policy->shutting_down_) {
868 // Keep listening for serverlist updates.
869 grpc_op op;
870 memset(&op, 0, sizeof(op));
871 op.op = GRPC_OP_RECV_MESSAGE;
872 op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
873 op.flags = 0;
874 op.reserved = nullptr;
875 // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
876 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
877 lb_calld->lb_call_, &op, 1,
878 &lb_calld->lb_on_balancer_message_received_);
879 GPR_ASSERT(GRPC_CALL_OK == call_error);
880 } else {
881 lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
882 }
883}
884
885void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
886 void* arg, grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -0800887 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -0800888 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
889 GPR_ASSERT(lb_calld->lb_call_ != nullptr);
890 if (grpc_lb_glb_trace.enabled()) {
891 char* status_details =
892 grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
893 gpr_log(GPR_INFO,
894 "[grpclb %p] Status from LB server received. Status = %d, details "
895 "= '%s', (lb_calld: %p, lb_call: %p), error '%s'",
896 grpclb_policy, lb_calld->lb_call_status_, status_details, lb_calld,
897 lb_calld->lb_call_, grpc_error_string(error));
898 gpr_free(status_details);
899 }
900 grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
901 // If this lb_calld is still in use, this call ended because of a failure so
902 // we want to retry connecting. Otherwise, we have deliberately ended this
903 // call and no further action is required.
904 if (lb_calld == grpclb_policy->lb_calld_.get()) {
905 grpclb_policy->lb_calld_.reset();
906 GPR_ASSERT(!grpclb_policy->shutting_down_);
907 if (lb_calld->seen_initial_response_) {
908 // If we lose connection to the LB server, reset the backoff and restart
909 // the LB call immediately.
910 grpclb_policy->lb_call_backoff_.Reset();
911 grpclb_policy->StartBalancerCallLocked();
912 } else {
913 // If this LB call fails establishing any connection to the LB server,
914 // retry later.
915 grpclb_policy->StartBalancerCallRetryTimerLocked();
916 }
917 }
918 lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
919}
920
921//
922// helper code for creating balancer channel
923//
924
Mark D. Rothbd0f1512018-02-20 10:28:22 -0800925grpc_lb_addresses* ExtractBalancerAddresses(
926 const grpc_lb_addresses* addresses) {
927 size_t num_grpclb_addrs = 0;
928 for (size_t i = 0; i < addresses->num_addresses; ++i) {
929 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
930 }
931 // There must be at least one balancer address, or else the
932 // client_channel would not have chosen this LB policy.
933 GPR_ASSERT(num_grpclb_addrs > 0);
934 grpc_lb_addresses* lb_addresses =
935 grpc_lb_addresses_create(num_grpclb_addrs, nullptr);
936 size_t lb_addresses_idx = 0;
937 for (size_t i = 0; i < addresses->num_addresses; ++i) {
938 if (!addresses->addresses[i].is_balancer) continue;
Yash Tibrewal7f51ba82018-04-12 13:21:20 -0700939 if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) {
Mark D. Rothbd0f1512018-02-20 10:28:22 -0800940 gpr_log(GPR_ERROR,
941 "This LB policy doesn't support user data. It will be ignored");
942 }
943 grpc_lb_addresses_set_address(
944 lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr,
945 addresses->addresses[i].address.len, false /* is balancer */,
946 addresses->addresses[i].balancer_name, nullptr /* user data */);
947 }
948 GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx);
949 return lb_addresses;
Mark D. Rothc8875492018-02-20 08:33:48 -0800950}
951
952/* Returns the channel args for the LB channel, used to create a bidirectional
953 * stream for the reception of load balancing updates.
954 *
955 * Inputs:
956 * - \a addresses: corresponding to the balancers.
957 * - \a response_generator: in order to propagate updates from the resolver
958 * above the grpclb policy.
959 * - \a args: other args inherited from the grpclb policy. */
960grpc_channel_args* BuildBalancerChannelArgs(
961 const grpc_lb_addresses* addresses,
962 FakeResolverResponseGenerator* response_generator,
963 const grpc_channel_args* args) {
Mark D. Rothbd0f1512018-02-20 10:28:22 -0800964 grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses);
965 // Channel args to remove.
966 static const char* args_to_remove[] = {
967 // LB policy name, since we want to use the default (pick_first) in
968 // the LB channel.
969 GRPC_ARG_LB_POLICY_NAME,
970 // The channel arg for the server URI, since that will be different for
971 // the LB channel than for the parent channel. The client channel
972 // factory will re-add this arg with the right value.
973 GRPC_ARG_SERVER_URI,
974 // The resolved addresses, which will be generated by the name resolver
975 // used in the LB channel. Note that the LB channel will use the fake
976 // resolver, so this won't actually generate a query to DNS (or some
977 // other name service). However, the addresses returned by the fake
978 // resolver will have is_balancer=false, whereas our own addresses have
979 // is_balancer=true. We need the LB channel to return addresses with
980 // is_balancer=false so that it does not wind up recursively using the
981 // grpclb LB policy, as per the special case logic in client_channel.c.
982 GRPC_ARG_LB_ADDRESSES,
983 // The fake resolver response generator, because we are replacing it
984 // with the one from the grpclb policy, used to propagate updates to
985 // the LB channel.
986 GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
David Garcia Quintasc7c0d692018-03-10 17:27:15 -0800987 // The LB channel should use the authority indicated by the target
988 // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
989 // as opposed to the authority from the parent channel.
990 GRPC_ARG_DEFAULT_AUTHORITY,
David Garcia Quintas7b84b3d2018-03-12 12:08:30 -0700991 // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
992 // treated as a stand-alone channel and not inherit this argument from the
993 // args of the parent channel.
994 GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
Mark D. Rothbd0f1512018-02-20 10:28:22 -0800995 };
996 // Channel args to add.
997 const grpc_arg args_to_add[] = {
998 // New LB addresses.
999 // Note that we pass these in both when creating the LB channel
1000 // and via the fake resolver. The latter is what actually gets used.
1001 grpc_lb_addresses_create_channel_arg(lb_addresses),
1002 // The fake resolver response generator, which we use to inject
1003 // address updates into the LB channel.
1004 grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
1005 response_generator),
Yihua Zhang392dad72018-05-03 20:12:20 -07001006 // A channel arg indicating the target is a grpclb load balancer.
1007 grpc_channel_arg_integer_create(
1008 const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
Mark D. Rothbd0f1512018-02-20 10:28:22 -08001009 };
1010 // Construct channel args.
1011 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1012 args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
1013 GPR_ARRAY_SIZE(args_to_add));
1014 // Make any necessary modifications for security.
1015 new_args = grpc_lb_policy_grpclb_modify_lb_channel_args(new_args);
1016 // Clean up.
Mark D. Rothc8875492018-02-20 08:33:48 -08001017 grpc_lb_addresses_destroy(lb_addresses);
Mark D. Rothbd0f1512018-02-20 10:28:22 -08001018 return new_args;
Mark D. Rothc8875492018-02-20 08:33:48 -08001019}
1020
1021//
1022// ctor and dtor
1023//
1024
1025GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
1026 const LoadBalancingPolicy::Args& args)
1027 : LoadBalancingPolicy(args),
1028 response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1029 lb_call_backoff_(
1030 BackOff::Options()
1031 .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
1032 1000)
1033 .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1034 .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1035 .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1036 1000)) {
1037 // Initialization.
1038 grpc_subchannel_index_ref();
1039 GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
1040 &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
1041 grpc_combiner_scheduler(args.combiner));
1042 GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_,
1043 &GrpcLb::OnRoundRobinConnectivityChangedLocked, this,
1044 grpc_combiner_scheduler(args.combiner));
1045 GRPC_CLOSURE_INIT(&on_rr_request_reresolution_,
1046 &GrpcLb::OnRoundRobinRequestReresolutionLocked, this,
1047 grpc_combiner_scheduler(args.combiner));
1048 grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "grpclb");
1049 // Record server name.
Noah Eisen7ea8a602018-06-14 11:43:18 -04001050 const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1051 const char* server_uri = grpc_channel_arg_get_string(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -08001052 GPR_ASSERT(server_uri != nullptr);
1053 grpc_uri* uri = grpc_uri_parse(server_uri, true);
1054 GPR_ASSERT(uri->path[0] != '\0');
1055 server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1056 if (grpc_lb_glb_trace.enabled()) {
1057 gpr_log(GPR_INFO,
1058 "[grpclb %p] Will use '%s' as the server name for LB request.",
1059 this, server_name_);
1060 }
1061 grpc_uri_destroy(uri);
1062 // Record LB call timeout.
Noah Eisen7ea8a602018-06-14 11:43:18 -04001063 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1064 lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
Mark D. Rothc8875492018-02-20 08:33:48 -08001065 // Record fallback timeout.
Noah Eisen7ea8a602018-06-14 11:43:18 -04001066 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
1067 lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
1068 arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
Mark D. Rothc8875492018-02-20 08:33:48 -08001069 // Process channel args.
1070 ProcessChannelArgsLocked(*args.args);
1071}
1072
1073GrpcLb::~GrpcLb() {
1074 GPR_ASSERT(pending_picks_ == nullptr);
1075 GPR_ASSERT(pending_pings_ == nullptr);
1076 gpr_free((void*)server_name_);
1077 grpc_channel_args_destroy(args_);
1078 grpc_connectivity_state_destroy(&state_tracker_);
1079 if (serverlist_ != nullptr) {
1080 grpc_grpclb_destroy_serverlist(serverlist_);
1081 }
1082 if (fallback_backend_addresses_ != nullptr) {
1083 grpc_lb_addresses_destroy(fallback_backend_addresses_);
1084 }
1085 grpc_subchannel_index_unref();
1086}
1087
1088void GrpcLb::ShutdownLocked() {
1089 grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
1090 shutting_down_ = true;
1091 lb_calld_.reset();
1092 if (retry_timer_callback_pending_) {
1093 grpc_timer_cancel(&lb_call_retry_timer_);
1094 }
1095 if (fallback_timer_callback_pending_) {
1096 grpc_timer_cancel(&lb_fallback_timer_);
1097 }
1098 rr_policy_.reset();
1099 TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_CANCELLED);
1100 // We destroy the LB channel here instead of in our destructor because
1101 // destroying the channel triggers a last callback to
1102 // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1103 // alive when that callback is invoked.
1104 if (lb_channel_ != nullptr) {
1105 grpc_channel_destroy(lb_channel_);
1106 lb_channel_ = nullptr;
1107 }
1108 grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN,
1109 GRPC_ERROR_REF(error), "grpclb_shutdown");
1110 // Clear pending picks.
1111 PendingPick* pp;
1112 while ((pp = pending_picks_) != nullptr) {
1113 pending_picks_ = pp->next;
1114 pp->pick->connected_subchannel.reset();
1115 // Note: pp is deleted in this callback.
1116 GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
1117 }
1118 // Clear pending pings.
1119 PendingPing* pping;
1120 while ((pping = pending_pings_) != nullptr) {
1121 pending_pings_ = pping->next;
1122 GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
1123 GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
1124 Delete(pping);
1125 }
1126 GRPC_ERROR_UNREF(error);
1127}
1128
1129//
1130// public methods
1131//
1132
1133void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
1134 PendingPick* pp;
1135 while ((pp = pending_picks_) != nullptr) {
1136 pending_picks_ = pp->next;
1137 pp->pick->on_complete = pp->original_on_complete;
1138 pp->pick->user_data = nullptr;
1139 if (new_policy->PickLocked(pp->pick)) {
1140 // Synchronous return; schedule closure.
1141 GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE);
1142 }
1143 Delete(pp);
1144 }
1145}
1146
1147// Cancel a specific pending pick.
1148//
1149// A grpclb pick progresses as follows:
1150// - If there's a Round Robin policy (rr_policy_) available, it'll be
1151// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
1152// that point onwards, it'll be RR's responsibility. For cancellations, that
1153// implies the pick needs also be cancelled by the RR instance.
1154// - Otherwise, without an RR instance, picks stay pending at this policy's
1155// level (grpclb), inside the pending_picks_ list. To cancel these,
1156// we invoke the completion closure and set the pick's connected
1157// subchannel to nullptr right here.
1158void GrpcLb::CancelPickLocked(PickState* pick, grpc_error* error) {
1159 PendingPick* pp = pending_picks_;
1160 pending_picks_ = nullptr;
1161 while (pp != nullptr) {
1162 PendingPick* next = pp->next;
1163 if (pp->pick == pick) {
1164 pick->connected_subchannel.reset();
1165 // Note: pp is deleted in this callback.
1166 GRPC_CLOSURE_SCHED(&pp->on_complete,
1167 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1168 "Pick Cancelled", &error, 1));
1169 } else {
1170 pp->next = pending_picks_;
1171 pending_picks_ = pp;
1172 }
1173 pp = next;
1174 }
1175 if (rr_policy_ != nullptr) {
1176 rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error));
1177 }
1178 GRPC_ERROR_UNREF(error);
1179}
1180
1181// Cancel all pending picks.
1182//
1183// A grpclb pick progresses as follows:
1184// - If there's a Round Robin policy (rr_policy_) available, it'll be
1185// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From
1186// that point onwards, it'll be RR's responsibility. For cancellations, that
1187// implies the pick needs also be cancelled by the RR instance.
1188// - Otherwise, without an RR instance, picks stay pending at this policy's
1189// level (grpclb), inside the pending_picks_ list. To cancel these,
1190// we invoke the completion closure and set the pick's connected
1191// subchannel to nullptr right here.
1192void GrpcLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
1193 uint32_t initial_metadata_flags_eq,
1194 grpc_error* error) {
1195 PendingPick* pp = pending_picks_;
1196 pending_picks_ = nullptr;
1197 while (pp != nullptr) {
1198 PendingPick* next = pp->next;
1199 if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) ==
1200 initial_metadata_flags_eq) {
1201 // Note: pp is deleted in this callback.
1202 GRPC_CLOSURE_SCHED(&pp->on_complete,
1203 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1204 "Pick Cancelled", &error, 1));
1205 } else {
1206 pp->next = pending_picks_;
1207 pending_picks_ = pp;
1208 }
1209 pp = next;
1210 }
1211 if (rr_policy_ != nullptr) {
1212 rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask,
1213 initial_metadata_flags_eq,
1214 GRPC_ERROR_REF(error));
1215 }
1216 GRPC_ERROR_UNREF(error);
1217}
1218
1219void GrpcLb::ExitIdleLocked() {
1220 if (!started_picking_) {
1221 StartPickingLocked();
1222 }
1223}
1224
1225bool GrpcLb::PickLocked(PickState* pick) {
1226 PendingPick* pp = PendingPickCreate(pick);
1227 bool pick_done = false;
1228 if (rr_policy_ != nullptr) {
1229 const grpc_connectivity_state rr_connectivity_state =
1230 rr_policy_->CheckConnectivityLocked(nullptr);
1231 // The RR policy may have transitioned to SHUTDOWN but the callback
1232 // registered to capture this event (on_rr_connectivity_changed_) may not
1233 // have been invoked yet. We need to make sure we aren't trying to pick
1234 // from an RR policy instance that's in shutdown.
1235 if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
1236 if (grpc_lb_glb_trace.enabled()) {
1237 gpr_log(GPR_INFO,
1238 "[grpclb %p] NOT picking from from RR %p: RR conn state=%s",
1239 this, rr_policy_.get(),
1240 grpc_connectivity_state_name(rr_connectivity_state));
1241 }
1242 AddPendingPick(pp);
1243 pick_done = false;
1244 } else { // RR not in shutdown
1245 if (grpc_lb_glb_trace.enabled()) {
1246 gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
1247 rr_policy_.get());
1248 }
1249 pick_done = PickFromRoundRobinPolicyLocked(false /* force_async */, pp);
1250 }
1251 } else { // rr_policy_ == NULL
1252 if (grpc_lb_glb_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001253 gpr_log(GPR_INFO,
Mark D. Rothc8875492018-02-20 08:33:48 -08001254 "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
1255 this);
1256 }
1257 AddPendingPick(pp);
1258 if (!started_picking_) {
1259 StartPickingLocked();
1260 }
1261 pick_done = false;
1262 }
1263 return pick_done;
1264}
1265
1266void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
1267 if (rr_policy_ != nullptr) {
1268 rr_policy_->PingOneLocked(on_initiate, on_ack);
1269 } else {
1270 AddPendingPing(on_initiate, on_ack);
1271 if (!started_picking_) {
1272 StartPickingLocked();
1273 }
1274 }
1275}
1276
1277grpc_connectivity_state GrpcLb::CheckConnectivityLocked(
1278 grpc_error** connectivity_error) {
1279 return grpc_connectivity_state_get(&state_tracker_, connectivity_error);
1280}
1281
1282void GrpcLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
1283 grpc_closure* notify) {
1284 grpc_connectivity_state_notify_on_state_change(&state_tracker_, current,
1285 notify);
1286}
1287
1288void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
Noah Eisen7ea8a602018-06-14 11:43:18 -04001289 const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
1290 if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
Mark D. Rothc8875492018-02-20 08:33:48 -08001291 // Ignore this update.
1292 gpr_log(
1293 GPR_ERROR,
1294 "[grpclb %p] No valid LB addresses channel arg in update, ignoring.",
1295 this);
1296 return;
1297 }
Noah Eisen7ea8a602018-06-14 11:43:18 -04001298 const grpc_lb_addresses* addresses =
1299 static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
Mark D. Rothc8875492018-02-20 08:33:48 -08001300 // Update fallback address list.
1301 if (fallback_backend_addresses_ != nullptr) {
1302 grpc_lb_addresses_destroy(fallback_backend_addresses_);
1303 }
1304 fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
1305 // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1306 // since we use this to trigger the client_load_reporting filter.
1307 static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1308 grpc_arg new_arg = grpc_channel_arg_string_create(
1309 (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
1310 grpc_channel_args_destroy(args_);
1311 args_ = grpc_channel_args_copy_and_add_and_remove(
1312 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1313 // Construct args for balancer channel.
1314 grpc_channel_args* lb_channel_args =
1315 BuildBalancerChannelArgs(addresses, response_generator_.get(), &args);
1316 // Create balancer channel if needed.
1317 if (lb_channel_ == nullptr) {
1318 char* uri_str;
1319 gpr_asprintf(&uri_str, "fake:///%s", server_name_);
Mark D. Rothbd0f1512018-02-20 10:28:22 -08001320 lb_channel_ = grpc_client_channel_factory_create_channel(
1321 client_channel_factory(), uri_str,
1322 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args);
Mark D. Rothc8875492018-02-20 08:33:48 -08001323 GPR_ASSERT(lb_channel_ != nullptr);
1324 gpr_free(uri_str);
1325 }
1326 // Propagate updates to the LB channel (pick_first) through the fake
1327 // resolver.
1328 response_generator_->SetResponse(lb_channel_args);
1329 grpc_channel_args_destroy(lb_channel_args);
1330}
1331
1332void GrpcLb::UpdateLocked(const grpc_channel_args& args) {
1333 ProcessChannelArgsLocked(args);
1334 // If fallback is configured and the RR policy already exists, update
1335 // it with the new fallback addresses.
1336 if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) {
1337 CreateOrUpdateRoundRobinPolicyLocked();
1338 }
1339 // Start watching the LB channel connectivity for connection, if not
1340 // already doing so.
1341 if (!watching_lb_channel_) {
1342 lb_channel_connectivity_ = grpc_channel_check_connectivity_state(
1343 lb_channel_, true /* try to connect */);
1344 grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1345 grpc_channel_get_channel_stack(lb_channel_));
1346 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1347 watching_lb_channel_ = true;
1348 // TODO(roth): We currently track this ref manually. Once the
1349 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1350 // with the callback.
1351 auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
1352 self.release();
1353 grpc_client_channel_watch_connectivity_state(
1354 client_channel_elem,
1355 grpc_polling_entity_create_from_pollset_set(interested_parties()),
1356 &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
1357 nullptr);
1358 }
1359}
1360
1361//
1362// code for balancer channel and call
1363//
1364
1365void GrpcLb::StartPickingLocked() {
1366 // Start a timer to fall back.
1367 if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr &&
1368 !fallback_timer_callback_pending_) {
1369 grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
1370 // TODO(roth): We currently track this ref manually. Once the
1371 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1372 // with the callback.
1373 auto self = Ref(DEBUG_LOCATION, "on_fallback_timer");
1374 self.release();
1375 GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
1376 grpc_combiner_scheduler(combiner()));
1377 fallback_timer_callback_pending_ = true;
1378 grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1379 }
1380 started_picking_ = true;
1381 StartBalancerCallLocked();
1382}
1383
1384void GrpcLb::StartBalancerCallLocked() {
1385 GPR_ASSERT(lb_channel_ != nullptr);
1386 if (shutting_down_) return;
1387 // Init the LB call data.
1388 GPR_ASSERT(lb_calld_ == nullptr);
1389 lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1390 if (grpc_lb_glb_trace.enabled()) {
1391 gpr_log(GPR_INFO,
1392 "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1393 this, lb_channel_, lb_calld_.get());
1394 }
1395 lb_calld_->StartQuery();
1396}
1397
1398void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1399 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1400 grpclb_policy->fallback_timer_callback_pending_ = false;
1401 // If we receive a serverlist after the timer fires but before this callback
1402 // actually runs, don't fall back.
1403 if (grpclb_policy->serverlist_ == nullptr && !grpclb_policy->shutting_down_ &&
1404 error == GRPC_ERROR_NONE) {
1405 if (grpc_lb_glb_trace.enabled()) {
1406 gpr_log(GPR_INFO,
1407 "[grpclb %p] Falling back to use backends from resolver",
1408 grpclb_policy);
1409 }
1410 GPR_ASSERT(grpclb_policy->fallback_backend_addresses_ != nullptr);
1411 grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
1412 }
1413 grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1414}
1415
1416void GrpcLb::StartBalancerCallRetryTimerLocked() {
1417 grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
1418 if (grpc_lb_glb_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001419 gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
Mark D. Rothc8875492018-02-20 08:33:48 -08001420 grpc_millis timeout = next_try - ExecCtx::Get()->Now();
1421 if (timeout > 0) {
Sree Kuchibhotla6d5c2c22018-05-08 10:24:30 -07001422 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
Mark D. Roth48854d22018-04-25 13:05:26 -07001423 this, timeout);
Mark D. Rothc8875492018-02-20 08:33:48 -08001424 } else {
Mark D. Roth48854d22018-04-25 13:05:26 -07001425 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
Mark D. Rothc8875492018-02-20 08:33:48 -08001426 this);
1427 }
1428 }
1429 // TODO(roth): We currently track this ref manually. Once the
1430 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1431 // with the callback.
1432 auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1433 self.release();
1434 GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
1435 this, grpc_combiner_scheduler(combiner()));
1436 retry_timer_callback_pending_ = true;
1437 grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
1438}
1439
1440void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -08001441 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -08001442 grpclb_policy->retry_timer_callback_pending_ = false;
1443 if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
1444 grpclb_policy->lb_calld_ == nullptr) {
1445 if (grpc_lb_glb_trace.enabled()) {
1446 gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
1447 grpclb_policy);
1448 }
1449 grpclb_policy->StartBalancerCallLocked();
1450 }
1451 grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1452}
1453
1454// Invoked as part of the update process. It continues watching the LB channel
1455// until it shuts down or becomes READY. It's invoked even if the LB channel
1456// stayed READY throughout the update (for example if the update is identical).
1457void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
1458 grpc_error* error) {
1459 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1460 if (grpclb_policy->shutting_down_) goto done;
1461 // Re-initialize the lb_call. This should also take care of updating the
1462 // embedded RR policy. Note that the current RR policy, if any, will stay in
1463 // effect until an update from the new lb_call is received.
1464 switch (grpclb_policy->lb_channel_connectivity_) {
1465 case GRPC_CHANNEL_CONNECTING:
1466 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
1467 // Keep watching the LB channel.
1468 grpc_channel_element* client_channel_elem =
1469 grpc_channel_stack_last_element(
1470 grpc_channel_get_channel_stack(grpclb_policy->lb_channel_));
1471 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1472 grpc_client_channel_watch_connectivity_state(
1473 client_channel_elem,
1474 grpc_polling_entity_create_from_pollset_set(
1475 grpclb_policy->interested_parties()),
1476 &grpclb_policy->lb_channel_connectivity_,
1477 &grpclb_policy->lb_channel_on_connectivity_changed_, nullptr);
1478 break;
1479 }
1480 // The LB channel may be IDLE because it's shut down before the update.
1481 // Restart the LB call to kick the LB channel into gear.
1482 case GRPC_CHANNEL_IDLE:
1483 case GRPC_CHANNEL_READY:
1484 grpclb_policy->lb_calld_.reset();
1485 if (grpclb_policy->started_picking_) {
1486 if (grpclb_policy->retry_timer_callback_pending_) {
1487 grpc_timer_cancel(&grpclb_policy->lb_call_retry_timer_);
1488 }
1489 grpclb_policy->lb_call_backoff_.Reset();
1490 grpclb_policy->StartBalancerCallLocked();
1491 }
1492 // Fall through.
1493 case GRPC_CHANNEL_SHUTDOWN:
1494 done:
1495 grpclb_policy->watching_lb_channel_ = false;
1496 grpclb_policy->Unref(DEBUG_LOCATION,
1497 "watch_lb_channel_connectivity_cb_shutdown");
1498 }
1499}
1500
1501//
1502// PendingPick
1503//
1504
1505// Adds lb_token of selected subchannel (address) to the call's initial
1506// metadata.
1507grpc_error* AddLbTokenToInitialMetadata(
1508 grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage,
1509 grpc_metadata_batch* initial_metadata) {
1510 GPR_ASSERT(lb_token_mdelem_storage != nullptr);
1511 GPR_ASSERT(!GRPC_MDISNULL(lb_token));
1512 return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage,
1513 lb_token);
1514}
1515
1516// Destroy function used when embedding client stats in call context.
1517void DestroyClientStats(void* arg) {
Mark D. Roth290d35e2018-05-17 14:15:36 -07001518 static_cast<GrpcLbClientStats*>(arg)->Unref();
Mark D. Rothc8875492018-02-20 08:33:48 -08001519}
1520
1521void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
1522 /* if connected_subchannel is nullptr, no pick has been made by the RR
1523 * policy (e.g., all addresses failed to connect). There won't be any
1524 * user_data/token available */
1525 if (pp->pick->connected_subchannel != nullptr) {
Yash Tibrewal7f51ba82018-04-12 13:21:20 -07001526 if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) {
Mark D. Rothc8875492018-02-20 08:33:48 -08001527 AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token),
1528 &pp->pick->lb_token_mdelem_storage,
1529 pp->pick->initial_metadata);
1530 } else {
1531 gpr_log(GPR_ERROR,
1532 "[grpclb %p] No LB token for connected subchannel pick %p",
1533 pp->grpclb_policy, pp->pick);
1534 abort();
1535 }
1536 // Pass on client stats via context. Passes ownership of the reference.
1537 if (pp->client_stats != nullptr) {
1538 pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
Mark D. Roth290d35e2018-05-17 14:15:36 -07001539 pp->client_stats.release();
Mark D. Rothc8875492018-02-20 08:33:48 -08001540 pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
1541 DestroyClientStats;
1542 }
1543 } else {
Mark D. Roth290d35e2018-05-17 14:15:36 -07001544 pp->client_stats.reset();
Mark D. Rothc8875492018-02-20 08:33:48 -08001545 }
1546}
1547
1548/* The \a on_complete closure passed as part of the pick requires keeping a
1549 * reference to its associated round robin instance. We wrap this closure in
1550 * order to unref the round robin instance upon its invocation */
1551void GrpcLb::OnPendingPickComplete(void* arg, grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -08001552 PendingPick* pp = static_cast<PendingPick*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -08001553 PendingPickSetMetadataAndContext(pp);
1554 GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error));
1555 Delete(pp);
1556}
1557
1558GrpcLb::PendingPick* GrpcLb::PendingPickCreate(PickState* pick) {
1559 PendingPick* pp = New<PendingPick>();
1560 pp->grpclb_policy = this;
1561 pp->pick = pick;
1562 GRPC_CLOSURE_INIT(&pp->on_complete, &GrpcLb::OnPendingPickComplete, pp,
1563 grpc_schedule_on_exec_ctx);
1564 pp->original_on_complete = pick->on_complete;
1565 pick->on_complete = &pp->on_complete;
1566 return pp;
1567}
1568
1569void GrpcLb::AddPendingPick(PendingPick* pp) {
1570 pp->next = pending_picks_;
1571 pending_picks_ = pp;
1572}
1573
1574//
1575// PendingPing
1576//
1577
1578void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
1579 PendingPing* pping = New<PendingPing>();
1580 pping->on_initiate = on_initiate;
1581 pping->on_ack = on_ack;
1582 pping->next = pending_pings_;
1583 pending_pings_ = pping;
1584}
1585
1586//
1587// code for interacting with the RR policy
1588//
1589
1590// Performs a pick over \a rr_policy_. Given that a pick can return
1591// immediately (ignoring its completion callback), we need to perform the
1592// cleanups this callback would otherwise be responsible for.
1593// If \a force_async is true, then we will manually schedule the
1594// completion callback even if the pick is available immediately.
1595bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
1596 // Check for drops if we are not using fallback backend addresses.
1597 if (serverlist_ != nullptr) {
1598 // Look at the index into the serverlist to see if we should drop this call.
1599 grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++];
1600 if (serverlist_index_ == serverlist_->num_servers) {
1601 serverlist_index_ = 0; // Wrap-around.
1602 }
1603 if (server->drop) {
1604 // Update client load reporting stats to indicate the number of
1605 // dropped calls. Note that we have to do this here instead of in
1606 // the client_load_reporting filter, because we do not create a
1607 // subchannel call (and therefore no client_load_reporting filter)
1608 // for dropped calls.
1609 if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
Mark D. Roth290d35e2018-05-17 14:15:36 -07001610 lb_calld_->client_stats()->AddCallDroppedLocked(
1611 server->load_balance_token);
Mark D. Rothc8875492018-02-20 08:33:48 -08001612 }
1613 if (force_async) {
1614 GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
1615 Delete(pp);
1616 return false;
1617 }
1618 Delete(pp);
1619 return true;
1620 }
1621 }
1622 // Set client_stats and user_data.
1623 if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
Mark D. Roth290d35e2018-05-17 14:15:36 -07001624 pp->client_stats = lb_calld_->client_stats()->Ref();
Mark D. Rothc8875492018-02-20 08:33:48 -08001625 }
1626 GPR_ASSERT(pp->pick->user_data == nullptr);
1627 pp->pick->user_data = (void**)&pp->lb_token;
1628 // Pick via the RR policy.
1629 bool pick_done = rr_policy_->PickLocked(pp->pick);
1630 if (pick_done) {
1631 PendingPickSetMetadataAndContext(pp);
1632 if (force_async) {
1633 GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
1634 pick_done = false;
1635 }
1636 Delete(pp);
1637 }
1638 // else, the pending pick will be registered and taken care of by the
1639 // pending pick list inside the RR policy. Eventually,
1640 // OnPendingPickComplete() will be called, which will (among other
1641 // things) add the LB token to the call's initial metadata.
1642 return pick_done;
1643}
1644
1645void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
1646 GPR_ASSERT(rr_policy_ == nullptr);
1647 rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1648 "round_robin", args);
Yash Tibrewal7f51ba82018-04-12 13:21:20 -07001649 if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
Mark D. Rothc8875492018-02-20 08:33:48 -08001650 gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
1651 this);
1652 return;
1653 }
1654 // TODO(roth): We currently track this ref manually. Once the new
1655 // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
1656 auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested");
1657 self.release();
1658 rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_);
1659 grpc_error* rr_state_error = nullptr;
1660 rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error);
1661 // Connectivity state is a function of the RR policy updated/created.
1662 UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error);
1663 // Add the gRPC LB's interested_parties pollset_set to that of the newly
1664 // created RR policy. This will make the RR policy progress upon activity on
1665 // gRPC LB, which in turn is tied to the application's call.
1666 grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(),
1667 interested_parties());
1668 // Subscribe to changes to the connectivity of the new RR.
1669 // TODO(roth): We currently track this ref manually. Once the new
1670 // ClosureRef API is done, pass the RefCountedPtr<> along with the closure.
1671 self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed");
1672 self.release();
1673 rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_,
1674 &on_rr_connectivity_changed_);
1675 rr_policy_->ExitIdleLocked();
1676 // Send pending picks to RR policy.
1677 PendingPick* pp;
1678 while ((pp = pending_picks_)) {
1679 pending_picks_ = pp->next;
1680 if (grpc_lb_glb_trace.enabled()) {
1681 gpr_log(GPR_INFO,
1682 "[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
1683 rr_policy_.get());
1684 }
1685 PickFromRoundRobinPolicyLocked(true /* force_async */, pp);
1686 }
1687 // Send pending pings to RR policy.
1688 PendingPing* pping;
1689 while ((pping = pending_pings_)) {
1690 pending_pings_ = pping->next;
1691 if (grpc_lb_glb_trace.enabled()) {
1692 gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
1693 this, rr_policy_.get());
1694 }
1695 rr_policy_->PingOneLocked(pping->on_initiate, pping->on_ack);
1696 Delete(pping);
1697 }
1698}
1699
1700grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
1701 grpc_lb_addresses* addresses;
Yihua Zhang392dad72018-05-03 20:12:20 -07001702 bool is_backend_from_grpclb_load_balancer = false;
Mark D. Rothc8875492018-02-20 08:33:48 -08001703 if (serverlist_ != nullptr) {
1704 GPR_ASSERT(serverlist_->num_servers > 0);
1705 addresses = ProcessServerlist(serverlist_);
Yihua Zhang392dad72018-05-03 20:12:20 -07001706 is_backend_from_grpclb_load_balancer = true;
Mark D. Rothc8875492018-02-20 08:33:48 -08001707 } else {
1708 // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't
1709 // received any serverlist from the balancer, we use the fallback backends
1710 // returned by the resolver. Note that the fallback backend list may be
1711 // empty, in which case the new round_robin policy will keep the requested
1712 // picks pending.
1713 GPR_ASSERT(fallback_backend_addresses_ != nullptr);
1714 addresses = grpc_lb_addresses_copy(fallback_backend_addresses_);
1715 }
1716 GPR_ASSERT(addresses != nullptr);
1717 // Replace the LB addresses in the channel args that we pass down to
1718 // the subchannel.
1719 static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
Yihua Zhang392dad72018-05-03 20:12:20 -07001720 const grpc_arg args_to_add[] = {
1721 grpc_lb_addresses_create_channel_arg(addresses),
1722 // A channel arg indicating if the target is a backend inferred from a
1723 // grpclb load balancer.
1724 grpc_channel_arg_integer_create(
1725 const_cast<char*>(
1726 GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
1727 is_backend_from_grpclb_load_balancer),
1728 };
Mark D. Rothc8875492018-02-20 08:33:48 -08001729 grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
Yihua Zhang392dad72018-05-03 20:12:20 -07001730 args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
1731 GPR_ARRAY_SIZE(args_to_add));
Mark D. Rothc8875492018-02-20 08:33:48 -08001732 grpc_lb_addresses_destroy(addresses);
1733 return args;
1734}
1735
1736void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
1737 if (shutting_down_) return;
1738 grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked();
1739 GPR_ASSERT(args != nullptr);
1740 if (rr_policy_ != nullptr) {
1741 if (grpc_lb_glb_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001742 gpr_log(GPR_INFO, "[grpclb %p] Updating RR policy %p", this,
Mark D. Rothc8875492018-02-20 08:33:48 -08001743 rr_policy_.get());
1744 }
1745 rr_policy_->UpdateLocked(*args);
1746 } else {
1747 LoadBalancingPolicy::Args lb_policy_args;
1748 lb_policy_args.combiner = combiner();
1749 lb_policy_args.client_channel_factory = client_channel_factory();
1750 lb_policy_args.args = args;
1751 CreateRoundRobinPolicyLocked(lb_policy_args);
1752 if (grpc_lb_glb_trace.enabled()) {
Mark D. Roth48854d22018-04-25 13:05:26 -07001753 gpr_log(GPR_INFO, "[grpclb %p] Created new RR policy %p", this,
Mark D. Rothc8875492018-02-20 08:33:48 -08001754 rr_policy_.get());
1755 }
1756 }
1757 grpc_channel_args_destroy(args);
1758}
1759
1760void GrpcLb::OnRoundRobinRequestReresolutionLocked(void* arg,
1761 grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -08001762 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -08001763 if (grpclb_policy->shutting_down_ || error != GRPC_ERROR_NONE) {
1764 grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested");
1765 return;
1766 }
1767 if (grpc_lb_glb_trace.enabled()) {
1768 gpr_log(
Mark D. Roth48854d22018-04-25 13:05:26 -07001769 GPR_INFO,
Mark D. Rothc8875492018-02-20 08:33:48 -08001770 "[grpclb %p] Re-resolution requested from the internal RR policy (%p).",
1771 grpclb_policy, grpclb_policy->rr_policy_.get());
1772 }
1773 // If we are talking to a balancer, we expect to get updated addresses form
1774 // the balancer, so we can ignore the re-resolution request from the RR
1775 // policy. Otherwise, handle the re-resolution request using the
1776 // grpclb policy's original re-resolution closure.
1777 if (grpclb_policy->lb_calld_ == nullptr ||
1778 !grpclb_policy->lb_calld_->seen_initial_response()) {
1779 grpclb_policy->TryReresolutionLocked(&grpc_lb_glb_trace, GRPC_ERROR_NONE);
1780 }
1781 // Give back the wrapper closure to the RR policy.
1782 grpclb_policy->rr_policy_->SetReresolutionClosureLocked(
1783 &grpclb_policy->on_rr_request_reresolution_);
1784}
1785
1786void GrpcLb::UpdateConnectivityStateFromRoundRobinPolicyLocked(
1787 grpc_error* rr_state_error) {
Craig Tiller613dafa2017-02-09 12:00:43 -08001788 const grpc_connectivity_state curr_glb_state =
Mark D. Rothc8875492018-02-20 08:33:48 -08001789 grpc_connectivity_state_check(&state_tracker_);
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001790 /* The new connectivity status is a function of the previous one and the new
1791 * input coming from the status of the RR policy.
1792 *
David Garcia Quintas4283a262016-11-18 10:43:56 -08001793 * current state (grpclb's)
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001794 * |
1795 * v || I | C | R | TF | SD | <- new state (RR's)
1796 * ===++====+=====+=====+======+======+
David Garcia Quintas4283a262016-11-18 10:43:56 -08001797 * I || I | C | R | [I] | [I] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001798 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -08001799 * C || I | C | R | [C] | [C] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001800 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -08001801 * R || I | C | R | [R] | [R] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001802 * ---++----+-----+-----+------+------+
David Garcia Quintas4283a262016-11-18 10:43:56 -08001803 * TF || I | C | R | [TF] | [TF] |
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001804 * ---++----+-----+-----+------+------+
1805 * SD || NA | NA | NA | NA | NA | (*)
1806 * ---++----+-----+-----+------+------+
1807 *
David Garcia Quintas4283a262016-11-18 10:43:56 -08001808 * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
1809 * is the current state of grpclb, which is left untouched.
1810 *
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001811 * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
1812 * the previous RR instance.
1813 *
1814 * Note that the status is never updated to SHUTDOWN as a result of calling
1815 * this function. Only glb_shutdown() has the power to set that state.
1816 *
1817 * (*) This function mustn't be called during shutting down. */
1818 GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
Mark D. Rothc8875492018-02-20 08:33:48 -08001819 switch (rr_connectivity_state_) {
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001820 case GRPC_CHANNEL_TRANSIENT_FAILURE:
1821 case GRPC_CHANNEL_SHUTDOWN:
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001822 GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE);
1823 break;
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001824 case GRPC_CHANNEL_IDLE:
1825 case GRPC_CHANNEL_CONNECTING:
1826 case GRPC_CHANNEL_READY:
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001827 GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE);
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001828 }
Craig Tiller6014e8a2017-10-16 13:50:29 -07001829 if (grpc_lb_glb_trace.enabled()) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -07001830 gpr_log(
David Garcia Quintasa1c65902017-11-09 10:37:35 -08001831 GPR_INFO,
1832 "[grpclb %p] Setting grpclb's state to %s from new RR policy %p state.",
Mark D. Rothc8875492018-02-20 08:33:48 -08001833 this, grpc_connectivity_state_name(rr_connectivity_state_),
1834 rr_policy_.get());
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001835 }
Mark D. Rothc8875492018-02-20 08:33:48 -08001836 grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_,
1837 rr_state_error,
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001838 "update_lb_connectivity_status_locked");
David Garcia Quintas149f09d2016-11-17 20:43:10 -08001839}
1840
Mark D. Rothc8875492018-02-20 08:33:48 -08001841void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg,
1842 grpc_error* error) {
Vijay Pai7fed69b2018-03-05 09:58:05 -08001843 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
Mark D. Rothc8875492018-02-20 08:33:48 -08001844 if (grpclb_policy->shutting_down_) {
1845 grpclb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed");
Juanli Shen87c65042018-02-15 09:42:45 -08001846 return;
1847 }
Mark D. Rothc8875492018-02-20 08:33:48 -08001848 grpclb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked(
1849 GRPC_ERROR_REF(error));
1850 // Resubscribe. Reuse the "on_rr_connectivity_changed" ref.
1851 grpclb_policy->rr_policy_->NotifyOnStateChangeLocked(
1852 &grpclb_policy->rr_connectivity_state_,
1853 &grpclb_policy->on_rr_connectivity_changed_);
Juanli Shen87c65042018-02-15 09:42:45 -08001854}
1855
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001856//
Mark D. Rothc8875492018-02-20 08:33:48 -08001857// factory
David Garcia Quintasc22c65b2017-07-25 14:22:20 -07001858//
Mark D. Rothc8875492018-02-20 08:33:48 -08001859
1860class GrpcLbFactory : public LoadBalancingPolicyFactory {
1861 public:
1862 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1863 const LoadBalancingPolicy::Args& args) const override {
1864 /* Count the number of gRPC-LB addresses. There must be at least one. */
Noah Eisen7ea8a602018-06-14 11:43:18 -04001865 const grpc_arg* arg =
1866 grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES);
1867 if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
Mark D. Rothc8875492018-02-20 08:33:48 -08001868 return nullptr;
David Garcia Quintas65318262016-07-29 13:43:38 -07001869 }
Noah Eisen7ea8a602018-06-14 11:43:18 -04001870 grpc_lb_addresses* addresses =
1871 static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
Mark D. Rothc8875492018-02-20 08:33:48 -08001872 size_t num_grpclb_addrs = 0;
1873 for (size_t i = 0; i < addresses->num_addresses; ++i) {
1874 if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
David Garcia Quintas65318262016-07-29 13:43:38 -07001875 }
Mark D. Rothc8875492018-02-20 08:33:48 -08001876 if (num_grpclb_addrs == 0) return nullptr;
1877 return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(addresses, args));
David Garcia Quintas65318262016-07-29 13:43:38 -07001878 }
David Garcia Quintas8d489112016-07-29 15:20:42 -07001879
Mark D. Rothc8875492018-02-20 08:33:48 -08001880 const char* name() const override { return "grpclb"; }
1881};
David Garcia Quintas8d489112016-07-29 15:20:42 -07001882
Mark D. Rothc8875492018-02-20 08:33:48 -08001883} // namespace
David Garcia Quintas8d489112016-07-29 15:20:42 -07001884
Mark D. Rothc8875492018-02-20 08:33:48 -08001885} // namespace grpc_core
David Garcia Quintas65318262016-07-29 13:43:38 -07001886
Mark D. Rothc8875492018-02-20 08:33:48 -08001887//
1888// Plugin registration
1889//
Mark D. Rotha4792f52017-09-26 09:06:35 -07001890
Mark D. Rothc8875492018-02-20 08:33:48 -08001891namespace {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001892
1893// Only add client_load_reporting filter if the grpclb LB policy is used.
Mark D. Rothc8875492018-02-20 08:33:48 -08001894bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
1895 void* arg) {
Craig Tillerbaa14a92017-11-03 09:09:36 -07001896 const grpc_channel_args* args =
Mark D. Roth09e458c2017-05-02 08:13:26 -07001897 grpc_channel_stack_builder_get_channel_arguments(builder);
Noah Eisen7ea8a602018-06-14 11:43:18 -04001898 const grpc_arg* channel_arg =
1899 grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1900 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
1901 strcmp(channel_arg->value.string, "grpclb") == 0) {
Mark D. Roth09e458c2017-05-02 08:13:26 -07001902 return grpc_channel_stack_builder_append_filter(
Mark D. Rothc8875492018-02-20 08:33:48 -08001903 builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
Mark D. Roth09e458c2017-05-02 08:13:26 -07001904 }
1905 return true;
1906}
1907
Mark D. Rothc8875492018-02-20 08:33:48 -08001908} // namespace
1909
ncteisenadbfbd52017-11-16 15:35:45 -08001910void grpc_lb_policy_grpclb_init() {
Mark D. Rothc8875492018-02-20 08:33:48 -08001911 grpc_core::LoadBalancingPolicyRegistry::Builder::
1912 RegisterLoadBalancingPolicyFactory(
1913 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
1914 grpc_core::New<grpc_core::GrpcLbFactory>()));
Mark D. Roth09e458c2017-05-02 08:13:26 -07001915 grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1916 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1917 maybe_add_client_load_reporting_filter,
Craig Tillerbaa14a92017-11-03 09:09:36 -07001918 (void*)&grpc_client_load_reporting_filter);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001919}
1920
ncteisenadbfbd52017-11-16 15:35:45 -08001921void grpc_lb_policy_grpclb_shutdown() {}