blob: cf3e9525ef34e9afc2e74356b6f4a9f156bc4ca4 [file] [log] [blame]
Mark D. Roth5e9848e2017-10-06 13:59:32 -07001/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#include <string.h>
20
21#include <grpc/support/alloc.h>
22
23#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
24#include "src/core/lib/channel/channel_args.h"
25#include "src/core/lib/debug/trace.h"
26#include "src/core/lib/iomgr/closure.h"
27#include "src/core/lib/iomgr/combiner.h"
28#include "src/core/lib/iomgr/sockaddr_utils.h"
29#include "src/core/lib/transport/connectivity_state.h"
30
Mark D. Roth5e9848e2017-10-06 13:59:32 -070031void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx,
32 grpc_lb_subchannel_data *sd,
33 const char *reason) {
34 if (sd->subchannel != NULL) {
Mark D. Roth901bb4f2017-10-11 08:49:07 -070035 if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
Mark D. Roth9843ec72017-10-11 13:00:04 -070036 gpr_log(
37 GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR
38 " of %" PRIdPTR " (subchannel %p): unreffing subchannel",
39 sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
40 sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
41 sd->subchannel_list->num_subchannels, sd->subchannel);
Mark D. Roth5e9848e2017-10-06 13:59:32 -070042 }
43 GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason);
44 sd->subchannel = NULL;
45 if (sd->connected_subchannel != NULL) {
46 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, sd->connected_subchannel,
47 reason);
48 sd->connected_subchannel = NULL;
49 }
50 if (sd->user_data != NULL) {
51 GPR_ASSERT(sd->user_data_vtable != NULL);
52 sd->user_data_vtable->destroy(exec_ctx, sd->user_data);
53 sd->user_data = NULL;
54 }
55 }
56}
57
58void grpc_lb_subchannel_data_start_connectivity_watch(
59 grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) {
Mark D. Roth901bb4f2017-10-11 08:49:07 -070060 if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
Mark D. Roth5e9848e2017-10-06 13:59:32 -070061 gpr_log(GPR_DEBUG,
Mark D. Roth901bb4f2017-10-11 08:49:07 -070062 "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR
Mark D. Roth5e9848e2017-10-06 13:59:32 -070063 " (subchannel %p): requesting connectivity change notification",
Mark D. Roth901bb4f2017-10-11 08:49:07 -070064 sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
65 sd->subchannel_list,
Mark D. Rothbf6a86a2017-10-09 12:23:37 -070066 (size_t)(sd - sd->subchannel_list->subchannels),
Mark D. Roth5e9848e2017-10-06 13:59:32 -070067 sd->subchannel_list->num_subchannels, sd->subchannel);
68 }
69 sd->connectivity_notification_pending = true;
70 grpc_subchannel_notify_on_state_change(
71 exec_ctx, sd->subchannel, sd->subchannel_list->policy->interested_parties,
72 &sd->pending_connectivity_state_unsafe,
73 &sd->connectivity_changed_closure);
74}
75
76void grpc_lb_subchannel_data_stop_connectivity_watch(
77 grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) {
Mark D. Roth901bb4f2017-10-11 08:49:07 -070078 if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
Mark D. Roth9843ec72017-10-11 13:00:04 -070079 gpr_log(
80 GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR
81 " (subchannel %p): stopping connectivity watch",
82 sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
83 sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
84 sd->subchannel_list->num_subchannels, sd->subchannel);
Mark D. Roth5e9848e2017-10-06 13:59:32 -070085 }
Mark D. Roth7ba40ba2017-10-10 13:17:13 -070086 GPR_ASSERT(sd->connectivity_notification_pending);
Mark D. Roth5e9848e2017-10-06 13:59:32 -070087 sd->connectivity_notification_pending = false;
88}
89
90grpc_lb_subchannel_list *grpc_lb_subchannel_list_create(
Mark D. Roth901bb4f2017-10-11 08:49:07 -070091 grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer,
Mark D. Roth5e9848e2017-10-06 13:59:32 -070092 const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args,
93 grpc_iomgr_cb_func connectivity_changed_cb) {
94 grpc_lb_subchannel_list *subchannel_list =
95 (grpc_lb_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list));
Mark D. Roth901bb4f2017-10-11 08:49:07 -070096 if (GRPC_TRACER_ON(*tracer)) {
Mark D. Roth5e9848e2017-10-06 13:59:32 -070097 gpr_log(GPR_DEBUG,
Mark D. Roth901bb4f2017-10-11 08:49:07 -070098 "[%s %p] Creating subchannel list %p for %" PRIdPTR " subchannels",
99 tracer->name, p, subchannel_list, addresses->num_addresses);
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700100 }
101 subchannel_list->policy = p;
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700102 subchannel_list->tracer = tracer;
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700103 gpr_ref_init(&subchannel_list->refcount, 1);
104 subchannel_list->subchannels = (grpc_lb_subchannel_data *)gpr_zalloc(
105 sizeof(grpc_lb_subchannel_data) * addresses->num_addresses);
Mark D. Roth5132d0e2017-10-13 13:20:52 -0700106 // We need to remove the LB addresses in order to be able to compare the
107 // subchannel keys of subchannels from a different batch of addresses.
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700108 static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
109 GRPC_ARG_LB_ADDRESSES};
Mark D. Roth99f54e12017-10-16 09:55:53 -0700110 // Create a subchannel for each address.
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700111 grpc_subchannel_args sc_args;
112 size_t subchannel_index = 0;
113 for (size_t i = 0; i < addresses->num_addresses; i++) {
114 // If there were any balancer, we would have chosen grpclb policy instead.
115 GPR_ASSERT(!addresses->addresses[i].is_balancer);
116 memset(&sc_args, 0, sizeof(grpc_subchannel_args));
117 grpc_arg addr_arg =
118 grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
119 grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
120 args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
121 1);
122 gpr_free(addr_arg.value.string);
123 sc_args.args = new_args;
124 grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
125 exec_ctx, args->client_channel_factory, &sc_args);
126 grpc_channel_args_destroy(exec_ctx, new_args);
Mark D. Roth0c11eba2017-10-10 15:02:43 -0700127 if (subchannel == NULL) {
128 // Subchannel could not be created.
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700129 if (GRPC_TRACER_ON(*tracer)) {
Mark D. Roth0c11eba2017-10-10 15:02:43 -0700130 char *address_uri =
131 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
132 gpr_log(GPR_DEBUG,
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700133 "[%s %p] could not create subchannel for address uri %s, "
Mark D. Roth0c11eba2017-10-10 15:02:43 -0700134 "ignoring",
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700135 tracer->name, subchannel_list->policy, address_uri);
Mark D. Roth0c11eba2017-10-10 15:02:43 -0700136 gpr_free(address_uri);
137 }
138 continue;
139 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700140 grpc_error *error;
141 // Get the connectivity state of the subchannel. Already existing ones may
142 // be in a state other than INIT.
143 const grpc_connectivity_state subchannel_connectivity_state =
144 grpc_subchannel_check_connectivity(subchannel, &error);
145 if (error != GRPC_ERROR_NONE) {
146 // The subchannel is in error (e.g. shutting down). Ignore it.
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700147 if (GRPC_TRACER_ON(*tracer)) {
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700148 char *address_uri =
149 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
150 gpr_log(GPR_DEBUG,
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700151 "[%s %p] subchannel for address uri %s shutting down, ignoring",
152 tracer->name, subchannel_list->policy, address_uri);
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700153 gpr_free(address_uri);
154 }
155 GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "new_sc_connectivity_error");
156 GRPC_ERROR_UNREF(error);
157 continue;
158 }
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700159 if (GRPC_TRACER_ON(*tracer)) {
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700160 char *address_uri =
161 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700162 gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR
Mark D. Roth62ca6ce2017-10-10 10:01:51 -0700163 ": Created subchannel %p for address uri %s; "
164 "initial connectivity state: %s",
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700165 tracer->name, p, subchannel_list, subchannel_index, subchannel,
166 address_uri,
Mark D. Roth62ca6ce2017-10-10 10:01:51 -0700167 grpc_connectivity_state_name(subchannel_connectivity_state));
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700168 gpr_free(address_uri);
169 }
170 grpc_lb_subchannel_data *sd =
171 &subchannel_list->subchannels[subchannel_index++];
172 sd->subchannel_list = subchannel_list;
173 sd->subchannel = subchannel;
174 GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure,
175 connectivity_changed_cb, sd,
176 grpc_combiner_scheduler(args->combiner));
Mark D. Roth5132d0e2017-10-13 13:20:52 -0700177 // Use some sentinel value outside of the range of
178 // grpc_connectivity_state to signal an undefined previous state.
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700179 sd->prev_connectivity_state = GRPC_CHANNEL_INIT;
180 sd->curr_connectivity_state = subchannel_connectivity_state;
Mark D. Roth57cdb162017-10-23 12:37:07 -0700181 sd->pending_connectivity_state_unsafe = subchannel_connectivity_state;
Mark D. Roth6c556912017-10-25 07:59:51 -0700182 if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
183 sd->connected_subchannel = GRPC_CONNECTED_SUBCHANNEL_REF(
184 grpc_subchannel_get_connected_subchannel(sd->subchannel),
185 "ready_at_sl_creation");
186 }
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700187 sd->user_data_vtable = addresses->user_data_vtable;
188 if (sd->user_data_vtable != NULL) {
189 sd->user_data =
190 sd->user_data_vtable->copy(addresses->addresses[i].user_data);
191 }
192 }
193 subchannel_list->num_subchannels = subchannel_index;
194 return subchannel_list;
195}
196
197static void subchannel_list_destroy(grpc_exec_ctx *exec_ctx,
198 grpc_lb_subchannel_list *subchannel_list) {
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700199 if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
200 gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p",
201 subchannel_list->tracer->name, subchannel_list->policy,
202 subchannel_list);
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700203 }
204 for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
205 grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i];
206 grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
207 "subchannel_list_destroy");
208 }
209 gpr_free(subchannel_list->subchannels);
210 gpr_free(subchannel_list);
211}
212
213void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list,
214 const char *reason) {
215 gpr_ref_non_zero(&subchannel_list->refcount);
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700216 if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700217 const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700218 gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)",
219 subchannel_list->tracer->name, subchannel_list->policy,
Mark D. Roth9843ec72017-10-11 13:00:04 -0700220 subchannel_list, (unsigned long)(count - 1), (unsigned long)count,
221 reason);
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700222 }
223}
224
225void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx,
226 grpc_lb_subchannel_list *subchannel_list,
227 const char *reason) {
228 const bool done = gpr_unref(&subchannel_list->refcount);
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700229 if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700230 const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count);
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700231 gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)",
232 subchannel_list->tracer->name, subchannel_list->policy,
Mark D. Roth9843ec72017-10-11 13:00:04 -0700233 subchannel_list, (unsigned long)(count + 1), (unsigned long)count,
234 reason);
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700235 }
236 if (done) {
237 subchannel_list_destroy(exec_ctx, subchannel_list);
238 }
239}
240
241void grpc_lb_subchannel_list_ref_for_connectivity_watch(
242 grpc_lb_subchannel_list *subchannel_list, const char *reason) {
243 GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason);
244 grpc_lb_subchannel_list_ref(subchannel_list, reason);
245}
246
247void grpc_lb_subchannel_list_unref_for_connectivity_watch(
248 grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list,
249 const char *reason) {
250 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, subchannel_list->policy, reason);
251 grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason);
252}
253
Mark D. Roth99f54e12017-10-16 09:55:53 -0700254static void subchannel_data_cancel_connectivity_watch(
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700255 grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) {
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700256 if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) {
Mark D. Roth9843ec72017-10-11 13:00:04 -0700257 gpr_log(
258 GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIdPTR " of %" PRIdPTR
259 " (subchannel %p): canceling connectivity watch (%s)",
260 sd->subchannel_list->tracer->name, sd->subchannel_list->policy,
261 sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels),
262 sd->subchannel_list->num_subchannels, sd->subchannel, reason);
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700263 }
Mark D. Roth62ca6ce2017-10-10 10:01:51 -0700264 grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
265 &sd->connectivity_changed_closure);
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700266}
267
268void grpc_lb_subchannel_list_shutdown_and_unref(
269 grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list,
270 const char *reason) {
Mark D. Roth901bb4f2017-10-11 08:49:07 -0700271 if (GRPC_TRACER_ON(*subchannel_list->tracer)) {
272 gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)",
273 subchannel_list->tracer->name, subchannel_list->policy,
274 subchannel_list, reason);
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700275 }
276 GPR_ASSERT(!subchannel_list->shutting_down);
277 subchannel_list->shutting_down = true;
278 for (size_t i = 0; i < subchannel_list->num_subchannels; i++) {
279 grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i];
280 // If there's a pending notification for this subchannel, cancel it;
281 // the callback is responsible for unreffing the subchannel.
282 // Otherwise, unref the subchannel directly.
283 if (sd->connectivity_notification_pending) {
Mark D. Roth99f54e12017-10-16 09:55:53 -0700284 subchannel_data_cancel_connectivity_watch(exec_ctx, sd, reason);
Mark D. Roth5e9848e2017-10-06 13:59:32 -0700285 } else if (sd->subchannel != NULL) {
286 grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, reason);
287 }
288 }
289 grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason);
290}