blob: b8991dfaa7fc60f4a4b535851ffba241d9f0b231 [file] [log] [blame]
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas5c4543d2015-09-03 15:49:56 -070034#include "src/core/client_config/lb_policy_factory.h"
Craig Tiller3bc8ebd2015-06-24 15:41:15 -070035#include "src/core/client_config/lb_policies/pick_first.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070036
37#include <string.h>
38
39#include <grpc/support/alloc.h>
Craig Tiller08a1cf82015-06-29 09:37:52 -070040#include "src/core/transport/connectivity_state.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070041
Craig Tiller45724b32015-09-22 10:42:19 -070042typedef struct pending_pick
43{
Craig Tillereb3b12e2015-06-26 14:42:49 -070044 struct pending_pick *next;
45 grpc_pollset *pollset;
46 grpc_subchannel **target;
Craig Tiller33825112015-09-18 07:44:19 -070047 grpc_closure *on_complete;
Craig Tillereb3b12e2015-06-26 14:42:49 -070048} pending_pick;
49
Craig Tiller45724b32015-09-22 10:42:19 -070050typedef struct
51{
Craig Tillereb3b12e2015-06-26 14:42:49 -070052 /** base policy: must be first */
53 grpc_lb_policy base;
Craig Tillereb3b12e2015-06-26 14:42:49 -070054 /** all our subchannels */
55 grpc_subchannel **subchannels;
56 size_t num_subchannels;
57
Craig Tiller33825112015-09-18 07:44:19 -070058 grpc_closure connectivity_changed;
Craig Tillereb3b12e2015-06-26 14:42:49 -070059
60 /** mutex protecting remaining members */
61 gpr_mu mu;
62 /** the selected channel
63 TODO(ctiller): this should be atomically set so we don't
64 need to take a mutex in the common case */
65 grpc_subchannel *selected;
66 /** have we started picking? */
67 int started_picking;
Craig Tillera14215a2015-07-17 17:21:08 -070068 /** are we shut down? */
69 int shutdown;
Craig Tillereb3b12e2015-06-26 14:42:49 -070070 /** which subchannel are we watching? */
71 size_t checking_subchannel;
72 /** what is the connectivity of that channel? */
73 grpc_connectivity_state checking_connectivity;
74 /** list of picks that are waiting on connectivity */
75 pending_pick *pending_picks;
Craig Tillerc7b5f762015-06-27 11:48:42 -070076
77 /** our connectivity state tracker */
78 grpc_connectivity_state_tracker state_tracker;
Craig Tillereb3b12e2015-06-26 14:42:49 -070079} pick_first_lb_policy;
80
Craig Tiller45724b32015-09-22 10:42:19 -070081static void
82del_interested_parties_locked (pick_first_lb_policy * p, grpc_closure_list * closure_list)
83{
Craig Tiller1ada6ad2015-07-16 16:19:14 -070084 pending_pick *pp;
Craig Tiller45724b32015-09-22 10:42:19 -070085 for (pp = p->pending_picks; pp; pp = pp->next)
86 {
87 grpc_subchannel_del_interested_party (p->subchannels[p->checking_subchannel], pp->pollset, closure_list);
88 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -070089}
90
Craig Tiller45724b32015-09-22 10:42:19 -070091static void
92add_interested_parties_locked (pick_first_lb_policy * p, grpc_closure_list * closure_list)
93{
Craig Tiller1ada6ad2015-07-16 16:19:14 -070094 pending_pick *pp;
Craig Tiller45724b32015-09-22 10:42:19 -070095 for (pp = p->pending_picks; pp; pp = pp->next)
96 {
97 grpc_subchannel_add_interested_party (p->subchannels[p->checking_subchannel], pp->pollset, closure_list);
98 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -070099}
100
Craig Tiller45724b32015-09-22 10:42:19 -0700101void
102pf_destroy (grpc_lb_policy * pol, grpc_closure_list * closure_list)
103{
104 pick_first_lb_policy *p = (pick_first_lb_policy *) pol;
Craig Tillerd7b68e72015-06-28 11:41:09 -0700105 size_t i;
Craig Tiller45724b32015-09-22 10:42:19 -0700106 GPR_ASSERT (p->pending_picks == NULL);
107 for (i = 0; i < p->num_subchannels; i++)
108 {
109 GRPC_SUBCHANNEL_UNREF (p->subchannels[i], "pick_first", closure_list);
110 }
111 grpc_connectivity_state_destroy (&p->state_tracker, closure_list);
112 gpr_free (p->subchannels);
113 gpr_mu_destroy (&p->mu);
114 gpr_free (p);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700115}
116
Craig Tiller45724b32015-09-22 10:42:19 -0700117void
118pf_shutdown (grpc_lb_policy * pol, grpc_closure_list * closure_list)
119{
120 pick_first_lb_policy *p = (pick_first_lb_policy *) pol;
Craig Tillerd2cc4592015-07-01 07:50:47 -0700121 pending_pick *pp;
Craig Tiller45724b32015-09-22 10:42:19 -0700122 gpr_mu_lock (&p->mu);
123 del_interested_parties_locked (p, closure_list);
Craig Tillera14215a2015-07-17 17:21:08 -0700124 p->shutdown = 1;
Craig Tiller5795da72015-09-17 15:27:13 -0700125 pp = p->pending_picks;
126 p->pending_picks = NULL;
Craig Tiller45724b32015-09-22 10:42:19 -0700127 grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "shutdown", closure_list);
128 gpr_mu_unlock (&p->mu);
129 while (pp != NULL)
130 {
131 pending_pick *next = pp->next;
132 *pp->target = NULL;
133 grpc_closure_list_add (closure_list, pp->on_complete, 1);
134 gpr_free (pp);
135 pp = next;
136 }
Craig Tillereb3b12e2015-06-26 14:42:49 -0700137}
138
Craig Tiller45724b32015-09-22 10:42:19 -0700139static void
140start_picking (pick_first_lb_policy * p, grpc_closure_list * closure_list)
141{
Craig Tiller48cb07c2015-07-15 16:16:15 -0700142 p->started_picking = 1;
143 p->checking_subchannel = 0;
144 p->checking_connectivity = GRPC_CHANNEL_IDLE;
Craig Tiller45724b32015-09-22 10:42:19 -0700145 GRPC_LB_POLICY_REF (&p->base, "pick_first_connectivity");
146 grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list);
Craig Tiller48cb07c2015-07-15 16:16:15 -0700147}
148
Craig Tiller45724b32015-09-22 10:42:19 -0700149void
150pf_exit_idle (grpc_lb_policy * pol, grpc_closure_list * closure_list)
151{
152 pick_first_lb_policy *p = (pick_first_lb_policy *) pol;
153 gpr_mu_lock (&p->mu);
154 if (!p->started_picking)
155 {
156 start_picking (p, closure_list);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700157 }
Craig Tiller45724b32015-09-22 10:42:19 -0700158 gpr_mu_unlock (&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700159}
160
Craig Tiller45724b32015-09-22 10:42:19 -0700161void
162pf_pick (grpc_lb_policy * pol, grpc_pollset * pollset, grpc_metadata_batch * initial_metadata, grpc_subchannel ** target, grpc_closure * on_complete, grpc_closure_list * closure_list)
163{
164 pick_first_lb_policy *p = (pick_first_lb_policy *) pol;
165 pending_pick *pp;
166 gpr_mu_lock (&p->mu);
167 if (p->selected)
168 {
169 gpr_mu_unlock (&p->mu);
170 *target = p->selected;
171 grpc_closure_list_add (closure_list, on_complete, 1);
172 }
173 else
174 {
175 if (!p->started_picking)
176 {
177 start_picking (p, closure_list);
178 }
179 grpc_subchannel_add_interested_party (p->subchannels[p->checking_subchannel], pollset, closure_list);
180 pp = gpr_malloc (sizeof (*pp));
181 pp->next = p->pending_picks;
182 pp->pollset = pollset;
183 pp->target = target;
184 pp->on_complete = on_complete;
185 p->pending_picks = pp;
186 gpr_mu_unlock (&p->mu);
187 }
188}
189
190static void
191pf_connectivity_changed (void *arg, int iomgr_success, grpc_closure_list * closure_list)
192{
Craig Tillereb3b12e2015-06-26 14:42:49 -0700193 pick_first_lb_policy *p = arg;
194 pending_pick *pp;
Craig Tillereb3b12e2015-06-26 14:42:49 -0700195
Craig Tiller45724b32015-09-22 10:42:19 -0700196 gpr_mu_lock (&p->mu);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700197
Craig Tiller45724b32015-09-22 10:42:19 -0700198 if (p->shutdown)
199 {
200 gpr_mu_unlock (&p->mu);
201 GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list);
202 return;
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700203 }
Craig Tiller45724b32015-09-22 10:42:19 -0700204 else if (p->selected != NULL)
205 {
206 grpc_connectivity_state_set (&p->state_tracker, p->checking_connectivity, "selected_changed", closure_list);
207 if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE)
208 {
209 grpc_subchannel_notify_on_state_change (p->selected, &p->checking_connectivity, &p->connectivity_changed, closure_list);
210 }
211 else
212 {
213 GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list);
214 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700215 }
Craig Tiller45724b32015-09-22 10:42:19 -0700216 else
217 {
218 loop:
219 switch (p->checking_connectivity)
220 {
221 case GRPC_CHANNEL_READY:
222 grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready", closure_list);
223 p->selected = p->subchannels[p->checking_subchannel];
224 while ((pp = p->pending_picks))
225 {
226 p->pending_picks = pp->next;
227 *pp->target = p->selected;
228 grpc_subchannel_del_interested_party (p->selected, pp->pollset, closure_list);
229 grpc_closure_list_add (closure_list, pp->on_complete, 1);
230 gpr_free (pp);
231 }
232 grpc_subchannel_notify_on_state_change (p->selected, &p->checking_connectivity, &p->connectivity_changed, closure_list);
233 break;
234 case GRPC_CHANNEL_TRANSIENT_FAILURE:
235 grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure", closure_list);
236 del_interested_parties_locked (p, closure_list);
237 p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels;
238 p->checking_connectivity = grpc_subchannel_check_connectivity (p->subchannels[p->checking_subchannel]);
239 add_interested_parties_locked (p, closure_list);
240 if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE)
241 {
242 grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list);
243 }
244 else
245 {
246 goto loop;
247 }
248 break;
249 case GRPC_CHANNEL_CONNECTING:
250 case GRPC_CHANNEL_IDLE:
251 grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_CONNECTING, "connecting_changed", closure_list);
252 grpc_subchannel_notify_on_state_change (p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed, closure_list);
253 break;
254 case GRPC_CHANNEL_FATAL_FAILURE:
255 del_interested_parties_locked (p, closure_list);
256 GPR_SWAP (grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]);
257 p->num_subchannels--;
258 GRPC_SUBCHANNEL_UNREF (p->subchannels[p->num_subchannels], "pick_first", closure_list);
259 if (p->num_subchannels == 0)
260 {
261 grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "no_more_channels", closure_list);
262 while ((pp = p->pending_picks))
263 {
264 p->pending_picks = pp->next;
265 *pp->target = NULL;
266 grpc_closure_list_add (closure_list, pp->on_complete, 1);
267 gpr_free (pp);
268 }
269 GRPC_LB_POLICY_UNREF (&p->base, "pick_first_connectivity", closure_list);
270 }
271 else
272 {
273 grpc_connectivity_state_set (&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "subchannel_failed", closure_list);
274 p->checking_subchannel %= p->num_subchannels;
275 p->checking_connectivity = grpc_subchannel_check_connectivity (p->subchannels[p->checking_subchannel]);
276 add_interested_parties_locked (p, closure_list);
277 goto loop;
278 }
279 }
280 }
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700281
Craig Tiller45724b32015-09-22 10:42:19 -0700282 gpr_mu_unlock (&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700283}
284
Craig Tiller45724b32015-09-22 10:42:19 -0700285static void
286pf_broadcast (grpc_lb_policy * pol, grpc_transport_op * op, grpc_closure_list * closure_list)
287{
288 pick_first_lb_policy *p = (pick_first_lb_policy *) pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700289 size_t i;
290 size_t n;
291 grpc_subchannel **subchannels;
292
Craig Tiller45724b32015-09-22 10:42:19 -0700293 gpr_mu_lock (&p->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700294 n = p->num_subchannels;
Craig Tiller45724b32015-09-22 10:42:19 -0700295 subchannels = gpr_malloc (n * sizeof (*subchannels));
296 for (i = 0; i < n; i++)
297 {
298 subchannels[i] = p->subchannels[i];
299 GRPC_SUBCHANNEL_REF (subchannels[i], "pf_broadcast");
300 }
301 gpr_mu_unlock (&p->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700302
Craig Tiller45724b32015-09-22 10:42:19 -0700303 for (i = 0; i < n; i++)
304 {
305 grpc_subchannel_process_transport_op (subchannels[i], op, closure_list);
306 GRPC_SUBCHANNEL_UNREF (subchannels[i], "pf_broadcast", closure_list);
307 }
308 gpr_free (subchannels);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700309}
310
Craig Tiller45724b32015-09-22 10:42:19 -0700311static grpc_connectivity_state
312pf_check_connectivity (grpc_lb_policy * pol, grpc_closure_list * closure_list)
313{
314 pick_first_lb_policy *p = (pick_first_lb_policy *) pol;
Craig Tillerc7b5f762015-06-27 11:48:42 -0700315 grpc_connectivity_state st;
Craig Tiller45724b32015-09-22 10:42:19 -0700316 gpr_mu_lock (&p->mu);
317 st = grpc_connectivity_state_check (&p->state_tracker);
318 gpr_mu_unlock (&p->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700319 return st;
320}
321
Craig Tiller45724b32015-09-22 10:42:19 -0700322void
323pf_notify_on_state_change (grpc_lb_policy * pol, grpc_connectivity_state * current, grpc_closure * notify, grpc_closure_list * closure_list)
324{
325 pick_first_lb_policy *p = (pick_first_lb_policy *) pol;
326 gpr_mu_lock (&p->mu);
327 grpc_connectivity_state_notify_on_state_change (&p->state_tracker, current, notify, closure_list);
328 gpr_mu_unlock (&p->mu);
Craig Tillerc7b5f762015-06-27 11:48:42 -0700329}
330
Craig Tillereb3b12e2015-06-26 14:42:49 -0700331static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
Craig Tiller45724b32015-09-22 10:42:19 -0700332 pf_destroy,
333 pf_shutdown,
334 pf_pick,
335 pf_exit_idle,
336 pf_broadcast,
337 pf_check_connectivity,
338 pf_notify_on_state_change
339};
Craig Tillereb3b12e2015-06-26 14:42:49 -0700340
Craig Tiller45724b32015-09-22 10:42:19 -0700341static void
342pick_first_factory_ref (grpc_lb_policy_factory * factory)
343{
344}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700345
Craig Tiller45724b32015-09-22 10:42:19 -0700346static void
347pick_first_factory_unref (grpc_lb_policy_factory * factory)
348{
349}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700350
Craig Tiller45724b32015-09-22 10:42:19 -0700351static grpc_lb_policy *
352create_pick_first (grpc_lb_policy_factory * factory, grpc_lb_policy_args * args)
353{
354 pick_first_lb_policy *p = gpr_malloc (sizeof (*p));
355 GPR_ASSERT (args->num_subchannels > 0);
356 memset (p, 0, sizeof (*p));
357 grpc_lb_policy_init (&p->base, &pick_first_lb_policy_vtable);
358 p->subchannels = gpr_malloc (sizeof (grpc_subchannel *) * args->num_subchannels);
David Garcia Quintasc7705c72015-09-09 17:21:11 -0700359 p->num_subchannels = args->num_subchannels;
Craig Tiller45724b32015-09-22 10:42:19 -0700360 grpc_connectivity_state_init (&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first");
361 memcpy (p->subchannels, args->subchannels, sizeof (grpc_subchannel *) * args->num_subchannels);
362 grpc_closure_init (&p->connectivity_changed, pf_connectivity_changed, p);
363 gpr_mu_init (&p->mu);
Craig Tillereb3b12e2015-06-26 14:42:49 -0700364 return &p->base;
365}
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700366
367static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
Craig Tiller45724b32015-09-22 10:42:19 -0700368 pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
369 "pick_first"
370};
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700371
372static grpc_lb_policy_factory pick_first_lb_policy_factory = {
Craig Tiller45724b32015-09-22 10:42:19 -0700373 &pick_first_factory_vtable
374};
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700375
Craig Tiller45724b32015-09-22 10:42:19 -0700376grpc_lb_policy_factory *
377grpc_pick_first_lb_factory_create ()
378{
David Garcia Quintas5c4543d2015-09-03 15:49:56 -0700379 return &pick_first_lb_policy_factory;
380}