blob: 1c5d058d97f62604f1f87509856349f93a4024a9 [file] [log] [blame]
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas5c4543d2015-09-03 15:49:56 -070034#include "src/core/client_config/lb_policy_factory.h"
Craig Tiller3bc8ebd2015-06-24 15:41:15 -070035#include "src/core/client_config/lb_policies/pick_first.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070036
37#include <string.h>
38
39#include <grpc/support/alloc.h>
Craig Tiller08a1cf82015-06-29 09:37:52 -070040#include "src/core/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070041
42typedef struct pending_pick {
43 struct pending_pick *next;
44 grpc_pollset *pollset;
45 grpc_subchannel **target;
46 grpc_iomgr_closure *on_complete;
47} pending_pick;
48
49typedef struct {
50 /** base policy: must be first */
51 grpc_lb_policy base;
Craig Tillereb3b12e2015-06-26 14:42:49 -070052 /** all our subchannels */
53 grpc_subchannel **subchannels;
54 size_t num_subchannels;
55
56 grpc_iomgr_closure connectivity_changed;
57
58 /** mutex protecting remaining members */
59 gpr_mu mu;
60 /** the selected channel
61 TODO(ctiller): this should be atomically set so we don't
62 need to take a mutex in the common case */
63 grpc_subchannel *selected;
64 /** have we started picking? */
65 int started_picking;
Craig Tillera14215a2015-07-17 17:21:08 -070066 /** are we shut down? */
67 int shutdown;
Craig Tillereb3b12e2015-06-26 14:42:49 -070068 /** which subchannel are we watching? */
69 size_t checking_subchannel;
70 /** what is the connectivity of that channel? */
71 grpc_connectivity_state checking_connectivity;
72 /** list of picks that are waiting on connectivity */
73 pending_pick *pending_picks;
Craig Tillerc7b5f762015-06-27 11:48:42 -070074
75 /** our connectivity state tracker */
76 grpc_connectivity_state_tracker state_tracker;
Craig Tillereb3b12e2015-06-26 14:42:49 -070077} pick_first_lb_policy;
78
Craig Tiller1ada6ad2015-07-16 16:19:14 -070079static void del_interested_parties_locked(pick_first_lb_policy *p) {
80 pending_pick *pp;
81 for (pp = p->pending_picks; pp; pp = pp->next) {
82 grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
83 pp->pollset);
84 }
85}
86
87static void add_interested_parties_locked(pick_first_lb_policy *p) {
88 pending_pick *pp;
89 for (pp = p->pending_picks; pp; pp = pp->next) {
90 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
91 pp->pollset);
92 }
93}
94
Craig Tillerd7b68e72015-06-28 11:41:09 -070095void pf_destroy(grpc_lb_policy *pol) {
Craig Tiller4ab82d22015-06-29 09:40:33 -070096 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd7b68e72015-06-28 11:41:09 -070097 size_t i;
Craig Tiller1ada6ad2015-07-16 16:19:14 -070098 del_interested_parties_locked(p);
Craig Tillerd7b68e72015-06-28 11:41:09 -070099 for (i = 0; i < p->num_subchannels; i++) {
Craig Tillerc3967532015-06-29 14:59:38 -0700100 GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700101 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700102 grpc_connectivity_state_destroy(&p->state_tracker);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700103 gpr_free(p->subchannels);
104 gpr_mu_destroy(&p->mu);
105 gpr_free(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700106}
107
108void pf_shutdown(grpc_lb_policy *pol) {
Craig Tillerb3671532015-07-01 10:37:40 -0700109 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700110 pending_pick *pp;
111 gpr_mu_lock(&p->mu);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700112 del_interested_parties_locked(p);
Craig Tillera14215a2015-07-17 17:21:08 -0700113 p->shutdown = 1;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700114 while ((pp = p->pending_picks)) {
115 p->pending_picks = pp->next;
116 *pp->target = NULL;
117 grpc_iomgr_add_delayed_callback(pp->on_complete, 0);
118 gpr_free(pp);
119 }
Craig Tiller03dc6552015-07-17 23:12:34 -0700120 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
121 "shutdown");
Craig Tillerd2cc4592015-07-01 07:50:47 -0700122 gpr_mu_unlock(&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700123}
124
Craig Tiller48cb07c2015-07-15 16:16:15 -0700125static void start_picking(pick_first_lb_policy *p) {
126 p->started_picking = 1;
127 p->checking_subchannel = 0;
128 p->checking_connectivity = GRPC_CHANNEL_IDLE;
129 GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
130 grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
131 &p->checking_connectivity,
132 &p->connectivity_changed);
133}
134
135void pf_exit_idle(grpc_lb_policy *pol) {
136 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
137 gpr_mu_lock(&p->mu);
138 if (!p->started_picking) {
139 start_picking(p);
140 }
141 gpr_mu_unlock(&p->mu);
142}
143
Craig Tillereb3b12e2015-06-26 14:42:49 -0700144void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
145 grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
146 grpc_iomgr_closure *on_complete) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700147 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700148 pending_pick *pp;
149 gpr_mu_lock(&p->mu);
150 if (p->selected) {
151 gpr_mu_unlock(&p->mu);
152 *target = p->selected;
153 on_complete->cb(on_complete->cb_arg, 1);
154 } else {
155 if (!p->started_picking) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700156 start_picking(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700157 }
Craig Tiller4ab82d22015-06-29 09:40:33 -0700158 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
159 pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700160 pp = gpr_malloc(sizeof(*pp));
161 pp->next = p->pending_picks;
162 pp->pollset = pollset;
163 pp->target = target;
164 pp->on_complete = on_complete;
165 p->pending_picks = pp;
166 gpr_mu_unlock(&p->mu);
167 }
168}
169
Craig Tillereb3b12e2015-06-26 14:42:49 -0700170static void pf_connectivity_changed(void *arg, int iomgr_success) {
171 pick_first_lb_policy *p = arg;
172 pending_pick *pp;
173 int unref = 0;
174
175 gpr_mu_lock(&p->mu);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700176
Craig Tillera14215a2015-07-17 17:21:08 -0700177 if (p->shutdown) {
178 unref = 1;
179 } else if (p->selected != NULL) {
Craig Tiller03dc6552015-07-17 23:12:34 -0700180 grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
181 "selected_changed");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700182 if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
Craig Tiller03dc6552015-07-17 23:12:34 -0700183 grpc_subchannel_notify_on_state_change(
184 p->selected, &p->checking_connectivity, &p->connectivity_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700185 } else {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700186 unref = 1;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700187 }
188 } else {
Craig Tiller03dc6552015-07-17 23:12:34 -0700189 loop:
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700190 switch (p->checking_connectivity) {
191 case GRPC_CHANNEL_READY:
Craig Tiller03dc6552015-07-17 23:12:34 -0700192 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
193 "connecting_ready");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700194 p->selected = p->subchannels[p->checking_subchannel];
Craig Tiller87cc0842015-06-30 08:15:55 -0700195 while ((pp = p->pending_picks)) {
196 p->pending_picks = pp->next;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700197 *pp->target = p->selected;
198 grpc_subchannel_del_interested_party(p->selected, pp->pollset);
Craig Tiller87cc0842015-06-30 08:15:55 -0700199 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
200 gpr_free(pp);
201 }
Craig Tiller03dc6552015-07-17 23:12:34 -0700202 grpc_subchannel_notify_on_state_change(
203 p->selected, &p->checking_connectivity, &p->connectivity_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700204 break;
205 case GRPC_CHANNEL_TRANSIENT_FAILURE:
Craig Tiller03dc6552015-07-17 23:12:34 -0700206 grpc_connectivity_state_set(&p->state_tracker,
207 GRPC_CHANNEL_TRANSIENT_FAILURE,
208 "connecting_transient_failure");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700209 del_interested_parties_locked(p);
210 p->checking_subchannel =
211 (p->checking_subchannel + 1) % p->num_subchannels;
Craig Tiller87cc0842015-06-30 08:15:55 -0700212 p->checking_connectivity = grpc_subchannel_check_connectivity(
213 p->subchannels[p->checking_subchannel]);
Craig Tiller87cc0842015-06-30 08:15:55 -0700214 add_interested_parties_locked(p);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700215 if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
Craig Tiller03dc6552015-07-17 23:12:34 -0700216 grpc_subchannel_notify_on_state_change(
217 p->subchannels[p->checking_subchannel], &p->checking_connectivity,
218 &p->connectivity_changed);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700219 } else {
220 goto loop;
221 }
222 break;
223 case GRPC_CHANNEL_CONNECTING:
224 case GRPC_CHANNEL_IDLE:
Craig Tiller03dc6552015-07-17 23:12:34 -0700225 grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
226 "connecting_changed");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700227 grpc_subchannel_notify_on_state_change(
228 p->subchannels[p->checking_subchannel], &p->checking_connectivity,
229 &p->connectivity_changed);
230 break;
231 case GRPC_CHANNEL_FATAL_FAILURE:
232 del_interested_parties_locked(p);
233 GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
234 p->subchannels[p->num_subchannels - 1]);
235 p->num_subchannels--;
236 GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
237 if (p->num_subchannels == 0) {
Craig Tiller03dc6552015-07-17 23:12:34 -0700238 grpc_connectivity_state_set(&p->state_tracker,
239 GRPC_CHANNEL_FATAL_FAILURE,
240 "no_more_channels");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700241 while ((pp = p->pending_picks)) {
242 p->pending_picks = pp->next;
243 *pp->target = NULL;
244 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
245 gpr_free(pp);
246 }
247 unref = 1;
248 } else {
Craig Tiller03dc6552015-07-17 23:12:34 -0700249 grpc_connectivity_state_set(&p->state_tracker,
250 GRPC_CHANNEL_TRANSIENT_FAILURE,
251 "subchannel_failed");
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700252 p->checking_subchannel %= p->num_subchannels;
253 p->checking_connectivity = grpc_subchannel_check_connectivity(
254 p->subchannels[p->checking_subchannel]);
255 add_interested_parties_locked(p);
256 goto loop;
257 }
258 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700259 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700260
Craig Tillereb3b12e2015-06-26 14:42:49 -0700261 gpr_mu_unlock(&p->mu);
262
263 if (unref) {
Craig Tillerd7b68e72015-06-28 11:41:09 -0700264 GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700265 }
266}
267
Craig Tillerc7b5f762015-06-27 11:48:42 -0700268static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700269 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700270 size_t i;
271 size_t n;
272 grpc_subchannel **subchannels;
273
274 gpr_mu_lock(&p->mu);
275 n = p->num_subchannels;
276 subchannels = gpr_malloc(n * sizeof(*subchannels));
277 for (i = 0; i < n; i++) {
278 subchannels[i] = p->subchannels[i];
Craig Tillerc3967532015-06-29 14:59:38 -0700279 GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700280 }
281 gpr_mu_unlock(&p->mu);
282
283 for (i = 0; i < n; i++) {
284 grpc_subchannel_process_transport_op(subchannels[i], op);
Craig Tillerc3967532015-06-29 14:59:38 -0700285 GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700286 }
287 gpr_free(subchannels);
288}
289
290static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700291 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700292 grpc_connectivity_state st;
293 gpr_mu_lock(&p->mu);
294 st = grpc_connectivity_state_check(&p->state_tracker);
295 gpr_mu_unlock(&p->mu);
296 return st;
297}
298
Craig Tiller4ab82d22015-06-29 09:40:33 -0700299static void pf_notify_on_state_change(grpc_lb_policy *pol,
300 grpc_connectivity_state *current,
301 grpc_iomgr_closure *notify) {
302 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700303 gpr_mu_lock(&p->mu);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700304 grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
305 notify);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700306 gpr_mu_unlock(&p->mu);
307}
308
Craig Tillereb3b12e2015-06-26 14:42:49 -0700309static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700310 pf_destroy,
311 pf_shutdown,
312 pf_pick,
313 pf_exit_idle,
314 pf_broadcast,
315 pf_check_connectivity,
316 pf_notify_on_state_change};
Craig Tillereb3b12e2015-06-26 14:42:49 -0700317
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700318static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
319
320static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
321
322static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
323 grpc_subchannel **subchannels,
324 size_t num_subchannels) {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700325 pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
326 GPR_ASSERT(num_subchannels);
327 memset(p, 0, sizeof(*p));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700328 grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700329 p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
330 p->num_subchannels = num_subchannels;
Craig Tiller03dc6552015-07-17 23:12:34 -0700331 grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
332 "pick_first");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700333 memcpy(p->subchannels, subchannels,
334 sizeof(grpc_subchannel *) * num_subchannels);
335 grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
336 gpr_mu_init(&p->mu);
337 return &p->base;
338}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700339
340static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
341 pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
342 "pick_first"};
343
344static grpc_lb_policy_factory pick_first_lb_policy_factory = {
345 &pick_first_factory_vtable};
346
347grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() {
348 return &pick_first_lb_policy_factory;
349}