blob: 18c8c0412f75085d1135d4e37f73f404227a2f57 [file] [log] [blame]
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas5c4543d2015-09-03 15:49:56 -070034#include "src/core/client_config/lb_policy_factory.h"
Craig Tiller3bc8ebd2015-06-24 15:41:15 -070035#include "src/core/client_config/lb_policies/pick_first.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070036
37#include <string.h>
38
39#include <grpc/support/alloc.h>
Craig Tiller08a1cf82015-06-29 09:37:52 -070040#include "src/core/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070041
Craig Tillera82950e2015-09-22 12:33:20 -070042typedef struct pending_pick {
Craig Tillereb3b12e2015-06-26 14:42:49 -070043 struct pending_pick *next;
44 grpc_pollset *pollset;
45 grpc_subchannel **target;
Craig Tiller33825112015-09-18 07:44:19 -070046 grpc_closure *on_complete;
Craig Tillereb3b12e2015-06-26 14:42:49 -070047} pending_pick;
48
Craig Tillera82950e2015-09-22 12:33:20 -070049typedef struct {
Craig Tillereb3b12e2015-06-26 14:42:49 -070050 /** base policy: must be first */
51 grpc_lb_policy base;
Craig Tillereb3b12e2015-06-26 14:42:49 -070052 /** all our subchannels */
53 grpc_subchannel **subchannels;
54 size_t num_subchannels;
55
Craig Tiller33825112015-09-18 07:44:19 -070056 grpc_closure connectivity_changed;
Craig Tillereb3b12e2015-06-26 14:42:49 -070057
58 /** mutex protecting remaining members */
59 gpr_mu mu;
60 /** the selected channel
61 TODO(ctiller): this should be atomically set so we don't
62 need to take a mutex in the common case */
63 grpc_subchannel *selected;
64 /** have we started picking? */
65 int started_picking;
Craig Tillera14215a2015-07-17 17:21:08 -070066 /** are we shut down? */
67 int shutdown;
Craig Tillereb3b12e2015-06-26 14:42:49 -070068 /** which subchannel are we watching? */
69 size_t checking_subchannel;
70 /** what is the connectivity of that channel? */
71 grpc_connectivity_state checking_connectivity;
72 /** list of picks that are waiting on connectivity */
73 pending_pick *pending_picks;
Craig Tillerc7b5f762015-06-27 11:48:42 -070074
75 /** our connectivity state tracker */
76 grpc_connectivity_state_tracker state_tracker;
Craig Tillereb3b12e2015-06-26 14:42:49 -070077} pick_first_lb_policy;
78
Craig Tillera82950e2015-09-22 12:33:20 -070079static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
80 pick_first_lb_policy *p) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -070081 pending_pick *pp;
Craig Tillera82950e2015-09-22 12:33:20 -070082 for (pp = p->pending_picks; pp; pp = pp->next) {
83 grpc_subchannel_del_interested_party(
84 exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
85 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -070086}
87
Craig Tillera82950e2015-09-22 12:33:20 -070088static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx,
89 pick_first_lb_policy *p) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -070090 pending_pick *pp;
Craig Tillera82950e2015-09-22 12:33:20 -070091 for (pp = p->pending_picks; pp; pp = pp->next) {
92 grpc_subchannel_add_interested_party(
93 exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
94 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -070095}
96
Craig Tillera82950e2015-09-22 12:33:20 -070097void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
98 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd7b68e72015-06-28 11:41:09 -070099 size_t i;
Craig Tillera82950e2015-09-22 12:33:20 -0700100 GPR_ASSERT(p->pending_picks == NULL);
101 for (i = 0; i < p->num_subchannels; i++) {
102 GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
103 }
104 grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
105 gpr_free(p->subchannels);
106 gpr_mu_destroy(&p->mu);
107 gpr_free(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700108}
109
Craig Tillera82950e2015-09-22 12:33:20 -0700110void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
111 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700112 pending_pick *pp;
Craig Tillera82950e2015-09-22 12:33:20 -0700113 gpr_mu_lock(&p->mu);
114 del_interested_parties_locked(exec_ctx, p);
Craig Tillera14215a2015-07-17 17:21:08 -0700115 p->shutdown = 1;
Craig Tiller5795da72015-09-17 15:27:13 -0700116 pp = p->pending_picks;
117 p->pending_picks = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700118 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
119 GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
120 gpr_mu_unlock(&p->mu);
121 while (pp != NULL) {
122 pending_pick *next = pp->next;
123 *pp->target = NULL;
124 grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
125 gpr_free(pp);
126 pp = next;
127 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700128}
129
Craig Tillera82950e2015-09-22 12:33:20 -0700130static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700131 p->started_picking = 1;
132 p->checking_subchannel = 0;
133 p->checking_connectivity = GRPC_CHANNEL_IDLE;
Craig Tillera82950e2015-09-22 12:33:20 -0700134 GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
135 grpc_subchannel_notify_on_state_change(
136 exec_ctx, p->subchannels[p->checking_subchannel],
137 &p->checking_connectivity, &p->connectivity_changed);
Craig Tiller48cb07c2015-07-15 16:16:15 -0700138}
139
Craig Tillera82950e2015-09-22 12:33:20 -0700140void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
141 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
142 gpr_mu_lock(&p->mu);
143 if (!p->started_picking) {
144 start_picking(exec_ctx, p);
145 }
146 gpr_mu_unlock(&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700147}
148
Craig Tillera82950e2015-09-22 12:33:20 -0700149void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
150 grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
151 grpc_subchannel **target, grpc_closure *on_complete) {
152 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tiller45724b32015-09-22 10:42:19 -0700153 pending_pick *pp;
Craig Tillera82950e2015-09-22 12:33:20 -0700154 gpr_mu_lock(&p->mu);
155 if (p->selected) {
156 gpr_mu_unlock(&p->mu);
157 *target = p->selected;
158 grpc_exec_ctx_enqueue(exec_ctx, on_complete, 1);
159 } else {
160 if (!p->started_picking) {
161 start_picking(exec_ctx, p);
Craig Tiller45724b32015-09-22 10:42:19 -0700162 }
Craig Tillera82950e2015-09-22 12:33:20 -0700163 grpc_subchannel_add_interested_party(
164 exec_ctx, p->subchannels[p->checking_subchannel], pollset);
165 pp = gpr_malloc(sizeof(*pp));
166 pp->next = p->pending_picks;
167 pp->pollset = pollset;
168 pp->target = target;
169 pp->on_complete = on_complete;
170 p->pending_picks = pp;
171 gpr_mu_unlock(&p->mu);
172 }
Craig Tiller45724b32015-09-22 10:42:19 -0700173}
174
Craig Tillerb09d84d2015-10-06 09:12:16 -0700175static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
176 int iomgr_success) {
177 pick_first_lb_policy *p = arg;
178 size_t i;
179 grpc_transport_op op;
180 size_t num_subchannels = p->num_subchannels;
181 grpc_subchannel **subchannels;
182 grpc_subchannel *exclude_subchannel;
183
184 gpr_mu_lock(&p->mu);
185 subchannels = p->subchannels;
186 exclude_subchannel = p->selected;
187 p->num_subchannels = 0;
188 p->subchannels = NULL;
189 gpr_mu_unlock(&p->mu);
190 GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
191
192 for (i = 0; i < num_subchannels; i++) {
193 if (subchannels[i] == exclude_subchannel) {
194 exclude_subchannel = NULL;
195 continue;
196 }
197 memset(&op, 0, sizeof(op));
198 op.disconnect = 1;
199 grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
200 GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
201 }
202
203 gpr_free(subchannels);
204}
205
Craig Tillera82950e2015-09-22 12:33:20 -0700206static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
207 int iomgr_success) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700208 pick_first_lb_policy *p = arg;
209 pending_pick *pp;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700210
Craig Tillera82950e2015-09-22 12:33:20 -0700211 gpr_mu_lock(&p->mu);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700212
Craig Tillera82950e2015-09-22 12:33:20 -0700213 if (p->shutdown) {
214 gpr_mu_unlock(&p->mu);
215 GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
216 return;
217 } else if (p->selected != NULL) {
218 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
219 p->checking_connectivity, "selected_changed");
220 if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
221 grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
222 &p->checking_connectivity,
223 &p->connectivity_changed);
224 } else {
225 GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700226 }
Craig Tillera82950e2015-09-22 12:33:20 -0700227 } else {
228 loop:
229 switch (p->checking_connectivity) {
230 case GRPC_CHANNEL_READY:
231 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
232 GRPC_CHANNEL_READY, "connecting_ready");
233 p->selected = p->subchannels[p->checking_subchannel];
Craig Tillerb09d84d2015-10-06 09:12:16 -0700234 /* drop the pick list: we are connected now */
235 GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
236 grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1);
237 /* update any calls that were waiting for a pick */
Craig Tillera82950e2015-09-22 12:33:20 -0700238 while ((pp = p->pending_picks)) {
239 p->pending_picks = pp->next;
240 *pp->target = p->selected;
241 grpc_subchannel_del_interested_party(exec_ctx, p->selected,
242 pp->pollset);
243 grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
244 gpr_free(pp);
245 }
246 grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
247 &p->checking_connectivity,
248 &p->connectivity_changed);
249 break;
250 case GRPC_CHANNEL_TRANSIENT_FAILURE:
251 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
252 GRPC_CHANNEL_TRANSIENT_FAILURE,
253 "connecting_transient_failure");
254 del_interested_parties_locked(exec_ctx, p);
255 p->checking_subchannel =
256 (p->checking_subchannel + 1) % p->num_subchannels;
257 p->checking_connectivity = grpc_subchannel_check_connectivity(
258 p->subchannels[p->checking_subchannel]);
259 add_interested_parties_locked(exec_ctx, p);
260 if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
261 grpc_subchannel_notify_on_state_change(
262 exec_ctx, p->subchannels[p->checking_subchannel],
263 &p->checking_connectivity, &p->connectivity_changed);
264 } else {
265 goto loop;
266 }
267 break;
268 case GRPC_CHANNEL_CONNECTING:
269 case GRPC_CHANNEL_IDLE:
270 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
271 GRPC_CHANNEL_CONNECTING,
272 "connecting_changed");
273 grpc_subchannel_notify_on_state_change(
274 exec_ctx, p->subchannels[p->checking_subchannel],
275 &p->checking_connectivity, &p->connectivity_changed);
276 break;
277 case GRPC_CHANNEL_FATAL_FAILURE:
278 del_interested_parties_locked(exec_ctx, p);
279 GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
280 p->subchannels[p->num_subchannels - 1]);
281 p->num_subchannels--;
282 GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
283 "pick_first");
284 if (p->num_subchannels == 0) {
285 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
286 GRPC_CHANNEL_FATAL_FAILURE,
287 "no_more_channels");
288 while ((pp = p->pending_picks)) {
289 p->pending_picks = pp->next;
290 *pp->target = NULL;
291 grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
292 gpr_free(pp);
293 }
294 GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
295 } else {
296 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
297 GRPC_CHANNEL_TRANSIENT_FAILURE,
298 "subchannel_failed");
299 p->checking_subchannel %= p->num_subchannels;
300 p->checking_connectivity = grpc_subchannel_check_connectivity(
301 p->subchannels[p->checking_subchannel]);
302 add_interested_parties_locked(exec_ctx, p);
303 goto loop;
304 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700305 }
Craig Tillera82950e2015-09-22 12:33:20 -0700306 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700307
Craig Tillera82950e2015-09-22 12:33:20 -0700308 gpr_mu_unlock(&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700309}
310
Craig Tillera82950e2015-09-22 12:33:20 -0700311static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
312 grpc_transport_op *op) {
313 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700314 size_t i;
315 size_t n;
316 grpc_subchannel **subchannels;
317
Craig Tillera82950e2015-09-22 12:33:20 -0700318 gpr_mu_lock(&p->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700319 n = p->num_subchannels;
Craig Tillera82950e2015-09-22 12:33:20 -0700320 subchannels = gpr_malloc(n * sizeof(*subchannels));
321 for (i = 0; i < n; i++) {
322 subchannels[i] = p->subchannels[i];
323 GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
324 }
325 gpr_mu_unlock(&p->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700326
Craig Tillera82950e2015-09-22 12:33:20 -0700327 for (i = 0; i < n; i++) {
328 grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
329 GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
330 }
331 gpr_free(subchannels);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700332}
333
Craig Tillera82950e2015-09-22 12:33:20 -0700334static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
335 grpc_lb_policy *pol) {
336 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700337 grpc_connectivity_state st;
Craig Tillera82950e2015-09-22 12:33:20 -0700338 gpr_mu_lock(&p->mu);
339 st = grpc_connectivity_state_check(&p->state_tracker);
340 gpr_mu_unlock(&p->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700341 return st;
342}
343
Craig Tillera82950e2015-09-22 12:33:20 -0700344void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
345 grpc_connectivity_state *current,
346 grpc_closure *notify) {
347 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
348 gpr_mu_lock(&p->mu);
349 grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
350 current, notify);
351 gpr_mu_unlock(&p->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700352}
353
Craig Tillereb3b12e2015-06-26 14:42:49 -0700354static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
Craig Tiller71a0f9d2015-09-28 17:22:01 -0700355 pf_destroy, pf_shutdown, pf_pick, pf_exit_idle, pf_broadcast,
356 pf_check_connectivity, pf_notify_on_state_change};
Craig Tillereb3b12e2015-06-26 14:42:49 -0700357
Craig Tillera82950e2015-09-22 12:33:20 -0700358static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700359
Craig Tillera82950e2015-09-22 12:33:20 -0700360static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700361
Craig Tillera82950e2015-09-22 12:33:20 -0700362static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
363 grpc_lb_policy_args *args) {
364 pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
365 GPR_ASSERT(args->num_subchannels > 0);
366 memset(p, 0, sizeof(*p));
367 grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
368 p->subchannels =
369 gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
David Garcia Quintasc7705c72015-09-09 17:21:11 -0700370 p->num_subchannels = args->num_subchannels;
Craig Tillera82950e2015-09-22 12:33:20 -0700371 grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
372 "pick_first");
373 memcpy(p->subchannels, args->subchannels,
374 sizeof(grpc_subchannel *) * args->num_subchannels);
375 grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
376 gpr_mu_init(&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700377 return &p->base;
378}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700379
380static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
Craig Tillera82950e2015-09-22 12:33:20 -0700381 pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
382 "pick_first"};
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700383
384static grpc_lb_policy_factory pick_first_lb_policy_factory = {
Craig Tillera82950e2015-09-22 12:33:20 -0700385 &pick_first_factory_vtable};
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700386
Craig Tillera82950e2015-09-22 12:33:20 -0700387grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() {
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700388 return &pick_first_lb_policy_factory;
389}