blob: 83a25a9a72c42f2c13c0714a9045f88a84bcf0c0 [file] [log] [blame]
Craig Tiller3bc8ebd2015-06-24 15:41:15 -07001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/client_config/lb_policies/pick_first.h"
Craig Tillereb3b12e2015-06-26 14:42:49 -070035
36#include <string.h>
37
38#include <grpc/support/alloc.h>
39
40typedef struct pending_pick {
41 struct pending_pick *next;
42 grpc_pollset *pollset;
43 grpc_subchannel **target;
44 grpc_iomgr_closure *on_complete;
45} pending_pick;
46
47typedef struct {
48 /** base policy: must be first */
49 grpc_lb_policy base;
50 /** ref count */
51 gpr_refcount refs;
52 /** all our subchannels */
53 grpc_subchannel **subchannels;
54 size_t num_subchannels;
55
56 grpc_iomgr_closure connectivity_changed;
57
58 /** mutex protecting remaining members */
59 gpr_mu mu;
60 /** the selected channel
61 TODO(ctiller): this should be atomically set so we don't
62 need to take a mutex in the common case */
63 grpc_subchannel *selected;
64 /** have we started picking? */
65 int started_picking;
66 /** which subchannel are we watching? */
67 size_t checking_subchannel;
68 /** what is the connectivity of that channel? */
69 grpc_connectivity_state checking_connectivity;
70 /** list of picks that are waiting on connectivity */
71 pending_pick *pending_picks;
72} pick_first_lb_policy;
73
74void pf_ref(grpc_lb_policy *pol) {
75 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
76 gpr_ref(&p->refs);
77}
78
79void pf_unref(grpc_lb_policy *pol) {
80 pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
81 if (gpr_unref(&p->refs)) {
82 gpr_free(p->subchannels);
83 gpr_mu_destroy(&p->mu);
84 gpr_free(p);
85 }
86}
87
88void pf_shutdown(grpc_lb_policy *pol) {
89 /* pick_first_lb_policy *p = (pick_first_lb_policy*)pol; */
90 abort();
91}
92
93void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
94 grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
95 grpc_iomgr_closure *on_complete) {
96 pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
97 pending_pick *pp;
98 gpr_mu_lock(&p->mu);
99 if (p->selected) {
100 gpr_mu_unlock(&p->mu);
101 *target = p->selected;
102 on_complete->cb(on_complete->cb_arg, 1);
103 } else {
104 if (!p->started_picking) {
105 p->started_picking = 1;
106 p->checking_subchannel = 0;
107 p->checking_connectivity = GRPC_CHANNEL_IDLE;
108 pf_ref(pol);
109 grpc_subchannel_notify_on_state_change(p->subchannels[0], &p->checking_connectivity, &p->connectivity_changed);
110 }
111 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset);
112 pp = gpr_malloc(sizeof(*pp));
113 pp->next = p->pending_picks;
114 pp->pollset = pollset;
115 pp->target = target;
116 pp->on_complete = on_complete;
117 p->pending_picks = pp;
118 gpr_mu_unlock(&p->mu);
119 }
120}
121
122static void del_interested_parties_locked(pick_first_lb_policy *p) {
123 pending_pick *pp;
124 for (pp = p->pending_picks; pp; pp = pp->next) {
125 grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], pp->pollset);
126 }
127}
128
129static void add_interested_parties_locked(pick_first_lb_policy *p) {
130 pending_pick *pp;
131 for (pp = p->pending_picks; pp; pp = pp->next) {
132 grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pp->pollset);
133 }
134}
135
136static void pf_connectivity_changed(void *arg, int iomgr_success) {
137 pick_first_lb_policy *p = arg;
138 pending_pick *pp;
139 int unref = 0;
140
141 gpr_mu_lock(&p->mu);
142loop:
143 switch (p->checking_connectivity) {
144 case GRPC_CHANNEL_READY:
145 p->selected = p->subchannels[p->checking_connectivity];
146 while ((pp = p->pending_picks)) {
147 p->pending_picks = pp->next;
148 *pp->target = p->selected;
149 grpc_subchannel_del_interested_party(p->selected, pp->pollset);
150 grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
151 gpr_free(pp);
152 }
153 unref = 1;
154 break;
155 case GRPC_CHANNEL_TRANSIENT_FAILURE:
156 del_interested_parties_locked(p);
157 p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels;
158 p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]);
159 add_interested_parties_locked(p);
160 goto loop;
161 case GRPC_CHANNEL_CONNECTING:
162 case GRPC_CHANNEL_IDLE:
163 grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed);
164 break;
165 case GRPC_CHANNEL_FATAL_FAILURE:
166 del_interested_parties_locked(p);
167 GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]);
168 p->checking_subchannel %= p->num_subchannels;
169 p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]);
170 p->num_subchannels--;
171 grpc_subchannel_unref(p->subchannels[p->num_subchannels]);
172 add_interested_parties_locked(p);
173 if (p->num_subchannels == 0) {
174 abort();
175 } else {
176 goto loop;
177 }
178 }
179 gpr_mu_unlock(&p->mu);
180
181 if (unref) {
182 pf_unref(&p->base);
183 }
184}
185
186static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
187 pf_ref, pf_unref, pf_shutdown, pf_pick};
188
189grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
190 size_t num_subchannels) {
191 pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
192 GPR_ASSERT(num_subchannels);
193 memset(p, 0, sizeof(*p));
194 p->base.vtable = &pick_first_lb_policy_vtable;
195 gpr_ref_init(&p->refs, 1);
196 p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
197 p->num_subchannels = num_subchannels;
198 memcpy(p->subchannels, subchannels,
199 sizeof(grpc_subchannel *) * num_subchannels);
200 grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
201 gpr_mu_init(&p->mu);
202 return &p->base;
203}