blob: 0824fe373c6c666c65bee39fc9edb7154ad2defb [file] [log] [blame]
Mark D. Roth5e9848e2017-10-06 13:59:32 -07001/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
20#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
21
Alexander Polcyndb3e8982018-02-21 16:59:24 -080022#include <grpc/support/port_platform.h>
23
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070024#include <string.h>
25
26#include <grpc/support/alloc.h>
27
Mark D. Roth5e9848e2017-10-06 13:59:32 -070028#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
29#include "src/core/ext/filters/client_channel/subchannel.h"
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070030#include "src/core/lib/channel/channel_args.h"
Mark D. Roth901bb4f2017-10-11 08:49:07 -070031#include "src/core/lib/debug/trace.h"
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070032#include "src/core/lib/gprpp/abstract.h"
33#include "src/core/lib/gprpp/inlined_vector.h"
34#include "src/core/lib/gprpp/ref_counted.h"
Mark D. Roth4f2b0fd2018-01-19 12:12:23 -080035#include "src/core/lib/gprpp/ref_counted_ptr.h"
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070036#include "src/core/lib/iomgr/closure.h"
37#include "src/core/lib/iomgr/combiner.h"
38#include "src/core/lib/iomgr/sockaddr_utils.h"
Mark D. Roth5e9848e2017-10-06 13:59:32 -070039#include "src/core/lib/transport/connectivity_state.h"
40
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070041// FIXME: add comments
Mark D. Roth5e9848e2017-10-06 13:59:32 -070042
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070043namespace grpc_core {
Mark D. Roth5e9848e2017-10-06 13:59:32 -070044
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070045template <typename SubchannelListType, typename SubchannelDataType>
46class SubchannelData {
47 public:
48 // Returns the index into subchannel_list_ of this object.
49 size_t Index() const {
50 return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
51 subchannel_list_->subchannel(0));
52 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -070053
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070054 SubchannelListType* subchannel_list() const { return subchannel_list_; }
Mark D. Roth5e9848e2017-10-06 13:59:32 -070055
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070056 grpc_subchannel* subchannel() const { return subchannel_; }
Mark D. Roth5e9848e2017-10-06 13:59:32 -070057
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070058 ConnectedSubchannel* connected_subchannel() const {
59 return connected_subchannel_.get();
60 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -070061
Mark D. Rothdb0a4752018-03-30 14:44:27 -070062 void SetConnectedSubchannelFromSubchannelLocked() {
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070063 connected_subchannel_ =
64 grpc_subchannel_get_connected_subchannel(subchannel_);
65 }
Mark D. Roth901bb4f2017-10-11 08:49:07 -070066
Mark D. Rothdb0a4752018-03-30 14:44:27 -070067 // An alternative to SetConnectedSubchannelFromSubchannelLocked() for
68 // cases where we are retaining a connected subchannel from a previous
69 // subchannel list. This is slightly more efficient than getting the
70 // connected subchannel from the subchannel, because that approach
71 // requires the use of a mutex, whereas this one only mutates a
72 // refcount.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070073 void SetConnectedSubchannelFromLocked(SubchannelData* other) {
Mark D. Rothdb0a4752018-03-30 14:44:27 -070074 GPR_ASSERT(subchannel_ == other->subchannel_);
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070075 connected_subchannel_ = other->connected_subchannel_; // Adds ref.
76 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -070077
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070078 grpc_connectivity_state connectivity_state() const {
79 return curr_connectivity_state_;
80 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -070081
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070082 virtual grpc_connectivity_state CheckConnectivityStateLocked() {
83 pending_connectivity_state_unsafe_ =
84 grpc_subchannel_check_connectivity(subchannel(), nullptr);
85 curr_connectivity_state_ = pending_connectivity_state_unsafe_;
86 return curr_connectivity_state_;
87 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -070088
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070089 // Unrefs the subchannel.
Mark D. Rothdb0a4752018-03-30 14:44:27 -070090// FIXME: move this to private in favor of ShutdownLocked()?
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070091 virtual void UnrefSubchannelLocked(const char* reason);
Mark D. Roth5e9848e2017-10-06 13:59:32 -070092
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070093 /// Starts watching the connectivity state of the subchannel.
94 /// The connectivity_changed_cb callback must invoke either
95 /// StopConnectivityWatch() or again call StartConnectivityWatch().
96 void StartConnectivityWatchLocked();
97
98 /// Stops watching the connectivity state of the subchannel.
99 void StopConnectivityWatchLocked();
100
101 /// Cancels watching the connectivity state of the subchannel.
102 void CancelConnectivityWatchLocked(const char* reason);
103
104 void ShutdownLocked(const char* reason);
105
106 GRPC_ABSTRACT_BASE_CLASS
107
108 protected:
109 SubchannelData(
110 SubchannelListType* subchannel_list,
111 const grpc_lb_user_data_vtable* user_data_vtable,
112 const grpc_lb_address& address, grpc_subchannel* subchannel,
113 grpc_combiner* combiner);
114
115 virtual ~SubchannelData();
116
117// FIXME: define API
118 virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT;
119
120 private:
121 static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
122
123 // Backpointer to owning subchannel list. Not owned.
124 SubchannelListType* subchannel_list_;
125
126 // The subchannel and connected subchannel.
127 grpc_subchannel* subchannel_;
128 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
129
130 // Notification that connectivity has changed on subchannel.
131 grpc_closure connectivity_changed_closure_;
132 // Is a connectivity notification pending?
Mark D. Rothdb0a4752018-03-30 14:44:27 -0700133 bool connectivity_notification_pending_ = false;
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700134 // Connectivity state to be updated by
135 // grpc_subchannel_notify_on_state_change(), not guarded by
136 // the combiner. Will be copied to \a curr_connectivity_state_ by
137 // \a connectivity_changed_closure_.
138 grpc_connectivity_state pending_connectivity_state_unsafe_;
139 // Current connectivity state.
140 grpc_connectivity_state curr_connectivity_state_;
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700141};
142
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700143template <typename SubchannelListType, typename SubchannelDataType>
144class SubchannelList
145 : public RefCountedWithTracing<SubchannelListType> {
146 public:
147 typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
148
149 size_t num_subchannels() const { return subchannels_.size(); }
150 SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
151
152 // Marks the subchannel_list as discarded. Unsubscribes all its subchannels.
153 void ShutdownLocked(const char* reason);
154
155 bool shutting_down() const { return shutting_down_; }
156
157 LoadBalancingPolicy* policy() const { return policy_; }
158 TraceFlag* tracer() const { return tracer_; }
159
160 GRPC_ABSTRACT_BASE_CLASS
161
162 protected:
163 SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
164 const grpc_lb_addresses* addresses, grpc_combiner* combiner,
165 grpc_client_channel_factory* client_channel_factory,
166 const grpc_channel_args& args);
167
168 virtual ~SubchannelList();
169
170 private:
171 // So New() can call our private ctor.
172 template <typename T, typename... Args>
173 friend T* New(Args&&... args);
174
175 // Backpointer to owning policy.
176 LoadBalancingPolicy* policy_;
177
178 TraceFlag* tracer_;
179
180 // The list of subchannels.
181 SubchannelVector subchannels_;
182
183 // Is this list shutting down? This may be true due to the shutdown of the
184 // policy itself or because a newer update has arrived while this one hadn't
185 // finished processing.
186 bool shutting_down_ = false;
187};
188
189//
190// implementation -- no user-servicable parts below
191//
192
193//
194// SubchannelData
195//
196
197template <typename SubchannelListType, typename SubchannelDataType>
198SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
199 SubchannelListType* subchannel_list,
200 const grpc_lb_user_data_vtable* user_data_vtable,
201 const grpc_lb_address& address, grpc_subchannel* subchannel,
202 grpc_combiner* combiner)
203 : subchannel_list_(subchannel_list),
204 subchannel_(subchannel),
205 // We assume that the current state is IDLE. If not, we'll get a
206 // callback telling us that.
207 pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE),
208 curr_connectivity_state_(GRPC_CHANNEL_IDLE) {
209 GRPC_CLOSURE_INIT(
210 &connectivity_changed_closure_,
211 (&SubchannelData<SubchannelListType,
212 SubchannelDataType>::OnConnectivityChangedLocked),
213 this, grpc_combiner_scheduler(combiner));
214}
215
216template <typename SubchannelListType, typename SubchannelDataType>
217SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
218 UnrefSubchannelLocked("subchannel_data_destroy");
219}
220
221template <typename SubchannelListType, typename SubchannelDataType>
222void SubchannelData<SubchannelListType,
223 SubchannelDataType>::UnrefSubchannelLocked(
224 const char* reason) {
225 if (subchannel_ != nullptr) {
226 if (subchannel_list_->tracer()->enabled()) {
227 gpr_log(GPR_DEBUG,
228 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
229 " (subchannel %p): unreffing subchannel",
230 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
231 subchannel_list_, Index(),
232 subchannel_list_->num_subchannels(), subchannel_);
233 }
234 GRPC_SUBCHANNEL_UNREF(subchannel_, reason);
235 subchannel_ = nullptr;
236 connected_subchannel_.reset();
237 }
238}
239
240template <typename SubchannelListType, typename SubchannelDataType>
241void SubchannelData<SubchannelListType,
242 SubchannelDataType>::StartConnectivityWatchLocked() {
243 if (subchannel_list_->tracer()->enabled()) {
244 gpr_log(
245 GPR_DEBUG,
246 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
247 " (subchannel %p): requesting connectivity change "
248 "notification (from %s)",
249 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
250 subchannel_list_, Index(),
251 subchannel_list_->num_subchannels(), subchannel_,
252 grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
253 }
254 connectivity_notification_pending_ = true;
255 grpc_subchannel_notify_on_state_change(
256 subchannel_, subchannel_list_->policy()->interested_parties(),
257 &pending_connectivity_state_unsafe_,
258 &connectivity_changed_closure_);
259}
260
261template <typename SubchannelListType, typename SubchannelDataType>
262void SubchannelData<SubchannelListType,
263 SubchannelDataType>::StopConnectivityWatchLocked() {
264 if (subchannel_list_->tracer()->enabled()) {
265 gpr_log(GPR_DEBUG,
266 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
267 " (subchannel %p): stopping connectivity watch",
268 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
269 subchannel_list_, Index(),
270 subchannel_list_->num_subchannels(), subchannel_);
271 }
272 GPR_ASSERT(connectivity_notification_pending_);
273 connectivity_notification_pending_ = false;
274}
275
276template <typename SubchannelListType, typename SubchannelDataType>
277void SubchannelData<SubchannelListType,
278 SubchannelDataType>::CancelConnectivityWatchLocked(
279 const char* reason) {
280 if (subchannel_list_->tracer()->enabled()) {
281 gpr_log(GPR_DEBUG,
282 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
283 " (subchannel %p): canceling connectivity watch (%s)",
284 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
285 subchannel_list_, Index(),
286 subchannel_list_->num_subchannels(), subchannel_, reason);
287 }
288 grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr,
289 &connectivity_changed_closure_);
290}
291
292template <typename SubchannelListType, typename SubchannelDataType>
293void SubchannelData<SubchannelListType,
294 SubchannelDataType>::OnConnectivityChangedLocked(
295 void* arg, grpc_error* error) {
296 SubchannelData* sd = static_cast<SubchannelData*>(arg);
297 // Now that we're inside the combiner, copy the pending connectivity
298 // state (which was set by the connectivity state watcher) to
299 // curr_connectivity_state_, which is what we use inside of the combiner.
300 sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_;
Mark D. Rothdb0a4752018-03-30 14:44:27 -0700301 // If we get TRANSIENT_FAILURE, unref the connected subchannel.
302 if (sd->curr_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
303 sd->connected_subchannel_.reset();
304 }
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700305 sd->ProcessConnectivityChangeLocked(error);
306}
307
308template <typename SubchannelListType, typename SubchannelDataType>
309void SubchannelData<SubchannelListType,
310 SubchannelDataType>::ShutdownLocked(const char* reason) {
311 // If there's a pending notification for this subchannel, cancel it;
312 // the callback is responsible for unreffing the subchannel.
313 // Otherwise, unref the subchannel directly.
314 if (connectivity_notification_pending_) {
315 CancelConnectivityWatchLocked(reason);
316 } else if (subchannel_ != nullptr) {
317 UnrefSubchannelLocked(reason);
318 }
319}
320
321//
322// SubchannelList
323//
324
325template <typename SubchannelListType, typename SubchannelDataType>
326SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
327 LoadBalancingPolicy* policy, TraceFlag* tracer,
Mark D. Rothc8875492018-02-20 08:33:48 -0800328 const grpc_lb_addresses* addresses, grpc_combiner* combiner,
329 grpc_client_channel_factory* client_channel_factory,
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700330 const grpc_channel_args& args)
331 : RefCountedWithTracing<SubchannelListType>(tracer),
332 policy_(policy),
333 tracer_(tracer) {
334 if (tracer_->enabled()) {
335 gpr_log(GPR_DEBUG,
336 "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
337 tracer_->name(), policy, this, addresses->num_addresses);
338 }
339 subchannels_.reserve(addresses->num_addresses);
340 // We need to remove the LB addresses in order to be able to compare the
341 // subchannel keys of subchannels from a different batch of addresses.
342 static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
343 GRPC_ARG_LB_ADDRESSES};
344 // Create a subchannel for each address.
345 grpc_subchannel_args sc_args;
346 for (size_t i = 0; i < addresses->num_addresses; i++) {
347 // If there were any balancer, we would have chosen grpclb policy instead.
348 GPR_ASSERT(!addresses->addresses[i].is_balancer);
349 memset(&sc_args, 0, sizeof(grpc_subchannel_args));
350 grpc_arg addr_arg =
351 grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
352 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
353 &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
354 gpr_free(addr_arg.value.string);
355 sc_args.args = new_args;
356 grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
357 client_channel_factory, &sc_args);
358 grpc_channel_args_destroy(new_args);
359 if (subchannel == nullptr) {
360 // Subchannel could not be created.
361 if (tracer_->enabled()) {
362 char* address_uri =
363 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
364 gpr_log(GPR_DEBUG,
365 "[%s %p] could not create subchannel for address uri %s, "
366 "ignoring",
367 tracer_->name(), policy_, address_uri);
368 gpr_free(address_uri);
369 }
370 continue;
371 }
372 if (tracer_->enabled()) {
373 char* address_uri =
374 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
375 gpr_log(GPR_DEBUG,
376 "[%s %p] subchannel list %p index %" PRIuPTR
377 ": Created subchannel %p for address uri %s",
378 tracer_->name(), policy_, this, subchannels_.size(), subchannel,
379 address_uri);
380 gpr_free(address_uri);
381 }
382 subchannels_.emplace_back(static_cast<SubchannelListType*>(this),
383 addresses->user_data_vtable,
384 addresses->addresses[i], subchannel, combiner);
385 }
386}
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700387
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700388template <typename SubchannelListType, typename SubchannelDataType>
389SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
390 if (tracer_->enabled()) {
391 gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p",
392 tracer_->name(), policy_, this);
393 }
394}
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700395
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700396template <typename SubchannelListType, typename SubchannelDataType>
397void SubchannelList<SubchannelListType,
398 SubchannelDataType>::ShutdownLocked(const char* reason) {
399 if (tracer_->enabled()) {
400 gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
401 tracer_->name(), policy_, this, reason);
402 }
403 GPR_ASSERT(!shutting_down_);
404 shutting_down_ = true;
405 for (size_t i = 0; i < subchannels_.size(); i++) {
406 SubchannelDataType* sd = &subchannels_[i];
407 sd->ShutdownLocked(reason);
408 }
409}
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700410
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700411} // namespace grpc_core
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700412
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700413#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */