blob: 51cc63264956e10e778336459bbd0db8cecf0ce1 [file] [log] [blame]
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Craig Tiller3bc8ebd2015-06-24 15:41:15 -070010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Craig Tiller3bc8ebd2015-06-24 15:41:15 -070016 *
17 */
18
Craig Tillereb3b12e2015-06-26 14:42:49 -070019#include <string.h>
20
21#include <grpc/support/alloc.h>
Mark D. Roth5bd7be02016-10-21 14:19:50 -070022
Craig Tiller9eb0fde2017-03-31 16:59:30 -070023#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
24#include "src/core/ext/filters/client_channel/subchannel.h"
David Garcia Quintas87d5a312017-06-06 19:45:58 -070025#include "src/core/ext/filters/client_channel/subchannel_index.h"
Mark D. Roth557c9902016-10-24 11:12:05 -070026#include "src/core/lib/channel/channel_args.h"
Craig Tiller2400bf52017-02-09 16:25:19 -080027#include "src/core/lib/iomgr/combiner.h"
Mark D. Roth0748f392017-01-13 09:22:44 -080028#include "src/core/lib/iomgr/sockaddr_utils.h"
Craig Tiller9533d042016-03-25 17:11:06 -070029#include "src/core/lib/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070030
David Garcia Quintas87d5a312017-06-06 19:45:58 -070031grpc_tracer_flag grpc_lb_pick_first_trace = GRPC_TRACER_INITIALIZER(false);
32
Craig Tillera82950e2015-09-22 12:33:20 -070033typedef struct pending_pick {
Craig Tillereb3b12e2015-06-26 14:42:49 -070034 struct pending_pick *next;
Craig Tiller8c0d96f2016-03-11 14:27:52 -080035 uint32_t initial_metadata_flags;
Craig Tillerb5585d42015-11-17 07:18:31 -080036 grpc_connected_subchannel **target;
Craig Tiller33825112015-09-18 07:44:19 -070037 grpc_closure *on_complete;
Craig Tillereb3b12e2015-06-26 14:42:49 -070038} pending_pick;
39
Craig Tillera82950e2015-09-22 12:33:20 -070040typedef struct {
Craig Tillereb3b12e2015-06-26 14:42:49 -070041 /** base policy: must be first */
42 grpc_lb_policy base;
Craig Tillereb3b12e2015-06-26 14:42:49 -070043 /** all our subchannels */
44 grpc_subchannel **subchannels;
David Garcia Quintas87d5a312017-06-06 19:45:58 -070045 grpc_subchannel **new_subchannels;
Craig Tillereb3b12e2015-06-26 14:42:49 -070046 size_t num_subchannels;
David Garcia Quintas87d5a312017-06-06 19:45:58 -070047 size_t num_new_subchannels;
Craig Tillereb3b12e2015-06-26 14:42:49 -070048
Craig Tiller33825112015-09-18 07:44:19 -070049 grpc_closure connectivity_changed;
Craig Tillereb3b12e2015-06-26 14:42:49 -070050
Craig Tiller2400bf52017-02-09 16:25:19 -080051 /** remaining members are protected by the combiner */
Craig Tiller320bee02016-01-06 17:33:45 -080052
Craig Tiller46dd7902017-02-23 09:42:16 -080053 /** the selected channel */
Craig Tiller2400bf52017-02-09 16:25:19 -080054 grpc_connected_subchannel *selected;
55
David Garcia Quintas87d5a312017-06-06 19:45:58 -070056 /** the subchannel key for \a selected, or NULL if \a selected not set */
57 const grpc_subchannel_key *selected_key;
58
Craig Tillereb3b12e2015-06-26 14:42:49 -070059 /** have we started picking? */
David Garcia Quintas87d5a312017-06-06 19:45:58 -070060 bool started_picking;
Craig Tillera14215a2015-07-17 17:21:08 -070061 /** are we shut down? */
David Garcia Quintas87d5a312017-06-06 19:45:58 -070062 bool shutdown;
63 /** are we updating the selected subchannel? */
64 bool updating_selected;
65 /** are we updating the subchannel candidates? */
66 bool updating_subchannels;
67 /** args from the latest update received while already updating, or NULL */
68 grpc_lb_policy_args *pending_update_args;
Craig Tillereb3b12e2015-06-26 14:42:49 -070069 /** which subchannel are we watching? */
70 size_t checking_subchannel;
71 /** what is the connectivity of that channel? */
72 grpc_connectivity_state checking_connectivity;
73 /** list of picks that are waiting on connectivity */
74 pending_pick *pending_picks;
Craig Tillerc7b5f762015-06-27 11:48:42 -070075
76 /** our connectivity state tracker */
77 grpc_connectivity_state_tracker state_tracker;
Craig Tillereb3b12e2015-06-26 14:42:49 -070078} pick_first_lb_policy;
79
Craig Tillerfb433852016-03-29 08:51:07 -070080static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
Craig Tillera82950e2015-09-22 12:33:20 -070081 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillera82950e2015-09-22 12:33:20 -070082 GPR_ASSERT(p->pending_picks == NULL);
David Garcia Quintas87d5a312017-06-06 19:45:58 -070083 for (size_t i = 0; i < p->num_subchannels; i++) {
84 GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy");
Craig Tillera82950e2015-09-22 12:33:20 -070085 }
Craig Tiller2400bf52017-02-09 16:25:19 -080086 if (p->selected != NULL) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -070087 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
88 "picked_first_destroy");
Craig Tiller89a768e2015-10-06 09:55:59 -070089 }
Craig Tillera82950e2015-09-22 12:33:20 -070090 grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
David Garcia Quintas87d5a312017-06-06 19:45:58 -070091 if (p->pending_update_args != NULL) {
92 grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
93 gpr_free(p->pending_update_args);
94 }
Craig Tillera82950e2015-09-22 12:33:20 -070095 gpr_free(p->subchannels);
David Garcia Quintas87d5a312017-06-06 19:45:58 -070096 gpr_free(p->new_subchannels);
Craig Tillera82950e2015-09-22 12:33:20 -070097 gpr_free(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -070098}
99
Craig Tiller2400bf52017-02-09 16:25:19 -0800100static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
Craig Tillera82950e2015-09-22 12:33:20 -0700101 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700102 pending_pick *pp;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700103 p->shutdown = true;
Craig Tiller5795da72015-09-17 15:27:13 -0700104 pp = p->pending_picks;
105 p->pending_picks = NULL;
Craig Tiller804ff712016-05-05 16:25:40 -0700106 grpc_connectivity_state_set(
Craig Tillerd925c932016-06-06 08:38:50 -0700107 exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700108 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown");
Craig Tillerf036a642015-12-01 17:00:40 -0800109 /* cancel subscription */
Craig Tiller2400bf52017-02-09 16:25:19 -0800110 if (p->selected != NULL) {
Craig Tiller1d881fb2015-12-01 07:39:04 -0800111 grpc_connected_subchannel_notify_on_state_change(
Craig Tiller2400bf52017-02-09 16:25:19 -0800112 exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700113 } else if (p->num_subchannels > 0 && p->started_picking) {
Craig Tiller1d881fb2015-12-01 07:39:04 -0800114 grpc_subchannel_notify_on_state_change(
115 exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
116 &p->connectivity_changed);
Craig Tiller48613042015-11-29 14:45:11 -0800117 }
Craig Tillera82950e2015-09-22 12:33:20 -0700118 while (pp != NULL) {
119 pending_pick *next = pp->next;
120 *pp->target = NULL;
Craig Tiller91031da2016-12-28 15:44:25 -0800121 grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700122 gpr_free(pp);
123 pp = next;
124 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700125}
126
Craig Tiller2400bf52017-02-09 16:25:19 -0800127static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
128 grpc_connected_subchannel **target,
129 grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800130 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
131 pending_pick *pp;
Craig Tiller577c9b22015-11-02 14:11:15 -0800132 pp = p->pending_picks;
133 p->pending_picks = NULL;
134 while (pp != NULL) {
135 pending_pick *next = pp->next;
136 if (pp->target == target) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800137 *target = NULL;
ncteisen4b36a3d2017-03-13 19:08:06 -0700138 grpc_closure_sched(exec_ctx, pp->on_complete,
139 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
140 "Pick Cancelled", &error, 1));
Craig Tiller577c9b22015-11-02 14:11:15 -0800141 gpr_free(pp);
142 } else {
143 pp->next = p->pending_picks;
144 p->pending_picks = pp;
145 }
146 pp = next;
147 }
Mark D. Roth5f844002016-09-08 08:20:53 -0700148 GRPC_ERROR_UNREF(error);
Craig Tiller577c9b22015-11-02 14:11:15 -0800149}
150
Craig Tiller2400bf52017-02-09 16:25:19 -0800151static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
152 uint32_t initial_metadata_flags_mask,
153 uint32_t initial_metadata_flags_eq,
154 grpc_error *error) {
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800155 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
156 pending_pick *pp;
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800157 pp = p->pending_picks;
158 p->pending_picks = NULL;
159 while (pp != NULL) {
160 pending_pick *next = pp->next;
161 if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
162 initial_metadata_flags_eq) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700163 grpc_closure_sched(exec_ctx, pp->on_complete,
164 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
165 "Pick Cancelled", &error, 1));
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800166 gpr_free(pp);
167 } else {
168 pp->next = p->pending_picks;
169 p->pending_picks = pp;
170 }
171 pp = next;
172 }
Mark D. Rothe65ff112016-09-09 13:48:38 -0700173 GRPC_ERROR_UNREF(error);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800174}
175
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700176static void start_picking_locked(grpc_exec_ctx *exec_ctx,
177 pick_first_lb_policy *p) {
178 p->started_picking = true;
179 if (p->subchannels != NULL) {
180 GPR_ASSERT(p->num_subchannels > 0);
181 p->checking_subchannel = 0;
182 p->checking_connectivity = GRPC_CHANNEL_IDLE;
183 GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
184 grpc_subchannel_notify_on_state_change(
185 exec_ctx, p->subchannels[p->checking_subchannel],
186 p->base.interested_parties, &p->checking_connectivity,
187 &p->connectivity_changed);
188 }
Craig Tiller48cb07c2015-07-15 16:16:15 -0700189}
190
Craig Tiller2400bf52017-02-09 16:25:19 -0800191static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
Craig Tillera82950e2015-09-22 12:33:20 -0700192 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillera82950e2015-09-22 12:33:20 -0700193 if (!p->started_picking) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700194 start_picking_locked(exec_ctx, p);
Craig Tillera82950e2015-09-22 12:33:20 -0700195 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700196}
197
Craig Tiller2400bf52017-02-09 16:25:19 -0800198static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
199 const grpc_lb_policy_pick_args *pick_args,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700200 grpc_connected_subchannel **target,
201 grpc_call_context_element *context, void **user_data,
Craig Tiller2400bf52017-02-09 16:25:19 -0800202 grpc_closure *on_complete) {
Craig Tillera82950e2015-09-22 12:33:20 -0700203 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tiller45724b32015-09-22 10:42:19 -0700204 pending_pick *pp;
Craig Tiller320bee02016-01-06 17:33:45 -0800205
206 /* Check atomically for a selected channel */
Craig Tiller2400bf52017-02-09 16:25:19 -0800207 if (p->selected != NULL) {
208 *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
Craig Tiller86c0f8a2015-12-01 20:05:40 -0800209 return 1;
210 }
Craig Tiller320bee02016-01-06 17:33:45 -0800211
Craig Tiller46dd7902017-02-23 09:42:16 -0800212 /* No subchannel selected yet, so try again */
Craig Tiller2400bf52017-02-09 16:25:19 -0800213 if (!p->started_picking) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700214 start_picking_locked(exec_ctx, p);
Craig Tillera82950e2015-09-22 12:33:20 -0700215 }
Craig Tiller2400bf52017-02-09 16:25:19 -0800216 pp = gpr_malloc(sizeof(*pp));
217 pp->next = p->pending_picks;
218 pp->target = target;
219 pp->initial_metadata_flags = pick_args->initial_metadata_flags;
220 pp->on_complete = on_complete;
221 p->pending_picks = pp;
222 return 0;
Craig Tiller45724b32015-09-22 10:42:19 -0700223}
224
Craig Tiller2400bf52017-02-09 16:25:19 -0800225static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
226 pick_first_lb_policy *p) {
Craig Tillerb09d84d2015-10-06 09:12:16 -0700227 size_t num_subchannels = p->num_subchannels;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700228 grpc_subchannel **subchannels = p->subchannels;
Craig Tillerb09d84d2015-10-06 09:12:16 -0700229
Craig Tillerb09d84d2015-10-06 09:12:16 -0700230 p->num_subchannels = 0;
231 p->subchannels = NULL;
Craig Tiller48613042015-11-29 14:45:11 -0800232 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
Craig Tillerb09d84d2015-10-06 09:12:16 -0700233
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700234 for (size_t i = 0; i < num_subchannels; i++) {
Craig Tillerb09d84d2015-10-06 09:12:16 -0700235 GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
236 }
Craig Tillerb09d84d2015-10-06 09:12:16 -0700237 gpr_free(subchannels);
238}
239
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700240static grpc_connectivity_state pf_check_connectivity_locked(
241 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
242 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
243 return grpc_connectivity_state_get(&p->state_tracker, error);
244}
245
246static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
247 grpc_lb_policy *pol,
248 grpc_connectivity_state *current,
249 grpc_closure *notify) {
250 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
251 grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
252 current, notify);
253}
254
255static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
256 grpc_closure *closure) {
257 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
258 if (p->selected) {
259 grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
260 } else {
261 grpc_closure_sched(exec_ctx, closure,
262 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
263 }
264}
265
266/* unsubscribe all subchannels */
267static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
268 pick_first_lb_policy *p) {
269 if (p->num_subchannels > 0) {
270 GPR_ASSERT(p->selected == NULL);
271 grpc_subchannel_notify_on_state_change(
272 exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
273 &p->connectivity_changed);
274 p->updating_subchannels = true;
275 } else if (p->selected != NULL) {
276 grpc_connected_subchannel_notify_on_state_change(
277 exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
278 p->updating_selected = true;
279 }
280}
281
282/* true upon success */
283static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
284 const grpc_lb_policy_args *args) {
285 pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
286 /* Find the number of backend addresses. We ignore balancer
287 * addresses, since we don't know how to handle them. */
288 const grpc_arg *arg =
289 grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
290 if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
291 if (p->subchannels == NULL) {
292 // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
293 grpc_connectivity_state_set(
294 exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
295 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
296 "pf_update_missing");
297 } else {
298 // otherwise, keep using the current subchannel list (ignore this update).
299 gpr_log(GPR_ERROR,
300 "No valid LB addresses channel arg for Pick First %p update, "
301 "ignoring.",
302 (void *)p);
303 }
304 return;
305 }
306 const grpc_lb_addresses *addresses = arg->value.pointer.p;
307 size_t num_addrs = 0;
308 for (size_t i = 0; i < addresses->num_addresses; i++) {
309 if (!addresses->addresses[i].is_balancer) ++num_addrs;
310 }
311 if (num_addrs == 0) {
312 // Empty update. Unsubscribe from all current subchannels and put the
313 // channel in TRANSIENT_FAILURE.
314 grpc_connectivity_state_set(
315 exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
316 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
317 "pf_update_empty");
318 stop_connectivity_watchers(exec_ctx, p);
319 return;
320 }
321 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
322 gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
323 (void *)p, (unsigned long)num_addrs);
324 }
325 grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs);
326 /* We remove the following keys in order for subchannel keys belonging to
327 * subchannels point to the same address to match. */
328 static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
329 GRPC_ARG_LB_ADDRESSES};
330 size_t sc_args_count = 0;
331
332 /* Create list of subchannel args for new addresses in \a args. */
333 for (size_t i = 0; i < addresses->num_addresses; i++) {
334 if (addresses->addresses[i].is_balancer) continue;
335 if (addresses->addresses[i].user_data != NULL) {
336 gpr_log(GPR_ERROR,
337 "This LB policy doesn't support user data. It will be ignored");
338 }
339 grpc_arg addr_arg =
340 grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
341 grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
342 args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
343 1);
344 gpr_free(addr_arg.value.string);
345 sc_args[sc_args_count++].args = new_args;
346 }
347
348 /* Check if p->selected is amongst them. If so, we are done. */
349 if (p->selected != NULL) {
350 GPR_ASSERT(p->selected_key != NULL);
351 for (size_t i = 0; i < sc_args_count; i++) {
352 grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]);
353 const bool found_selected =
354 grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0;
355 grpc_subchannel_key_destroy(exec_ctx, ith_sc_key);
356 if (found_selected) {
357 // The currently selected subchannel is in the update: we are done.
358 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
359 gpr_log(GPR_INFO,
360 "Pick First %p found already selected subchannel %p amongst "
361 "updates. Update done.",
362 (void *)p, (void *)p->selected);
363 }
364 for (size_t j = 0; j < sc_args_count; j++) {
365 grpc_channel_args_destroy(exec_ctx,
366 (grpc_channel_args *)sc_args[j].args);
367 }
368 gpr_free(sc_args);
369 return;
370 }
371 }
372 }
373 // We only check for already running updates here because if the previous
374 // steps were successful, the update can be considered done without any
375 // interference (ie, no callbacks were scheduled).
376 if (p->updating_selected || p->updating_subchannels) {
377 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
378 gpr_log(GPR_INFO,
379 "Update already in progress for pick first %p. Deferring update.",
380 (void *)p);
381 }
382 if (p->pending_update_args != NULL) {
383 grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
384 gpr_free(p->pending_update_args);
385 }
386 p->pending_update_args = gpr_zalloc(sizeof(*p->pending_update_args));
387 p->pending_update_args->client_channel_factory =
388 args->client_channel_factory;
389 p->pending_update_args->args = grpc_channel_args_copy(args->args);
390 p->pending_update_args->combiner = args->combiner;
391 return;
392 }
393 /* Create the subchannels for the new subchannel args/addresses. */
394 grpc_subchannel **new_subchannels =
395 gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
396 size_t num_new_subchannels = 0;
397 for (size_t i = 0; i < sc_args_count; i++) {
398 grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
399 exec_ctx, args->client_channel_factory, &sc_args[i]);
400 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
401 char *address_uri =
402 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
403 gpr_log(GPR_INFO,
404 "Pick First %p created subchannel %p for address uri %s",
405 (void *)p, (void *)subchannel, address_uri);
406 gpr_free(address_uri);
407 }
408 grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args);
409 if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel;
410 }
411 gpr_free(sc_args);
412 if (num_new_subchannels == 0) {
413 gpr_free(new_subchannels);
414 // Empty update. Unsubscribe from all current subchannels and put the
415 // channel in TRANSIENT_FAILURE.
416 grpc_connectivity_state_set(
417 exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
418 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"),
419 "pf_update_no_valid_addresses");
420 stop_connectivity_watchers(exec_ctx, p);
421 return;
422 }
423
424 /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */
425 stop_connectivity_watchers(exec_ctx, p);
426
427 /* Save new subchannels. The switch over will happen in
428 * pf_connectivity_changed_locked */
429 if (p->updating_selected || p->updating_subchannels) {
430 p->num_new_subchannels = num_new_subchannels;
431 p->new_subchannels = new_subchannels;
432 } else { /* nothing is updating. Get things moving from here */
433 p->num_subchannels = num_new_subchannels;
434 p->subchannels = new_subchannels;
435 p->new_subchannels = NULL;
436 p->num_new_subchannels = 0;
437 if (p->started_picking) {
438 p->checking_subchannel = 0;
439 p->checking_connectivity = GRPC_CHANNEL_IDLE;
440 grpc_subchannel_notify_on_state_change(
441 exec_ctx, p->subchannels[p->checking_subchannel],
442 p->base.interested_parties, &p->checking_connectivity,
443 &p->connectivity_changed);
444 }
445 }
446}
447
Craig Tiller2400bf52017-02-09 16:25:19 -0800448static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
449 grpc_error *error) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700450 pick_first_lb_policy *p = arg;
Craig Tillerb5585d42015-11-17 07:18:31 -0800451 grpc_subchannel *selected_subchannel;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700452 pending_pick *pp;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700453
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700454 bool restart = false;
455 if (p->updating_selected && error == GRPC_ERROR_CANCELLED) {
456 /* Captured the unsubscription for p->selected */
457 GPR_ASSERT(p->selected != NULL);
458 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
459 "pf_update_connectivity");
460 p->updating_selected = false;
461 if (p->num_new_subchannels == 0) {
462 p->selected = NULL;
463 return;
464 }
465 restart = true;
466 }
467 if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) {
468 /* Captured the unsubscription for the checking subchannel */
469 GPR_ASSERT(p->selected == NULL);
470 for (size_t i = 0; i < p->num_subchannels; i++) {
471 GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
472 "pf_update_connectivity");
473 }
474 gpr_free(p->subchannels);
475 p->subchannels = NULL;
476 p->num_subchannels = 0;
477 p->updating_subchannels = false;
478 if (p->num_new_subchannels == 0) return;
479 restart = true;
480 }
481 if (restart) {
482 p->selected = NULL;
483 p->selected_key = NULL;
Craig Tiller804ff712016-05-05 16:25:40 -0700484
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700485 GPR_ASSERT(p->new_subchannels != NULL);
486 GPR_ASSERT(p->num_new_subchannels > 0);
487 p->num_subchannels = p->num_new_subchannels;
488 p->subchannels = p->new_subchannels;
489 p->num_new_subchannels = 0;
490 p->new_subchannels = NULL;
491
492 if (p->started_picking) {
493 /* If we were picking, continue to do so over the new subchannels,
494 * starting from the 0th index. */
495 p->checking_subchannel = 0;
496 p->checking_connectivity = GRPC_CHANNEL_IDLE;
497 /* reuses the weak ref from start_picking_locked */
498 grpc_subchannel_notify_on_state_change(
499 exec_ctx, p->subchannels[p->checking_subchannel],
500 p->base.interested_parties, &p->checking_connectivity,
501 &p->connectivity_changed);
502 }
503 if (p->pending_update_args != NULL) {
504 const grpc_lb_policy_args *args = p->pending_update_args;
505 p->pending_update_args = NULL;
506 pf_update_locked(exec_ctx, &p->base, args);
507 }
508 return;
509 }
510 GRPC_ERROR_REF(error);
Craig Tillera82950e2015-09-22 12:33:20 -0700511 if (p->shutdown) {
Craig Tiller48613042015-11-29 14:45:11 -0800512 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
Craig Tillerae125932016-05-13 16:34:29 -0700513 GRPC_ERROR_UNREF(error);
Craig Tillera82950e2015-09-22 12:33:20 -0700514 return;
Craig Tiller2400bf52017-02-09 16:25:19 -0800515 } else if (p->selected != NULL) {
Craig Tillercb2609f2015-11-24 17:19:19 -0800516 if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
517 /* if the selected channel goes bad, we're done */
Craig Tiller48ed92e2016-06-02 11:07:12 -0700518 p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN;
Craig Tillercb2609f2015-11-24 17:19:19 -0800519 }
Craig Tillera82950e2015-09-22 12:33:20 -0700520 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
Craig Tillerf707d622016-05-06 14:26:12 -0700521 p->checking_connectivity, GRPC_ERROR_REF(error),
Craig Tiller804ff712016-05-05 16:25:40 -0700522 "selected_changed");
Craig Tiller48ed92e2016-06-02 11:07:12 -0700523 if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) {
Craig Tillerab33b482015-11-21 08:11:04 -0800524 grpc_connected_subchannel_notify_on_state_change(
Craig Tiller2400bf52017-02-09 16:25:19 -0800525 exec_ctx, p->selected, p->base.interested_parties,
Craig Tillera6bebf42015-12-01 17:02:35 -0800526 &p->checking_connectivity, &p->connectivity_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700527 } else {
Craig Tiller48613042015-11-29 14:45:11 -0800528 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700529 }
Craig Tillera82950e2015-09-22 12:33:20 -0700530 } else {
531 loop:
532 switch (p->checking_connectivity) {
David Garcia Quintasea6689d2016-11-08 09:46:41 -0800533 case GRPC_CHANNEL_INIT:
Jan Tattermuschb0fb2d22016-11-16 14:04:05 +0100534 GPR_UNREACHABLE_CODE(return );
Craig Tillera82950e2015-09-22 12:33:20 -0700535 case GRPC_CHANNEL_READY:
536 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
Craig Tiller804ff712016-05-05 16:25:40 -0700537 GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
538 "connecting_ready");
Craig Tillerb5585d42015-11-17 07:18:31 -0800539 selected_subchannel = p->subchannels[p->checking_subchannel];
Craig Tiller2400bf52017-02-09 16:25:19 -0800540 p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
541 grpc_subchannel_get_connected_subchannel(selected_subchannel),
542 "picked_first");
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700543
544 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
545 gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected);
546 }
547 p->selected_key = grpc_subchannel_get_key(selected_subchannel);
Craig Tillerb09d84d2015-10-06 09:12:16 -0700548 /* drop the pick list: we are connected now */
Craig Tiller48613042015-11-29 14:45:11 -0800549 GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
Craig Tiller2400bf52017-02-09 16:25:19 -0800550 destroy_subchannels_locked(exec_ctx, p);
Craig Tillerb09d84d2015-10-06 09:12:16 -0700551 /* update any calls that were waiting for a pick */
Craig Tillera82950e2015-09-22 12:33:20 -0700552 while ((pp = p->pending_picks)) {
553 p->pending_picks = pp->next;
Craig Tiller2400bf52017-02-09 16:25:19 -0800554 *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700555 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
556 gpr_log(GPR_INFO,
557 "Servicing pending pick with selected subchannel %p",
558 (void *)p->selected);
559 }
Craig Tiller91031da2016-12-28 15:44:25 -0800560 grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700561 gpr_free(pp);
562 }
Craig Tillerab33b482015-11-21 08:11:04 -0800563 grpc_connected_subchannel_notify_on_state_change(
Craig Tiller2400bf52017-02-09 16:25:19 -0800564 exec_ctx, p->selected, p->base.interested_parties,
Craig Tillera6bebf42015-12-01 17:02:35 -0800565 &p->checking_connectivity, &p->connectivity_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700566 break;
567 case GRPC_CHANNEL_TRANSIENT_FAILURE:
Craig Tillera82950e2015-09-22 12:33:20 -0700568 p->checking_subchannel =
569 (p->checking_subchannel + 1) % p->num_subchannels;
Craig Tiller131b6de2016-03-31 17:05:28 -0700570 if (p->checking_subchannel == 0) {
571 /* only trigger transient failure when we've tried all alternatives */
Craig Tiller804ff712016-05-05 16:25:40 -0700572 grpc_connectivity_state_set(
573 exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
Craig Tillerf707d622016-05-06 14:26:12 -0700574 GRPC_ERROR_REF(error), "connecting_transient_failure");
Craig Tiller131b6de2016-03-31 17:05:28 -0700575 }
Craig Tillerf707d622016-05-06 14:26:12 -0700576 GRPC_ERROR_UNREF(error);
Craig Tillera82950e2015-09-22 12:33:20 -0700577 p->checking_connectivity = grpc_subchannel_check_connectivity(
Craig Tiller804ff712016-05-05 16:25:40 -0700578 p->subchannels[p->checking_subchannel], &error);
Craig Tillera82950e2015-09-22 12:33:20 -0700579 if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
580 grpc_subchannel_notify_on_state_change(
581 exec_ctx, p->subchannels[p->checking_subchannel],
Craig Tiller69b093b2016-02-25 19:04:07 -0800582 p->base.interested_parties, &p->checking_connectivity,
Craig Tiller1d881fb2015-12-01 07:39:04 -0800583 &p->connectivity_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700584 } else {
585 goto loop;
586 }
587 break;
588 case GRPC_CHANNEL_CONNECTING:
589 case GRPC_CHANNEL_IDLE:
Craig Tiller804ff712016-05-05 16:25:40 -0700590 grpc_connectivity_state_set(
591 exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
Craig Tillerf707d622016-05-06 14:26:12 -0700592 GRPC_ERROR_REF(error), "connecting_changed");
Craig Tillera82950e2015-09-22 12:33:20 -0700593 grpc_subchannel_notify_on_state_change(
594 exec_ctx, p->subchannels[p->checking_subchannel],
Craig Tiller69b093b2016-02-25 19:04:07 -0800595 p->base.interested_parties, &p->checking_connectivity,
Craig Tiller1d881fb2015-12-01 07:39:04 -0800596 &p->connectivity_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700597 break;
Craig Tiller48ed92e2016-06-02 11:07:12 -0700598 case GRPC_CHANNEL_SHUTDOWN:
Craig Tillera82950e2015-09-22 12:33:20 -0700599 p->num_subchannels--;
Craig Tiller86c99582015-11-25 15:22:26 -0800600 GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
601 p->subchannels[p->num_subchannels]);
Craig Tillera82950e2015-09-22 12:33:20 -0700602 GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
603 "pick_first");
604 if (p->num_subchannels == 0) {
Craig Tiller804ff712016-05-05 16:25:40 -0700605 grpc_connectivity_state_set(
Craig Tillerd925c932016-06-06 08:38:50 -0700606 exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700607 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
608 "Pick first exhausted channels", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700609 "no_more_channels");
Craig Tillera82950e2015-09-22 12:33:20 -0700610 while ((pp = p->pending_picks)) {
611 p->pending_picks = pp->next;
612 *pp->target = NULL;
Craig Tiller91031da2016-12-28 15:44:25 -0800613 grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700614 gpr_free(pp);
615 }
Craig Tiller1d881fb2015-12-01 07:39:04 -0800616 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
617 "pick_first_connectivity");
Craig Tillera82950e2015-09-22 12:33:20 -0700618 } else {
Craig Tiller804ff712016-05-05 16:25:40 -0700619 grpc_connectivity_state_set(
620 exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
Craig Tillerf707d622016-05-06 14:26:12 -0700621 GRPC_ERROR_REF(error), "subchannel_failed");
Craig Tillera82950e2015-09-22 12:33:20 -0700622 p->checking_subchannel %= p->num_subchannels;
Craig Tillerf707d622016-05-06 14:26:12 -0700623 GRPC_ERROR_UNREF(error);
Craig Tillera82950e2015-09-22 12:33:20 -0700624 p->checking_connectivity = grpc_subchannel_check_connectivity(
Craig Tiller804ff712016-05-05 16:25:40 -0700625 p->subchannels[p->checking_subchannel], &error);
Craig Tillera82950e2015-09-22 12:33:20 -0700626 goto loop;
627 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700628 }
Craig Tillera82950e2015-09-22 12:33:20 -0700629 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700630
Craig Tillerf707d622016-05-06 14:26:12 -0700631 GRPC_ERROR_UNREF(error);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700632}
633
634static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
Craig Tiller2400bf52017-02-09 16:25:19 -0800635 pf_destroy,
636 pf_shutdown_locked,
637 pf_pick_locked,
638 pf_cancel_pick_locked,
639 pf_cancel_picks_locked,
640 pf_ping_one_locked,
641 pf_exit_idle_locked,
642 pf_check_connectivity_locked,
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700643 pf_notify_on_state_change_locked,
644 pf_update_locked};
Craig Tillereb3b12e2015-06-26 14:42:49 -0700645
Craig Tillera82950e2015-09-22 12:33:20 -0700646static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700647
Craig Tillera82950e2015-09-22 12:33:20 -0700648static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700649
David Garcia Quintas67c0d042016-03-25 01:37:53 -0700650static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
651 grpc_lb_policy_factory *factory,
Craig Tillera82950e2015-09-22 12:33:20 -0700652 grpc_lb_policy_args *args) {
David Garcia Quintas86fcfcc2016-03-31 23:22:28 -0700653 GPR_ASSERT(args->client_channel_factory != NULL);
Craig Tiller6f417882017-02-16 14:09:39 -0800654 pick_first_lb_policy *p = gpr_zalloc(sizeof(*p));
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700655 pf_update_locked(exec_ctx, &p->base, args);
Craig Tiller2400bf52017-02-09 16:25:19 -0800656 grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
657 grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
658 grpc_combiner_scheduler(args->combiner, false));
Craig Tillereb3b12e2015-06-26 14:42:49 -0700659 return &p->base;
660}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700661
662static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
Craig Tillera82950e2015-09-22 12:33:20 -0700663 pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
664 "pick_first"};
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700665
666static grpc_lb_policy_factory pick_first_lb_policy_factory = {
Craig Tillera82950e2015-09-22 12:33:20 -0700667 &pick_first_factory_vtable};
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700668
Craig Tillerfb433852016-03-29 08:51:07 -0700669static grpc_lb_policy_factory *pick_first_lb_factory_create() {
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700670 return &pick_first_lb_policy_factory;
671}
Craig Tillerfb433852016-03-29 08:51:07 -0700672
673/* Plugin registration */
674
675void grpc_lb_policy_pick_first_init() {
Craig Tiller3113ef42016-03-29 09:03:14 -0700676 grpc_register_lb_policy(pick_first_lb_factory_create());
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700677 grpc_register_tracer("pick_first", &grpc_lb_pick_first_trace);
Craig Tillerfb433852016-03-29 08:51:07 -0700678}
679
680void grpc_lb_policy_pick_first_shutdown() {}