blob: 2d31b47c245ed1bfb717dcc3405602c8c9027886 [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintasd9cee6f2016-08-01 17:42:47 -070034#include <cinttypes>
David Garcia Quintasaaba1312016-06-22 18:10:37 -070035#include <cstdarg>
David Garcia Quintasd9cee6f2016-08-01 17:42:47 -070036#include <cstdint>
David Garcia Quintasaaba1312016-06-22 18:10:37 -070037#include <cstring>
38#include <string>
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070039
40#include <grpc/grpc.h>
41#include <grpc/support/alloc.h>
42#include <grpc/support/host_port.h>
43#include <grpc/support/log.h>
44#include <grpc/support/string_util.h>
45#include <grpc/support/sync.h>
46#include <grpc/support/thd.h>
47#include <grpc/support/time.h>
48
yang-g7c288712016-08-30 10:12:28 -070049extern "C" {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070050#include "src/core/ext/client_config/client_channel.h"
51#include "src/core/lib/channel/channel_stack.h"
52#include "src/core/lib/support/string.h"
53#include "src/core/lib/support/tmpfile.h"
54#include "src/core/lib/surface/channel.h"
55#include "src/core/lib/surface/server.h"
56#include "test/core/end2end/cq_verifier.h"
57#include "test/core/util/port.h"
58#include "test/core/util/test_config.h"
David Garcia Quintas55145c02016-06-21 14:51:54 -070059}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070060
David Garcia Quintasaaba1312016-06-22 18:10:37 -070061#include "src/proto/grpc/lb/v1/load_balancer.pb.h"
62
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070063#define NUM_BACKENDS 4
64
David Garcia Quintasf9f856b2016-06-22 18:25:53 -070065// TODO(dgq): Other scenarios in need of testing:
David Garcia Quintasea11d162016-07-14 17:27:28 -070066// - Send an empty serverlist update and verify that the client request blocks
67// until a new serverlist with actual contents is available.
David Garcia Quintasf9f856b2016-06-22 18:25:53 -070068// - Send identical serverlist update
69// - Test reception of invalid serverlist
70// - Test pinging
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070071// - Test against a non-LB server. That server should return UNIMPLEMENTED and
David Garcia Quintasa26c77b2016-07-18 12:57:09 -070072// the call should fail.
73// - Random LB server closing the stream unexpectedly.
David Garcia Quintasf9f856b2016-06-22 18:25:53 -070074
David Garcia Quintasaaba1312016-06-22 18:10:37 -070075namespace grpc {
76namespace {
77
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070078typedef struct client_fixture {
79 grpc_channel *client;
80 char *server_uri;
81 grpc_completion_queue *cq;
82} client_fixture;
83
84typedef struct server_fixture {
85 grpc_server *server;
86 grpc_call *server_call;
87 grpc_completion_queue *cq;
88 char *servers_hostport;
89 int port;
90 gpr_thd_id tid;
91 int num_calls_serviced;
92} server_fixture;
93
94typedef struct test_fixture {
95 server_fixture lb_server;
96 server_fixture lb_backends[NUM_BACKENDS];
97 client_fixture client;
98 int lb_server_update_delay_ms;
99} test_fixture;
100
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700101static void *tag(intptr_t t) { return (void *)t; }
102
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700103static gpr_slice build_response_payload_slice(
104 const char *host, int *ports, size_t nports,
105 int64_t expiration_interval_secs, int32_t expiration_interval_nanos) {
David Garcia Quintasf9f856b2016-06-22 18:25:53 -0700106 // server_list {
107 // servers {
108 // ip_address: "127.0.0.1"
109 // port: ...
110 // load_balance_token: "token..."
111 // }
112 // ...
113 // }
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700114 grpc::lb::v1::LoadBalanceResponse response;
115 auto *serverlist = response.mutable_server_list();
116
117 if (expiration_interval_secs > 0 || expiration_interval_nanos > 0) {
118 auto *expiration_interval = serverlist->mutable_expiration_interval();
119 if (expiration_interval_secs > 0) {
120 expiration_interval->set_seconds(expiration_interval_secs);
121 }
122 if (expiration_interval_nanos > 0) {
123 expiration_interval->set_nanos(expiration_interval_nanos);
124 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700125 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700126 for (size_t i = 0; i < nports; i++) {
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700127 auto *server = serverlist->add_servers();
128 server->set_ip_address(host);
129 server->set_port(ports[i]);
David Garcia Quintasc534b0d2016-06-28 11:48:05 -0700130 // The following long long int cast is meant to work around the
131 // disfunctional implementation of std::to_string in gcc 4.4, which doesn't
132 // have a version for int but does have one for long long int.
133 server->set_load_balance_token("token" +
134 std::to_string((long long int)ports[i]));
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700135 }
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700136
137 gpr_log(GPR_INFO, "generating response: %s",
138 response.ShortDebugString().c_str());
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700139
140 const gpr_slice response_slice =
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700141 gpr_slice_from_copied_string(response.SerializeAsString().c_str());
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700142 return response_slice;
143}
144
145static void drain_cq(grpc_completion_queue *cq) {
146 grpc_event ev;
147 do {
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700148 ev = grpc_completion_queue_next(cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
149 NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700150 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
151}
152
153static void sleep_ms(int delay_ms) {
154 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
155 gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
156}
157
158static void start_lb_server(server_fixture *sf, int *ports, size_t nports,
159 int update_delay_ms) {
160 grpc_call *s;
161 cq_verifier *cqv = cq_verifier_create(sf->cq);
162 grpc_op ops[6];
163 grpc_op *op;
164 grpc_metadata_array request_metadata_recv;
165 grpc_call_details call_details;
166 grpc_call_error error;
167 int was_cancelled = 2;
168 grpc_byte_buffer *request_payload_recv;
169 grpc_byte_buffer *response_payload;
170
171 memset(ops, 0, sizeof(ops));
172 grpc_metadata_array_init(&request_metadata_recv);
173 grpc_call_details_init(&call_details);
174
175 error = grpc_server_request_call(sf->server, &s, &call_details,
176 &request_metadata_recv, sf->cq, sf->cq,
177 tag(200));
178 GPR_ASSERT(GRPC_CALL_OK == error);
179 gpr_log(GPR_INFO, "LB Server[%s] up", sf->servers_hostport);
180 cq_expect_completion(cqv, tag(200), 1);
181 cq_verify(cqv);
182 gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport);
183
184 op = ops;
185 op->op = GRPC_OP_SEND_INITIAL_METADATA;
186 op->data.send_initial_metadata.count = 0;
187 op->flags = 0;
188 op->reserved = NULL;
189 op++;
190 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
191 op->data.recv_close_on_server.cancelled = &was_cancelled;
192 op->flags = 0;
193 op->reserved = NULL;
194 op++;
195 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL);
196 GPR_ASSERT(GRPC_CALL_OK == error);
197 gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport);
198
David Garcia Quintas390673a2016-06-28 10:38:19 -0700199 // receive request for backends
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700200 op = ops;
201 op->op = GRPC_OP_RECV_MESSAGE;
202 op->data.recv_message = &request_payload_recv;
203 op->flags = 0;
204 op->reserved = NULL;
205 op++;
206 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL);
207 GPR_ASSERT(GRPC_CALL_OK == error);
208 cq_expect_completion(cqv, tag(202), 1);
209 cq_verify(cqv);
210 gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport);
211 // TODO(dgq): validate request.
212 grpc_byte_buffer_destroy(request_payload_recv);
213 gpr_slice response_payload_slice;
214 for (int i = 0; i < 2; i++) {
215 if (i == 0) {
216 // First half of the ports.
217 response_payload_slice =
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700218 build_response_payload_slice("127.0.0.1", ports, nports / 2, -1, -1);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700219 } else {
220 // Second half of the ports.
221 sleep_ms(update_delay_ms);
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700222 response_payload_slice =
223 build_response_payload_slice("127.0.0.1", ports + (nports / 2),
224 (nports + 1) / 2 /* ceil */, -1, -1);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700225 }
226
227 response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
228 op = ops;
229 op->op = GRPC_OP_SEND_MESSAGE;
230 op->data.send_message = response_payload;
231 op->flags = 0;
232 op->reserved = NULL;
233 op++;
234 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(203), NULL);
235 GPR_ASSERT(GRPC_CALL_OK == error);
236 cq_expect_completion(cqv, tag(203), 1);
237 cq_verify(cqv);
238 gpr_log(GPR_INFO, "LB Server[%s] after SEND_MESSAGE, iter %d",
239 sf->servers_hostport, i);
240
241 grpc_byte_buffer_destroy(response_payload);
242 gpr_slice_unref(response_payload_slice);
243 }
244 gpr_log(GPR_INFO, "LB Server[%s] shutting down", sf->servers_hostport);
245
246 op = ops;
247 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
248 op->data.send_status_from_server.trailing_metadata_count = 0;
249 op->data.send_status_from_server.status = GRPC_STATUS_OK;
250 op->data.send_status_from_server.status_details = "xyz";
251 op->flags = 0;
252 op->reserved = NULL;
253 op++;
254 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(204), NULL);
255 GPR_ASSERT(GRPC_CALL_OK == error);
256
257 cq_expect_completion(cqv, tag(201), 1);
258 cq_expect_completion(cqv, tag(204), 1);
259 cq_verify(cqv);
260 gpr_log(GPR_INFO, "LB Server[%s] after tag 204. All done. LB server out",
261 sf->servers_hostport);
262
263 grpc_call_destroy(s);
264
265 cq_verifier_destroy(cqv);
266
267 grpc_metadata_array_destroy(&request_metadata_recv);
268 grpc_call_details_destroy(&call_details);
269}
270
271static void start_backend_server(server_fixture *sf) {
272 grpc_call *s;
273 cq_verifier *cqv;
274 grpc_op ops[6];
275 grpc_op *op;
276 grpc_metadata_array request_metadata_recv;
277 grpc_call_details call_details;
278 grpc_call_error error;
279 int was_cancelled;
280 grpc_byte_buffer *request_payload_recv;
281 grpc_byte_buffer *response_payload;
282 grpc_event ev;
283
284 while (true) {
285 memset(ops, 0, sizeof(ops));
286 cqv = cq_verifier_create(sf->cq);
287 was_cancelled = 2;
288 grpc_metadata_array_init(&request_metadata_recv);
289 grpc_call_details_init(&call_details);
290
291 error = grpc_server_request_call(sf->server, &s, &call_details,
292 &request_metadata_recv, sf->cq, sf->cq,
293 tag(100));
294 GPR_ASSERT(GRPC_CALL_OK == error);
295 gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport);
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700296 ev = grpc_completion_queue_next(sf->cq,
297 GRPC_TIMEOUT_SECONDS_TO_DEADLINE(60), NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700298 if (!ev.success) {
299 gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport);
300 cq_verifier_destroy(cqv);
301 grpc_metadata_array_destroy(&request_metadata_recv);
302 grpc_call_details_destroy(&call_details);
303 return;
304 }
305 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
306 gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport);
307
308 op = ops;
309 op->op = GRPC_OP_SEND_INITIAL_METADATA;
310 op->data.send_initial_metadata.count = 0;
311 op->flags = 0;
312 op->reserved = NULL;
313 op++;
314 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
315 op->data.recv_close_on_server.cancelled = &was_cancelled;
316 op->flags = 0;
317 op->reserved = NULL;
318 op++;
319 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(101), NULL);
320 GPR_ASSERT(GRPC_CALL_OK == error);
321 gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport);
322
323 bool exit = false;
324 gpr_slice response_payload_slice =
325 gpr_slice_from_copied_string("hello you");
326 while (!exit) {
327 op = ops;
328 op->op = GRPC_OP_RECV_MESSAGE;
329 op->data.recv_message = &request_payload_recv;
330 op->flags = 0;
331 op->reserved = NULL;
332 op++;
333 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
334 GPR_ASSERT(GRPC_CALL_OK == error);
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700335 ev = grpc_completion_queue_next(
336 sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700337 if (ev.type == GRPC_OP_COMPLETE && ev.success) {
338 GPR_ASSERT(ev.tag = tag(102));
339 if (request_payload_recv == NULL) {
340 exit = true;
341 gpr_log(GPR_INFO,
342 "Server[%s] recv \"close\" from client, exiting. Call #%d",
343 sf->servers_hostport, sf->num_calls_serviced);
344 }
345 } else {
346 gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d",
347 sf->servers_hostport, sf->num_calls_serviced);
348 exit = true;
349 }
350 gpr_log(GPR_INFO, "Server[%s] after tag 102. Call #%d",
351 sf->servers_hostport, sf->num_calls_serviced);
352
353 if (!exit) {
354 response_payload =
355 grpc_raw_byte_buffer_create(&response_payload_slice, 1);
356 op = ops;
357 op->op = GRPC_OP_SEND_MESSAGE;
358 op->data.send_message = response_payload;
359 op->flags = 0;
360 op->reserved = NULL;
361 op++;
362 error =
363 grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
364 GPR_ASSERT(GRPC_CALL_OK == error);
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700365 ev = grpc_completion_queue_next(
366 sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700367 if (ev.type == GRPC_OP_COMPLETE && ev.success) {
368 GPR_ASSERT(ev.tag = tag(103));
369 } else {
370 gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d",
371 sf->servers_hostport, sf->num_calls_serviced);
372 exit = true;
373 }
374 gpr_log(GPR_INFO, "Server[%s] after tag 103. Call #%d",
375 sf->servers_hostport, sf->num_calls_serviced);
376 grpc_byte_buffer_destroy(response_payload);
377 }
378
379 grpc_byte_buffer_destroy(request_payload_recv);
380 }
381 ++sf->num_calls_serviced;
382
383 gpr_log(GPR_INFO, "Server[%s] OUT OF THE LOOP", sf->servers_hostport);
384 gpr_slice_unref(response_payload_slice);
385
386 op = ops;
387 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
388 op->data.send_status_from_server.trailing_metadata_count = 0;
389 op->data.send_status_from_server.status = GRPC_STATUS_OK;
390 op->data.send_status_from_server.status_details = "Backend server out a-ok";
391 op->flags = 0;
392 op->reserved = NULL;
393 op++;
394 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL);
395 GPR_ASSERT(GRPC_CALL_OK == error);
396
397 cq_expect_completion(cqv, tag(101), 1);
398 cq_expect_completion(cqv, tag(104), 1);
399 cq_verify(cqv);
400 gpr_log(GPR_INFO, "Server[%s] DONE. After servicing %d calls",
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700401 sf->servers_hostport, sf->num_calls_serviced);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700402
403 grpc_call_destroy(s);
404 cq_verifier_destroy(cqv);
405 grpc_metadata_array_destroy(&request_metadata_recv);
406 grpc_call_details_destroy(&call_details);
407 }
408}
409
410static void perform_request(client_fixture *cf) {
411 grpc_call *c;
412 cq_verifier *cqv = cq_verifier_create(cf->cq);
413 grpc_op ops[6];
414 grpc_op *op;
415 grpc_metadata_array initial_metadata_recv;
416 grpc_metadata_array trailing_metadata_recv;
417 grpc_status_code status;
418 grpc_call_error error;
419 char *details = NULL;
420 size_t details_capacity = 0;
421 grpc_byte_buffer *request_payload;
422 grpc_byte_buffer *response_payload_recv;
423 int i;
424
425 memset(ops, 0, sizeof(ops));
426 gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
427
428 c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS,
429 cf->cq, "/foo", "foo.test.google.fr:1234",
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700430 GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1000), NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700431 gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c);
432 GPR_ASSERT(c);
433 char *peer;
434
435 grpc_metadata_array_init(&initial_metadata_recv);
436 grpc_metadata_array_init(&trailing_metadata_recv);
437
438 op = ops;
439 op->op = GRPC_OP_SEND_INITIAL_METADATA;
440 op->data.send_initial_metadata.count = 0;
441 op->flags = 0;
442 op->reserved = NULL;
443 op++;
444 op->op = GRPC_OP_RECV_INITIAL_METADATA;
445 op->data.recv_initial_metadata = &initial_metadata_recv;
446 op->flags = 0;
447 op->reserved = NULL;
448 op++;
449 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
450 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
451 op->data.recv_status_on_client.status = &status;
452 op->data.recv_status_on_client.status_details = &details;
453 op->data.recv_status_on_client.status_details_capacity = &details_capacity;
454 op->flags = 0;
455 op->reserved = NULL;
456 op++;
457 error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
458 GPR_ASSERT(GRPC_CALL_OK == error);
459
460 for (i = 0; i < 4; i++) {
461 request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
462
463 op = ops;
464 op->op = GRPC_OP_SEND_MESSAGE;
465 op->data.send_message = request_payload;
466 op->flags = 0;
467 op->reserved = NULL;
468 op++;
469 op->op = GRPC_OP_RECV_MESSAGE;
470 op->data.recv_message = &response_payload_recv;
471 op->flags = 0;
472 op->reserved = NULL;
473 op++;
474 error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
475 GPR_ASSERT(GRPC_CALL_OK == error);
476
477 peer = grpc_call_get_peer(c);
478 cq_expect_completion(cqv, tag(2), 1);
479 cq_verify(cqv);
480 gpr_free(peer);
481
482 grpc_byte_buffer_destroy(request_payload);
483 grpc_byte_buffer_destroy(response_payload_recv);
484 }
485
486 gpr_slice_unref(request_payload_slice);
487
488 op = ops;
489 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
490 op->flags = 0;
491 op->reserved = NULL;
492 op++;
493 error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL);
494 GPR_ASSERT(GRPC_CALL_OK == error);
495
496 cq_expect_completion(cqv, tag(1), 1);
497 cq_expect_completion(cqv, tag(3), 1);
498 cq_verify(cqv);
499 peer = grpc_call_get_peer(c);
500 gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer);
501 gpr_free(peer);
502
503 grpc_call_destroy(c);
504
505 cq_verify_empty_timeout(cqv, 1);
506 cq_verifier_destroy(cqv);
507
508 grpc_metadata_array_destroy(&initial_metadata_recv);
509 grpc_metadata_array_destroy(&trailing_metadata_recv);
510 gpr_free(details);
511}
512
513static void setup_client(const char *server_hostport, client_fixture *cf) {
514 cf->cq = grpc_completion_queue_create(NULL);
515 cf->server_uri = gpr_strdup(server_hostport);
516 cf->client = grpc_insecure_channel_create(cf->server_uri, NULL, NULL);
517}
518
519static void teardown_client(client_fixture *cf) {
520 grpc_completion_queue_shutdown(cf->cq);
521 drain_cq(cf->cq);
522 grpc_completion_queue_destroy(cf->cq);
523 cf->cq = NULL;
524 grpc_channel_destroy(cf->client);
525 cf->client = NULL;
526 gpr_free(cf->server_uri);
527}
528
529static void setup_server(const char *host, server_fixture *sf) {
530 int assigned_port;
531
532 sf->cq = grpc_completion_queue_create(NULL);
David Garcia Quintas55145c02016-06-21 14:51:54 -0700533 const char *colon_idx = strchr(host, ':');
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700534 if (colon_idx) {
David Garcia Quintas55145c02016-06-21 14:51:54 -0700535 const char *port_str = colon_idx + 1;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700536 sf->port = atoi(port_str);
537 sf->servers_hostport = gpr_strdup(host);
538 } else {
539 sf->port = grpc_pick_unused_port_or_die();
540 gpr_join_host_port(&sf->servers_hostport, host, sf->port);
541 }
542
543 sf->server = grpc_server_create(NULL, NULL);
544 grpc_server_register_completion_queue(sf->server, sf->cq, NULL);
545 GPR_ASSERT((assigned_port = grpc_server_add_insecure_http2_port(
546 sf->server, sf->servers_hostport)) > 0);
547 GPR_ASSERT(sf->port == assigned_port);
548 grpc_server_start(sf->server);
549}
550
551static void teardown_server(server_fixture *sf) {
552 if (!sf->server) return;
553
554 gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport);
555 grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000));
556 GPR_ASSERT(grpc_completion_queue_pluck(
557 sf->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
558 .type == GRPC_OP_COMPLETE);
559 grpc_server_destroy(sf->server);
560 gpr_thd_join(sf->tid);
561
562 sf->server = NULL;
563 grpc_completion_queue_shutdown(sf->cq);
564 drain_cq(sf->cq);
565 grpc_completion_queue_destroy(sf->cq);
566
567 gpr_log(GPR_INFO, "Server[%s] bye bye", sf->servers_hostport);
568 gpr_free(sf->servers_hostport);
569}
570
571static void fork_backend_server(void *arg) {
David Garcia Quintas55145c02016-06-21 14:51:54 -0700572 server_fixture *sf = static_cast<server_fixture *>(arg);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700573 start_backend_server(sf);
574}
575
576static void fork_lb_server(void *arg) {
David Garcia Quintas55145c02016-06-21 14:51:54 -0700577 test_fixture *tf = static_cast<test_fixture *>(arg);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700578 int ports[NUM_BACKENDS];
579 for (int i = 0; i < NUM_BACKENDS; i++) {
580 ports[i] = tf->lb_backends[i].port;
581 }
582 start_lb_server(&tf->lb_server, ports, NUM_BACKENDS,
583 tf->lb_server_update_delay_ms);
584}
585
586static void setup_test_fixture(test_fixture *tf,
587 int lb_server_update_delay_ms) {
588 tf->lb_server_update_delay_ms = lb_server_update_delay_ms;
589
590 gpr_thd_options options = gpr_thd_options_default();
591 gpr_thd_options_set_joinable(&options);
592
593 for (int i = 0; i < NUM_BACKENDS; ++i) {
594 setup_server("127.0.0.1", &tf->lb_backends[i]);
595 gpr_thd_new(&tf->lb_backends[i].tid, fork_backend_server,
596 &tf->lb_backends[i], &options);
597 }
598
599 setup_server("127.0.0.1", &tf->lb_server);
600 gpr_thd_new(&tf->lb_server.tid, fork_lb_server, &tf->lb_server, &options);
601
602 char *server_uri;
603 gpr_asprintf(&server_uri, "ipv4:%s?lb_policy=grpclb&lb_enabled=1",
604 tf->lb_server.servers_hostport);
605 setup_client(server_uri, &tf->client);
606 gpr_free(server_uri);
607}
608
609static void teardown_test_fixture(test_fixture *tf) {
610 teardown_client(&tf->client);
611 for (int i = 0; i < NUM_BACKENDS; ++i) {
612 teardown_server(&tf->lb_backends[i]);
613 }
614 teardown_server(&tf->lb_server);
615}
616
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700617// The LB server will send two updates: batch 1 and batch 2. Each batch
618// contains
619// two addresses, both of a valid and running backend server. Batch 1 is
620// readily
621// available and provided as soon as the client establishes the streaming
622// call.
623// Batch 2 is sent after a delay of \a lb_server_update_delay_ms
624// milliseconds.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700625static test_fixture test_update(int lb_server_update_delay_ms) {
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700626 gpr_log(GPR_INFO, "start %s(%d)", __func__, lb_server_update_delay_ms);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700627 test_fixture tf;
628 memset(&tf, 0, sizeof(tf));
629 setup_test_fixture(&tf, lb_server_update_delay_ms);
630 perform_request(
631 &tf.client); // "consumes" 1st backend server of 1st serverlist
632 perform_request(
633 &tf.client); // "consumes" 2nd backend server of 1st serverlist
634
635 perform_request(
636 &tf.client); // "consumes" 1st backend server of 2nd serverlist
637 perform_request(
638 &tf.client); // "consumes" 2nd backend server of 2nd serverlist
639
640 teardown_test_fixture(&tf);
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700641 gpr_log(GPR_INFO, "end %s(%d)", __func__, lb_server_update_delay_ms);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700642 return tf;
643}
644
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700645} // namespace
646} // namespace grpc
647
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700648int main(int argc, char **argv) {
649 grpc_test_init(argc, argv);
650 grpc_init();
651
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700652 grpc::test_fixture tf_result;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700653 // Clients take a bit over one second to complete a call (the last part of the
654 // call sleeps for 1 second while verifying the client's completion queue is
655 // empty). Therefore:
656 //
657 // If the LB server waits 800ms before sending an update, it will arrive
658 // before the first client request is done, skipping the second server from
659 // batch 1 altogether: the 2nd client request will go to the 1st server of
660 // batch 2 (ie, the third one out of the four total servers).
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700661 tf_result = grpc::test_update(800);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700662 GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
663 GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0);
664 GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2);
665 GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
666
667 // If the LB server waits 1500ms, the update arrives after having picked the
668 // 2nd server from batch 1 but before the next pick for the first server of
669 // batch 2. All server are used.
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700670 tf_result = grpc::test_update(1500);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700671 GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
672 GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
673 GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1);
674 GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
675
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700676 // If the LB server waits > 2000ms, the update arrives after the first two
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700677 // request are done and the third pick is performed, which returns, in RR
678 // fashion, the 1st server of the 1st update. Therefore, the second server of
David Garcia Quintase60ae9c2016-08-02 16:37:41 -0700679 // batch 1 is hit at least one, whereas the first server of batch 2 is never
680 // hit.
681 tf_result = grpc::test_update(2500);
682 GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced >= 1);
683 GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0);
684 GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700685 GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0);
686
687 grpc_shutdown();
688 return 0;
689}