blob: 852e78f02029f7f83b8e15eb54b6e2f2e1d6d296 [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintas22e8f1d2016-06-15 23:53:00 -070034#include <string.h>
35
36#include <grpc/byte_buffer_reader.h>
37#include <grpc/grpc.h>
38#include <grpc/support/alloc.h>
39#include <grpc/support/host_port.h>
40#include <grpc/support/string_util.h>
41
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070042#include "src/core/ext/lb_policy/grpclb/grpclb.h"
43#include "src/core/ext/client_config/client_channel_factory.h"
44#include "src/core/ext/client_config/lb_policy_registry.h"
45#include "src/core/ext/client_config/parse_address.h"
46#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
47#include "src/core/lib/iomgr/sockaddr_utils.h"
48#include "src/core/lib/support/string.h"
49#include "src/core/lib/surface/call.h"
50#include "src/core/lib/surface/channel.h"
51
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070052int grpc_lb_glb_trace = 0;
53
54typedef struct wrapped_rr_closure_arg {
55 grpc_closure *wrapped_closure;
56 grpc_lb_policy *rr_policy;
57} wrapped_rr_closure_arg;
58
59/* The \a on_complete closure passed as part of the pick requires keeping a
60 * reference to its associated round robin instance. We wrap this closure in
61 * order to unref the round robin instance upon its invocation */
62static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
63 bool success) {
64 wrapped_rr_closure_arg *wc = arg;
65
66 if (wc->rr_policy != NULL) {
67 if (grpc_lb_glb_trace) {
68 gpr_log(GPR_INFO, "Unreffing RR %p", wc->rr_policy);
69 }
70 GRPC_LB_POLICY_UNREF(exec_ctx, wc->rr_policy, "wrapped_rr_closure");
71 }
72
73 if (wc->wrapped_closure != NULL) {
74 grpc_exec_ctx_enqueue(exec_ctx, wc->wrapped_closure, success, NULL);
75 }
76 gpr_free(wc);
77}
78
79typedef struct pending_pick {
80 struct pending_pick *next;
81 grpc_polling_entity *pollent;
82 grpc_metadata_batch *initial_metadata;
83 uint32_t initial_metadata_flags;
84 grpc_connected_subchannel **target;
85 grpc_closure *wrapped_on_complete;
86 wrapped_rr_closure_arg *wrapped_on_complete_arg;
87} pending_pick;
88
89typedef struct pending_ping {
90 struct pending_ping *next;
91 grpc_closure *wrapped_notify;
92 wrapped_rr_closure_arg *wrapped_notify_arg;
93} pending_ping;
94
95typedef struct glb_lb_policy glb_lb_policy;
96
97#define MAX_LBCD_OPS_LEN 6
98typedef struct lb_client_data {
99 gpr_mu mu;
100 grpc_closure md_sent;
101 grpc_closure md_rcvd;
102 grpc_closure req_sent;
103 grpc_closure res_rcvd;
104 grpc_closure close_sent;
105 grpc_closure srv_status_rcvd;
106
107 grpc_call *c;
108 gpr_timespec deadline;
109
110 grpc_metadata_array initial_metadata_recv;
111 grpc_metadata_array trailing_metadata_recv;
112
113 grpc_byte_buffer *request_payload;
114 grpc_byte_buffer *response_payload;
115
116 grpc_status_code status;
117 char *status_details;
118 size_t status_details_capacity;
119
120 glb_lb_policy *p;
121} lb_client_data;
122
123/* Keeps track and reacts to changes in connectivity of the RR instance */
124typedef struct rr_connectivity_data {
125 grpc_closure on_change;
126 grpc_connectivity_state state;
127 glb_lb_policy *p;
128} rr_connectivity_data;
129
130struct glb_lb_policy {
131 /** base policy: must be first */
132 grpc_lb_policy base;
133
134 /** mutex protecting remaining members */
135 gpr_mu mu;
136
137 grpc_client_channel_factory *cc_factory;
138
139 /** for communicating with the LB server */
140 grpc_channel *lb_server_channel;
141
142 /** the RR policy to use of the backend servers returned by the LB server */
143 grpc_lb_policy *rr_policy;
144
145 bool started_picking;
146
147 /** our connectivity state tracker */
148 grpc_connectivity_state_tracker state_tracker;
149
150 grpc_grpclb_serverlist *serverlist;
151
152 /** list of picks that are waiting on connectivity */
153 pending_pick *pending_picks;
154
155 /** list of pings that are waiting on connectivity */
156 pending_ping *pending_pings;
157
158 /** data associated with the communication with the LB server */
159 lb_client_data *lbcd;
160
161 /** for tracking of the RR connectivity */
162 rr_connectivity_data *rr_connectivity;
163};
164
165static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p);
166static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
167 bool iomgr_success) {
168 rr_connectivity_data *rrcd = arg;
169 if (!iomgr_success) {
170 gpr_free(rrcd);
171 return;
172 }
173 glb_lb_policy *p = rrcd->p;
174 const grpc_connectivity_state new_state = p->rr_connectivity->state;
175 if (new_state == GRPC_CHANNEL_SHUTDOWN && p->serverlist != NULL) {
176 /* a RR policy is shutting down but there's a serverlist available ->
177 * perform a handover */
178 rr_handover(exec_ctx, p);
179 } else {
180 grpc_connectivity_state_set(exec_ctx, &p->state_tracker, new_state,
181 "rr_connectivity_changed");
182 /* resubscribe */
183 grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy,
184 &p->rr_connectivity->state,
185 &p->rr_connectivity->on_change);
186 }
187}
188
189static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent,
190 grpc_metadata_batch *initial_metadata,
191 uint32_t initial_metadata_flags,
192 grpc_connected_subchannel **target,
193 grpc_closure *on_complete) {
194 pending_pick *pp = gpr_malloc(sizeof(*pp));
195 memset(pp, 0, sizeof(pending_pick));
196 pp->wrapped_on_complete_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
197 memset(pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg));
198 pp->next = *root;
199 pp->pollent = pollent;
200 pp->target = target;
201 pp->initial_metadata = initial_metadata;
202 pp->initial_metadata_flags = initial_metadata_flags;
203 pp->wrapped_on_complete =
204 grpc_closure_create(wrapped_rr_closure, pp->wrapped_on_complete_arg);
205 pp->wrapped_on_complete_arg->wrapped_closure = on_complete;
206 *root = pp;
207}
208
209static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
210 pending_ping *pping = gpr_malloc(sizeof(*pping));
211 memset(pping, 0, sizeof(pending_ping));
212 pping->wrapped_notify_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
213 memset(pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg));
214 pping->next = *root;
215 pping->wrapped_notify =
216 grpc_closure_create(wrapped_rr_closure, pping->wrapped_notify_arg);
217 pping->wrapped_notify_arg->wrapped_closure = notify;
218 *root = pping;
219}
220
221static void lb_client_data_destroy(lb_client_data *lbcd);
222
223static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
224 lb_client_data *lbcd = arg;
225 GPR_ASSERT(lbcd->c);
226 grpc_call_error error;
227 grpc_op ops[1];
228 memset(ops, 0, sizeof(ops));
229 grpc_op *op = ops;
230 op->op = GRPC_OP_RECV_INITIAL_METADATA;
231 op->data.recv_initial_metadata = &lbcd->initial_metadata_recv;
232 op->flags = 0;
233 op->reserved = NULL;
234 op++;
235 error = grpc_call_start_batch_and_execute(exec_ctx, lbcd->c, ops,
236 (size_t)(op - ops), &lbcd->md_rcvd);
237 GPR_ASSERT(GRPC_CALL_OK == error);
238}
239
240static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
241 lb_client_data *lbcd = arg;
242 GPR_ASSERT(lbcd->c);
243 grpc_call_error error;
244 grpc_op ops[1];
245 memset(ops, 0, sizeof(ops));
246 grpc_op *op = ops;
247
248 op->op = GRPC_OP_SEND_MESSAGE;
249 op->data.send_message = lbcd->request_payload;
250 op->flags = 0;
251 op->reserved = NULL;
252 op++;
253 error = grpc_call_start_batch_and_execute(
254 exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->req_sent);
255 GPR_ASSERT(GRPC_CALL_OK == error);
256}
257
258static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
259 lb_client_data *lbcd = arg;
260 grpc_call_error error;
261
262 grpc_op ops[1];
263 memset(ops, 0, sizeof(ops));
264 grpc_op *op = ops;
265
266 op->op = GRPC_OP_RECV_MESSAGE;
267 op->data.recv_message = &lbcd->response_payload;
268 op->flags = 0;
269 op->reserved = NULL;
270 op++;
271 error = grpc_call_start_batch_and_execute(
272 exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->res_rcvd);
273 GPR_ASSERT(GRPC_CALL_OK == error);
274}
275
276static void res_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
277 /* look inside lbcd->response_payload, ideally to send it back as the
278 * serverlist. */
279 lb_client_data *lbcd = arg;
280 grpc_op ops[2];
281 memset(ops, 0, sizeof(ops));
282 grpc_op *op = ops;
283 if (lbcd->response_payload) {
284 grpc_byte_buffer_reader bbr;
285 grpc_byte_buffer_reader_init(&bbr, lbcd->response_payload);
286 gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
287 grpc_byte_buffer_destroy(lbcd->response_payload);
288 grpc_grpclb_serverlist *serverlist =
289 grpc_grpclb_response_parse_serverlist(response_slice);
290 if (serverlist) {
291 gpr_slice_unref(response_slice);
292 if (grpc_lb_glb_trace) {
293 gpr_log(GPR_INFO, "Serverlist with %zu servers received",
294 serverlist->num_servers);
295 }
296 /* update serverlist */
297 if (serverlist->num_servers > 0) {
298 if (grpc_grpclb_serverlist_equals(lbcd->p->serverlist, serverlist)) {
299 gpr_log(GPR_INFO,
300 "Incoming server list identical to current, ignoring.");
301 } else {
302 if (lbcd->p->serverlist != NULL) {
303 grpc_grpclb_destroy_serverlist(lbcd->p->serverlist);
304 }
305 lbcd->p->serverlist = serverlist;
306 }
307 }
308 if (lbcd->p->rr_policy == NULL) {
309 /* initial "handover", in this case from a null RR policy, meaning it'll
310 * just create the first one */
311 rr_handover(exec_ctx, lbcd->p);
312 } else {
313 /* unref the RR policy, eventually leading to its substitution with a
314 * new one constructed from the received serverlist (see
315 * rr_connectivity_changed) */
316 GRPC_LB_POLICY_UNREF(exec_ctx, lbcd->p->rr_policy,
317 "serverlist_received");
318 }
319
320 /* listen for a potential serverlist update */
321 op->op = GRPC_OP_RECV_MESSAGE;
322 op->data.recv_message = &lbcd->response_payload;
323 op->flags = 0;
324 op->reserved = NULL;
325 op++;
326 const grpc_call_error error = grpc_call_start_batch_and_execute(
327 exec_ctx, lbcd->c, ops, (size_t)(op - ops),
328 &lbcd->res_rcvd); /* loop */
329 GPR_ASSERT(GRPC_CALL_OK == error);
330 return;
331 } else {
332 gpr_log(GPR_ERROR, "Invalid LB response received: '%s'",
333 gpr_dump_slice(response_slice, GPR_DUMP_ASCII));
334 gpr_slice_unref(response_slice);
335
336 /* Disconnect from server returning invalid response. */
337 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
338 op->flags = 0;
339 op->reserved = NULL;
340 op++;
341 grpc_call_error error = grpc_call_start_batch_and_execute(
342 exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->close_sent);
343 GPR_ASSERT(GRPC_CALL_OK == error);
344 }
345 }
346 /* empty payload: call cancelled by server. Cleanups happening in
347 * srv_status_rcvd_cb */
348}
349static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
350 if (grpc_lb_glb_trace) {
351 gpr_log(GPR_INFO,
352 "Close from LB client sent. Waiting from server status now");
353 }
354}
355static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg,
356 bool success) {
357 lb_client_data *lbcd = arg;
358 glb_lb_policy *p = lbcd->p;
359 if (grpc_lb_glb_trace) {
360 gpr_log(
361 GPR_INFO,
362 "status from lb server received. Status = %d, Details = '%s', Capaticy "
363 "= %zu",
364 lbcd->status, lbcd->status_details, lbcd->status_details_capacity);
365 }
366
367 grpc_call_destroy(lbcd->c);
368 grpc_channel_destroy(lbcd->p->lb_server_channel);
369 lbcd->p->lb_server_channel = NULL;
370 lb_client_data_destroy(lbcd);
371 p->lbcd = NULL;
372}
373
374static lb_client_data *lb_client_data_create(glb_lb_policy *p) {
375 lb_client_data *lbcd = gpr_malloc(sizeof(lb_client_data));
376 memset(lbcd, 0, sizeof(lb_client_data));
377
378 gpr_mu_init(&lbcd->mu);
379 grpc_closure_init(&lbcd->md_sent, md_sent_cb, lbcd);
380
381 grpc_closure_init(&lbcd->md_rcvd, md_recv_cb, lbcd);
382 grpc_closure_init(&lbcd->req_sent, req_sent_cb, lbcd);
383 grpc_closure_init(&lbcd->res_rcvd, res_rcvd_cb, lbcd);
384 grpc_closure_init(&lbcd->close_sent, close_sent_cb, lbcd);
385 grpc_closure_init(&lbcd->srv_status_rcvd, srv_status_rcvd_cb, lbcd);
386
387 /* TODO(dgq): get the deadline from the client/user instead of fabricating
388 * one
389 * here. Make it a policy arg? */
390 lbcd->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
391 gpr_time_from_seconds(3, GPR_TIMESPAN));
392
393 lbcd->c = grpc_channel_create_pollset_set_call(
394 p->lb_server_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
395 p->base.interested_parties, "/BalanceLoad",
396 NULL, /* FIXME(dgq): which "host" value to use? */
397 lbcd->deadline, NULL);
398
399 grpc_metadata_array_init(&lbcd->initial_metadata_recv);
400 grpc_metadata_array_init(&lbcd->trailing_metadata_recv);
401
402 grpc_grpclb_request *request = grpc_grpclb_request_create(
403 "load.balanced.service.name"); /* FIXME(dgq): get the name of the load
404 balanced service from above. */
405 gpr_slice request_payload_slice = grpc_grpclb_request_encode(request);
406 lbcd->request_payload =
407 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
408 gpr_slice_unref(request_payload_slice);
409 grpc_grpclb_request_destroy(request);
410
411 lbcd->status_details = NULL;
412 lbcd->status_details_capacity = 0;
413 lbcd->p = p;
414 return lbcd;
415}
416
417static void lb_client_data_destroy(lb_client_data *lbcd) {
418 grpc_metadata_array_destroy(&lbcd->initial_metadata_recv);
419 grpc_metadata_array_destroy(&lbcd->trailing_metadata_recv);
420
421 grpc_byte_buffer_destroy(lbcd->request_payload);
422
423 gpr_free(lbcd->status_details);
424 gpr_mu_destroy(&lbcd->mu);
425 gpr_free(lbcd);
426}
427
428static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
429 glb_lb_policy *p = (glb_lb_policy *)pol;
430 GPR_ASSERT(p->pending_picks == NULL);
431 GPR_ASSERT(p->pending_pings == NULL);
432 grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
433 if (p->serverlist != NULL) {
434 grpc_grpclb_destroy_serverlist(p->serverlist);
435 }
436 gpr_mu_destroy(&p->mu);
437 gpr_free(p);
438}
439
440static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
441 glb_lb_policy *p = (glb_lb_policy *)pol;
442 gpr_mu_lock(&p->mu);
443
444 pending_pick *pp = p->pending_picks;
445 p->pending_picks = NULL;
446 pending_ping *pping = p->pending_pings;
447 p->pending_pings = NULL;
448 gpr_mu_unlock(&p->mu);
449
450 while (pp != NULL) {
451 pending_pick *next = pp->next;
452 *pp->target = NULL;
453 grpc_exec_ctx_enqueue(exec_ctx, pp->wrapped_on_complete, true, NULL);
454 gpr_free(pp);
455 pp = next;
456 }
457
458 while (pping != NULL) {
459 pending_ping *next = pping->next;
460 grpc_exec_ctx_enqueue(exec_ctx, pping->wrapped_notify, true, NULL);
461 pping = next;
462 }
463
464 if (p->rr_policy) {
465 /* unsubscribe */
466 grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy, NULL,
467 &p->rr_connectivity->on_change);
468 GRPC_LB_POLICY_UNREF(exec_ctx, p->rr_policy, "glb_shutdown");
469 }
470
471 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
472 GRPC_CHANNEL_SHUTDOWN, "glb_shutdown");
473}
474
475static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
476 grpc_connected_subchannel **target) {
477 glb_lb_policy *p = (glb_lb_policy *)pol;
478 gpr_mu_lock(&p->mu);
479 pending_pick *pp = p->pending_picks;
480 p->pending_picks = NULL;
481 while (pp != NULL) {
482 pending_pick *next = pp->next;
483 if (pp->target == target) {
484 grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
485 p->base.interested_parties);
486 *target = NULL;
487 grpc_exec_ctx_enqueue(exec_ctx, pp->wrapped_on_complete, false, NULL);
488 gpr_free(pp);
489 } else {
490 pp->next = p->pending_picks;
491 p->pending_picks = pp;
492 }
493 pp = next;
494 }
495 gpr_mu_unlock(&p->mu);
496}
497
498static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
499 uint32_t initial_metadata_flags_mask,
500 uint32_t initial_metadata_flags_eq) {
501 glb_lb_policy *p = (glb_lb_policy *)pol;
502 gpr_mu_lock(&p->mu);
503 if (p->lbcd != NULL) {
504 /* cancel the call to the load balancer service, if any */
505 grpc_call_cancel(p->lbcd->c, NULL);
506 }
507 pending_pick *pp = p->pending_picks;
508 p->pending_picks = NULL;
509 while (pp != NULL) {
510 pending_pick *next = pp->next;
511 if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
512 initial_metadata_flags_eq) {
513 grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
514 p->base.interested_parties);
515 grpc_exec_ctx_enqueue(exec_ctx, pp->wrapped_on_complete, false, NULL);
516 gpr_free(pp);
517 } else {
518 pp->next = p->pending_picks;
519 p->pending_picks = pp;
520 }
521 pp = next;
522 }
523 gpr_mu_unlock(&p->mu);
524}
525
526static void query_for_backends(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) {
527 GPR_ASSERT(p->lb_server_channel != NULL);
528
529 p->lbcd = lb_client_data_create(p);
530 grpc_call_error error;
531 grpc_op ops[1];
532 memset(ops, 0, sizeof(ops));
533 grpc_op *op = ops;
534 op->op = GRPC_OP_SEND_INITIAL_METADATA;
535 op->data.send_initial_metadata.count = 0;
536 op->flags = 0;
537 op->reserved = NULL;
538 op++;
539 error = grpc_call_start_batch_and_execute(
540 exec_ctx, p->lbcd->c, ops, (size_t)(op - ops), &p->lbcd->md_sent);
541 GPR_ASSERT(GRPC_CALL_OK == error);
542
543 op = ops;
544 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
545 op->data.recv_status_on_client.trailing_metadata =
546 &p->lbcd->trailing_metadata_recv;
547 op->data.recv_status_on_client.status = &p->lbcd->status;
548 op->data.recv_status_on_client.status_details = &p->lbcd->status_details;
549 op->data.recv_status_on_client.status_details_capacity =
550 &p->lbcd->status_details_capacity;
551 op->flags = 0;
552 op->reserved = NULL;
553 op++;
554 error = grpc_call_start_batch_and_execute(
555 exec_ctx, p->lbcd->c, ops, (size_t)(op - ops), &p->lbcd->srv_status_rcvd);
556 GPR_ASSERT(GRPC_CALL_OK == error);
557}
558
559static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
560 const grpc_grpclb_serverlist *serverlist,
561 glb_lb_policy *p) {
562 /* TODO(dgq): support mixed ip version */
563 GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
564 char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers);
565 for (size_t i = 0; i < serverlist->num_servers; ++i) {
566 gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address,
567 serverlist->servers[i]->port);
568 }
569
570 size_t uri_path_len;
571 char *concat_ipports = gpr_strjoin_sep(
572 (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len);
573
574 grpc_lb_policy_args args;
575 args.client_channel_factory = p->cc_factory;
576 args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
577 args.addresses->naddrs = serverlist->num_servers;
578 args.addresses->addrs =
579 gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs);
580 size_t out_addrs_idx = 0;
581 for (size_t i = 0; i < serverlist->num_servers; ++i) {
582 grpc_uri uri;
583 struct sockaddr_storage sa;
584 size_t sa_len;
585 uri.path = host_ports[i];
586 if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */
587 memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len);
588 args.addresses->addrs[out_addrs_idx].len = sa_len;
589 ++out_addrs_idx;
590 } else {
591 gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.",
592 host_ports[i]);
593 }
594 }
595
596 grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
597
598 gpr_free(concat_ipports);
599 for (size_t i = 0; i < serverlist->num_servers; i++) {
600 gpr_free(host_ports[i]);
601 }
602 gpr_free(host_ports);
603
604 gpr_free(args.addresses->addrs);
605 gpr_free(args.addresses);
606
607 return rr;
608}
609
610static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) {
611 p->started_picking = true;
612 query_for_backends(exec_ctx, p);
613}
614
615static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) {
616 p->rr_policy = create_rr(exec_ctx, p->serverlist, p);
617 if (grpc_lb_glb_trace) {
618 gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")",
619 (intptr_t)p->rr_policy);
620 }
621 GPR_ASSERT(p->rr_policy != NULL);
622 p->rr_connectivity->state =
623 grpc_lb_policy_check_connectivity(exec_ctx, p->rr_policy);
624 grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy,
625 &p->rr_connectivity->state,
626 &p->rr_connectivity->on_change);
627 grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
628 p->rr_connectivity->state, "rr_handover");
629 grpc_lb_policy_exit_idle(exec_ctx, p->rr_policy);
630
631 /* flush pending ops */
632 pending_pick *pp;
633 while ((pp = p->pending_picks)) {
634 p->pending_picks = pp->next;
635 GRPC_LB_POLICY_REF(p->rr_policy, "rr_handover_pending_pick");
636 pp->wrapped_on_complete_arg->rr_policy = p->rr_policy;
637 if (grpc_lb_glb_trace) {
638 gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%"PRIxPTR"", (intptr_t)p->rr_policy);
639 }
640 grpc_lb_policy_pick(exec_ctx, p->rr_policy, pp->pollent,
641 pp->initial_metadata, pp->initial_metadata_flags,
642 pp->target, pp->wrapped_on_complete);
643 gpr_free(pp);
644 }
645
646 pending_ping *pping;
647 while ((pping = p->pending_pings)) {
648 p->pending_pings = pping->next;
649 GRPC_LB_POLICY_REF(p->rr_policy, "rr_handover_pending_ping");
650 pping->wrapped_notify_arg->rr_policy = p->rr_policy;
651 if (grpc_lb_glb_trace) {
652 gpr_log(GPR_INFO, "Pending ping about to PING from 0x%"PRIxPTR"", (intptr_t)p->rr_policy);
653 }
654 grpc_lb_policy_ping_one(exec_ctx, p->rr_policy, pping->wrapped_notify);
655 gpr_free(pping);
656 }
657}
658
659static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
660 glb_lb_policy *p = (glb_lb_policy *)pol;
661 gpr_mu_lock(&p->mu);
662 if (!p->started_picking) {
663 start_picking(exec_ctx, p);
664 }
665 gpr_mu_unlock(&p->mu);
666}
667
668static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
669 grpc_polling_entity *pollent,
670 grpc_metadata_batch *initial_metadata,
671 uint32_t initial_metadata_flags,
672 grpc_connected_subchannel **target,
673 grpc_closure *on_complete) {
674 glb_lb_policy *p = (glb_lb_policy *)pol;
675 gpr_mu_lock(&p->mu);
676 int r;
677
678 if (p->rr_policy != NULL) {
679 if (grpc_lb_glb_trace) {
680 gpr_log(GPR_INFO, "about to PICK from 0x%"PRIxPTR"", (intptr_t)p->rr_policy);
681 }
682 GRPC_LB_POLICY_REF(p->rr_policy, "rr_pick");
683 wrapped_rr_closure_arg *warg = gpr_malloc(sizeof(wrapped_rr_closure_arg));
684 warg->rr_policy = p->rr_policy;
685 warg->wrapped_closure = on_complete;
686 grpc_closure *wrapped_on_complete =
687 grpc_closure_create(wrapped_rr_closure, warg);
688 r = grpc_lb_policy_pick(exec_ctx, p->rr_policy, pollent, initial_metadata,
689 initial_metadata_flags, target,
690 wrapped_on_complete);
691 if (r != 0) {
692 /* the call to grpc_lb_policy_pick has been sychronous. Invoke a neutered
693 * wrapped closure */
694 warg->wrapped_closure = NULL;
695 grpc_exec_ctx_enqueue(exec_ctx, wrapped_on_complete, false, NULL);
696 }
697 } else {
698 grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
699 p->base.interested_parties);
700 add_pending_pick(&p->pending_picks, pollent, initial_metadata,
701 initial_metadata_flags, target, on_complete);
702
703 if (!p->started_picking) {
704 start_picking(exec_ctx, p);
705 }
706 r = 0;
707 }
708 gpr_mu_unlock(&p->mu);
709 return r;
710}
711
712static grpc_connectivity_state glb_check_connectivity(grpc_exec_ctx *exec_ctx,
713 grpc_lb_policy *pol) {
714 glb_lb_policy *p = (glb_lb_policy *)pol;
715 grpc_connectivity_state st;
716 gpr_mu_lock(&p->mu);
717 st = grpc_connectivity_state_check(&p->state_tracker);
718 gpr_mu_unlock(&p->mu);
719 return st;
720}
721
722static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
723 grpc_closure *closure) {
724 glb_lb_policy *p = (glb_lb_policy *)pol;
725 gpr_mu_lock(&p->mu);
726 if (p->rr_policy) {
727 grpc_lb_policy_ping_one(exec_ctx, p->rr_policy, closure);
728 } else {
729 add_pending_ping(&p->pending_pings, closure);
730 if (!p->started_picking) {
731 start_picking(exec_ctx, p);
732 }
733 }
734 gpr_mu_unlock(&p->mu);
735}
736
737static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx,
738 grpc_lb_policy *pol,
739 grpc_connectivity_state *current,
740 grpc_closure *notify) {
741 glb_lb_policy *p = (glb_lb_policy *)pol;
742 gpr_mu_lock(&p->mu);
743 grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
744 current, notify);
745
746 gpr_mu_unlock(&p->mu);
747}
748
749static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
750 glb_destroy, glb_shutdown, glb_pick,
751 glb_cancel_pick, glb_cancel_picks, glb_ping_one,
752 glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change};
753
754static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
755
756static void glb_factory_unref(grpc_lb_policy_factory *factory) {}
757
758static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
759 grpc_lb_policy_factory *factory,
760 grpc_lb_policy_args *args) {
761 glb_lb_policy *p = gpr_malloc(sizeof(*p));
762 memset(p, 0, sizeof(*p));
763
764 /* all input addresses in args->addresses come from a resolver that claims
765 * they are LB services.
766 *
767 * Create a client channel over them to communicate with a LB service */
768 p->cc_factory = args->client_channel_factory;
769 GPR_ASSERT(p->cc_factory != NULL);
770 if (args->addresses->naddrs == 0) {
771 return NULL;
772 }
773
774 /* construct a target from the args->addresses, in the form
775 * ipvX://ip1:port1,ip2:port2,...
776 * TODO(dgq): support mixed ip version */
777 char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs);
778 addr_strs[0] =
779 grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]);
780 for (size_t i = 1; i < args->addresses->naddrs; i++) {
781 GPR_ASSERT(grpc_sockaddr_to_string(
782 &addr_strs[i],
783 (const struct sockaddr *)&args->addresses->addrs[i],
784 true) == 0);
785 }
786 size_t uri_path_len;
787 char *target_uri_str = gpr_strjoin_sep(
788 (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len);
789
790 /* will pick using pick_first */
791 p->lb_server_channel = grpc_client_channel_factory_create_channel(
792 exec_ctx, p->cc_factory, target_uri_str,
793 GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
794
795 gpr_free(target_uri_str);
796 for (size_t i = 0; i < args->addresses->naddrs; i++) {
797 gpr_free(addr_strs[i]);
798 }
799 gpr_free(addr_strs);
800
801 if (p->lb_server_channel == NULL) {
802 gpr_free(p);
803 return NULL;
804 }
805
806 rr_connectivity_data *rr_connectivity =
807 gpr_malloc(sizeof(rr_connectivity_data));
808 memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
809 grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed,
810 rr_connectivity);
811 rr_connectivity->p = p;
812 p->rr_connectivity = rr_connectivity;
813
814 grpc_lb_policy_init(&p->base, &glb_lb_policy_vtable);
815 gpr_mu_init(&p->mu);
816 grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "grpclb");
817 return &p->base;
818}
819
820static const grpc_lb_policy_factory_vtable glb_factory_vtable = {
821 glb_factory_ref, glb_factory_unref, glb_create, "grpclb"};
822
823static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable};
824
825grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
826 return &glb_lb_policy_factory;
827}
828
829/* Plugin registration */
830
831void grpc_lb_policy_grpclb_init() {
832 grpc_register_lb_policy(grpc_glb_lb_factory_create());
833 grpc_register_tracer("glb", &grpc_lb_glb_trace);
834}
835
836void grpc_lb_policy_grpclb_shutdown() {}