blob: 8409aab14e22e4d0d3264558a63fbc756258a730 [file] [log] [blame]
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/lb_policies/pick_first.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070035
36#include <string.h>
37
38#include <grpc/support/alloc.h>
Craig Tiller08a1cf82015-06-29 09:37:52 -070039#include "src/core/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070040
41typedef struct pending_pick {
42 struct pending_pick *next;
43 grpc_pollset *pollset;
44 grpc_subchannel **target;
45 grpc_iomgr_closure *on_complete;
46} pending_pick;
47
48typedef struct {
49 /** base policy: must be first */
50 grpc_lb_policy base;
Craig Tillereb3b12e2015-06-26 14:42:49 -070051 /** all our subchannels */
52 grpc_subchannel **subchannels;
53 size_t num_subchannels;
54
55 grpc_iomgr_closure connectivity_changed;
56
57 /** mutex protecting remaining members */
58 gpr_mu mu;
59 /** the selected channel
60 TODO(ctiller): this should be atomically set so we don't
61 need to take a mutex in the common case */
62 grpc_subchannel *selected;
63 /** have we started picking? */
64 int started_picking;
65 /** which subchannel are we watching? */
66 size_t checking_subchannel;
67 /** what is the connectivity of that channel? */
68 grpc_connectivity_state checking_connectivity;
69 /** list of picks that are waiting on connectivity */
70 pending_pick *pending_picks;
Craig Tillerc7b5f762015-06-27 11:48:42 -070071
72 /** our connectivity state tracker */
73 grpc_connectivity_state_tracker state_tracker;
Craig Tillereb3b12e2015-06-26 14:42:49 -070074} pick_first_lb_policy;
75
Craig Tiller1ada6ad2015-07-16 16:19:14 -070076static void del_interested_parties_locked(pick_first_lb_policy *p) {
77 pending_pick *pp;
78 for (pp = p->pending_picks; pp; pp = pp->next) {
79 grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
80 pp->pollset);
81 }
82}
83
84static void add_interested_parties_locked(pick_first_lb_policy *p) {
85 pending_pick *pp;
86 for (pp = p->pending_picks; pp; pp = pp->next) {
87 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
88 pp->pollset);
89 }
90}
91
Craig Tillerd7b68e72015-06-28 11:41:09 -070092void pf_destroy(grpc_lb_policy *pol) {
Craig Tiller4ab82d22015-06-29 09:40:33 -070093 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd7b68e72015-06-28 11:41:09 -070094 size_t i;
Craig Tiller1ada6ad2015-07-16 16:19:14 -070095 del_interested_parties_locked(p);
Craig Tillerd7b68e72015-06-28 11:41:09 -070096 for (i = 0; i < p->num_subchannels; i++) {
Craig Tillerc3967532015-06-29 14:59:38 -070097 GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first");
Craig Tillereb3b12e2015-06-26 14:42:49 -070098 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -070099 grpc_connectivity_state_destroy(&p->state_tracker);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700100 gpr_free(p->subchannels);
101 gpr_mu_destroy(&p->mu);
102 gpr_free(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700103}
104
105void pf_shutdown(grpc_lb_policy *pol) {
Craig Tillerb3671532015-07-01 10:37:40 -0700106 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700107 pending_pick *pp;
108 gpr_mu_lock(&p->mu);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700109 del_interested_parties_locked(p);
Craig Tillerd2cc4592015-07-01 07:50:47 -0700110 while ((pp = p->pending_picks)) {
111 p->pending_picks = pp->next;
112 *pp->target = NULL;
113 grpc_iomgr_add_delayed_callback(pp->on_complete, 0);
114 gpr_free(pp);
115 }
116 gpr_mu_unlock(&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700117}
118
Craig Tiller48cb07c2015-07-15 16:16:15 -0700119static void start_picking(pick_first_lb_policy *p) {
120 p->started_picking = 1;
121 p->checking_subchannel = 0;
122 p->checking_connectivity = GRPC_CHANNEL_IDLE;
123 GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
124 grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
125 &p->checking_connectivity,
126 &p->connectivity_changed);
127}
128
129void pf_exit_idle(grpc_lb_policy *pol) {
130 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
131 gpr_mu_lock(&p->mu);
132 if (!p->started_picking) {
133 start_picking(p);
134 }
135 gpr_mu_unlock(&p->mu);
136}
137
Craig Tillereb3b12e2015-06-26 14:42:49 -0700138void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
139 grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
140 grpc_iomgr_closure *on_complete) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700141 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700142 pending_pick *pp;
143 gpr_mu_lock(&p->mu);
144 if (p->selected) {
145 gpr_mu_unlock(&p->mu);
146 *target = p->selected;
147 on_complete->cb(on_complete->cb_arg, 1);
148 } else {
149 if (!p->started_picking) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700150 start_picking(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700151 }
Craig Tiller4ab82d22015-06-29 09:40:33 -0700152 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
153 pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700154 pp = gpr_malloc(sizeof(*pp));
155 pp->next = p->pending_picks;
156 pp->pollset = pollset;
157 pp->target = target;
158 pp->on_complete = on_complete;
159 p->pending_picks = pp;
160 gpr_mu_unlock(&p->mu);
161 }
162}
163
Craig Tillereb3b12e2015-06-26 14:42:49 -0700164static void pf_connectivity_changed(void *arg, int iomgr_success) {
165 pick_first_lb_policy *p = arg;
166 pending_pick *pp;
167 int unref = 0;
168
169 gpr_mu_lock(&p->mu);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700170
171 if (p->selected != NULL) {
172 grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity);
173 if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
174 grpc_subchannel_notify_on_state_change(p->selected, &p->checking_connectivity, &p->connectivity_changed);
175 } else {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700176 unref = 1;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700177 }
178 } else {
179loop:
180 switch (p->checking_connectivity) {
181 case GRPC_CHANNEL_READY:
182 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY);
183 p->selected = p->subchannels[p->checking_subchannel];
Craig Tiller87cc0842015-06-30 08:15:55 -0700184 while ((pp = p->pending_picks)) {
185 p->pending_picks = pp->next;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700186 *pp->target = p->selected;
187 grpc_subchannel_del_interested_party(p->selected, pp->pollset);
Craig Tiller87cc0842015-06-30 08:15:55 -0700188 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
189 gpr_free(pp);
190 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700191 grpc_subchannel_notify_on_state_change(p->selected, &p->checking_connectivity, &p->connectivity_changed);
192 break;
193 case GRPC_CHANNEL_TRANSIENT_FAILURE:
194 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE);
195 del_interested_parties_locked(p);
196 p->checking_subchannel =
197 (p->checking_subchannel + 1) % p->num_subchannels;
Craig Tiller87cc0842015-06-30 08:15:55 -0700198 p->checking_connectivity = grpc_subchannel_check_connectivity(
199 p->subchannels[p->checking_subchannel]);
Craig Tiller87cc0842015-06-30 08:15:55 -0700200 add_interested_parties_locked(p);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700201 if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
202 grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed);
203 } else {
204 goto loop;
205 }
206 break;
207 case GRPC_CHANNEL_CONNECTING:
208 case GRPC_CHANNEL_IDLE:
209 grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity);
210 grpc_subchannel_notify_on_state_change(
211 p->subchannels[p->checking_subchannel], &p->checking_connectivity,
212 &p->connectivity_changed);
213 break;
214 case GRPC_CHANNEL_FATAL_FAILURE:
215 del_interested_parties_locked(p);
216 GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
217 p->subchannels[p->num_subchannels - 1]);
218 p->num_subchannels--;
219 GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
220 if (p->num_subchannels == 0) {
221 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE);
222 while ((pp = p->pending_picks)) {
223 p->pending_picks = pp->next;
224 *pp->target = NULL;
225 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
226 gpr_free(pp);
227 }
228 unref = 1;
229 } else {
230 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE);
231 p->checking_subchannel %= p->num_subchannels;
232 p->checking_connectivity = grpc_subchannel_check_connectivity(
233 p->subchannels[p->checking_subchannel]);
234 add_interested_parties_locked(p);
235 goto loop;
236 }
237 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700238 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700239
Craig Tillereb3b12e2015-06-26 14:42:49 -0700240 gpr_mu_unlock(&p->mu);
241
242 if (unref) {
Craig Tillerd7b68e72015-06-28 11:41:09 -0700243 GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700244 }
245}
246
Craig Tillerc7b5f762015-06-27 11:48:42 -0700247static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700248 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700249 size_t i;
250 size_t n;
251 grpc_subchannel **subchannels;
252
253 gpr_mu_lock(&p->mu);
254 n = p->num_subchannels;
255 subchannels = gpr_malloc(n * sizeof(*subchannels));
256 for (i = 0; i < n; i++) {
257 subchannels[i] = p->subchannels[i];
Craig Tillerc3967532015-06-29 14:59:38 -0700258 GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700259 }
260 gpr_mu_unlock(&p->mu);
261
262 for (i = 0; i < n; i++) {
263 grpc_subchannel_process_transport_op(subchannels[i], op);
Craig Tillerc3967532015-06-29 14:59:38 -0700264 GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700265 }
266 gpr_free(subchannels);
267}
268
269static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700270 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700271 grpc_connectivity_state st;
272 gpr_mu_lock(&p->mu);
273 st = grpc_connectivity_state_check(&p->state_tracker);
274 gpr_mu_unlock(&p->mu);
275 return st;
276}
277
Craig Tiller4ab82d22015-06-29 09:40:33 -0700278static void pf_notify_on_state_change(grpc_lb_policy *pol,
279 grpc_connectivity_state *current,
280 grpc_iomgr_closure *notify) {
281 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700282 gpr_mu_lock(&p->mu);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700283 grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
284 notify);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700285 gpr_mu_unlock(&p->mu);
286}
287
Craig Tillereb3b12e2015-06-26 14:42:49 -0700288static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700289 pf_destroy,
290 pf_shutdown,
291 pf_pick,
292 pf_exit_idle,
293 pf_broadcast,
294 pf_check_connectivity,
295 pf_notify_on_state_change};
Craig Tillereb3b12e2015-06-26 14:42:49 -0700296
297grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
298 size_t num_subchannels) {
299 pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
300 GPR_ASSERT(num_subchannels);
301 memset(p, 0, sizeof(*p));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700302 grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700303 p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
304 p->num_subchannels = num_subchannels;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700305 grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700306 memcpy(p->subchannels, subchannels,
307 sizeof(grpc_subchannel *) * num_subchannels);
308 grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
309 gpr_mu_init(&p->mu);
310 return &p->base;
311}