blob: 4f4c7eb64cdd0be7269c9aaba75559b38dbb9d33 [file] [log] [blame]
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/lb_policies/pick_first.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070035
36#include <string.h>
37
38#include <grpc/support/alloc.h>
Craig Tiller08a1cf82015-06-29 09:37:52 -070039#include "src/core/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070040
41typedef struct pending_pick {
42 struct pending_pick *next;
43 grpc_pollset *pollset;
44 grpc_subchannel **target;
45 grpc_iomgr_closure *on_complete;
46} pending_pick;
47
48typedef struct {
49 /** base policy: must be first */
50 grpc_lb_policy base;
Craig Tillereb3b12e2015-06-26 14:42:49 -070051 /** all our subchannels */
52 grpc_subchannel **subchannels;
53 size_t num_subchannels;
54
55 grpc_iomgr_closure connectivity_changed;
56
57 /** mutex protecting remaining members */
58 gpr_mu mu;
59 /** the selected channel
60 TODO(ctiller): this should be atomically set so we don't
61 need to take a mutex in the common case */
62 grpc_subchannel *selected;
63 /** have we started picking? */
64 int started_picking;
65 /** which subchannel are we watching? */
66 size_t checking_subchannel;
67 /** what is the connectivity of that channel? */
68 grpc_connectivity_state checking_connectivity;
69 /** list of picks that are waiting on connectivity */
70 pending_pick *pending_picks;
Craig Tillerc7b5f762015-06-27 11:48:42 -070071
72 /** our connectivity state tracker */
73 grpc_connectivity_state_tracker state_tracker;
Craig Tillereb3b12e2015-06-26 14:42:49 -070074} pick_first_lb_policy;
75
Craig Tillerd7b68e72015-06-28 11:41:09 -070076void pf_destroy(grpc_lb_policy *pol) {
Craig Tiller4ab82d22015-06-29 09:40:33 -070077 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd7b68e72015-06-28 11:41:09 -070078 size_t i;
79 for (i = 0; i < p->num_subchannels; i++) {
Craig Tillerc3967532015-06-29 14:59:38 -070080 GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first");
Craig Tillereb3b12e2015-06-26 14:42:49 -070081 }
Craig Tillerd7b68e72015-06-28 11:41:09 -070082 gpr_free(p->subchannels);
83 gpr_mu_destroy(&p->mu);
84 gpr_free(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -070085}
86
87void pf_shutdown(grpc_lb_policy *pol) {
Craig Tillerb3671532015-07-01 10:37:40 -070088 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd2cc4592015-07-01 07:50:47 -070089 pending_pick *pp;
90 gpr_mu_lock(&p->mu);
91 while ((pp = p->pending_picks)) {
92 p->pending_picks = pp->next;
93 *pp->target = NULL;
94 grpc_iomgr_add_delayed_callback(pp->on_complete, 0);
95 gpr_free(pp);
96 }
97 gpr_mu_unlock(&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -070098}
99
Craig Tiller48cb07c2015-07-15 16:16:15 -0700100static void start_picking(pick_first_lb_policy *p) {
101 p->started_picking = 1;
102 p->checking_subchannel = 0;
103 p->checking_connectivity = GRPC_CHANNEL_IDLE;
104 GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
105 grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
106 &p->checking_connectivity,
107 &p->connectivity_changed);
108}
109
110void pf_exit_idle(grpc_lb_policy *pol) {
111 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
112 gpr_mu_lock(&p->mu);
113 if (!p->started_picking) {
114 start_picking(p);
115 }
116 gpr_mu_unlock(&p->mu);
117}
118
Craig Tillereb3b12e2015-06-26 14:42:49 -0700119void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
120 grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
121 grpc_iomgr_closure *on_complete) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700122 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700123 pending_pick *pp;
124 gpr_mu_lock(&p->mu);
125 if (p->selected) {
126 gpr_mu_unlock(&p->mu);
127 *target = p->selected;
128 on_complete->cb(on_complete->cb_arg, 1);
129 } else {
130 if (!p->started_picking) {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700131 start_picking(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700132 }
Craig Tiller4ab82d22015-06-29 09:40:33 -0700133 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
134 pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700135 pp = gpr_malloc(sizeof(*pp));
136 pp->next = p->pending_picks;
137 pp->pollset = pollset;
138 pp->target = target;
139 pp->on_complete = on_complete;
140 p->pending_picks = pp;
141 gpr_mu_unlock(&p->mu);
142 }
143}
144
145static void del_interested_parties_locked(pick_first_lb_policy *p) {
146 pending_pick *pp;
147 for (pp = p->pending_picks; pp; pp = pp->next) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700148 grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
149 pp->pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700150 }
151}
152
153static void add_interested_parties_locked(pick_first_lb_policy *p) {
154 pending_pick *pp;
155 for (pp = p->pending_picks; pp; pp = pp->next) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700156 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
157 pp->pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700158 }
159}
160
161static void pf_connectivity_changed(void *arg, int iomgr_success) {
162 pick_first_lb_policy *p = arg;
163 pending_pick *pp;
164 int unref = 0;
165
166 gpr_mu_lock(&p->mu);
167loop:
168 switch (p->checking_connectivity) {
169 case GRPC_CHANNEL_READY:
Craig Tillerff54c922015-06-26 16:57:20 -0700170 p->selected = p->subchannels[p->checking_subchannel];
Craig Tillereb3b12e2015-06-26 14:42:49 -0700171 while ((pp = p->pending_picks)) {
172 p->pending_picks = pp->next;
173 *pp->target = p->selected;
174 grpc_subchannel_del_interested_party(p->selected, pp->pollset);
175 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
176 gpr_free(pp);
177 }
178 unref = 1;
179 break;
180 case GRPC_CHANNEL_TRANSIENT_FAILURE:
181 del_interested_parties_locked(p);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700182 p->checking_subchannel =
183 (p->checking_subchannel + 1) % p->num_subchannels;
184 p->checking_connectivity = grpc_subchannel_check_connectivity(
185 p->subchannels[p->checking_subchannel]);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700186 add_interested_parties_locked(p);
187 goto loop;
188 case GRPC_CHANNEL_CONNECTING:
189 case GRPC_CHANNEL_IDLE:
Craig Tiller4ab82d22015-06-29 09:40:33 -0700190 grpc_subchannel_notify_on_state_change(
191 p->subchannels[p->checking_subchannel], &p->checking_connectivity,
192 &p->connectivity_changed);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700193 break;
194 case GRPC_CHANNEL_FATAL_FAILURE:
195 del_interested_parties_locked(p);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700196 GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
197 p->subchannels[p->num_subchannels - 1]);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700198 p->num_subchannels--;
Craig Tillera82fef12015-07-06 08:02:41 -0700199 GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700200 if (p->num_subchannels == 0) {
Craig Tiller87cc0842015-06-30 08:15:55 -0700201 while ((pp = p->pending_picks)) {
202 p->pending_picks = pp->next;
203 *pp->target = NULL;
204 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
205 gpr_free(pp);
206 }
Craig Tiller740aac12015-07-01 10:03:50 -0700207 unref = 1;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700208 } else {
Craig Tiller87cc0842015-06-30 08:15:55 -0700209 p->checking_subchannel %= p->num_subchannels;
210 p->checking_connectivity = grpc_subchannel_check_connectivity(
211 p->subchannels[p->checking_subchannel]);
Craig Tiller87cc0842015-06-30 08:15:55 -0700212 add_interested_parties_locked(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700213 goto loop;
214 }
215 }
216 gpr_mu_unlock(&p->mu);
217
218 if (unref) {
Craig Tillerd7b68e72015-06-28 11:41:09 -0700219 GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700220 }
221}
222
Craig Tillerc7b5f762015-06-27 11:48:42 -0700223static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700224 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700225 size_t i;
226 size_t n;
227 grpc_subchannel **subchannels;
228
229 gpr_mu_lock(&p->mu);
230 n = p->num_subchannels;
231 subchannels = gpr_malloc(n * sizeof(*subchannels));
232 for (i = 0; i < n; i++) {
233 subchannels[i] = p->subchannels[i];
Craig Tillerc3967532015-06-29 14:59:38 -0700234 GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700235 }
236 gpr_mu_unlock(&p->mu);
237
238 for (i = 0; i < n; i++) {
239 grpc_subchannel_process_transport_op(subchannels[i], op);
Craig Tillerc3967532015-06-29 14:59:38 -0700240 GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700241 }
242 gpr_free(subchannels);
243}
244
245static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700246 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700247 grpc_connectivity_state st;
248 gpr_mu_lock(&p->mu);
249 st = grpc_connectivity_state_check(&p->state_tracker);
250 gpr_mu_unlock(&p->mu);
251 return st;
252}
253
Craig Tiller4ab82d22015-06-29 09:40:33 -0700254static void pf_notify_on_state_change(grpc_lb_policy *pol,
255 grpc_connectivity_state *current,
256 grpc_iomgr_closure *notify) {
257 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700258 gpr_mu_lock(&p->mu);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700259 grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
260 notify);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700261 gpr_mu_unlock(&p->mu);
262}
263
Craig Tillereb3b12e2015-06-26 14:42:49 -0700264static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
Craig Tiller48cb07c2015-07-15 16:16:15 -0700265 pf_destroy,
266 pf_shutdown,
267 pf_pick,
268 pf_exit_idle,
269 pf_broadcast,
270 pf_check_connectivity,
271 pf_notify_on_state_change};
Craig Tillereb3b12e2015-06-26 14:42:49 -0700272
273grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
274 size_t num_subchannels) {
275 pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
276 GPR_ASSERT(num_subchannels);
277 memset(p, 0, sizeof(*p));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700278 grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700279 p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
280 p->num_subchannels = num_subchannels;
281 memcpy(p->subchannels, subchannels,
282 sizeof(grpc_subchannel *) * num_subchannels);
283 grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
284 gpr_mu_init(&p->mu);
285 return &p->base;
286}