blob: b6f86255df64a86cf75cbbed624af5534b17dc8b [file] [log] [blame]
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tillereb3b12e2015-06-26 14:42:49 -070034#include <string.h>
35
36#include <grpc/support/alloc.h>
Mark D. Roth5bd7be02016-10-21 14:19:50 -070037
Craig Tiller9eb0fde2017-03-31 16:59:30 -070038#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
39#include "src/core/ext/filters/client_channel/subchannel.h"
David Garcia Quintas87d5a312017-06-06 19:45:58 -070040#include "src/core/ext/filters/client_channel/subchannel_index.h"
Mark D. Roth557c9902016-10-24 11:12:05 -070041#include "src/core/lib/channel/channel_args.h"
Craig Tiller2400bf52017-02-09 16:25:19 -080042#include "src/core/lib/iomgr/combiner.h"
Mark D. Roth0748f392017-01-13 09:22:44 -080043#include "src/core/lib/iomgr/sockaddr_utils.h"
Craig Tiller9533d042016-03-25 17:11:06 -070044#include "src/core/lib/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070045
David Garcia Quintas87d5a312017-06-06 19:45:58 -070046grpc_tracer_flag grpc_lb_pick_first_trace = GRPC_TRACER_INITIALIZER(false);
47
Craig Tillera82950e2015-09-22 12:33:20 -070048typedef struct pending_pick {
Craig Tillereb3b12e2015-06-26 14:42:49 -070049 struct pending_pick *next;
Craig Tiller8c0d96f2016-03-11 14:27:52 -080050 uint32_t initial_metadata_flags;
Craig Tillerb5585d42015-11-17 07:18:31 -080051 grpc_connected_subchannel **target;
Craig Tiller33825112015-09-18 07:44:19 -070052 grpc_closure *on_complete;
Craig Tillereb3b12e2015-06-26 14:42:49 -070053} pending_pick;
54
Craig Tillera82950e2015-09-22 12:33:20 -070055typedef struct {
Craig Tillereb3b12e2015-06-26 14:42:49 -070056 /** base policy: must be first */
57 grpc_lb_policy base;
Craig Tillereb3b12e2015-06-26 14:42:49 -070058 /** all our subchannels */
59 grpc_subchannel **subchannels;
David Garcia Quintas87d5a312017-06-06 19:45:58 -070060 grpc_subchannel **new_subchannels;
Craig Tillereb3b12e2015-06-26 14:42:49 -070061 size_t num_subchannels;
David Garcia Quintas87d5a312017-06-06 19:45:58 -070062 size_t num_new_subchannels;
Craig Tillereb3b12e2015-06-26 14:42:49 -070063
Craig Tiller33825112015-09-18 07:44:19 -070064 grpc_closure connectivity_changed;
Craig Tillereb3b12e2015-06-26 14:42:49 -070065
Craig Tiller2400bf52017-02-09 16:25:19 -080066 /** remaining members are protected by the combiner */
Craig Tiller320bee02016-01-06 17:33:45 -080067
Craig Tiller46dd7902017-02-23 09:42:16 -080068 /** the selected channel */
Craig Tiller2400bf52017-02-09 16:25:19 -080069 grpc_connected_subchannel *selected;
70
David Garcia Quintas87d5a312017-06-06 19:45:58 -070071 /** the subchannel key for \a selected, or NULL if \a selected not set */
72 const grpc_subchannel_key *selected_key;
73
Craig Tillereb3b12e2015-06-26 14:42:49 -070074 /** have we started picking? */
David Garcia Quintas87d5a312017-06-06 19:45:58 -070075 bool started_picking;
Craig Tillera14215a2015-07-17 17:21:08 -070076 /** are we shut down? */
David Garcia Quintas87d5a312017-06-06 19:45:58 -070077 bool shutdown;
78 /** are we updating the selected subchannel? */
79 bool updating_selected;
80 /** are we updating the subchannel candidates? */
81 bool updating_subchannels;
82 /** args from the latest update received while already updating, or NULL */
83 grpc_lb_policy_args *pending_update_args;
Craig Tillereb3b12e2015-06-26 14:42:49 -070084 /** which subchannel are we watching? */
85 size_t checking_subchannel;
86 /** what is the connectivity of that channel? */
87 grpc_connectivity_state checking_connectivity;
88 /** list of picks that are waiting on connectivity */
89 pending_pick *pending_picks;
Craig Tillerc7b5f762015-06-27 11:48:42 -070090
91 /** our connectivity state tracker */
92 grpc_connectivity_state_tracker state_tracker;
Craig Tillereb3b12e2015-06-26 14:42:49 -070093} pick_first_lb_policy;
94
Craig Tillerfb433852016-03-29 08:51:07 -070095static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
Craig Tillera82950e2015-09-22 12:33:20 -070096 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillera82950e2015-09-22 12:33:20 -070097 GPR_ASSERT(p->pending_picks == NULL);
David Garcia Quintas87d5a312017-06-06 19:45:58 -070098 for (size_t i = 0; i < p->num_subchannels; i++) {
99 GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first_destroy");
Craig Tillera82950e2015-09-22 12:33:20 -0700100 }
Craig Tiller2400bf52017-02-09 16:25:19 -0800101 if (p->selected != NULL) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700102 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
103 "picked_first_destroy");
Craig Tiller89a768e2015-10-06 09:55:59 -0700104 }
Craig Tillera82950e2015-09-22 12:33:20 -0700105 grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700106 if (p->pending_update_args != NULL) {
107 grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
108 gpr_free(p->pending_update_args);
109 }
Craig Tillera82950e2015-09-22 12:33:20 -0700110 gpr_free(p->subchannels);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700111 gpr_free(p->new_subchannels);
Craig Tillera82950e2015-09-22 12:33:20 -0700112 gpr_free(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700113}
114
Craig Tiller2400bf52017-02-09 16:25:19 -0800115static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
Craig Tillera82950e2015-09-22 12:33:20 -0700116 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700117 pending_pick *pp;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700118 p->shutdown = true;
Craig Tiller5795da72015-09-17 15:27:13 -0700119 pp = p->pending_picks;
120 p->pending_picks = NULL;
Craig Tiller804ff712016-05-05 16:25:40 -0700121 grpc_connectivity_state_set(
Craig Tillerd925c932016-06-06 08:38:50 -0700122 exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700123 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown");
Craig Tillerf036a642015-12-01 17:00:40 -0800124 /* cancel subscription */
Craig Tiller2400bf52017-02-09 16:25:19 -0800125 if (p->selected != NULL) {
Craig Tiller1d881fb2015-12-01 07:39:04 -0800126 grpc_connected_subchannel_notify_on_state_change(
Craig Tiller2400bf52017-02-09 16:25:19 -0800127 exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700128 } else if (p->num_subchannels > 0 && p->started_picking) {
Craig Tiller1d881fb2015-12-01 07:39:04 -0800129 grpc_subchannel_notify_on_state_change(
130 exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
131 &p->connectivity_changed);
Craig Tiller48613042015-11-29 14:45:11 -0800132 }
Craig Tillera82950e2015-09-22 12:33:20 -0700133 while (pp != NULL) {
134 pending_pick *next = pp->next;
135 *pp->target = NULL;
Craig Tiller91031da2016-12-28 15:44:25 -0800136 grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700137 gpr_free(pp);
138 pp = next;
139 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700140}
141
Craig Tiller2400bf52017-02-09 16:25:19 -0800142static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
143 grpc_connected_subchannel **target,
144 grpc_error *error) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800145 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
146 pending_pick *pp;
Craig Tiller577c9b22015-11-02 14:11:15 -0800147 pp = p->pending_picks;
148 p->pending_picks = NULL;
149 while (pp != NULL) {
150 pending_pick *next = pp->next;
151 if (pp->target == target) {
Craig Tiller577c9b22015-11-02 14:11:15 -0800152 *target = NULL;
ncteisen4b36a3d2017-03-13 19:08:06 -0700153 grpc_closure_sched(exec_ctx, pp->on_complete,
154 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
155 "Pick Cancelled", &error, 1));
Craig Tiller577c9b22015-11-02 14:11:15 -0800156 gpr_free(pp);
157 } else {
158 pp->next = p->pending_picks;
159 p->pending_picks = pp;
160 }
161 pp = next;
162 }
Mark D. Roth5f844002016-09-08 08:20:53 -0700163 GRPC_ERROR_UNREF(error);
Craig Tiller577c9b22015-11-02 14:11:15 -0800164}
165
Craig Tiller2400bf52017-02-09 16:25:19 -0800166static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
167 uint32_t initial_metadata_flags_mask,
168 uint32_t initial_metadata_flags_eq,
169 grpc_error *error) {
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800170 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
171 pending_pick *pp;
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800172 pp = p->pending_picks;
173 p->pending_picks = NULL;
174 while (pp != NULL) {
175 pending_pick *next = pp->next;
176 if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
177 initial_metadata_flags_eq) {
ncteisen4b36a3d2017-03-13 19:08:06 -0700178 grpc_closure_sched(exec_ctx, pp->on_complete,
179 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
180 "Pick Cancelled", &error, 1));
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800181 gpr_free(pp);
182 } else {
183 pp->next = p->pending_picks;
184 p->pending_picks = pp;
185 }
186 pp = next;
187 }
Mark D. Rothe65ff112016-09-09 13:48:38 -0700188 GRPC_ERROR_UNREF(error);
Craig Tiller8c0d96f2016-03-11 14:27:52 -0800189}
190
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700191static void start_picking_locked(grpc_exec_ctx *exec_ctx,
192 pick_first_lb_policy *p) {
193 p->started_picking = true;
194 if (p->subchannels != NULL) {
195 GPR_ASSERT(p->num_subchannels > 0);
196 p->checking_subchannel = 0;
197 p->checking_connectivity = GRPC_CHANNEL_IDLE;
198 GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity");
199 grpc_subchannel_notify_on_state_change(
200 exec_ctx, p->subchannels[p->checking_subchannel],
201 p->base.interested_parties, &p->checking_connectivity,
202 &p->connectivity_changed);
203 }
Craig Tiller48cb07c2015-07-15 16:16:15 -0700204}
205
Craig Tiller2400bf52017-02-09 16:25:19 -0800206static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
Craig Tillera82950e2015-09-22 12:33:20 -0700207 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillera82950e2015-09-22 12:33:20 -0700208 if (!p->started_picking) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700209 start_picking_locked(exec_ctx, p);
Craig Tillera82950e2015-09-22 12:33:20 -0700210 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700211}
212
Craig Tiller2400bf52017-02-09 16:25:19 -0800213static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
214 const grpc_lb_policy_pick_args *pick_args,
Mark D. Roth09e458c2017-05-02 08:13:26 -0700215 grpc_connected_subchannel **target,
216 grpc_call_context_element *context, void **user_data,
Craig Tiller2400bf52017-02-09 16:25:19 -0800217 grpc_closure *on_complete) {
Craig Tillera82950e2015-09-22 12:33:20 -0700218 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tiller45724b32015-09-22 10:42:19 -0700219 pending_pick *pp;
Craig Tiller320bee02016-01-06 17:33:45 -0800220
221 /* Check atomically for a selected channel */
Craig Tiller2400bf52017-02-09 16:25:19 -0800222 if (p->selected != NULL) {
223 *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
Craig Tiller86c0f8a2015-12-01 20:05:40 -0800224 return 1;
225 }
Craig Tiller320bee02016-01-06 17:33:45 -0800226
Craig Tiller46dd7902017-02-23 09:42:16 -0800227 /* No subchannel selected yet, so try again */
Craig Tiller2400bf52017-02-09 16:25:19 -0800228 if (!p->started_picking) {
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700229 start_picking_locked(exec_ctx, p);
Craig Tillera82950e2015-09-22 12:33:20 -0700230 }
Craig Tiller2400bf52017-02-09 16:25:19 -0800231 pp = gpr_malloc(sizeof(*pp));
232 pp->next = p->pending_picks;
233 pp->target = target;
234 pp->initial_metadata_flags = pick_args->initial_metadata_flags;
235 pp->on_complete = on_complete;
236 p->pending_picks = pp;
237 return 0;
Craig Tiller45724b32015-09-22 10:42:19 -0700238}
239
Craig Tiller2400bf52017-02-09 16:25:19 -0800240static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx,
241 pick_first_lb_policy *p) {
Craig Tillerb09d84d2015-10-06 09:12:16 -0700242 size_t num_subchannels = p->num_subchannels;
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700243 grpc_subchannel **subchannels = p->subchannels;
Craig Tillerb09d84d2015-10-06 09:12:16 -0700244
Craig Tillerb09d84d2015-10-06 09:12:16 -0700245 p->num_subchannels = 0;
246 p->subchannels = NULL;
Craig Tiller48613042015-11-29 14:45:11 -0800247 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels");
Craig Tillerb09d84d2015-10-06 09:12:16 -0700248
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700249 for (size_t i = 0; i < num_subchannels; i++) {
Craig Tillerb09d84d2015-10-06 09:12:16 -0700250 GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
251 }
Craig Tillerb09d84d2015-10-06 09:12:16 -0700252 gpr_free(subchannels);
253}
254
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700255static grpc_connectivity_state pf_check_connectivity_locked(
256 grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) {
257 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
258 return grpc_connectivity_state_get(&p->state_tracker, error);
259}
260
261static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
262 grpc_lb_policy *pol,
263 grpc_connectivity_state *current,
264 grpc_closure *notify) {
265 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
266 grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
267 current, notify);
268}
269
270static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
271 grpc_closure *closure) {
272 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
273 if (p->selected) {
274 grpc_connected_subchannel_ping(exec_ctx, p->selected, closure);
275 } else {
276 grpc_closure_sched(exec_ctx, closure,
277 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
278 }
279}
280
281/* unsubscribe all subchannels */
282static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
283 pick_first_lb_policy *p) {
284 if (p->num_subchannels > 0) {
285 GPR_ASSERT(p->selected == NULL);
286 grpc_subchannel_notify_on_state_change(
287 exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL,
288 &p->connectivity_changed);
289 p->updating_subchannels = true;
290 } else if (p->selected != NULL) {
291 grpc_connected_subchannel_notify_on_state_change(
292 exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed);
293 p->updating_selected = true;
294 }
295}
296
297/* true upon success */
298static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
299 const grpc_lb_policy_args *args) {
300 pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
301 /* Find the number of backend addresses. We ignore balancer
302 * addresses, since we don't know how to handle them. */
303 const grpc_arg *arg =
304 grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
305 if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
306 if (p->subchannels == NULL) {
307 // If we don't have a current subchannel list, go into TRANSIENT FAILURE.
308 grpc_connectivity_state_set(
309 exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
310 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
311 "pf_update_missing");
312 } else {
313 // otherwise, keep using the current subchannel list (ignore this update).
314 gpr_log(GPR_ERROR,
315 "No valid LB addresses channel arg for Pick First %p update, "
316 "ignoring.",
317 (void *)p);
318 }
319 return;
320 }
321 const grpc_lb_addresses *addresses = arg->value.pointer.p;
322 size_t num_addrs = 0;
323 for (size_t i = 0; i < addresses->num_addresses; i++) {
324 if (!addresses->addresses[i].is_balancer) ++num_addrs;
325 }
326 if (num_addrs == 0) {
327 // Empty update. Unsubscribe from all current subchannels and put the
328 // channel in TRANSIENT_FAILURE.
329 grpc_connectivity_state_set(
330 exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
331 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
332 "pf_update_empty");
333 stop_connectivity_watchers(exec_ctx, p);
334 return;
335 }
336 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
337 gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
338 (void *)p, (unsigned long)num_addrs);
339 }
340 grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs);
341 /* We remove the following keys in order for subchannel keys belonging to
342 * subchannels point to the same address to match. */
343 static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
344 GRPC_ARG_LB_ADDRESSES};
345 size_t sc_args_count = 0;
346
347 /* Create list of subchannel args for new addresses in \a args. */
348 for (size_t i = 0; i < addresses->num_addresses; i++) {
349 if (addresses->addresses[i].is_balancer) continue;
350 if (addresses->addresses[i].user_data != NULL) {
351 gpr_log(GPR_ERROR,
352 "This LB policy doesn't support user data. It will be ignored");
353 }
354 grpc_arg addr_arg =
355 grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
356 grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(
357 args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg,
358 1);
359 gpr_free(addr_arg.value.string);
360 sc_args[sc_args_count++].args = new_args;
361 }
362
363 /* Check if p->selected is amongst them. If so, we are done. */
364 if (p->selected != NULL) {
365 GPR_ASSERT(p->selected_key != NULL);
366 for (size_t i = 0; i < sc_args_count; i++) {
367 grpc_subchannel_key *ith_sc_key = grpc_subchannel_key_create(&sc_args[i]);
368 const bool found_selected =
369 grpc_subchannel_key_compare(p->selected_key, ith_sc_key) == 0;
370 grpc_subchannel_key_destroy(exec_ctx, ith_sc_key);
371 if (found_selected) {
372 // The currently selected subchannel is in the update: we are done.
373 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
374 gpr_log(GPR_INFO,
375 "Pick First %p found already selected subchannel %p amongst "
376 "updates. Update done.",
377 (void *)p, (void *)p->selected);
378 }
379 for (size_t j = 0; j < sc_args_count; j++) {
380 grpc_channel_args_destroy(exec_ctx,
381 (grpc_channel_args *)sc_args[j].args);
382 }
383 gpr_free(sc_args);
384 return;
385 }
386 }
387 }
388 // We only check for already running updates here because if the previous
389 // steps were successful, the update can be considered done without any
390 // interference (ie, no callbacks were scheduled).
391 if (p->updating_selected || p->updating_subchannels) {
392 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
393 gpr_log(GPR_INFO,
394 "Update already in progress for pick first %p. Deferring update.",
395 (void *)p);
396 }
397 if (p->pending_update_args != NULL) {
398 grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args);
399 gpr_free(p->pending_update_args);
400 }
401 p->pending_update_args = gpr_zalloc(sizeof(*p->pending_update_args));
402 p->pending_update_args->client_channel_factory =
403 args->client_channel_factory;
404 p->pending_update_args->args = grpc_channel_args_copy(args->args);
405 p->pending_update_args->combiner = args->combiner;
406 return;
407 }
408 /* Create the subchannels for the new subchannel args/addresses. */
409 grpc_subchannel **new_subchannels =
410 gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
411 size_t num_new_subchannels = 0;
412 for (size_t i = 0; i < sc_args_count; i++) {
413 grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
414 exec_ctx, args->client_channel_factory, &sc_args[i]);
415 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
416 char *address_uri =
417 grpc_sockaddr_to_uri(&addresses->addresses[i].address);
418 gpr_log(GPR_INFO,
419 "Pick First %p created subchannel %p for address uri %s",
420 (void *)p, (void *)subchannel, address_uri);
421 gpr_free(address_uri);
422 }
423 grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)sc_args[i].args);
424 if (subchannel != NULL) new_subchannels[num_new_subchannels++] = subchannel;
425 }
426 gpr_free(sc_args);
427 if (num_new_subchannels == 0) {
428 gpr_free(new_subchannels);
429 // Empty update. Unsubscribe from all current subchannels and put the
430 // channel in TRANSIENT_FAILURE.
431 grpc_connectivity_state_set(
432 exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
433 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No valid addresses in update"),
434 "pf_update_no_valid_addresses");
435 stop_connectivity_watchers(exec_ctx, p);
436 return;
437 }
438
439 /* Destroy the current subchannels. Repurpose pf_shutdown/destroy. */
440 stop_connectivity_watchers(exec_ctx, p);
441
442 /* Save new subchannels. The switch over will happen in
443 * pf_connectivity_changed_locked */
444 if (p->updating_selected || p->updating_subchannels) {
445 p->num_new_subchannels = num_new_subchannels;
446 p->new_subchannels = new_subchannels;
447 } else { /* nothing is updating. Get things moving from here */
448 p->num_subchannels = num_new_subchannels;
449 p->subchannels = new_subchannels;
450 p->new_subchannels = NULL;
451 p->num_new_subchannels = 0;
452 if (p->started_picking) {
453 p->checking_subchannel = 0;
454 p->checking_connectivity = GRPC_CHANNEL_IDLE;
455 grpc_subchannel_notify_on_state_change(
456 exec_ctx, p->subchannels[p->checking_subchannel],
457 p->base.interested_parties, &p->checking_connectivity,
458 &p->connectivity_changed);
459 }
460 }
461}
462
Craig Tiller2400bf52017-02-09 16:25:19 -0800463static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg,
464 grpc_error *error) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700465 pick_first_lb_policy *p = arg;
Craig Tillerb5585d42015-11-17 07:18:31 -0800466 grpc_subchannel *selected_subchannel;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700467 pending_pick *pp;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700468
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700469 bool restart = false;
470 if (p->updating_selected && error == GRPC_ERROR_CANCELLED) {
471 /* Captured the unsubscription for p->selected */
472 GPR_ASSERT(p->selected != NULL);
473 GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected,
474 "pf_update_connectivity");
475 p->updating_selected = false;
476 if (p->num_new_subchannels == 0) {
477 p->selected = NULL;
478 return;
479 }
480 restart = true;
481 }
482 if (p->updating_subchannels && error == GRPC_ERROR_CANCELLED) {
483 /* Captured the unsubscription for the checking subchannel */
484 GPR_ASSERT(p->selected == NULL);
485 for (size_t i = 0; i < p->num_subchannels; i++) {
486 GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i],
487 "pf_update_connectivity");
488 }
489 gpr_free(p->subchannels);
490 p->subchannels = NULL;
491 p->num_subchannels = 0;
492 p->updating_subchannels = false;
493 if (p->num_new_subchannels == 0) return;
494 restart = true;
495 }
496 if (restart) {
497 p->selected = NULL;
498 p->selected_key = NULL;
Craig Tiller804ff712016-05-05 16:25:40 -0700499
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700500 GPR_ASSERT(p->new_subchannels != NULL);
501 GPR_ASSERT(p->num_new_subchannels > 0);
502 p->num_subchannels = p->num_new_subchannels;
503 p->subchannels = p->new_subchannels;
504 p->num_new_subchannels = 0;
505 p->new_subchannels = NULL;
506
507 if (p->started_picking) {
508 /* If we were picking, continue to do so over the new subchannels,
509 * starting from the 0th index. */
510 p->checking_subchannel = 0;
511 p->checking_connectivity = GRPC_CHANNEL_IDLE;
512 /* reuses the weak ref from start_picking_locked */
513 grpc_subchannel_notify_on_state_change(
514 exec_ctx, p->subchannels[p->checking_subchannel],
515 p->base.interested_parties, &p->checking_connectivity,
516 &p->connectivity_changed);
517 }
518 if (p->pending_update_args != NULL) {
519 const grpc_lb_policy_args *args = p->pending_update_args;
520 p->pending_update_args = NULL;
521 pf_update_locked(exec_ctx, &p->base, args);
522 }
523 return;
524 }
525 GRPC_ERROR_REF(error);
Craig Tillera82950e2015-09-22 12:33:20 -0700526 if (p->shutdown) {
Craig Tiller48613042015-11-29 14:45:11 -0800527 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
Craig Tillerae125932016-05-13 16:34:29 -0700528 GRPC_ERROR_UNREF(error);
Craig Tillera82950e2015-09-22 12:33:20 -0700529 return;
Craig Tiller2400bf52017-02-09 16:25:19 -0800530 } else if (p->selected != NULL) {
Craig Tillercb2609f2015-11-24 17:19:19 -0800531 if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
532 /* if the selected channel goes bad, we're done */
Craig Tiller48ed92e2016-06-02 11:07:12 -0700533 p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN;
Craig Tillercb2609f2015-11-24 17:19:19 -0800534 }
Craig Tillera82950e2015-09-22 12:33:20 -0700535 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
Craig Tillerf707d622016-05-06 14:26:12 -0700536 p->checking_connectivity, GRPC_ERROR_REF(error),
Craig Tiller804ff712016-05-05 16:25:40 -0700537 "selected_changed");
Craig Tiller48ed92e2016-06-02 11:07:12 -0700538 if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) {
Craig Tillerab33b482015-11-21 08:11:04 -0800539 grpc_connected_subchannel_notify_on_state_change(
Craig Tiller2400bf52017-02-09 16:25:19 -0800540 exec_ctx, p->selected, p->base.interested_parties,
Craig Tillera6bebf42015-12-01 17:02:35 -0800541 &p->checking_connectivity, &p->connectivity_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700542 } else {
Craig Tiller48613042015-11-29 14:45:11 -0800543 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700544 }
Craig Tillera82950e2015-09-22 12:33:20 -0700545 } else {
546 loop:
547 switch (p->checking_connectivity) {
David Garcia Quintasea6689d2016-11-08 09:46:41 -0800548 case GRPC_CHANNEL_INIT:
Jan Tattermuschb0fb2d22016-11-16 14:04:05 +0100549 GPR_UNREACHABLE_CODE(return );
Craig Tillera82950e2015-09-22 12:33:20 -0700550 case GRPC_CHANNEL_READY:
551 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
Craig Tiller804ff712016-05-05 16:25:40 -0700552 GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
553 "connecting_ready");
Craig Tillerb5585d42015-11-17 07:18:31 -0800554 selected_subchannel = p->subchannels[p->checking_subchannel];
Craig Tiller2400bf52017-02-09 16:25:19 -0800555 p->selected = GRPC_CONNECTED_SUBCHANNEL_REF(
556 grpc_subchannel_get_connected_subchannel(selected_subchannel),
557 "picked_first");
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700558
559 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
560 gpr_log(GPR_INFO, "Selected subchannel %p", (void *)p->selected);
561 }
562 p->selected_key = grpc_subchannel_get_key(selected_subchannel);
Craig Tillerb09d84d2015-10-06 09:12:16 -0700563 /* drop the pick list: we are connected now */
Craig Tiller48613042015-11-29 14:45:11 -0800564 GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
Craig Tiller2400bf52017-02-09 16:25:19 -0800565 destroy_subchannels_locked(exec_ctx, p);
Craig Tillerb09d84d2015-10-06 09:12:16 -0700566 /* update any calls that were waiting for a pick */
Craig Tillera82950e2015-09-22 12:33:20 -0700567 while ((pp = p->pending_picks)) {
568 p->pending_picks = pp->next;
Craig Tiller2400bf52017-02-09 16:25:19 -0800569 *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked");
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700570 if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
571 gpr_log(GPR_INFO,
572 "Servicing pending pick with selected subchannel %p",
573 (void *)p->selected);
574 }
Craig Tiller91031da2016-12-28 15:44:25 -0800575 grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700576 gpr_free(pp);
577 }
Craig Tillerab33b482015-11-21 08:11:04 -0800578 grpc_connected_subchannel_notify_on_state_change(
Craig Tiller2400bf52017-02-09 16:25:19 -0800579 exec_ctx, p->selected, p->base.interested_parties,
Craig Tillera6bebf42015-12-01 17:02:35 -0800580 &p->checking_connectivity, &p->connectivity_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700581 break;
582 case GRPC_CHANNEL_TRANSIENT_FAILURE:
Craig Tillera82950e2015-09-22 12:33:20 -0700583 p->checking_subchannel =
584 (p->checking_subchannel + 1) % p->num_subchannels;
Craig Tiller131b6de2016-03-31 17:05:28 -0700585 if (p->checking_subchannel == 0) {
586 /* only trigger transient failure when we've tried all alternatives */
Craig Tiller804ff712016-05-05 16:25:40 -0700587 grpc_connectivity_state_set(
588 exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
Craig Tillerf707d622016-05-06 14:26:12 -0700589 GRPC_ERROR_REF(error), "connecting_transient_failure");
Craig Tiller131b6de2016-03-31 17:05:28 -0700590 }
Craig Tillerf707d622016-05-06 14:26:12 -0700591 GRPC_ERROR_UNREF(error);
Craig Tillera82950e2015-09-22 12:33:20 -0700592 p->checking_connectivity = grpc_subchannel_check_connectivity(
Craig Tiller804ff712016-05-05 16:25:40 -0700593 p->subchannels[p->checking_subchannel], &error);
Craig Tillera82950e2015-09-22 12:33:20 -0700594 if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
595 grpc_subchannel_notify_on_state_change(
596 exec_ctx, p->subchannels[p->checking_subchannel],
Craig Tiller69b093b2016-02-25 19:04:07 -0800597 p->base.interested_parties, &p->checking_connectivity,
Craig Tiller1d881fb2015-12-01 07:39:04 -0800598 &p->connectivity_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700599 } else {
600 goto loop;
601 }
602 break;
603 case GRPC_CHANNEL_CONNECTING:
604 case GRPC_CHANNEL_IDLE:
Craig Tiller804ff712016-05-05 16:25:40 -0700605 grpc_connectivity_state_set(
606 exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
Craig Tillerf707d622016-05-06 14:26:12 -0700607 GRPC_ERROR_REF(error), "connecting_changed");
Craig Tillera82950e2015-09-22 12:33:20 -0700608 grpc_subchannel_notify_on_state_change(
609 exec_ctx, p->subchannels[p->checking_subchannel],
Craig Tiller69b093b2016-02-25 19:04:07 -0800610 p->base.interested_parties, &p->checking_connectivity,
Craig Tiller1d881fb2015-12-01 07:39:04 -0800611 &p->connectivity_changed);
Craig Tillera82950e2015-09-22 12:33:20 -0700612 break;
Craig Tiller48ed92e2016-06-02 11:07:12 -0700613 case GRPC_CHANNEL_SHUTDOWN:
Craig Tillera82950e2015-09-22 12:33:20 -0700614 p->num_subchannels--;
Craig Tiller86c99582015-11-25 15:22:26 -0800615 GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
616 p->subchannels[p->num_subchannels]);
Craig Tillera82950e2015-09-22 12:33:20 -0700617 GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
618 "pick_first");
619 if (p->num_subchannels == 0) {
Craig Tiller804ff712016-05-05 16:25:40 -0700620 grpc_connectivity_state_set(
Craig Tillerd925c932016-06-06 08:38:50 -0700621 exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
ncteisen4b36a3d2017-03-13 19:08:06 -0700622 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
623 "Pick first exhausted channels", &error, 1),
Craig Tiller804ff712016-05-05 16:25:40 -0700624 "no_more_channels");
Craig Tillera82950e2015-09-22 12:33:20 -0700625 while ((pp = p->pending_picks)) {
626 p->pending_picks = pp->next;
627 *pp->target = NULL;
Craig Tiller91031da2016-12-28 15:44:25 -0800628 grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700629 gpr_free(pp);
630 }
Craig Tiller1d881fb2015-12-01 07:39:04 -0800631 GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
632 "pick_first_connectivity");
Craig Tillera82950e2015-09-22 12:33:20 -0700633 } else {
Craig Tiller804ff712016-05-05 16:25:40 -0700634 grpc_connectivity_state_set(
635 exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
Craig Tillerf707d622016-05-06 14:26:12 -0700636 GRPC_ERROR_REF(error), "subchannel_failed");
Craig Tillera82950e2015-09-22 12:33:20 -0700637 p->checking_subchannel %= p->num_subchannels;
Craig Tillerf707d622016-05-06 14:26:12 -0700638 GRPC_ERROR_UNREF(error);
Craig Tillera82950e2015-09-22 12:33:20 -0700639 p->checking_connectivity = grpc_subchannel_check_connectivity(
Craig Tiller804ff712016-05-05 16:25:40 -0700640 p->subchannels[p->checking_subchannel], &error);
Craig Tillera82950e2015-09-22 12:33:20 -0700641 goto loop;
642 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700643 }
Craig Tillera82950e2015-09-22 12:33:20 -0700644 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700645
Craig Tillerf707d622016-05-06 14:26:12 -0700646 GRPC_ERROR_UNREF(error);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700647}
648
649static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
Craig Tiller2400bf52017-02-09 16:25:19 -0800650 pf_destroy,
651 pf_shutdown_locked,
652 pf_pick_locked,
653 pf_cancel_pick_locked,
654 pf_cancel_picks_locked,
655 pf_ping_one_locked,
656 pf_exit_idle_locked,
657 pf_check_connectivity_locked,
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700658 pf_notify_on_state_change_locked,
659 pf_update_locked};
Craig Tillereb3b12e2015-06-26 14:42:49 -0700660
Craig Tillera82950e2015-09-22 12:33:20 -0700661static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700662
Craig Tillera82950e2015-09-22 12:33:20 -0700663static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700664
David Garcia Quintas67c0d042016-03-25 01:37:53 -0700665static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
666 grpc_lb_policy_factory *factory,
Craig Tillera82950e2015-09-22 12:33:20 -0700667 grpc_lb_policy_args *args) {
David Garcia Quintas86fcfcc2016-03-31 23:22:28 -0700668 GPR_ASSERT(args->client_channel_factory != NULL);
Craig Tiller6f417882017-02-16 14:09:39 -0800669 pick_first_lb_policy *p = gpr_zalloc(sizeof(*p));
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700670 pf_update_locked(exec_ctx, &p->base, args);
Craig Tiller2400bf52017-02-09 16:25:19 -0800671 grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner);
672 grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p,
673 grpc_combiner_scheduler(args->combiner, false));
Craig Tillereb3b12e2015-06-26 14:42:49 -0700674 return &p->base;
675}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700676
677static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
Craig Tillera82950e2015-09-22 12:33:20 -0700678 pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
679 "pick_first"};
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700680
681static grpc_lb_policy_factory pick_first_lb_policy_factory = {
Craig Tillera82950e2015-09-22 12:33:20 -0700682 &pick_first_factory_vtable};
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700683
Craig Tillerfb433852016-03-29 08:51:07 -0700684static grpc_lb_policy_factory *pick_first_lb_factory_create() {
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700685 return &pick_first_lb_policy_factory;
686}
Craig Tillerfb433852016-03-29 08:51:07 -0700687
688/* Plugin registration */
689
690void grpc_lb_policy_pick_first_init() {
Craig Tiller3113ef42016-03-29 09:03:14 -0700691 grpc_register_lb_policy(pick_first_lb_factory_create());
David Garcia Quintas87d5a312017-06-06 19:45:58 -0700692 grpc_register_tracer("pick_first", &grpc_lb_pick_first_trace);
Craig Tillerfb433852016-03-29 08:51:07 -0700693}
694
695void grpc_lb_policy_pick_first_shutdown() {}