blob: 51a05a36d7212372dcdcd09bfc0e5244a3cec4e4 [file] [log] [blame]
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "test/core/end2end/end2end_tests.h"
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
#include "src/core/ext/load_reporting/load_reporting.h"
#include "src/core/lib/channel/channel_args.h"
enum { TIMEOUT = 200000 };
static void *tag(intptr_t t) { return (void *)t; }
typedef struct {
uint32_t call_creation_token; /* expected 0xCAFED00D */
uint32_t call_destruction_token; /* expected 0xDEADD00D */
uint32_t channel_creation_token; /* expected 0xCAFEFACE */
uint32_t channel_destruction_token; /* expected 0xDEADFACE */
char *method_name;
uint64_t total_bytes;
bool fully_processed;
} aggregated_bw_stats;
static void sample_fn(const grpc_load_reporting_call_data *call_data,
void *user_data) {
GPR_ASSERT(user_data != NULL);
aggregated_bw_stats *custom_stats = (aggregated_bw_stats *)user_data;
switch (call_data->source) {
case GRPC_LR_POINT_CHANNEL_CREATION:
custom_stats->channel_creation_token = 0xCAFEFACE;
break;
case GRPC_LR_POINT_CHANNEL_DESTRUCTION:
custom_stats->channel_destruction_token = 0xDEADFACE;
break;
case GRPC_LR_POINT_CALL_CREATION:
custom_stats->call_creation_token = 0xCAFED00D;
break;
case GRPC_LR_POINT_CALL_DESTRUCTION:
custom_stats->method_name = gpr_strdup(call_data->method);
custom_stats->call_destruction_token = 0xDEADD00D;
custom_stats->total_bytes =
call_data->final_info->stats.transport_stream_stats.outgoing
.data_bytes +
call_data->final_info->stats.transport_stream_stats.incoming
.data_bytes;
custom_stats->fully_processed = true;
break;
default:
abort();
}
}
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char *test_name,
grpc_channel_args *client_args,
grpc_channel_args *server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_server(&f, server_args);
config.init_client(&f, client_args);
return f;
}
static gpr_timespec n_seconds_time(int n) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
}
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(
f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}
static void shutdown_client(grpc_end2end_test_fixture *f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = NULL;
}
static void end_test(grpc_end2end_test_fixture *f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->cq);
drain_cq(f->cq);
grpc_completion_queue_destroy(f->cq);
}
static void request_response_with_payload(grpc_end2end_test_fixture f,
const char *method_name,
const char *request_msg,
const char *response_msg) {
gpr_slice request_payload_slice = gpr_slice_from_copied_string(request_msg);
gpr_slice response_payload_slice = gpr_slice_from_copied_string(response_msg);
grpc_call *c;
grpc_call *s;
grpc_byte_buffer *request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
cq_verifier *cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
grpc_op *op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_byte_buffer *request_payload_recv = NULL;
grpc_byte_buffer *response_payload_recv = NULL;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
char *details = NULL;
size_t details_capacity = 0;
int was_cancelled = 2;
c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
method_name, "foo.test.google.fr", deadline,
NULL);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = request_payload;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &response_payload_recv;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->data.recv_status_on_client.status_details_capacity = &details_capacity;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
cq_expect_completion(cqv, tag(101), 1);
cq_verify(cqv);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
cq_expect_completion(cqv, tag(102), 1);
cq_verify(cqv);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = response_payload;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
cq_expect_completion(cqv, tag(103), 1);
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_OK);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_destroy(c);
grpc_call_destroy(s);
cq_verifier_destroy(cqv);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_byte_buffer_destroy(response_payload_recv);
}
static void test_load_reporting_hook(grpc_end2end_test_config config) {
aggregated_bw_stats *aggr_stats_server =
gpr_malloc(sizeof(aggregated_bw_stats));
memset(aggr_stats_server, 0, sizeof(aggregated_bw_stats));
grpc_load_reporting_config *server_lrc =
grpc_load_reporting_config_create(sample_fn, aggr_stats_server);
/* Introduce load reporting for the server through its arguments */
grpc_arg arg = grpc_load_reporting_config_create_arg(server_lrc);
grpc_channel_args *lr_server_args =
grpc_channel_args_copy_and_add(NULL, &arg, 1);
grpc_end2end_test_fixture f =
begin_test(config, "test_load_reporting_hook", NULL, lr_server_args);
const char *method_name = "/gRPCFTW";
const char *request_msg = "so long!";
const char *response_msg = "I'm back!";
request_response_with_payload(f, method_name, request_msg, response_msg);
end_test(&f);
grpc_channel_args_destroy(lr_server_args);
config.tear_down_data(&f);
if (aggr_stats_server->fully_processed) {
GPR_ASSERT(aggr_stats_server->total_bytes ==
5 + strlen(request_msg) + strlen(response_msg));
GPR_ASSERT(aggr_stats_server->channel_creation_token == 0xCAFEFACE);
GPR_ASSERT(aggr_stats_server->channel_destruction_token == 0xDEADFACE);
GPR_ASSERT(aggr_stats_server->call_creation_token == 0xCAFED00D);
GPR_ASSERT(aggr_stats_server->call_destruction_token == 0xDEADD00D);
GPR_ASSERT(strcmp(aggr_stats_server->method_name, "/gRPCFTW") == 0);
}
gpr_free(aggr_stats_server->method_name);
gpr_free(aggr_stats_server);
grpc_load_reporting_config_destroy(server_lrc);
}
void load_reporting_hook(grpc_end2end_test_config config) {
test_load_reporting_hook(config);
}
void load_reporting_hook_pre_init(void) {}