blob: f83935fa1a1ec210b8c4eea56b11bfa499894445 [file] [log] [blame]
David Garcia Quintas3fb8f732016-06-15 22:53:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
David Garcia Quintasd9cee6f2016-08-01 17:42:47 -070034#include <cinttypes>
David Garcia Quintasaaba1312016-06-22 18:10:37 -070035#include <cstdarg>
David Garcia Quintasd9cee6f2016-08-01 17:42:47 -070036#include <cstdint>
David Garcia Quintasaaba1312016-06-22 18:10:37 -070037#include <cstring>
38#include <string>
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070039
David Garcia Quintas55145c02016-06-21 14:51:54 -070040extern "C" {
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070041#include <grpc/grpc.h>
42#include <grpc/support/alloc.h>
43#include <grpc/support/host_port.h>
44#include <grpc/support/log.h>
45#include <grpc/support/string_util.h>
46#include <grpc/support/sync.h>
47#include <grpc/support/thd.h>
48#include <grpc/support/time.h>
49
50#include "src/core/ext/client_config/client_channel.h"
51#include "src/core/lib/channel/channel_stack.h"
52#include "src/core/lib/support/string.h"
53#include "src/core/lib/support/tmpfile.h"
54#include "src/core/lib/surface/channel.h"
55#include "src/core/lib/surface/server.h"
56#include "test/core/end2end/cq_verifier.h"
57#include "test/core/util/port.h"
58#include "test/core/util/test_config.h"
David Garcia Quintas55145c02016-06-21 14:51:54 -070059}
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070060
David Garcia Quintasaaba1312016-06-22 18:10:37 -070061#include "src/proto/grpc/lb/v1/load_balancer.pb.h"
62
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070063#define NUM_BACKENDS 4
David Garcia Quintas5b0e9462016-08-15 19:38:39 -070064#define PAYLOAD "hello you"
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070065
David Garcia Quintasf9f856b2016-06-22 18:25:53 -070066// TODO(dgq): Other scenarios in need of testing:
David Garcia Quintasea11d162016-07-14 17:27:28 -070067// - Send an empty serverlist update and verify that the client request blocks
68// until a new serverlist with actual contents is available.
David Garcia Quintasf9f856b2016-06-22 18:25:53 -070069// - Send identical serverlist update
70// - Test reception of invalid serverlist
71// - Test pinging
David Garcia Quintas8b3b97f2016-07-15 07:46:47 -070072// - Test against a non-LB server. That server should return UNIMPLEMENTED and
David Garcia Quintasa26c77b2016-07-18 12:57:09 -070073// the call should fail.
74// - Random LB server closing the stream unexpectedly.
David Garcia Quintasf9f856b2016-06-22 18:25:53 -070075
David Garcia Quintasaaba1312016-06-22 18:10:37 -070076namespace grpc {
77namespace {
78
David Garcia Quintas3fb8f732016-06-15 22:53:08 -070079typedef struct client_fixture {
80 grpc_channel *client;
81 char *server_uri;
82 grpc_completion_queue *cq;
83} client_fixture;
84
85typedef struct server_fixture {
86 grpc_server *server;
87 grpc_call *server_call;
88 grpc_completion_queue *cq;
89 char *servers_hostport;
90 int port;
91 gpr_thd_id tid;
92 int num_calls_serviced;
93} server_fixture;
94
95typedef struct test_fixture {
96 server_fixture lb_server;
97 server_fixture lb_backends[NUM_BACKENDS];
98 client_fixture client;
99 int lb_server_update_delay_ms;
100} test_fixture;
101
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700102static void *tag(intptr_t t) { return (void *)t; }
103
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700104static gpr_slice build_response_payload_slice(
105 const char *host, int *ports, size_t nports,
106 int64_t expiration_interval_secs, int32_t expiration_interval_nanos) {
David Garcia Quintasf9f856b2016-06-22 18:25:53 -0700107 // server_list {
108 // servers {
109 // ip_address: "127.0.0.1"
110 // port: ...
111 // load_balance_token: "token..."
112 // }
113 // ...
114 // }
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700115 grpc::lb::v1::LoadBalanceResponse response;
116 auto *serverlist = response.mutable_server_list();
117
118 if (expiration_interval_secs > 0 || expiration_interval_nanos > 0) {
119 auto *expiration_interval = serverlist->mutable_expiration_interval();
120 if (expiration_interval_secs > 0) {
121 expiration_interval->set_seconds(expiration_interval_secs);
122 }
123 if (expiration_interval_nanos > 0) {
124 expiration_interval->set_nanos(expiration_interval_nanos);
125 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700126 }
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700127 for (size_t i = 0; i < nports; i++) {
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700128 auto *server = serverlist->add_servers();
129 server->set_ip_address(host);
130 server->set_port(ports[i]);
David Garcia Quintasc534b0d2016-06-28 11:48:05 -0700131 // The following long long int cast is meant to work around the
132 // disfunctional implementation of std::to_string in gcc 4.4, which doesn't
133 // have a version for int but does have one for long long int.
134 server->set_load_balance_token("token" +
135 std::to_string((long long int)ports[i]));
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700136 }
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700137
138 gpr_log(GPR_INFO, "generating response: %s",
139 response.ShortDebugString().c_str());
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700140
141 const gpr_slice response_slice =
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700142 gpr_slice_from_copied_string(response.SerializeAsString().c_str());
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700143 return response_slice;
144}
145
146static void drain_cq(grpc_completion_queue *cq) {
147 grpc_event ev;
148 do {
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700149 ev = grpc_completion_queue_next(cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
150 NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700151 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
152}
153
154static void sleep_ms(int delay_ms) {
155 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
156 gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
157}
158
159static void start_lb_server(server_fixture *sf, int *ports, size_t nports,
160 int update_delay_ms) {
161 grpc_call *s;
162 cq_verifier *cqv = cq_verifier_create(sf->cq);
163 grpc_op ops[6];
164 grpc_op *op;
165 grpc_metadata_array request_metadata_recv;
166 grpc_call_details call_details;
167 grpc_call_error error;
168 int was_cancelled = 2;
169 grpc_byte_buffer *request_payload_recv;
170 grpc_byte_buffer *response_payload;
171
172 memset(ops, 0, sizeof(ops));
173 grpc_metadata_array_init(&request_metadata_recv);
174 grpc_call_details_init(&call_details);
175
176 error = grpc_server_request_call(sf->server, &s, &call_details,
177 &request_metadata_recv, sf->cq, sf->cq,
178 tag(200));
179 GPR_ASSERT(GRPC_CALL_OK == error);
180 gpr_log(GPR_INFO, "LB Server[%s] up", sf->servers_hostport);
181 cq_expect_completion(cqv, tag(200), 1);
182 cq_verify(cqv);
183 gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport);
184
185 op = ops;
186 op->op = GRPC_OP_SEND_INITIAL_METADATA;
187 op->data.send_initial_metadata.count = 0;
188 op->flags = 0;
189 op->reserved = NULL;
190 op++;
191 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
192 op->data.recv_close_on_server.cancelled = &was_cancelled;
193 op->flags = 0;
194 op->reserved = NULL;
195 op++;
196 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL);
197 GPR_ASSERT(GRPC_CALL_OK == error);
198 gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport);
199
David Garcia Quintas390673a2016-06-28 10:38:19 -0700200 // receive request for backends
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700201 op = ops;
202 op->op = GRPC_OP_RECV_MESSAGE;
203 op->data.recv_message = &request_payload_recv;
204 op->flags = 0;
205 op->reserved = NULL;
206 op++;
207 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL);
208 GPR_ASSERT(GRPC_CALL_OK == error);
209 cq_expect_completion(cqv, tag(202), 1);
210 cq_verify(cqv);
211 gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport);
212 // TODO(dgq): validate request.
213 grpc_byte_buffer_destroy(request_payload_recv);
214 gpr_slice response_payload_slice;
215 for (int i = 0; i < 2; i++) {
216 if (i == 0) {
217 // First half of the ports.
218 response_payload_slice =
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700219 build_response_payload_slice("127.0.0.1", ports, nports / 2, -1, -1);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700220 } else {
221 // Second half of the ports.
222 sleep_ms(update_delay_ms);
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700223 response_payload_slice =
224 build_response_payload_slice("127.0.0.1", ports + (nports / 2),
225 (nports + 1) / 2 /* ceil */, -1, -1);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700226 }
227
228 response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
229 op = ops;
230 op->op = GRPC_OP_SEND_MESSAGE;
231 op->data.send_message = response_payload;
232 op->flags = 0;
233 op->reserved = NULL;
234 op++;
235 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(203), NULL);
236 GPR_ASSERT(GRPC_CALL_OK == error);
237 cq_expect_completion(cqv, tag(203), 1);
238 cq_verify(cqv);
239 gpr_log(GPR_INFO, "LB Server[%s] after SEND_MESSAGE, iter %d",
240 sf->servers_hostport, i);
241
242 grpc_byte_buffer_destroy(response_payload);
243 gpr_slice_unref(response_payload_slice);
244 }
245 gpr_log(GPR_INFO, "LB Server[%s] shutting down", sf->servers_hostport);
246
247 op = ops;
248 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
249 op->data.send_status_from_server.trailing_metadata_count = 0;
250 op->data.send_status_from_server.status = GRPC_STATUS_OK;
251 op->data.send_status_from_server.status_details = "xyz";
252 op->flags = 0;
253 op->reserved = NULL;
254 op++;
255 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(204), NULL);
256 GPR_ASSERT(GRPC_CALL_OK == error);
257
258 cq_expect_completion(cqv, tag(201), 1);
259 cq_expect_completion(cqv, tag(204), 1);
260 cq_verify(cqv);
261 gpr_log(GPR_INFO, "LB Server[%s] after tag 204. All done. LB server out",
262 sf->servers_hostport);
263
264 grpc_call_destroy(s);
265
266 cq_verifier_destroy(cqv);
267
268 grpc_metadata_array_destroy(&request_metadata_recv);
269 grpc_call_details_destroy(&call_details);
270}
271
272static void start_backend_server(server_fixture *sf) {
273 grpc_call *s;
274 cq_verifier *cqv;
275 grpc_op ops[6];
276 grpc_op *op;
277 grpc_metadata_array request_metadata_recv;
278 grpc_call_details call_details;
279 grpc_call_error error;
280 int was_cancelled;
281 grpc_byte_buffer *request_payload_recv;
282 grpc_byte_buffer *response_payload;
283 grpc_event ev;
284
285 while (true) {
286 memset(ops, 0, sizeof(ops));
287 cqv = cq_verifier_create(sf->cq);
288 was_cancelled = 2;
289 grpc_metadata_array_init(&request_metadata_recv);
290 grpc_call_details_init(&call_details);
291
292 error = grpc_server_request_call(sf->server, &s, &call_details,
293 &request_metadata_recv, sf->cq, sf->cq,
294 tag(100));
295 GPR_ASSERT(GRPC_CALL_OK == error);
296 gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport);
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700297 ev = grpc_completion_queue_next(sf->cq,
298 GRPC_TIMEOUT_SECONDS_TO_DEADLINE(60), NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700299 if (!ev.success) {
300 gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport);
301 cq_verifier_destroy(cqv);
302 grpc_metadata_array_destroy(&request_metadata_recv);
303 grpc_call_details_destroy(&call_details);
304 return;
305 }
306 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700307 char *expected_token;
308 GPR_ASSERT(gpr_asprintf(&expected_token, "token%d", sf->port) > 0);
309 GPR_ASSERT(contains_metadata(&request_metadata_recv,
310 "load-reporting-initial", expected_token));
311 gpr_free(expected_token);
312
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700313 gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport);
314
315 op = ops;
316 op->op = GRPC_OP_SEND_INITIAL_METADATA;
317 op->data.send_initial_metadata.count = 0;
318 op->flags = 0;
319 op->reserved = NULL;
320 op++;
321 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
322 op->data.recv_close_on_server.cancelled = &was_cancelled;
323 op->flags = 0;
324 op->reserved = NULL;
325 op++;
326 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(101), NULL);
327 GPR_ASSERT(GRPC_CALL_OK == error);
328 gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport);
329
330 bool exit = false;
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700331 gpr_slice response_payload_slice = gpr_slice_from_copied_string(PAYLOAD);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700332 while (!exit) {
333 op = ops;
334 op->op = GRPC_OP_RECV_MESSAGE;
335 op->data.recv_message = &request_payload_recv;
336 op->flags = 0;
337 op->reserved = NULL;
338 op++;
339 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
340 GPR_ASSERT(GRPC_CALL_OK == error);
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700341 ev = grpc_completion_queue_next(
342 sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700343 if (ev.type == GRPC_OP_COMPLETE && ev.success) {
344 GPR_ASSERT(ev.tag = tag(102));
345 if (request_payload_recv == NULL) {
346 exit = true;
347 gpr_log(GPR_INFO,
348 "Server[%s] recv \"close\" from client, exiting. Call #%d",
349 sf->servers_hostport, sf->num_calls_serviced);
350 }
351 } else {
352 gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d",
353 sf->servers_hostport, sf->num_calls_serviced);
354 exit = true;
355 }
356 gpr_log(GPR_INFO, "Server[%s] after tag 102. Call #%d",
357 sf->servers_hostport, sf->num_calls_serviced);
358
359 if (!exit) {
360 response_payload =
361 grpc_raw_byte_buffer_create(&response_payload_slice, 1);
362 op = ops;
363 op->op = GRPC_OP_SEND_MESSAGE;
364 op->data.send_message = response_payload;
365 op->flags = 0;
366 op->reserved = NULL;
367 op++;
368 error =
369 grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
370 GPR_ASSERT(GRPC_CALL_OK == error);
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700371 ev = grpc_completion_queue_next(
372 sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700373 if (ev.type == GRPC_OP_COMPLETE && ev.success) {
374 GPR_ASSERT(ev.tag = tag(103));
375 } else {
376 gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d",
377 sf->servers_hostport, sf->num_calls_serviced);
378 exit = true;
379 }
380 gpr_log(GPR_INFO, "Server[%s] after tag 103. Call #%d",
381 sf->servers_hostport, sf->num_calls_serviced);
382 grpc_byte_buffer_destroy(response_payload);
383 }
384
385 grpc_byte_buffer_destroy(request_payload_recv);
386 }
387 ++sf->num_calls_serviced;
388
389 gpr_log(GPR_INFO, "Server[%s] OUT OF THE LOOP", sf->servers_hostport);
390 gpr_slice_unref(response_payload_slice);
391
392 op = ops;
393 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
394 op->data.send_status_from_server.trailing_metadata_count = 0;
395 op->data.send_status_from_server.status = GRPC_STATUS_OK;
396 op->data.send_status_from_server.status_details = "Backend server out a-ok";
397 op->flags = 0;
398 op->reserved = NULL;
399 op++;
400 error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL);
401 GPR_ASSERT(GRPC_CALL_OK == error);
402
403 cq_expect_completion(cqv, tag(101), 1);
404 cq_expect_completion(cqv, tag(104), 1);
405 cq_verify(cqv);
406 gpr_log(GPR_INFO, "Server[%s] DONE. After servicing %d calls",
David Garcia Quintas8782d1b2016-06-15 23:58:44 -0700407 sf->servers_hostport, sf->num_calls_serviced);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700408
409 grpc_call_destroy(s);
410 cq_verifier_destroy(cqv);
411 grpc_metadata_array_destroy(&request_metadata_recv);
412 grpc_call_details_destroy(&call_details);
413 }
414}
415
416static void perform_request(client_fixture *cf) {
417 grpc_call *c;
418 cq_verifier *cqv = cq_verifier_create(cf->cq);
419 grpc_op ops[6];
420 grpc_op *op;
421 grpc_metadata_array initial_metadata_recv;
422 grpc_metadata_array trailing_metadata_recv;
423 grpc_status_code status;
424 grpc_call_error error;
425 char *details = NULL;
426 size_t details_capacity = 0;
427 grpc_byte_buffer *request_payload;
428 grpc_byte_buffer *response_payload_recv;
429 int i;
430
431 memset(ops, 0, sizeof(ops));
432 gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
433
434 c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS,
435 cf->cq, "/foo", "foo.test.google.fr:1234",
David Garcia Quintas4166cb02016-07-29 14:33:15 -0700436 GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1000), NULL);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700437 gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c);
438 GPR_ASSERT(c);
439 char *peer;
440
441 grpc_metadata_array_init(&initial_metadata_recv);
442 grpc_metadata_array_init(&trailing_metadata_recv);
443
444 op = ops;
445 op->op = GRPC_OP_SEND_INITIAL_METADATA;
446 op->data.send_initial_metadata.count = 0;
447 op->flags = 0;
448 op->reserved = NULL;
449 op++;
450 op->op = GRPC_OP_RECV_INITIAL_METADATA;
451 op->data.recv_initial_metadata = &initial_metadata_recv;
452 op->flags = 0;
453 op->reserved = NULL;
454 op++;
455 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
456 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
457 op->data.recv_status_on_client.status = &status;
458 op->data.recv_status_on_client.status_details = &details;
459 op->data.recv_status_on_client.status_details_capacity = &details_capacity;
460 op->flags = 0;
461 op->reserved = NULL;
462 op++;
463 error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
464 GPR_ASSERT(GRPC_CALL_OK == error);
465
466 for (i = 0; i < 4; i++) {
467 request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
468
469 op = ops;
470 op->op = GRPC_OP_SEND_MESSAGE;
471 op->data.send_message = request_payload;
472 op->flags = 0;
473 op->reserved = NULL;
474 op++;
475 op->op = GRPC_OP_RECV_MESSAGE;
476 op->data.recv_message = &response_payload_recv;
477 op->flags = 0;
478 op->reserved = NULL;
479 op++;
480 error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
481 GPR_ASSERT(GRPC_CALL_OK == error);
482
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700483 cq_expect_completion(cqv, tag(2), 1);
484 cq_verify(cqv);
David Garcia Quintas5b0e9462016-08-15 19:38:39 -0700485 GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD));
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700486
487 grpc_byte_buffer_destroy(request_payload);
488 grpc_byte_buffer_destroy(response_payload_recv);
489 }
490
491 gpr_slice_unref(request_payload_slice);
492
493 op = ops;
494 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
495 op->flags = 0;
496 op->reserved = NULL;
497 op++;
498 error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL);
499 GPR_ASSERT(GRPC_CALL_OK == error);
500
501 cq_expect_completion(cqv, tag(1), 1);
502 cq_expect_completion(cqv, tag(3), 1);
503 cq_verify(cqv);
504 peer = grpc_call_get_peer(c);
505 gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer);
506 gpr_free(peer);
507
508 grpc_call_destroy(c);
509
510 cq_verify_empty_timeout(cqv, 1);
511 cq_verifier_destroy(cqv);
512
513 grpc_metadata_array_destroy(&initial_metadata_recv);
514 grpc_metadata_array_destroy(&trailing_metadata_recv);
515 gpr_free(details);
516}
517
518static void setup_client(const char *server_hostport, client_fixture *cf) {
519 cf->cq = grpc_completion_queue_create(NULL);
520 cf->server_uri = gpr_strdup(server_hostport);
521 cf->client = grpc_insecure_channel_create(cf->server_uri, NULL, NULL);
522}
523
524static void teardown_client(client_fixture *cf) {
525 grpc_completion_queue_shutdown(cf->cq);
526 drain_cq(cf->cq);
527 grpc_completion_queue_destroy(cf->cq);
528 cf->cq = NULL;
529 grpc_channel_destroy(cf->client);
530 cf->client = NULL;
531 gpr_free(cf->server_uri);
532}
533
534static void setup_server(const char *host, server_fixture *sf) {
535 int assigned_port;
536
537 sf->cq = grpc_completion_queue_create(NULL);
David Garcia Quintas55145c02016-06-21 14:51:54 -0700538 const char *colon_idx = strchr(host, ':');
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700539 if (colon_idx) {
David Garcia Quintas55145c02016-06-21 14:51:54 -0700540 const char *port_str = colon_idx + 1;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700541 sf->port = atoi(port_str);
542 sf->servers_hostport = gpr_strdup(host);
543 } else {
544 sf->port = grpc_pick_unused_port_or_die();
545 gpr_join_host_port(&sf->servers_hostport, host, sf->port);
546 }
547
548 sf->server = grpc_server_create(NULL, NULL);
549 grpc_server_register_completion_queue(sf->server, sf->cq, NULL);
550 GPR_ASSERT((assigned_port = grpc_server_add_insecure_http2_port(
551 sf->server, sf->servers_hostport)) > 0);
552 GPR_ASSERT(sf->port == assigned_port);
553 grpc_server_start(sf->server);
554}
555
556static void teardown_server(server_fixture *sf) {
557 if (!sf->server) return;
558
559 gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport);
560 grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000));
561 GPR_ASSERT(grpc_completion_queue_pluck(
562 sf->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
563 .type == GRPC_OP_COMPLETE);
564 grpc_server_destroy(sf->server);
565 gpr_thd_join(sf->tid);
566
567 sf->server = NULL;
568 grpc_completion_queue_shutdown(sf->cq);
569 drain_cq(sf->cq);
570 grpc_completion_queue_destroy(sf->cq);
571
572 gpr_log(GPR_INFO, "Server[%s] bye bye", sf->servers_hostport);
573 gpr_free(sf->servers_hostport);
574}
575
576static void fork_backend_server(void *arg) {
David Garcia Quintas55145c02016-06-21 14:51:54 -0700577 server_fixture *sf = static_cast<server_fixture *>(arg);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700578 start_backend_server(sf);
579}
580
581static void fork_lb_server(void *arg) {
David Garcia Quintas55145c02016-06-21 14:51:54 -0700582 test_fixture *tf = static_cast<test_fixture *>(arg);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700583 int ports[NUM_BACKENDS];
584 for (int i = 0; i < NUM_BACKENDS; i++) {
585 ports[i] = tf->lb_backends[i].port;
586 }
587 start_lb_server(&tf->lb_server, ports, NUM_BACKENDS,
588 tf->lb_server_update_delay_ms);
589}
590
591static void setup_test_fixture(test_fixture *tf,
592 int lb_server_update_delay_ms) {
593 tf->lb_server_update_delay_ms = lb_server_update_delay_ms;
594
595 gpr_thd_options options = gpr_thd_options_default();
596 gpr_thd_options_set_joinable(&options);
597
598 for (int i = 0; i < NUM_BACKENDS; ++i) {
599 setup_server("127.0.0.1", &tf->lb_backends[i]);
600 gpr_thd_new(&tf->lb_backends[i].tid, fork_backend_server,
601 &tf->lb_backends[i], &options);
602 }
603
604 setup_server("127.0.0.1", &tf->lb_server);
605 gpr_thd_new(&tf->lb_server.tid, fork_lb_server, &tf->lb_server, &options);
606
607 char *server_uri;
608 gpr_asprintf(&server_uri, "ipv4:%s?lb_policy=grpclb&lb_enabled=1",
609 tf->lb_server.servers_hostport);
610 setup_client(server_uri, &tf->client);
611 gpr_free(server_uri);
612}
613
614static void teardown_test_fixture(test_fixture *tf) {
615 teardown_client(&tf->client);
616 for (int i = 0; i < NUM_BACKENDS; ++i) {
617 teardown_server(&tf->lb_backends[i]);
618 }
619 teardown_server(&tf->lb_server);
620}
621
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700622// The LB server will send two updates: batch 1 and batch 2. Each batch
623// contains
624// two addresses, both of a valid and running backend server. Batch 1 is
625// readily
626// available and provided as soon as the client establishes the streaming
627// call.
628// Batch 2 is sent after a delay of \a lb_server_update_delay_ms
629// milliseconds.
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700630static test_fixture test_update(int lb_server_update_delay_ms) {
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700631 gpr_log(GPR_INFO, "start %s(%d)", __func__, lb_server_update_delay_ms);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700632 test_fixture tf;
633 memset(&tf, 0, sizeof(tf));
634 setup_test_fixture(&tf, lb_server_update_delay_ms);
635 perform_request(
636 &tf.client); // "consumes" 1st backend server of 1st serverlist
637 perform_request(
638 &tf.client); // "consumes" 2nd backend server of 1st serverlist
639
640 perform_request(
641 &tf.client); // "consumes" 1st backend server of 2nd serverlist
642 perform_request(
643 &tf.client); // "consumes" 2nd backend server of 2nd serverlist
644
645 teardown_test_fixture(&tf);
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700646 gpr_log(GPR_INFO, "end %s(%d)", __func__, lb_server_update_delay_ms);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700647 return tf;
648}
649
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700650} // namespace
651} // namespace grpc
652
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700653int main(int argc, char **argv) {
654 grpc_test_init(argc, argv);
655 grpc_init();
656
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700657 grpc::test_fixture tf_result;
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700658 // Clients take a bit over one second to complete a call (the last part of the
659 // call sleeps for 1 second while verifying the client's completion queue is
660 // empty). Therefore:
661 //
662 // If the LB server waits 800ms before sending an update, it will arrive
663 // before the first client request is done, skipping the second server from
664 // batch 1 altogether: the 2nd client request will go to the 1st server of
665 // batch 2 (ie, the third one out of the four total servers).
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700666 tf_result = grpc::test_update(800);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700667 GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
668 GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0);
669 GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2);
670 GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
671
672 // If the LB server waits 1500ms, the update arrives after having picked the
673 // 2nd server from batch 1 but before the next pick for the first server of
674 // batch 2. All server are used.
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700675 tf_result = grpc::test_update(1500);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700676 GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
677 GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
678 GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1);
679 GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
680
David Garcia Quintasaaba1312016-06-22 18:10:37 -0700681 // If the LB server waits > 2000ms, the update arrives after the first two
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700682 // request are done and the third pick is performed, which returns, in RR
683 // fashion, the 1st server of the 1st update. Therefore, the second server of
David Garcia Quintase60ae9c2016-08-02 16:37:41 -0700684 // batch 1 is hit at least one, whereas the first server of batch 2 is never
685 // hit.
686 tf_result = grpc::test_update(2500);
687 GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced >= 1);
688 GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0);
689 GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0);
David Garcia Quintas3fb8f732016-06-15 22:53:08 -0700690 GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0);
691
692 grpc_shutdown();
693 return 0;
694}