blob: 1a0f9d17905e5086d962a9cd357b647e61854f14 [file] [log] [blame]
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/lb_policies/pick_first.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070035
36#include <string.h>
37
38#include <grpc/support/alloc.h>
Craig Tiller08a1cf82015-06-29 09:37:52 -070039#include "src/core/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070040
41typedef struct pending_pick {
42 struct pending_pick *next;
43 grpc_pollset *pollset;
44 grpc_subchannel **target;
45 grpc_iomgr_closure *on_complete;
46} pending_pick;
47
48typedef struct {
49 /** base policy: must be first */
50 grpc_lb_policy base;
Craig Tillereb3b12e2015-06-26 14:42:49 -070051 /** all our subchannels */
52 grpc_subchannel **subchannels;
53 size_t num_subchannels;
54
55 grpc_iomgr_closure connectivity_changed;
56
57 /** mutex protecting remaining members */
58 gpr_mu mu;
59 /** the selected channel
60 TODO(ctiller): this should be atomically set so we don't
61 need to take a mutex in the common case */
62 grpc_subchannel *selected;
63 /** have we started picking? */
64 int started_picking;
Craig Tillera14215a2015-07-17 17:21:08 -070065 /** are we shut down? */
66 int shutdown;
Craig Tillereb3b12e2015-06-26 14:42:49 -070067 /** which subchannel are we watching? */
68 size_t checking_subchannel;
69 /** what is the connectivity of that channel? */
70 grpc_connectivity_state checking_connectivity;
71 /** list of picks that are waiting on connectivity */
72 pending_pick *pending_picks;
Craig Tillerc7b5f762015-06-27 11:48:42 -070073
74 /** our connectivity state tracker */
75 grpc_connectivity_state_tracker state_tracker;
Craig Tillereb3b12e2015-06-26 14:42:49 -070076} pick_first_lb_policy;
77
Craig Tiller1ada6ad2015-07-16 16:19:14 -070078static void del_interested_parties_locked(pick_first_lb_policy *p) {
79 pending_pick *pp;
80 for (pp = p->pending_picks; pp; pp = pp->next) {
81 grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
82 pp->pollset);
83 }
84}
85
86static void add_interested_parties_locked(pick_first_lb_policy *p) {
87 pending_pick *pp;
88 for (pp = p->pending_picks; pp; pp = pp->next) {
89 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
90 pp->pollset);
91 }
92}
93
Craig Tillerd7b68e72015-06-28 11:41:09 -070094void pf_destroy(grpc_lb_policy *pol) {
Craig Tiller4ab82d22015-06-29 09:40:33 -070095 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd7b68e72015-06-28 11:41:09 -070096 size_t i;
Craig Tiller1ada6ad2015-07-16 16:19:14 -070097 del_interested_parties_locked(p);
Craig Tillerd7b68e72015-06-28 11:41:09 -070098 for (i = 0; i < p->num_subchannels; i++) {
Craig Tillerc3967532015-06-29 14:59:38 -070099 GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700100 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700101 grpc_connectivity_state_destroy(&p->state_tracker);
Craig Tillerd7b68e72015-06-28 11:41:09 -0700102 gpr_free(p->subchannels);
103 gpr_mu_destroy(&p->mu);
104 gpr_free(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700105}
106
107void pf_shutdown(grpc_lb_policy *pol) {
Craig Tillerb3671532015-07-01 10:37:40 -0700108 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700109 pending_pick *pp;
110 gpr_mu_lock(&p->mu);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700111 del_interested_parties_locked(p);
Craig Tillera14215a2015-07-17 17:21:08 -0700112 p->shutdown = 1;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700113 while ((pp = p->pending_picks)) {
114 p->pending_picks = pp->next;
115 *pp->target = NULL;
116 grpc_iomgr_add_delayed_callback(pp->on_complete, 0);
117 gpr_free(pp);
118 }
Craig Tillera14215a2015-07-17 17:21:08 -0700119 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE);
Craig Tillerd2cc4592015-07-01 07:50:47 -0700120 gpr_mu_unlock(&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700121}
122
Craig Tiller48cb07c2015-07-15 16:16:15 -0700123static void start_picking(pick_first_lb_policy *p) {
124 p->started_picking = 1;
125 p->checking_subchannel = 0;
126 p->checking_connectivity = GRPC_CHANNEL_IDLE;
127 GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
128 grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
129 &p->checking_connectivity,
130 &p->connectivity_changed);
131}
132
133void pf_exit_idle(grpc_lb_policy *pol) {
134 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
135 gpr_mu_lock(&p->mu);
136 if (!p->started_picking) {
137 start_picking(p);
138 }
139 gpr_mu_unlock(&p->mu);
140}
141
Craig Tillereb3b12e2015-06-26 14:42:49 -0700142void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
143 grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
144 grpc_iomgr_closure *on_complete) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700145 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700146 pending_pick *pp;
147 gpr_mu_lock(&p->mu);
148 if (p->selected) {
149 gpr_mu_unlock(&p->mu);
150 *target = p->selected;
151 on_complete->cb(on_complete->cb_arg, 1);
152 } else {
153 if (!p->started_picking) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700154 start_picking(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700155 }
Craig Tiller4ab82d22015-06-29 09:40:33 -0700156 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
157 pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700158 pp = gpr_malloc(sizeof(*pp));
159 pp->next = p->pending_picks;
160 pp->pollset = pollset;
161 pp->target = target;
162 pp->on_complete = on_complete;
163 p->pending_picks = pp;
164 gpr_mu_unlock(&p->mu);
165 }
166}
167
Craig Tillereb3b12e2015-06-26 14:42:49 -0700168static void pf_connectivity_changed(void *arg, int iomgr_success) {
169 pick_first_lb_policy *p = arg;
170 pending_pick *pp;
171 int unref = 0;
172
173 gpr_mu_lock(&p->mu);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700174
Craig Tillera14215a2015-07-17 17:21:08 -0700175 if (p->shutdown) {
176 unref = 1;
177 } else if (p->selected != NULL) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700178 grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity);
179 if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
180 grpc_subchannel_notify_on_state_change(p->selected, &p->checking_connectivity, &p->connectivity_changed);
181 } else {
Craig Tillereb3b12e2015-06-26 14:42:49 -0700182 unref = 1;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700183 }
184 } else {
185loop:
186 switch (p->checking_connectivity) {
187 case GRPC_CHANNEL_READY:
188 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY);
189 p->selected = p->subchannels[p->checking_subchannel];
Craig Tiller87cc0842015-06-30 08:15:55 -0700190 while ((pp = p->pending_picks)) {
191 p->pending_picks = pp->next;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700192 *pp->target = p->selected;
193 grpc_subchannel_del_interested_party(p->selected, pp->pollset);
Craig Tiller87cc0842015-06-30 08:15:55 -0700194 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
195 gpr_free(pp);
196 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700197 grpc_subchannel_notify_on_state_change(p->selected, &p->checking_connectivity, &p->connectivity_changed);
198 break;
199 case GRPC_CHANNEL_TRANSIENT_FAILURE:
200 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE);
201 del_interested_parties_locked(p);
202 p->checking_subchannel =
203 (p->checking_subchannel + 1) % p->num_subchannels;
Craig Tiller87cc0842015-06-30 08:15:55 -0700204 p->checking_connectivity = grpc_subchannel_check_connectivity(
205 p->subchannels[p->checking_subchannel]);
Craig Tiller87cc0842015-06-30 08:15:55 -0700206 add_interested_parties_locked(p);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700207 if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
208 grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed);
209 } else {
210 goto loop;
211 }
212 break;
213 case GRPC_CHANNEL_CONNECTING:
214 case GRPC_CHANNEL_IDLE:
215 grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity);
216 grpc_subchannel_notify_on_state_change(
217 p->subchannels[p->checking_subchannel], &p->checking_connectivity,
218 &p->connectivity_changed);
219 break;
220 case GRPC_CHANNEL_FATAL_FAILURE:
221 del_interested_parties_locked(p);
222 GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
223 p->subchannels[p->num_subchannels - 1]);
224 p->num_subchannels--;
225 GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
226 if (p->num_subchannels == 0) {
227 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE);
228 while ((pp = p->pending_picks)) {
229 p->pending_picks = pp->next;
230 *pp->target = NULL;
231 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
232 gpr_free(pp);
233 }
234 unref = 1;
235 } else {
236 grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE);
237 p->checking_subchannel %= p->num_subchannels;
238 p->checking_connectivity = grpc_subchannel_check_connectivity(
239 p->subchannels[p->checking_subchannel]);
240 add_interested_parties_locked(p);
241 goto loop;
242 }
243 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700244 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700245
Craig Tillereb3b12e2015-06-26 14:42:49 -0700246 gpr_mu_unlock(&p->mu);
247
248 if (unref) {
Craig Tillerd7b68e72015-06-28 11:41:09 -0700249 GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700250 }
251}
252
Craig Tillerc7b5f762015-06-27 11:48:42 -0700253static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700254 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700255 size_t i;
256 size_t n;
257 grpc_subchannel **subchannels;
258
259 gpr_mu_lock(&p->mu);
260 n = p->num_subchannels;
261 subchannels = gpr_malloc(n * sizeof(*subchannels));
262 for (i = 0; i < n; i++) {
263 subchannels[i] = p->subchannels[i];
Craig Tillerc3967532015-06-29 14:59:38 -0700264 GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700265 }
266 gpr_mu_unlock(&p->mu);
267
268 for (i = 0; i < n; i++) {
269 grpc_subchannel_process_transport_op(subchannels[i], op);
Craig Tillerc3967532015-06-29 14:59:38 -0700270 GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700271 }
272 gpr_free(subchannels);
273}
274
275static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700276 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700277 grpc_connectivity_state st;
278 gpr_mu_lock(&p->mu);
279 st = grpc_connectivity_state_check(&p->state_tracker);
280 gpr_mu_unlock(&p->mu);
281 return st;
282}
283
Craig Tiller4ab82d22015-06-29 09:40:33 -0700284static void pf_notify_on_state_change(grpc_lb_policy *pol,
285 grpc_connectivity_state *current,
286 grpc_iomgr_closure *notify) {
287 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700288 gpr_mu_lock(&p->mu);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700289 grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
290 notify);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700291 gpr_mu_unlock(&p->mu);
292}
293
Craig Tillereb3b12e2015-06-26 14:42:49 -0700294static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700295 pf_destroy,
296 pf_shutdown,
297 pf_pick,
298 pf_exit_idle,
299 pf_broadcast,
300 pf_check_connectivity,
301 pf_notify_on_state_change};
Craig Tillereb3b12e2015-06-26 14:42:49 -0700302
303grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
304 size_t num_subchannels) {
305 pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
306 GPR_ASSERT(num_subchannels);
307 memset(p, 0, sizeof(*p));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700308 grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700309 p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
310 p->num_subchannels = num_subchannels;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700311 grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700312 memcpy(p->subchannels, subchannels,
313 sizeof(grpc_subchannel *) * num_subchannels);
314 grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
315 gpr_mu_init(&p->mu);
316 return &p->base;
317}