blob: 3d57e3136a41ed2467dfa10447300c5cfc9a3d2c [file] [log] [blame]
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/lb_policies/pick_first.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070035
36#include <string.h>
37
38#include <grpc/support/alloc.h>
Craig Tiller08a1cf82015-06-29 09:37:52 -070039#include "src/core/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070040
41typedef struct pending_pick {
42 struct pending_pick *next;
43 grpc_pollset *pollset;
44 grpc_subchannel **target;
45 grpc_iomgr_closure *on_complete;
46} pending_pick;
47
48typedef struct {
49 /** base policy: must be first */
50 grpc_lb_policy base;
Craig Tillereb3b12e2015-06-26 14:42:49 -070051 /** all our subchannels */
52 grpc_subchannel **subchannels;
53 size_t num_subchannels;
54
55 grpc_iomgr_closure connectivity_changed;
56
57 /** mutex protecting remaining members */
58 gpr_mu mu;
59 /** the selected channel
60 TODO(ctiller): this should be atomically set so we don't
61 need to take a mutex in the common case */
62 grpc_subchannel *selected;
63 /** have we started picking? */
64 int started_picking;
65 /** which subchannel are we watching? */
66 size_t checking_subchannel;
67 /** what is the connectivity of that channel? */
68 grpc_connectivity_state checking_connectivity;
69 /** list of picks that are waiting on connectivity */
70 pending_pick *pending_picks;
Craig Tillerc7b5f762015-06-27 11:48:42 -070071
72 /** our connectivity state tracker */
73 grpc_connectivity_state_tracker state_tracker;
Craig Tillereb3b12e2015-06-26 14:42:49 -070074} pick_first_lb_policy;
75
Craig Tillerd7b68e72015-06-28 11:41:09 -070076void pf_destroy(grpc_lb_policy *pol) {
Craig Tiller4ab82d22015-06-29 09:40:33 -070077 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd7b68e72015-06-28 11:41:09 -070078 size_t i;
79 for (i = 0; i < p->num_subchannels; i++) {
Craig Tillerc3967532015-06-29 14:59:38 -070080 GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first");
Craig Tillereb3b12e2015-06-26 14:42:49 -070081 }
Craig Tillerd7b68e72015-06-28 11:41:09 -070082 gpr_free(p->subchannels);
83 gpr_mu_destroy(&p->mu);
84 gpr_free(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -070085}
86
87void pf_shutdown(grpc_lb_policy *pol) {
Craig Tillerb3671532015-07-01 10:37:40 -070088 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerd2cc4592015-07-01 07:50:47 -070089 pending_pick *pp;
90 gpr_mu_lock(&p->mu);
91 while ((pp = p->pending_picks)) {
92 p->pending_picks = pp->next;
93 *pp->target = NULL;
94 grpc_iomgr_add_delayed_callback(pp->on_complete, 0);
95 gpr_free(pp);
96 }
97 gpr_mu_unlock(&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -070098}
99
100void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
101 grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
102 grpc_iomgr_closure *on_complete) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700103 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700104 pending_pick *pp;
105 gpr_mu_lock(&p->mu);
106 if (p->selected) {
107 gpr_mu_unlock(&p->mu);
108 *target = p->selected;
109 on_complete->cb(on_complete->cb_arg, 1);
110 } else {
111 if (!p->started_picking) {
112 p->started_picking = 1;
113 p->checking_subchannel = 0;
114 p->checking_connectivity = GRPC_CHANNEL_IDLE;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700115 GRPC_LB_POLICY_REF(pol, "pick_first_connectivity");
Craig Tiller4ab82d22015-06-29 09:40:33 -0700116 grpc_subchannel_notify_on_state_change(
117 p->subchannels[p->checking_subchannel], &p->checking_connectivity,
118 &p->connectivity_changed);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700119 }
Craig Tiller4ab82d22015-06-29 09:40:33 -0700120 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
121 pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700122 pp = gpr_malloc(sizeof(*pp));
123 pp->next = p->pending_picks;
124 pp->pollset = pollset;
125 pp->target = target;
126 pp->on_complete = on_complete;
127 p->pending_picks = pp;
128 gpr_mu_unlock(&p->mu);
129 }
130}
131
132static void del_interested_parties_locked(pick_first_lb_policy *p) {
133 pending_pick *pp;
134 for (pp = p->pending_picks; pp; pp = pp->next) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700135 grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
136 pp->pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700137 }
138}
139
140static void add_interested_parties_locked(pick_first_lb_policy *p) {
141 pending_pick *pp;
142 for (pp = p->pending_picks; pp; pp = pp->next) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700143 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
144 pp->pollset);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700145 }
146}
147
148static void pf_connectivity_changed(void *arg, int iomgr_success) {
149 pick_first_lb_policy *p = arg;
150 pending_pick *pp;
151 int unref = 0;
152
153 gpr_mu_lock(&p->mu);
154loop:
155 switch (p->checking_connectivity) {
156 case GRPC_CHANNEL_READY:
Craig Tillerff54c922015-06-26 16:57:20 -0700157 p->selected = p->subchannels[p->checking_subchannel];
Craig Tiller4ab82d22015-06-29 09:40:33 -0700158 GPR_ASSERT(grpc_subchannel_check_connectivity(p->selected) ==
159 GRPC_CHANNEL_READY);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700160 while ((pp = p->pending_picks)) {
161 p->pending_picks = pp->next;
162 *pp->target = p->selected;
163 grpc_subchannel_del_interested_party(p->selected, pp->pollset);
164 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
165 gpr_free(pp);
166 }
167 unref = 1;
168 break;
169 case GRPC_CHANNEL_TRANSIENT_FAILURE:
170 del_interested_parties_locked(p);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700171 p->checking_subchannel =
172 (p->checking_subchannel + 1) % p->num_subchannels;
173 p->checking_connectivity = grpc_subchannel_check_connectivity(
174 p->subchannels[p->checking_subchannel]);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700175 add_interested_parties_locked(p);
176 goto loop;
177 case GRPC_CHANNEL_CONNECTING:
178 case GRPC_CHANNEL_IDLE:
Craig Tiller4ab82d22015-06-29 09:40:33 -0700179 grpc_subchannel_notify_on_state_change(
180 p->subchannels[p->checking_subchannel], &p->checking_connectivity,
181 &p->connectivity_changed);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700182 break;
183 case GRPC_CHANNEL_FATAL_FAILURE:
184 del_interested_parties_locked(p);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700185 GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
186 p->subchannels[p->num_subchannels - 1]);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700187 p->num_subchannels--;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700188 if (p->num_subchannels == 0) {
Craig Tiller87cc0842015-06-30 08:15:55 -0700189 while ((pp = p->pending_picks)) {
190 p->pending_picks = pp->next;
191 *pp->target = NULL;
192 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
193 gpr_free(pp);
194 }
Craig Tiller740aac12015-07-01 10:03:50 -0700195 unref = 1;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700196 } else {
Craig Tiller87cc0842015-06-30 08:15:55 -0700197 p->checking_subchannel %= p->num_subchannels;
198 p->checking_connectivity = grpc_subchannel_check_connectivity(
199 p->subchannels[p->checking_subchannel]);
200 GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
201 add_interested_parties_locked(p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700202 goto loop;
203 }
204 }
205 gpr_mu_unlock(&p->mu);
206
207 if (unref) {
Craig Tillerd7b68e72015-06-28 11:41:09 -0700208 GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
Craig Tillereb3b12e2015-06-26 14:42:49 -0700209 }
210}
211
Craig Tillerc7b5f762015-06-27 11:48:42 -0700212static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700213 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700214 size_t i;
215 size_t n;
216 grpc_subchannel **subchannels;
217
218 gpr_mu_lock(&p->mu);
219 n = p->num_subchannels;
220 subchannels = gpr_malloc(n * sizeof(*subchannels));
221 for (i = 0; i < n; i++) {
222 subchannels[i] = p->subchannels[i];
Craig Tillerc3967532015-06-29 14:59:38 -0700223 GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700224 }
225 gpr_mu_unlock(&p->mu);
226
227 for (i = 0; i < n; i++) {
228 grpc_subchannel_process_transport_op(subchannels[i], op);
Craig Tillerc3967532015-06-29 14:59:38 -0700229 GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast");
Craig Tillerc7b5f762015-06-27 11:48:42 -0700230 }
231 gpr_free(subchannels);
232}
233
234static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700235 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700236 grpc_connectivity_state st;
237 gpr_mu_lock(&p->mu);
238 st = grpc_connectivity_state_check(&p->state_tracker);
239 gpr_mu_unlock(&p->mu);
240 return st;
241}
242
Craig Tiller4ab82d22015-06-29 09:40:33 -0700243static void pf_notify_on_state_change(grpc_lb_policy *pol,
244 grpc_connectivity_state *current,
245 grpc_iomgr_closure *notify) {
246 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700247 gpr_mu_lock(&p->mu);
Craig Tiller4ab82d22015-06-29 09:40:33 -0700248 grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
249 notify);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700250 gpr_mu_unlock(&p->mu);
251}
252
Craig Tillereb3b12e2015-06-26 14:42:49 -0700253static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
Craig Tiller4ab82d22015-06-29 09:40:33 -0700254 pf_destroy, pf_shutdown, pf_pick,
255 pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
Craig Tillereb3b12e2015-06-26 14:42:49 -0700256
257grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
258 size_t num_subchannels) {
259 pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
260 GPR_ASSERT(num_subchannels);
261 memset(p, 0, sizeof(*p));
Craig Tillerd7b68e72015-06-28 11:41:09 -0700262 grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700263 p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
264 p->num_subchannels = num_subchannels;
265 memcpy(p->subchannels, subchannels,
266 sizeof(grpc_subchannel *) * num_subchannels);
267 grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
268 gpr_mu_init(&p->mu);
269 return &p->base;
270}