WIP in *_end2end_test.cc. Tests pass. Fixed leaks and introduced concept of compression request thru MD
diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h
index 5e10875..88954e2 100644
--- a/include/grpc++/client_context.h
+++ b/include/grpc++/client_context.h
@@ -38,6 +38,7 @@
#include <memory>
#include <string>
+#include <grpc/compression.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc++/config.h>
@@ -107,6 +108,17 @@
creds_ = creds;
}
+ grpc_compression_level get_compression_level() const {
+ return compression_level_;
+ }
+ void set_compression_level(grpc_compression_level level);
+
+ grpc_compression_algorithm get_compression_algorithm() const {
+ return compression_algorithm_;
+ }
+ void set_compression_algorithm(grpc_compression_algorithm algorithm);
+
+
void TryCancel();
private:
@@ -157,6 +169,9 @@
std::multimap<grpc::string, grpc::string> send_initial_metadata_;
std::multimap<grpc::string, grpc::string> recv_initial_metadata_;
std::multimap<grpc::string, grpc::string> trailing_metadata_;
+
+ grpc_compression_level compression_level_;
+ grpc_compression_algorithm compression_algorithm_;
};
} // namespace grpc
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 326b6a1..a2f0a2f 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -36,6 +36,7 @@
#include <map>
+#include <grpc/compression.h>
#include <grpc/support/time.h>
#include <grpc++/config.h>
#include <grpc++/time.h>
@@ -97,6 +98,16 @@
return client_metadata_;
}
+ grpc_compression_level get_compression_level() const {
+ return compression_level_;
+ }
+ void set_compression_level(grpc_compression_level level);
+
+ grpc_compression_algorithm get_compression_algorithm() const {
+ return compression_algorithm_;
+ }
+ void set_compression_algorithm(grpc_compression_algorithm algorithm);
+
private:
friend class ::grpc::Server;
template <class W, class R>
@@ -142,6 +153,9 @@
std::multimap<grpc::string, grpc::string> client_metadata_;
std::multimap<grpc::string, grpc::string> initial_metadata_;
std::multimap<grpc::string, grpc::string> trailing_metadata_;
+
+ grpc_compression_level compression_level_;
+ grpc_compression_algorithm compression_algorithm_;
};
} // namespace grpc
diff --git a/include/grpc/compression.h b/include/grpc/compression.h
index 1cff5d2..dd7e1d0 100644
--- a/include/grpc/compression.h
+++ b/include/grpc/compression.h
@@ -34,6 +34,10 @@
#ifndef GRPC_COMPRESSION_H
#define GRPC_COMPRESSION_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/** To be used in channel arguments */
#define GRPC_COMPRESSION_LEVEL_ARG "grpc.compression_level"
@@ -76,4 +80,8 @@
grpc_compression_algorithm grpc_compression_algorithm_for_level(
grpc_compression_level level);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* GRPC_COMPRESSION_H */
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index f5fe87d..6100a90 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -50,7 +50,8 @@
} call_data;
typedef struct channel_data {
- grpc_mdstr *mdstr_compression_algorithm_key;
+ grpc_mdstr *mdstr_request_compression_algorithm_key;
+ grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
grpc_compression_algorithm default_compression_algorithm;
} channel_data;
@@ -72,14 +73,14 @@
}
/** For each \a md element from the incoming metadata, filter out the entry for
- * "grpc-compression-algorithm", using its value to populate the call data's
+ * "grpc-encoding", using its value to populate the call data's
* compression_algorithm field. */
static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
- if (md->key == channeld->mdstr_compression_algorithm_key) {
+ if (md->key == channeld->mdstr_request_compression_algorithm_key) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
if (!grpc_compression_algorithm_parse(md_c_str,
&calld->compression_algorithm)) {
@@ -184,7 +185,6 @@
break;
case GRPC_OP_SLICE:
if (did_compress) {
- gpr_slice_unref(sop->data.slice);
if (j < calld->slices.count) {
sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
}
@@ -259,7 +259,10 @@
channeld->default_compression_algorithm =
grpc_compression_algorithm_for_level(clevel);
- channeld->mdstr_compression_algorithm_key =
+ channeld->mdstr_request_compression_algorithm_key =
+ grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY);
+
+ channeld->mdstr_outgoing_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, "grpc-encoding");
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
@@ -267,7 +270,8 @@
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0);
channeld->mdelem_compression_algorithms[algo_idx] =
grpc_mdelem_from_metadata_strings(
- mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key),
+ mdctx,
+ grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorith_name));
}
@@ -283,7 +287,8 @@
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
- grpc_mdstr_unref(channeld->mdstr_compression_algorithm_key);
+ grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key);
+ grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
++algo_idx) {
grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h
index ea66796..3a196eb 100644
--- a/src/core/channel/compress_filter.h
+++ b/src/core/channel/compress_filter.h
@@ -36,6 +36,8 @@
#include "src/core/channel/channel_stack.h"
+#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "internal:grpc-encoding-request"
+
/** Message-level compression filter.
*
* See <grpc/compression.h> for the available compression levels.
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 37dadec..5f489c0 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -1243,7 +1243,7 @@
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
} else if (key ==
- grpc_channel_get_compresssion_algorithm_string(call->channel)) {
+ grpc_channel_get_compression_algorithm_string(call->channel)) {
set_compression_algorithm(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index d3dcb22..cab99e7 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -273,7 +273,7 @@
return channel->grpc_status_string;
}
-grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(
+grpc_mdstr *grpc_channel_get_compression_algorithm_string(
grpc_channel *channel) {
return channel->grpc_compression_algorithm_string;
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 8d0fe81..66924ad 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -53,7 +53,7 @@
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(
+grpc_mdstr *grpc_channel_get_compression_algorithm_string(
grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index be46c54..cfa869e 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -244,10 +244,7 @@
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
} */
- if (grpc_channel_args_get_compression_level(args) >
- GRPC_COMPRESS_LEVEL_NONE) {
- filters[n++] = &grpc_compress_filter;
- }
+ filters[n++] = &grpc_compress_filter;
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 72cdd49..0eba554 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -34,9 +34,12 @@
#include <grpc++/client_context.h>
#include <grpc/grpc.h>
+#include <grpc/support/string_util.h>
#include <grpc++/credentials.h>
#include <grpc++/time.h>
+#include "src/core/channel/compress_filter.h"
+
namespace grpc {
ClientContext::ClientContext()
@@ -75,6 +78,24 @@
}
}
+void ClientContext::set_compression_level(grpc_compression_level level) {
+ const grpc_compression_algorithm algorithm_for_level =
+ grpc_compression_algorithm_for_level(level);
+ set_compression_algorithm(algorithm_for_level);
+}
+
+void ClientContext::set_compression_algorithm(
+ grpc_compression_algorithm algorithm) {
+ char* algorithm_name = NULL;
+ if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
+ gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
+ algorithm);
+ abort();
+ }
+ GPR_ASSERT(algorithm_name != NULL);
+ AddMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name);
+}
+
void ClientContext::TryCancel() {
if (call_) {
grpc_call_cancel(call_);
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 268e4f6..337f680 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -103,7 +103,9 @@
: byte_count_(0), backup_count_(0) {
grpc_byte_buffer_reader_init(&reader_, buffer);
}
- ~GrpcBufferReader() GRPC_OVERRIDE {}
+ ~GrpcBufferReader() GRPC_OVERRIDE {
+ grpc_byte_buffer_reader_destroy(&reader_);
+ }
bool Next(const void** data, int* size) GRPC_OVERRIDE {
if (backup_count_ > 0) {
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 699895a..087e28d 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -39,6 +39,8 @@
#include <grpc++/impl/sync.h>
#include <grpc++/time.h>
+#include "src/core/channel/compress_filter.h"
+
namespace grpc {
// CompletionOp
@@ -146,4 +148,22 @@
return completion_op_ && completion_op_->CheckCancelled(cq_);
}
+void ServerContext::set_compression_level(grpc_compression_level level) {
+ const grpc_compression_algorithm algorithm_for_level =
+ grpc_compression_algorithm_for_level(level);
+ set_compression_algorithm(algorithm_for_level);
+}
+
+void ServerContext::set_compression_algorithm(
+ grpc_compression_algorithm algorithm) {
+ char* algorithm_name = NULL;
+ if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
+ gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
+ algorithm);
+ abort();
+ }
+ GPR_ASSERT(algorithm_name != NULL);
+ AddInitialMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name);
+}
+
} // namespace grpc
diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c
index ca16bc7..a605745 100644
--- a/test/core/end2end/tests/request_with_compressed_payload.c
+++ b/test/core/end2end/tests/request_with_compressed_payload.c
@@ -45,6 +45,7 @@
#include "test/core/end2end/cq_verifier.h"
#include "src/core/channel/channel_args.h"
+#include "src/core/channel/compress_filter.h"
enum { TIMEOUT = 200000 };
@@ -240,6 +241,7 @@
cq_verifier_destroy(cqv);
+ gpr_slice_unref(request_payload_slice);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(request_payload_recv);
@@ -279,13 +281,13 @@
grpc_metadata gzip_compression_override;
grpc_metadata none_compression_override;
- gzip_compression_override.key = "grpc-encoding";
+ gzip_compression_override.key = GRPC_COMPRESS_REQUEST_ALGORITHM_KEY;
gzip_compression_override.value = "gzip";
gzip_compression_override.value_length = 4;
memset(&gzip_compression_override.internal_data, 0,
sizeof(gzip_compression_override.internal_data));
- none_compression_override.key = "grpc-encoding";
+ none_compression_override.key = GRPC_COMPRESS_REQUEST_ALGORITHM_KEY;
none_compression_override.value = "none";
none_compression_override.value_length = 4;
memset(&none_compression_override.internal_data, 0,
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 45ba8b0..49070a7 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -226,10 +226,11 @@
int num_rpcs) {
EchoRequest request;
EchoResponse response;
- request.set_message("Hello");
+ request.set_message("Hello hello hello hello");
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
+ context.set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
Status s = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.ok());
diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc
index b9d47b3..e9d86cc 100644
--- a/test/cpp/end2end/generic_end2end_test.cc
+++ b/test/cpp/end2end/generic_end2end_test.cc
@@ -227,6 +227,7 @@
GenericServerContext srv_ctx;
GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
+ cli_ctx.set_compression_level(GRPC_COMPRESS_LEVEL_HIGH);
send_request.set_message("Hello");
std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));