blob: 8b843b16c00a24c6ba00f77a7bbadd1e8bfdadb0 [file] [log] [blame]
Mark D. Roth5e9848e2017-10-06 13:59:32 -07001/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
20#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
21
Alexander Polcyndb3e8982018-02-21 16:59:24 -080022#include <grpc/support/port_platform.h>
23
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070024#include <string.h>
25
26#include <grpc/support/alloc.h>
27
Mark D. Roth5e9848e2017-10-06 13:59:32 -070028#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
29#include "src/core/ext/filters/client_channel/subchannel.h"
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070030#include "src/core/lib/channel/channel_args.h"
Mark D. Roth901bb4f2017-10-11 08:49:07 -070031#include "src/core/lib/debug/trace.h"
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070032#include "src/core/lib/gprpp/abstract.h"
33#include "src/core/lib/gprpp/inlined_vector.h"
34#include "src/core/lib/gprpp/ref_counted.h"
Mark D. Roth4f2b0fd2018-01-19 12:12:23 -080035#include "src/core/lib/gprpp/ref_counted_ptr.h"
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070036#include "src/core/lib/iomgr/closure.h"
37#include "src/core/lib/iomgr/combiner.h"
38#include "src/core/lib/iomgr/sockaddr_utils.h"
Mark D. Roth5e9848e2017-10-06 13:59:32 -070039#include "src/core/lib/transport/connectivity_state.h"
40
Mark D. Roth485c4ba2018-04-03 11:36:23 -070041// Code for maintaining a list of subchannels within an LB policy.
42//
43// To use this, callers must create their own subclasses, like so:
44/*
45
46class MySubchannelList; // Forward declaration.
47
48class MySubchannelData
49 : public SubchannelData<MySubchannelList, MySubchannelData> {
50 public:
51 void ProcessConnectivityChangeLocked(grpc_error* error) override {
52 // ...code to handle connectivity changes...
53 }
54};
55
56class MySubchannelList
57 : public SubchannelList<MySubchannelList, MySubchannelData> {
58};
59
60*/
61// All methods with a Locked() suffix must be called from within the
62// client_channel combiner.
Mark D. Roth5e9848e2017-10-06 13:59:32 -070063
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070064namespace grpc_core {
Mark D. Roth5e9848e2017-10-06 13:59:32 -070065
Mark D. Roth485c4ba2018-04-03 11:36:23 -070066// Stores data for a particular subchannel in a subchannel list.
67// Callers must create a subclass that implements the
68// ProcessConnectivityChangeLocked() method.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070069template <typename SubchannelListType, typename SubchannelDataType>
70class SubchannelData {
71 public:
Mark D. Roth485c4ba2018-04-03 11:36:23 -070072 // Returns a pointer to the subchannel list containing this object.
73 SubchannelListType* subchannel_list() const { return subchannel_list_; }
74
75 // Returns the index into the subchannel list of this object.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070076 size_t Index() const {
77 return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
78 subchannel_list_->subchannel(0));
79 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -070080
Mark D. Roth485c4ba2018-04-03 11:36:23 -070081 // Returns a pointer to the subchannel.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070082 grpc_subchannel* subchannel() const { return subchannel_; }
Mark D. Roth5e9848e2017-10-06 13:59:32 -070083
Mark D. Roth485c4ba2018-04-03 11:36:23 -070084 // Returns the connected subchannel. Will be null if the subchannel
85 // is not connected.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -070086 ConnectedSubchannel* connected_subchannel() const {
87 return connected_subchannel_.get();
88 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -070089
Mark D. Roth485c4ba2018-04-03 11:36:23 -070090 // The current connectivity state.
91 // May be called from ProcessConnectivityChangeLocked() to determine
92 // the state that the subchannel has transitioned into.
93 grpc_connectivity_state connectivity_state() const {
94 return curr_connectivity_state_;
95 }
96
Mark D. Roth542bceb2018-04-12 15:08:36 -070097// FIXME: remove
Mark D. Rothdb0a4752018-03-30 14:44:27 -070098 // An alternative to SetConnectedSubchannelFromSubchannelLocked() for
99 // cases where we are retaining a connected subchannel from a previous
100 // subchannel list. This is slightly more efficient than getting the
101 // connected subchannel from the subchannel, because that approach
102 // requires the use of a mutex, whereas this one only mutates a
103 // refcount.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700104 void SetConnectedSubchannelFromLocked(SubchannelData* other) {
Mark D. Rothdb0a4752018-03-30 14:44:27 -0700105 GPR_ASSERT(subchannel_ == other->subchannel_);
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700106 connected_subchannel_ = other->connected_subchannel_; // Adds ref.
107 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700108
Mark D. Roth485c4ba2018-04-03 11:36:23 -0700109 // Synchronously checks the subchannel's connectivity state. Calls
110 // ProcessConnectivityChangeLocked() if the state has changed.
111 // Must not be called while there is a connectivity notification
Mark D. Rothb1c13092018-04-25 12:39:45 -0700112 // pending (i.e., between calling StartOrRenewConnectivityWatchLocked()
113 // and the resulting invocation of ProcessConnectivityChangeLocked()).
Mark D. Roth8c93fc82018-04-03 10:42:10 -0700114 void CheckConnectivityStateLocked() {
115 GPR_ASSERT(!connectivity_notification_pending_);
116 grpc_error* error = GRPC_ERROR_NONE;
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700117 pending_connectivity_state_unsafe_ =
Mark D. Roth8c93fc82018-04-03 10:42:10 -0700118 grpc_subchannel_check_connectivity(subchannel(), &error);
Mark D. Roth253358d2018-04-25 08:07:54 -0700119 UpdateConnectedSubchannelLocked();
Mark D. Roth8c93fc82018-04-03 10:42:10 -0700120 if (pending_connectivity_state_unsafe_ != curr_connectivity_state_) {
121 curr_connectivity_state_ = pending_connectivity_state_unsafe_;
122 ProcessConnectivityChangeLocked(error);
Mark D. Roth99dbd7a2018-04-04 14:05:53 -0700123 } else {
124 GRPC_ERROR_UNREF(error);
Mark D. Roth8c93fc82018-04-03 10:42:10 -0700125 }
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700126 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700127
Mark D. Roth485c4ba2018-04-03 11:36:23 -0700128 // Unrefs the subchannel. May be used if an individual subchannel is
129 // no longer needed even though the subchannel list as a whole is not
130 // being unreffed.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700131 virtual void UnrefSubchannelLocked(const char* reason);
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700132
Mark D. Roth485c4ba2018-04-03 11:36:23 -0700133 // Starts or renewes watching the connectivity state of the subchannel.
134 // ProcessConnectivityChangeLocked() will be called when the
135 // connectivity state changes.
Mark D. Rothb1c13092018-04-25 12:39:45 -0700136 void StartOrRenewConnectivityWatchLocked();
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700137
Mark D. Roth485c4ba2018-04-03 11:36:23 -0700138 // Stops watching the connectivity state of the subchannel.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700139 void StopConnectivityWatchLocked();
140
Mark D. Roth485c4ba2018-04-03 11:36:23 -0700141 // Cancels watching the connectivity state of the subchannel.
142 // Must be called only while there is a connectivity notification
Mark D. Rothb1c13092018-04-25 12:39:45 -0700143 // pending (i.e., between calling StartOrRenewConnectivityWatchLocked()
144 // and the resulting invocation of ProcessConnectivityChangeLocked()).
Mark D. Roth485c4ba2018-04-03 11:36:23 -0700145 // From within ProcessConnectivityChangeLocked(), use
146 // StopConnectivityWatchLocked() instead.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700147 void CancelConnectivityWatchLocked(const char* reason);
148
Mark D. Roth485c4ba2018-04-03 11:36:23 -0700149 // Cancels any pending connectivity watch and unrefs the subchannel.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700150 void ShutdownLocked(const char* reason);
151
152 GRPC_ABSTRACT_BASE_CLASS
153
154 protected:
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700155 SubchannelData(SubchannelListType* subchannel_list,
156 const grpc_lb_user_data_vtable* user_data_vtable,
157 const grpc_lb_address& address, grpc_subchannel* subchannel,
158 grpc_combiner* combiner);
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700159
160 virtual ~SubchannelData();
161
Mark D. Rothb1c13092018-04-25 12:39:45 -0700162 // After StartOrRenewConnectivityWatchLocked() is called, this method
163 // will be invoked when the subchannel's connectivity state changes.
Mark D. Roth485c4ba2018-04-03 11:36:23 -0700164 // Implementations can use connectivity_state() to get the new
165 // connectivity state.
166 // Implementations must invoke either StopConnectivityWatch() or again
167 // call StartConnectivityWatch() before returning.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700168 virtual void ProcessConnectivityChangeLocked(grpc_error* error) GRPC_ABSTRACT;
169
170 private:
Mark D. Roth253358d2018-04-25 08:07:54 -0700171// FIXME: document
172 bool UpdateConnectedSubchannelLocked();
173
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700174 static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
175
176 // Backpointer to owning subchannel list. Not owned.
177 SubchannelListType* subchannel_list_;
178
179 // The subchannel and connected subchannel.
180 grpc_subchannel* subchannel_;
181 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
182
183 // Notification that connectivity has changed on subchannel.
184 grpc_closure connectivity_changed_closure_;
185 // Is a connectivity notification pending?
Mark D. Rothdb0a4752018-03-30 14:44:27 -0700186 bool connectivity_notification_pending_ = false;
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700187 // Connectivity state to be updated by
188 // grpc_subchannel_notify_on_state_change(), not guarded by
Mark D. Roth485c4ba2018-04-03 11:36:23 -0700189 // the combiner. Will be copied to curr_connectivity_state_ by
190 // OnConnectivityChangedLocked().
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700191 grpc_connectivity_state pending_connectivity_state_unsafe_;
192 // Current connectivity state.
Mark D. Roth542bceb2018-04-12 15:08:36 -0700193// FIXME: move this into RR, not needed in PF because connectivity_state
194// is only used in ProcessConnectivityChangeLocked()
195// (maybe pass it as a param and eliminate the accessor method?)
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700196 grpc_connectivity_state curr_connectivity_state_;
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700197};
198
Mark D. Roth485c4ba2018-04-03 11:36:23 -0700199// A list of subchannels.
Mark D. Roth542bceb2018-04-12 15:08:36 -0700200// FIXME: make this InternallyRefCounted, and have Orphan() do
201// ShutdownLocked()?
202// (also, maybe we don't need to take a ref to the LB policy anymore?)
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700203template <typename SubchannelListType, typename SubchannelDataType>
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700204class SubchannelList : public RefCountedWithTracing<SubchannelListType> {
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700205 public:
206 typedef InlinedVector<SubchannelDataType, 10> SubchannelVector;
207
Mark D. Roth9b620a22018-04-03 11:57:45 -0700208 // The number of subchannels in the list.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700209 size_t num_subchannels() const { return subchannels_.size(); }
Mark D. Roth9b620a22018-04-03 11:57:45 -0700210
211 // The data for the subchannel at a particular index.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700212 SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
213
214 // Marks the subchannel_list as discarded. Unsubscribes all its subchannels.
215 void ShutdownLocked(const char* reason);
216
Mark D. Roth9b620a22018-04-03 11:57:45 -0700217 // Returns true if the subchannel list is shutting down.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700218 bool shutting_down() const { return shutting_down_; }
219
Mark D. Roth9b620a22018-04-03 11:57:45 -0700220 // Accessors.
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700221 LoadBalancingPolicy* policy() const { return policy_; }
222 TraceFlag* tracer() const { return tracer_; }
223
224 GRPC_ABSTRACT_BASE_CLASS
225
226 protected:
227 SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
228 const grpc_lb_addresses* addresses, grpc_combiner* combiner,
229 grpc_client_channel_factory* client_channel_factory,
230 const grpc_channel_args& args);
231
232 virtual ~SubchannelList();
233
234 private:
235 // So New() can call our private ctor.
236 template <typename T, typename... Args>
237 friend T* New(Args&&... args);
238
239 // Backpointer to owning policy.
240 LoadBalancingPolicy* policy_;
241
242 TraceFlag* tracer_;
243
244 // The list of subchannels.
245 SubchannelVector subchannels_;
246
247 // Is this list shutting down? This may be true due to the shutdown of the
248 // policy itself or because a newer update has arrived while this one hadn't
249 // finished processing.
250 bool shutting_down_ = false;
251};
252
253//
254// implementation -- no user-servicable parts below
255//
256
257//
258// SubchannelData
259//
260
261template <typename SubchannelListType, typename SubchannelDataType>
262SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
263 SubchannelListType* subchannel_list,
264 const grpc_lb_user_data_vtable* user_data_vtable,
265 const grpc_lb_address& address, grpc_subchannel* subchannel,
266 grpc_combiner* combiner)
267 : subchannel_list_(subchannel_list),
268 subchannel_(subchannel),
269 // We assume that the current state is IDLE. If not, we'll get a
270 // callback telling us that.
271 pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE),
272 curr_connectivity_state_(GRPC_CHANNEL_IDLE) {
273 GRPC_CLOSURE_INIT(
274 &connectivity_changed_closure_,
275 (&SubchannelData<SubchannelListType,
276 SubchannelDataType>::OnConnectivityChangedLocked),
277 this, grpc_combiner_scheduler(combiner));
278}
279
280template <typename SubchannelListType, typename SubchannelDataType>
281SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
282 UnrefSubchannelLocked("subchannel_data_destroy");
283}
284
285template <typename SubchannelListType, typename SubchannelDataType>
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700286void SubchannelData<SubchannelListType, SubchannelDataType>::
287 UnrefSubchannelLocked(const char* reason) {
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700288 if (subchannel_ != nullptr) {
289 if (subchannel_list_->tracer()->enabled()) {
290 gpr_log(GPR_DEBUG,
291 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
292 " (subchannel %p): unreffing subchannel",
293 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700294 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
295 subchannel_);
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700296 }
297 GRPC_SUBCHANNEL_UNREF(subchannel_, reason);
298 subchannel_ = nullptr;
299 connected_subchannel_.reset();
300 }
301}
302
303template <typename SubchannelListType, typename SubchannelDataType>
304void SubchannelData<SubchannelListType,
Mark D. Rothb1c13092018-04-25 12:39:45 -0700305 SubchannelDataType>::StartOrRenewConnectivityWatchLocked() {
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700306 if (subchannel_list_->tracer()->enabled()) {
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700307 gpr_log(GPR_DEBUG,
308 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
309 " (subchannel %p): requesting connectivity change "
310 "notification (from %s)",
311 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
312 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
313 subchannel_,
314 grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700315 }
Mark D. Rothb1c13092018-04-25 12:39:45 -0700316 if (!connectivity_notification_pending_) {
317 subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release();
318 connectivity_notification_pending_ = true;
319 }
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700320 grpc_subchannel_notify_on_state_change(
321 subchannel_, subchannel_list_->policy()->interested_parties(),
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700322 &pending_connectivity_state_unsafe_, &connectivity_changed_closure_);
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700323}
324
325template <typename SubchannelListType, typename SubchannelDataType>
326void SubchannelData<SubchannelListType,
327 SubchannelDataType>::StopConnectivityWatchLocked() {
328 if (subchannel_list_->tracer()->enabled()) {
329 gpr_log(GPR_DEBUG,
330 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
331 " (subchannel %p): stopping connectivity watch",
332 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700333 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
334 subchannel_);
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700335 }
336 GPR_ASSERT(connectivity_notification_pending_);
337 connectivity_notification_pending_ = false;
Mark D. Rothb1c13092018-04-25 12:39:45 -0700338 subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch");
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700339}
340
341template <typename SubchannelListType, typename SubchannelDataType>
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700342void SubchannelData<SubchannelListType, SubchannelDataType>::
343 CancelConnectivityWatchLocked(const char* reason) {
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700344 if (subchannel_list_->tracer()->enabled()) {
345 gpr_log(GPR_DEBUG,
346 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
347 " (subchannel %p): canceling connectivity watch (%s)",
348 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700349 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
350 subchannel_, reason);
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700351 }
352 grpc_subchannel_notify_on_state_change(subchannel_, nullptr, nullptr,
353 &connectivity_changed_closure_);
354}
355
356template <typename SubchannelListType, typename SubchannelDataType>
Mark D. Roth253358d2018-04-25 08:07:54 -0700357bool SubchannelData<SubchannelListType, SubchannelDataType>::
358 UpdateConnectedSubchannelLocked() {
359// FIXME: add trace logging
360 // If the subchannel is READY, get a ref to the connected subchannel.
361 if (pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) {
362 connected_subchannel_ =
363 grpc_subchannel_get_connected_subchannel(subchannel_);
364 // If the subchannel became disconnected between the time that READY
365 // was reported and the time we got here (e.g., between when a
366 // notification callback is scheduled and when it was actually run in
367 // the combiner), then the connected subchannel may have disappeared out
368 // from under us. In that case, we don't actually want to consider the
369 // subchannel to be in state READY. Instead, we use IDLE as the
370 // basis for any future connectivity watch; this is the one state that
371 // the subchannel will never transition back into, so this ensures
372 // that we will get a notification for the next state, even if that state
373 // is READY again (e.g., if the subchannel has transitioned back to
374 // READY before the next watch gets requested).
375 if (connected_subchannel_ == nullptr) {
376 pending_connectivity_state_unsafe_ = GRPC_CHANNEL_IDLE;
377 return false;
378 }
379 }
380// FIXME: do this for any other state?
381 // If we get TRANSIENT_FAILURE, unref the connected subchannel.
382 else if (pending_connectivity_state_unsafe_ ==
383 GRPC_CHANNEL_TRANSIENT_FAILURE) {
384 connected_subchannel_.reset();
385 }
386 return true;
387}
388
389template <typename SubchannelListType, typename SubchannelDataType>
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700390void SubchannelData<SubchannelListType, SubchannelDataType>::
391 OnConnectivityChangedLocked(void* arg, grpc_error* error) {
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700392 SubchannelData* sd = static_cast<SubchannelData*>(arg);
Mark D. Roth542bceb2018-04-12 15:08:36 -0700393// FIXME: add trace logging
Mark D. Rothb1c13092018-04-25 12:39:45 -0700394 if (sd->subchannel_list()->shutting_down() || error == GRPC_ERROR_CANCELLED) {
395 sd->UnrefSubchannelLocked("connectivity_shutdown");
396 sd->StopConnectivityWatchLocked();
397 return;
398 }
Mark D. Roth253358d2018-04-25 08:07:54 -0700399 if (!sd->UpdateConnectedSubchannelLocked()) {
400 // We don't want to report this connectivity state, so renew the watch.
Mark D. Rothb1c13092018-04-25 12:39:45 -0700401 sd->StartOrRenewConnectivityWatchLocked();
Mark D. Roth253358d2018-04-25 08:07:54 -0700402 return;
Mark D. Roth542bceb2018-04-12 15:08:36 -0700403 }
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700404 // Now that we're inside the combiner, copy the pending connectivity
405 // state (which was set by the connectivity state watcher) to
406 // curr_connectivity_state_, which is what we use inside of the combiner.
407 sd->curr_connectivity_state_ = sd->pending_connectivity_state_unsafe_;
Mark D. Roth542bceb2018-04-12 15:08:36 -0700408 // Call the subclass's ProcessConnectivityChangeLocked() method.
Mark D. Roth99dbd7a2018-04-04 14:05:53 -0700409 sd->ProcessConnectivityChangeLocked(GRPC_ERROR_REF(error));
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700410}
411
412template <typename SubchannelListType, typename SubchannelDataType>
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700413void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked(
414 const char* reason) {
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700415 // If there's a pending notification for this subchannel, cancel it;
416 // the callback is responsible for unreffing the subchannel.
417 // Otherwise, unref the subchannel directly.
418 if (connectivity_notification_pending_) {
419 CancelConnectivityWatchLocked(reason);
420 } else if (subchannel_ != nullptr) {
421 UnrefSubchannelLocked(reason);
422 }
423}
424
425//
426// SubchannelList
427//
428
429template <typename SubchannelListType, typename SubchannelDataType>
430SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
431 LoadBalancingPolicy* policy, TraceFlag* tracer,
Mark D. Rothc8875492018-02-20 08:33:48 -0800432 const grpc_lb_addresses* addresses, grpc_combiner* combiner,
433 grpc_client_channel_factory* client_channel_factory,
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700434 const grpc_channel_args& args)
435 : RefCountedWithTracing<SubchannelListType>(tracer),
436 policy_(policy),
437 tracer_(tracer) {
438 if (tracer_->enabled()) {
439 gpr_log(GPR_DEBUG,
440 "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
441 tracer_->name(), policy, this, addresses->num_addresses);
442 }
443 subchannels_.reserve(addresses->num_addresses);
444 // We need to remove the LB addresses in order to be able to compare the
445 // subchannel keys of subchannels from a different batch of addresses.
446 static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
447 GRPC_ARG_LB_ADDRESSES};
448 // Create a subchannel for each address.
449 grpc_subchannel_args sc_args;
450 for (size_t i = 0; i < addresses->num_addresses; i++) {
451 // If there were any balancer, we would have chosen grpclb policy instead.
452 GPR_ASSERT(!addresses->addresses[i].is_balancer);
453 memset(&sc_args, 0, sizeof(grpc_subchannel_args));
454 grpc_arg addr_arg =
455 grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
456 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
457 &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1);
458 gpr_free(addr_arg.value.string);
459 sc_args.args = new_args;
460 grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
461 client_channel_factory, &sc_args);
462 grpc_channel_args_destroy(new_args);
463 if (subchannel == nullptr) {
464 // Subchannel could not be created.
465 if (tracer_->enabled()) {
466 char* address_uri =
467 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
468 gpr_log(GPR_DEBUG,
469 "[%s %p] could not create subchannel for address uri %s, "
470 "ignoring",
471 tracer_->name(), policy_, address_uri);
472 gpr_free(address_uri);
473 }
474 continue;
475 }
476 if (tracer_->enabled()) {
477 char* address_uri =
478 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
479 gpr_log(GPR_DEBUG,
480 "[%s %p] subchannel list %p index %" PRIuPTR
481 ": Created subchannel %p for address uri %s",
482 tracer_->name(), policy_, this, subchannels_.size(), subchannel,
483 address_uri);
484 gpr_free(address_uri);
485 }
486 subchannels_.emplace_back(static_cast<SubchannelListType*>(this),
487 addresses->user_data_vtable,
488 addresses->addresses[i], subchannel, combiner);
489 }
490}
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700491
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700492template <typename SubchannelListType, typename SubchannelDataType>
493SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
494 if (tracer_->enabled()) {
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700495 gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
496 policy_, this);
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700497 }
498}
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700499
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700500template <typename SubchannelListType, typename SubchannelDataType>
Mark D. Roth5dd42ab2018-04-04 07:40:19 -0700501void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked(
502 const char* reason) {
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700503 if (tracer_->enabled()) {
504 gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
505 tracer_->name(), policy_, this, reason);
506 }
507 GPR_ASSERT(!shutting_down_);
508 shutting_down_ = true;
509 for (size_t i = 0; i < subchannels_.size(); i++) {
510 SubchannelDataType* sd = &subchannels_[i];
511 sd->ShutdownLocked(reason);
512 }
513}
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700514
Mark D. Roth7c1b5db2018-03-30 13:28:56 -0700515} // namespace grpc_core
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700516
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700517#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */