blob: 1fc0d8ebc0ec82846884c00b3a715869e7bf8f51 [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintasaaba1312016-06-22 18:10:37 -070034#include <cstdarg>
35#include <cstring>
36#include <string>
David Garcia Quintas78fbb0a2016-06-28 14:02:41 -070037#include <cstdint>
38#include <cinttypes>
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070039
David Garcia Quintas55145c02016-06-21 14:51:54 -070040extern "C" {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070041#include <grpc/grpc.h>
42#include <grpc/support/alloc.h>
43#include <grpc/support/host_port.h>
44#include <grpc/support/log.h>
45#include <grpc/support/string_util.h>
46#include <grpc/support/sync.h>
47#include <grpc/support/thd.h>
48#include <grpc/support/time.h>
49
50#include "src/core/ext/client_config/client_channel.h"
51#include "src/core/lib/channel/channel_stack.h"
52#include "src/core/lib/support/string.h"
53#include "src/core/lib/support/tmpfile.h"
54#include "src/core/lib/surface/channel.h"
55#include "src/core/lib/surface/server.h"
56#include "test/core/end2end/cq_verifier.h"
57#include "test/core/util/port.h"
58#include "test/core/util/test_config.h"
David Garcia Quintas55145c02016-06-21 14:51:54 -070059}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070060
David Garcia Quintasaaba1312016-06-22 18:10:37 -070061#include "src/proto/grpc/lb/v1/load_balancer.pb.h"
62
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070063#define NUM_BACKENDS 4
64
David Garcia Quintasf9f856b2016-06-22 18:25:53 -070065// TODO(dgq): Other scenarios in need of testing:
David Garcia Quintasea11d162016-07-14 17:27:28 -070066// - Send an empty serverlist update and verify that the client request blocks
67// until a new serverlist with actual contents is available.
David Garcia Quintasf9f856b2016-06-22 18:25:53 -070068// - Send identical serverlist update
69// - Test reception of invalid serverlist
70// - Test pinging
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070071// - Test against a non-LB server. That server should return UNIMPLEMENTED and
72// the call should fail.
David Garcia Quintasf9f856b2016-06-22 18:25:53 -070073
David Garcia Quintasaaba1312016-06-22 18:10:37 -070074namespace grpc {
75namespace {
76
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070077typedef struct client_fixture {
78 grpc_channel *client;
79 char *server_uri;
80 grpc_completion_queue *cq;
81} client_fixture;
82
83typedef struct server_fixture {
84 grpc_server *server;
85 grpc_call *server_call;
86 grpc_completion_queue *cq;
87 char *servers_hostport;
88 int port;
89 gpr_thd_id tid;
90 int num_calls_serviced;
91} server_fixture;
92
93typedef struct test_fixture {
94 server_fixture lb_server;
95 server_fixture lb_backends[NUM_BACKENDS];
96 client_fixture client;
97 int lb_server_update_delay_ms;
98} test_fixture;
99
100static gpr_timespec n_seconds_time(int n) {
101 return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
102}
103
104static void *tag(intptr_t t) { return (void *)t; }
105
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700106static gpr_slice build_response_payload_slice(
107 const char *host, int *ports, size_t nports,
108 int64_t expiration_interval_secs, int32_t expiration_interval_nanos) {
David Garcia Quintasf9f856b2016-06-22 18:25:53 -0700109 // server_list {
110 // servers {
111 // ip_address: "127.0.0.1"
112 // port: ...
113 // load_balance_token: "token..."
114 // }
115 // ...
116 // }
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700117 grpc::lb::v1::LoadBalanceResponse response;
118 auto *serverlist = response.mutable_server_list();
119
120 if (expiration_interval_secs > 0 || expiration_interval_nanos > 0) {
121 auto *expiration_interval = serverlist->mutable_expiration_interval();
122 if (expiration_interval_secs > 0) {
123 expiration_interval->set_seconds(expiration_interval_secs);
124 }
125 if (expiration_interval_nanos > 0) {
126 expiration_interval->set_nanos(expiration_interval_nanos);
127 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700128 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700129 for (size_t i = 0; i < nports; i++) {
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700130 auto *server = serverlist->add_servers();
131 server->set_ip_address(host);
132 server->set_port(ports[i]);
David Garcia Quintasc534b0d2016-06-28 11:48:05 -0700133 // The following long long int cast is meant to work around the
134 // disfunctional implementation of std::to_string in gcc 4.4, which doesn't
135 // have a version for int but does have one for long long int.
136 server->set_load_balance_token("token" +
137 std::to_string((long long int)ports[i]));
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700138 }
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700139
140 gpr_log(GPR_INFO, "generating response: %s",
141 response.ShortDebugString().c_str());
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700142
143 const gpr_slice response_slice =
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700144 gpr_slice_from_copied_string(response.SerializeAsString().c_str());
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700145 return response_slice;
146}
147
148static void drain_cq(grpc_completion_queue *cq) {
149 grpc_event ev;
150 do {
151 ev = grpc_completion_queue_next(cq, n_seconds_time(5), NULL);
152 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
153}
154
155static void sleep_ms(int delay_ms) {
156 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
157 gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
158}
159
160static void start_lb_server(server_fixture *sf, int *ports, size_t nports,
161 int update_delay_ms) {
162 grpc_call *s;
163 cq_verifier *cqv = cq_verifier_create(sf->cq);
164 grpc_op ops[6];
165 grpc_op *op;
166 grpc_metadata_array request_metadata_recv;
167 grpc_call_details call_details;
168 grpc_call_error error;
169 int was_cancelled = 2;
170 grpc_byte_buffer *request_payload_recv;
171 grpc_byte_buffer *response_payload;
172
173 memset(ops, 0, sizeof(ops));
174 grpc_metadata_array_init(&request_metadata_recv);
175 grpc_call_details_init(&call_details);
176
177 error = grpc_server_request_call(sf->server, &s, &call_details,
178 &request_metadata_recv, sf->cq, sf->cq,
179 tag(200));
180 GPR_ASSERT(GRPC_CALL_OK == error);
181 gpr_log(GPR_INFO, "LB Server[%s] up", sf->servers_hostport);
182 cq_expect_completion(cqv, tag(200), 1);
183 cq_verify(cqv);
184 gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport);
185
186 op = ops;
187 op->op = GRPC_OP_SEND_INITIAL_METADATA;
188 op->data.send_initial_metadata.count = 0;
189 op->flags = 0;
190 op->reserved = NULL;
191 op++;
192 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
193 op->data.recv_close_on_server.cancelled = &was_cancelled;
194 op->flags = 0;
195 op->reserved = NULL;
196 op++;
197 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL);
198 GPR_ASSERT(GRPC_CALL_OK == error);
199 gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport);
200
David Garcia Quintas390673a2016-06-28 10:38:19 -0700201 // receive request for backends
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700202 op = ops;
203 op->op = GRPC_OP_RECV_MESSAGE;
204 op->data.recv_message = &request_payload_recv;
205 op->flags = 0;
206 op->reserved = NULL;
207 op++;
208 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL);
209 GPR_ASSERT(GRPC_CALL_OK == error);
210 cq_expect_completion(cqv, tag(202), 1);
211 cq_verify(cqv);
212 gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport);
213 // TODO(dgq): validate request.
214 grpc_byte_buffer_destroy(request_payload_recv);
215 gpr_slice response_payload_slice;
216 for (int i = 0; i < 2; i++) {
217 if (i == 0) {
218 // First half of the ports.
219 response_payload_slice =
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700220 build_response_payload_slice("127.0.0.1", ports, nports / 2, -1, -1);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700221 } else {
222 // Second half of the ports.
223 sleep_ms(update_delay_ms);
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700224 response_payload_slice =
225 build_response_payload_slice("127.0.0.1", ports + (nports / 2),
226 (nports + 1) / 2 /* ceil */, -1, -1);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700227 }
228
229 response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
230 op = ops;
231 op->op = GRPC_OP_SEND_MESSAGE;
232 op->data.send_message = response_payload;
233 op->flags = 0;
234 op->reserved = NULL;
235 op++;
236 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(203), NULL);
237 GPR_ASSERT(GRPC_CALL_OK == error);
238 cq_expect_completion(cqv, tag(203), 1);
239 cq_verify(cqv);
240 gpr_log(GPR_INFO, "LB Server[%s] after SEND_MESSAGE, iter %d",
241 sf->servers_hostport, i);
242
243 grpc_byte_buffer_destroy(response_payload);
244 gpr_slice_unref(response_payload_slice);
245 }
246 gpr_log(GPR_INFO, "LB Server[%s] shutting down", sf->servers_hostport);
247
248 op = ops;
249 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
250 op->data.send_status_from_server.trailing_metadata_count = 0;
251 op->data.send_status_from_server.status = GRPC_STATUS_OK;
252 op->data.send_status_from_server.status_details = "xyz";
253 op->flags = 0;
254 op->reserved = NULL;
255 op++;
256 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(204), NULL);
257 GPR_ASSERT(GRPC_CALL_OK == error);
258
259 cq_expect_completion(cqv, tag(201), 1);
260 cq_expect_completion(cqv, tag(204), 1);
261 cq_verify(cqv);
262 gpr_log(GPR_INFO, "LB Server[%s] after tag 204. All done. LB server out",
263 sf->servers_hostport);
264
265 grpc_call_destroy(s);
266
267 cq_verifier_destroy(cqv);
268
269 grpc_metadata_array_destroy(&request_metadata_recv);
270 grpc_call_details_destroy(&call_details);
271}
272
273static void start_backend_server(server_fixture *sf) {
274 grpc_call *s;
275 cq_verifier *cqv;
276 grpc_op ops[6];
277 grpc_op *op;
278 grpc_metadata_array request_metadata_recv;
279 grpc_call_details call_details;
280 grpc_call_error error;
281 int was_cancelled;
282 grpc_byte_buffer *request_payload_recv;
283 grpc_byte_buffer *response_payload;
284 grpc_event ev;
285
286 while (true) {
287 memset(ops, 0, sizeof(ops));
288 cqv = cq_verifier_create(sf->cq);
289 was_cancelled = 2;
290 grpc_metadata_array_init(&request_metadata_recv);
291 grpc_call_details_init(&call_details);
292
293 error = grpc_server_request_call(sf->server, &s, &call_details,
294 &request_metadata_recv, sf->cq, sf->cq,
295 tag(100));
296 GPR_ASSERT(GRPC_CALL_OK == error);
297 gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport);
298 ev = grpc_completion_queue_next(sf->cq, n_seconds_time(60), NULL);
299 if (!ev.success) {
300 gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport);
301 cq_verifier_destroy(cqv);
302 grpc_metadata_array_destroy(&request_metadata_recv);
303 grpc_call_details_destroy(&call_details);
304 return;
305 }
306 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
307 gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport);
308
309 op = ops;
310 op->op = GRPC_OP_SEND_INITIAL_METADATA;
311 op->data.send_initial_metadata.count = 0;
312 op->flags = 0;
313 op->reserved = NULL;
314 op++;
315 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
316 op->data.recv_close_on_server.cancelled = &was_cancelled;
317 op->flags = 0;
318 op->reserved = NULL;
319 op++;
320 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(101), NULL);
321 GPR_ASSERT(GRPC_CALL_OK == error);
322 gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport);
323
324 bool exit = false;
325 gpr_slice response_payload_slice =
326 gpr_slice_from_copied_string("hello you");
327 while (!exit) {
328 op = ops;
329 op->op = GRPC_OP_RECV_MESSAGE;
330 op->data.recv_message = &request_payload_recv;
331 op->flags = 0;
332 op->reserved = NULL;
333 op++;
334 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
335 GPR_ASSERT(GRPC_CALL_OK == error);
336 ev = grpc_completion_queue_next(sf->cq, n_seconds_time(3), NULL);
337 if (ev.type == GRPC_OP_COMPLETE && ev.success) {
338 GPR_ASSERT(ev.tag = tag(102));
339 if (request_payload_recv == NULL) {
340 exit = true;
341 gpr_log(GPR_INFO,
342 "Server[%s] recv \"close\" from client, exiting. Call #%d",
343 sf->servers_hostport, sf->num_calls_serviced);
344 }
345 } else {
346 gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d",
347 sf->servers_hostport, sf->num_calls_serviced);
348 exit = true;
349 }
350 gpr_log(GPR_INFO, "Server[%s] after tag 102. Call #%d",
351 sf->servers_hostport, sf->num_calls_serviced);
352
353 if (!exit) {
354 response_payload =
355 grpc_raw_byte_buffer_create(&response_payload_slice, 1);
356 op = ops;
357 op->op = GRPC_OP_SEND_MESSAGE;
358 op->data.send_message = response_payload;
359 op->flags = 0;
360 op->reserved = NULL;
361 op++;
362 error =
363 grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
364 GPR_ASSERT(GRPC_CALL_OK == error);
365 ev = grpc_completion_queue_next(sf->cq, n_seconds_time(3), NULL);
366 if (ev.type == GRPC_OP_COMPLETE && ev.success) {
367 GPR_ASSERT(ev.tag = tag(103));
368 } else {
369 gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d",
370 sf->servers_hostport, sf->num_calls_serviced);
371 exit = true;
372 }
373 gpr_log(GPR_INFO, "Server[%s] after tag 103. Call #%d",
374 sf->servers_hostport, sf->num_calls_serviced);
375 grpc_byte_buffer_destroy(response_payload);
376 }
377
378 grpc_byte_buffer_destroy(request_payload_recv);
379 }
380 ++sf->num_calls_serviced;
381
382 gpr_log(GPR_INFO, "Server[%s] OUT OF THE LOOP", sf->servers_hostport);
383 gpr_slice_unref(response_payload_slice);
384
385 op = ops;
386 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
387 op->data.send_status_from_server.trailing_metadata_count = 0;
388 op->data.send_status_from_server.status = GRPC_STATUS_OK;
389 op->data.send_status_from_server.status_details = "Backend server out a-ok";
390 op->flags = 0;
391 op->reserved = NULL;
392 op++;
393 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL);
394 GPR_ASSERT(GRPC_CALL_OK == error);
395
396 cq_expect_completion(cqv, tag(101), 1);
397 cq_expect_completion(cqv, tag(104), 1);
398 cq_verify(cqv);
399 gpr_log(GPR_INFO, "Server[%s] DONE. After servicing %d calls",
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700400 sf->servers_hostport, sf->num_calls_serviced);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700401
402 grpc_call_destroy(s);
403 cq_verifier_destroy(cqv);
404 grpc_metadata_array_destroy(&request_metadata_recv);
405 grpc_call_details_destroy(&call_details);
406 }
407}
408
409static void perform_request(client_fixture *cf) {
410 grpc_call *c;
411 cq_verifier *cqv = cq_verifier_create(cf->cq);
412 grpc_op ops[6];
413 grpc_op *op;
414 grpc_metadata_array initial_metadata_recv;
415 grpc_metadata_array trailing_metadata_recv;
416 grpc_status_code status;
417 grpc_call_error error;
418 char *details = NULL;
419 size_t details_capacity = 0;
420 grpc_byte_buffer *request_payload;
421 grpc_byte_buffer *response_payload_recv;
422 int i;
423
424 memset(ops, 0, sizeof(ops));
425 gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
426
427 c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS,
428 cf->cq, "/foo", "foo.test.google.fr:1234",
429 n_seconds_time(1000), NULL);
430 gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c);
431 GPR_ASSERT(c);
432 char *peer;
433
434 grpc_metadata_array_init(&initial_metadata_recv);
435 grpc_metadata_array_init(&trailing_metadata_recv);
436
437 op = ops;
438 op->op = GRPC_OP_SEND_INITIAL_METADATA;
439 op->data.send_initial_metadata.count = 0;
440 op->flags = 0;
441 op->reserved = NULL;
442 op++;
443 op->op = GRPC_OP_RECV_INITIAL_METADATA;
444 op->data.recv_initial_metadata = &initial_metadata_recv;
445 op->flags = 0;
446 op->reserved = NULL;
447 op++;
448 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
449 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
450 op->data.recv_status_on_client.status = &status;
451 op->data.recv_status_on_client.status_details = &details;
452 op->data.recv_status_on_client.status_details_capacity = &details_capacity;
453 op->flags = 0;
454 op->reserved = NULL;
455 op++;
456 error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
457 GPR_ASSERT(GRPC_CALL_OK == error);
458
459 for (i = 0; i < 4; i++) {
460 request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
461
462 op = ops;
463 op->op = GRPC_OP_SEND_MESSAGE;
464 op->data.send_message = request_payload;
465 op->flags = 0;
466 op->reserved = NULL;
467 op++;
468 op->op = GRPC_OP_RECV_MESSAGE;
469 op->data.recv_message = &response_payload_recv;
470 op->flags = 0;
471 op->reserved = NULL;
472 op++;
473 error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
474 GPR_ASSERT(GRPC_CALL_OK == error);
475
476 peer = grpc_call_get_peer(c);
477 cq_expect_completion(cqv, tag(2), 1);
478 cq_verify(cqv);
479 gpr_free(peer);
480
481 grpc_byte_buffer_destroy(request_payload);
482 grpc_byte_buffer_destroy(response_payload_recv);
483 }
484
485 gpr_slice_unref(request_payload_slice);
486
487 op = ops;
488 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
489 op->flags = 0;
490 op->reserved = NULL;
491 op++;
492 error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL);
493 GPR_ASSERT(GRPC_CALL_OK == error);
494
495 cq_expect_completion(cqv, tag(1), 1);
496 cq_expect_completion(cqv, tag(3), 1);
497 cq_verify(cqv);
498 peer = grpc_call_get_peer(c);
499 gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer);
500 gpr_free(peer);
501
502 grpc_call_destroy(c);
503
504 cq_verify_empty_timeout(cqv, 1);
505 cq_verifier_destroy(cqv);
506
507 grpc_metadata_array_destroy(&initial_metadata_recv);
508 grpc_metadata_array_destroy(&trailing_metadata_recv);
509 gpr_free(details);
510}
511
512static void setup_client(const char *server_hostport, client_fixture *cf) {
513 cf->cq = grpc_completion_queue_create(NULL);
514 cf->server_uri = gpr_strdup(server_hostport);
515 cf->client = grpc_insecure_channel_create(cf->server_uri, NULL, NULL);
516}
517
518static void teardown_client(client_fixture *cf) {
519 grpc_completion_queue_shutdown(cf->cq);
520 drain_cq(cf->cq);
521 grpc_completion_queue_destroy(cf->cq);
522 cf->cq = NULL;
523 grpc_channel_destroy(cf->client);
524 cf->client = NULL;
525 gpr_free(cf->server_uri);
526}
527
528static void setup_server(const char *host, server_fixture *sf) {
529 int assigned_port;
530
531 sf->cq = grpc_completion_queue_create(NULL);
David Garcia Quintas55145c02016-06-21 14:51:54 -0700532 const char *colon_idx = strchr(host, ':');
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700533 if (colon_idx) {
David Garcia Quintas55145c02016-06-21 14:51:54 -0700534 const char *port_str = colon_idx + 1;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700535 sf->port = atoi(port_str);
536 sf->servers_hostport = gpr_strdup(host);
537 } else {
538 sf->port = grpc_pick_unused_port_or_die();
539 gpr_join_host_port(&sf->servers_hostport, host, sf->port);
540 }
541
542 sf->server = grpc_server_create(NULL, NULL);
543 grpc_server_register_completion_queue(sf->server, sf->cq, NULL);
544 GPR_ASSERT((assigned_port = grpc_server_add_insecure_http2_port(
545 sf->server, sf->servers_hostport)) > 0);
546 GPR_ASSERT(sf->port == assigned_port);
547 grpc_server_start(sf->server);
548}
549
550static void teardown_server(server_fixture *sf) {
551 if (!sf->server) return;
552
553 gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport);
554 grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000));
555 GPR_ASSERT(grpc_completion_queue_pluck(
556 sf->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
557 .type == GRPC_OP_COMPLETE);
558 grpc_server_destroy(sf->server);
559 gpr_thd_join(sf->tid);
560
561 sf->server = NULL;
562 grpc_completion_queue_shutdown(sf->cq);
563 drain_cq(sf->cq);
564 grpc_completion_queue_destroy(sf->cq);
565
566 gpr_log(GPR_INFO, "Server[%s] bye bye", sf->servers_hostport);
567 gpr_free(sf->servers_hostport);
568}
569
570static void fork_backend_server(void *arg) {
David Garcia Quintas55145c02016-06-21 14:51:54 -0700571 server_fixture *sf = static_cast<server_fixture *>(arg);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700572 start_backend_server(sf);
573}
574
575static void fork_lb_server(void *arg) {
David Garcia Quintas55145c02016-06-21 14:51:54 -0700576 test_fixture *tf = static_cast<test_fixture *>(arg);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700577 int ports[NUM_BACKENDS];
578 for (int i = 0; i < NUM_BACKENDS; i++) {
579 ports[i] = tf->lb_backends[i].port;
580 }
581 start_lb_server(&tf->lb_server, ports, NUM_BACKENDS,
582 tf->lb_server_update_delay_ms);
583}
584
585static void setup_test_fixture(test_fixture *tf,
586 int lb_server_update_delay_ms) {
587 tf->lb_server_update_delay_ms = lb_server_update_delay_ms;
588
589 gpr_thd_options options = gpr_thd_options_default();
590 gpr_thd_options_set_joinable(&options);
591
592 for (int i = 0; i < NUM_BACKENDS; ++i) {
593 setup_server("127.0.0.1", &tf->lb_backends[i]);
594 gpr_thd_new(&tf->lb_backends[i].tid, fork_backend_server,
595 &tf->lb_backends[i], &options);
596 }
597
598 setup_server("127.0.0.1", &tf->lb_server);
599 gpr_thd_new(&tf->lb_server.tid, fork_lb_server, &tf->lb_server, &options);
600
601 char *server_uri;
602 gpr_asprintf(&server_uri, "ipv4:%s?lb_policy=grpclb&lb_enabled=1",
603 tf->lb_server.servers_hostport);
604 setup_client(server_uri, &tf->client);
605 gpr_free(server_uri);
606}
607
608static void teardown_test_fixture(test_fixture *tf) {
609 teardown_client(&tf->client);
610 for (int i = 0; i < NUM_BACKENDS; ++i) {
611 teardown_server(&tf->lb_backends[i]);
612 }
613 teardown_server(&tf->lb_server);
614}
615
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700616// The LB server will send two updates: batch 1 and batch 2. Each batch
617// contains
618// two addresses, both of a valid and running backend server. Batch 1 is
619// readily
620// available and provided as soon as the client establishes the streaming
621// call.
622// Batch 2 is sent after a delay of \a lb_server_update_delay_ms
623// milliseconds.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700624static test_fixture test_update(int lb_server_update_delay_ms) {
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700625 gpr_log(GPR_INFO, "start %s(%d)", __func__, lb_server_update_delay_ms);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700626 test_fixture tf;
627 memset(&tf, 0, sizeof(tf));
628 setup_test_fixture(&tf, lb_server_update_delay_ms);
629 perform_request(
630 &tf.client); // "consumes" 1st backend server of 1st serverlist
631 perform_request(
632 &tf.client); // "consumes" 2nd backend server of 1st serverlist
633
634 perform_request(
635 &tf.client); // "consumes" 1st backend server of 2nd serverlist
636 perform_request(
637 &tf.client); // "consumes" 2nd backend server of 2nd serverlist
638
639 teardown_test_fixture(&tf);
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700640 gpr_log(GPR_INFO, "end %s(%d)", __func__, lb_server_update_delay_ms);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700641 return tf;
642}
643
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700644} // namespace
645} // namespace grpc
646
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700647int main(int argc, char **argv) {
648 grpc_test_init(argc, argv);
649 grpc_init();
650
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700651 grpc::test_fixture tf_result;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700652 // Clients take a bit over one second to complete a call (the last part of the
653 // call sleeps for 1 second while verifying the client's completion queue is
654 // empty). Therefore:
655 //
656 // If the LB server waits 800ms before sending an update, it will arrive
657 // before the first client request is done, skipping the second server from
658 // batch 1 altogether: the 2nd client request will go to the 1st server of
659 // batch 2 (ie, the third one out of the four total servers).
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700660 tf_result = grpc::test_update(800);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700661 GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
662 GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0);
663 GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2);
664 GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
665
666 // If the LB server waits 1500ms, the update arrives after having picked the
667 // 2nd server from batch 1 but before the next pick for the first server of
668 // batch 2. All server are used.
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700669 tf_result = grpc::test_update(1500);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700670 GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
671 GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
672 GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1);
673 GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
674
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700675 // If the LB server waits > 2000ms, the update arrives after the first two
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700676 // request are done and the third pick is performed, which returns, in RR
677 // fashion, the 1st server of the 1st update. Therefore, the second server of
678 // batch 1 is hit twice, whereas the first server of batch 2 is never hit.
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700679 tf_result = grpc::test_update(2100);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700680 GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 2);
681 GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
682 GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1);
683 GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0);
684
685 grpc_shutdown();
686 return 0;
687}