Redesign of compression algorithm propagation based on metadata
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 918cb2d..ad42bbb 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -31,6 +31,7 @@
*
*/
+#include <assert.h>
#include <string.h>
#include "src/core/channel/compress_filter.h"
@@ -42,16 +43,16 @@
typedef struct call_data {
gpr_slice_buffer slices;
+ grpc_linked_mdelem compression_algorithm_storage;
int remaining_slice_bytes;
- int no_compress; /**< whether to skip compression for this specific call */
-
- grpc_linked_mdelem compression_algorithm;
+ grpc_compression_algorithm compression_algorithm;
+ gpr_uint8 has_compression_algorithm;
} call_data;
typedef struct channel_data {
- grpc_compression_algorithm compress_algorithm;
- grpc_mdelem *compress_algorithm_md;
- grpc_mdelem *no_compression_md;
+ grpc_mdstr *mdstr_compression_algorithm_key;
+ grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
+ grpc_compression_algorithm default_compression_algorithm;
} channel_data;
/** Compress \a slices in place using \a algorithm. Returns 1 if compression did
@@ -70,14 +71,41 @@
return did_compress;
}
+/** For each \a md element from the incoming metadata, filter out the entry for
+ * "grpc-compression-algorithm", using its value to populate the call data's
+ * compression_algorithm field. */
+static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ channel_data *channeld = elem->channel_data;
+
+ if (md->key == channeld->mdstr_compression_algorithm_key) {
+ assert(GPR_SLICE_LENGTH(md->value->slice) ==
+ sizeof(grpc_compression_algorithm));
+ memcpy(&calld->compression_algorithm, GPR_SLICE_START_PTR(md->value->slice),
+ sizeof(grpc_compression_algorithm));
+ calld->has_compression_algorithm = 1;
+ return NULL;
+ }
+
+ return md;
+}
+
+static int skip_compression(channel_data *channeld, call_data *calld) {
+ if (calld->has_compression_algorithm &&
+ (calld->compression_algorithm == GRPC_COMPRESS_NONE)) {
+ return 1;
+ }
+ /* no per-call compression override */
+ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
+}
+
static void process_send_ops(grpc_call_element *elem,
grpc_stream_op_buffer *send_ops) {
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i, j;
- int begin_message_index = -1;
- int metadata_op_index = -1;
- grpc_mdelem *actual_compression_md;
+ int did_compress = 0;
/* buffer up slices until we've processed all the expected ones (as given by
* GRPC_OP_BEGIN_MESSAGE) */
@@ -85,73 +113,83 @@
grpc_stream_op *sop = &send_ops->ops[i];
switch (sop->type) {
case GRPC_OP_BEGIN_MESSAGE:
- begin_message_index = i;
calld->remaining_slice_bytes = sop->data.begin_message.length;
- calld->no_compress =
- !!(sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS);
+ /* TODO(dgq): we may want to get rid of the flags mechanism to have
+ * exceptions to compression: we can rely solely on metadata to set NONE
+ * as the compression algorithm */
+ if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ }
+ break;
+ case GRPC_OP_METADATA:
+ /* Parse incoming request for compression. If any, it'll be available at
+ * calld->compression_algorithm */
+ grpc_metadata_batch_filter(&(sop->data.metadata), compression_md_filter,
+ elem);
+ if (!calld->has_compression_algorithm) {
+ /* If no algorithm was found in the metadata and we aren't
+ * exceptionally skipping compression, fall back to the channel
+ * default */
+ calld->compression_algorithm =
+ channeld->default_compression_algorithm;
+ calld->has_compression_algorithm = 1; /* GPR_TRUE */
+ }
break;
case GRPC_OP_SLICE:
- if (calld->no_compress) continue;
+ if (skip_compression(channeld, calld)) continue;
GPR_ASSERT(calld->remaining_slice_bytes > 0);
/* add to calld->slices */
gpr_slice_buffer_add(&calld->slices, sop->data.slice);
calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice);
if (calld->remaining_slice_bytes == 0) {
/* compress */
- if (!compress_send_sb(channeld->compress_algorithm, &calld->slices)) {
- calld->no_compress = 1; /* GPR_TRUE */
- }
+ did_compress =
+ compress_send_sb(calld->compression_algorithm, &calld->slices);
+ }
+ break;
+ case GRPC_NO_OP:
+ ; /* fallthrough */
+ }
+ }
+
+ /* We need to:
+ * - (OP_SLICE) If compression happened, replace the input slices with the
+ * compressed ones.
+ * - (BEGIN_MESSAGE) Update the message info (size, flags).
+ * - (OP_METADATA) Convey the compression configuration */
+ for (i = 0, j = 0; i < send_ops->nops; ++i) {
+ grpc_stream_op *sop = &send_ops->ops[i];
+ switch (sop->type) {
+ case GRPC_OP_BEGIN_MESSAGE:
+ if (did_compress) {
+ sop->data.begin_message.length = calld->slices.length;
+ sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ } else {
+ /* either because the user requested the exception or because compressing
+ * would have resulted in a larger output */
+ calld->compression_algorithm = GRPC_COMPRESS_NONE;
+ /* reset the flag compression bit */
+ sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS;
}
break;
case GRPC_OP_METADATA:
- /* Save the index of the first metadata op, to be processed after we
- * know whether compression actually happened */
- if (metadata_op_index < 0) metadata_op_index = i;
+ grpc_metadata_batch_add_head(
+ &(sop->data.metadata), &calld->compression_algorithm_storage,
+ grpc_mdelem_ref(channeld->mdelem_compression_algorithms
+ [calld->compression_algorithm]));
break;
- case GRPC_NO_OP:
- ; /* fallthrough, ignore */
- }
- }
-
- if (metadata_op_index < 0 || begin_message_index < 0) { /* bail out */
- return;
- }
-
- /* update both the metadata and the begin_message's flags */
- if (calld->no_compress) {
- /* either because the user requested the exception or because compressing
- * would have resulted in a larger output */
- channeld->compress_algorithm = GRPC_COMPRESS_NONE;
- actual_compression_md = channeld->no_compression_md;
- /* reset the flag compression bit */
- send_ops->ops[begin_message_index].data.begin_message.flags &=
- ~GRPC_WRITE_INTERNAL_COMPRESS;
- } else { /* DID compress */
- actual_compression_md = channeld->compress_algorithm_md;
- /* at this point, calld->slices contains the *compressed* slices from
- * send_ops->ops[*]->data.slice. We now replace these input slices with the
- * compressed ones. */
- for (i = 0, j = 0; i < send_ops->nops; ++i) {
- grpc_stream_op *sop = &send_ops->ops[i];
- GPR_ASSERT(j < calld->slices.count);
- switch (sop->type) {
- case GRPC_OP_SLICE:
+ case GRPC_OP_SLICE:
+ if (did_compress) {
+ GPR_ASSERT(j < calld->slices.count);
gpr_slice_unref(sop->data.slice);
sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
- break;
- case GRPC_OP_BEGIN_MESSAGE:
- sop->data.begin_message.length = calld->slices.length;
- sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
- case GRPC_NO_OP:
- case GRPC_OP_METADATA:
- ; /* fallthrough, ignore */
- }
+ }
+ break;
+ case GRPC_NO_OP:
+ ; /* fallthrough */
}
}
-
- grpc_metadata_batch_add_head(
- &(send_ops->ops[metadata_op_index].data.metadata),
- &calld->compression_algorithm, grpc_mdelem_ref(actual_compression_md));
}
/* Called either:
@@ -189,6 +227,7 @@
/* initialize members */
gpr_slice_buffer_init(&calld->slices);
+ calld->has_compression_algorithm = 0;
if (initial_op) {
if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
@@ -209,17 +248,23 @@
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
+ grpc_compression_algorithm algo_idx;
const grpc_compression_level clevel =
grpc_channel_args_get_compression_level(args);
- const grpc_compression_algorithm none_alg = GRPC_COMPRESS_NONE;
- channeld->compress_algorithm_md = grpc_mdelem_from_string_and_buffer(
- mdctx, "grpc-compression-level", (gpr_uint8*)&clevel, sizeof(clevel));
- channeld->compress_algorithm = grpc_compression_algorithm_for_level(clevel);
+ channeld->default_compression_algorithm =
+ grpc_compression_algorithm_for_level(clevel);
- channeld->no_compression_md = grpc_mdelem_from_string_and_buffer(
- mdctx, "grpc-compression-level", (gpr_uint8 *)&none_alg,
- sizeof(none_alg));
+ channeld->mdstr_compression_algorithm_key =
+ grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm");
+
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
+ channeld->mdelem_compression_algorithms[algo_idx] =
+ grpc_mdelem_from_metadata_strings(
+ mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key),
+ grpc_mdstr_from_buffer(mdctx, (gpr_uint8 *)&algo_idx,
+ sizeof(algo_idx)));
+ }
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
@@ -231,8 +276,13 @@
/* Destructor for channel data */
static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *channeld = elem->channel_data;
- grpc_mdelem_unref(channeld->compress_algorithm_md);
- grpc_mdelem_unref(channeld->no_compression_md);
+ grpc_compression_algorithm algo_idx;
+
+ grpc_mdstr_unref(channeld->mdstr_compression_algorithm_key);
+ for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
+ ++algo_idx) {
+ grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
+ }
}
const grpc_channel_filter grpc_compress_filter = {
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 4125741..2b2d92c 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -209,8 +209,8 @@
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
- /** Compression level for the call */
- grpc_compression_level compression_level;
+ /** Compression algorithm for the call */
+ grpc_compression_algorithm compression_algorithm;
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -395,9 +395,9 @@
}
}
-static void set_compression_level(grpc_call *call,
- grpc_compression_level clevel) {
- call->compression_level = clevel;
+static void set_compression_algorithm(grpc_call *call,
+ grpc_compression_algorithm algo) {
+ call->compression_algorithm = algo;
}
static void set_status_details(grpc_call *call, status_source source,
@@ -651,12 +651,10 @@
/* some aliases for readability */
gpr_slice *slices = call->incoming_message.slices;
const size_t nslices = call->incoming_message.count;
- const grpc_compression_algorithm compression_algorithm =
- grpc_compression_algorithm_for_level(call->compression_level);
- if (call->compression_level > GRPC_COMPRESS_LEVEL_NONE) {
- byte_buffer = grpc_raw_compressed_byte_buffer_create(slices, nslices,
- compression_algorithm);
+ if (call->compression_algorithm > GRPC_COMPRESS_NONE) {
+ byte_buffer = grpc_raw_compressed_byte_buffer_create(
+ slices, nslices, call->compression_algorithm);
} else {
byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
}
@@ -683,12 +681,11 @@
* compression level should already be present in the call, as parsed off its
* corresponding metadata. */
if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
- (call->compression_level == GRPC_COMPRESS_LEVEL_NONE)) {
+ (call->compression_algorithm == GRPC_COMPRESS_NONE)) {
char *message = NULL;
gpr_asprintf(
&message, "Invalid compression algorithm (%s) for compressed message.",
- grpc_compression_algorithm_name(
- grpc_compression_algorithm_for_level(call->compression_level)));
+ grpc_compression_algorithm_name(call->compression_algorithm));
cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message, 1);
}
/* stash away parameters, and prepare for incoming slices */
@@ -1183,8 +1180,8 @@
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
} else if (key ==
- grpc_channel_get_compresssion_level_string(call->channel)) {
- set_compression_level(call, decode_compression(md));
+ grpc_channel_get_compresssion_algorithm_string(call->channel)) {
+ set_compression_algorithm(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 6353a83..e841a5d 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -64,7 +64,7 @@
grpc_mdctx *metadata_context;
/** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
- grpc_mdstr *grpc_compression_level_string;
+ grpc_mdstr *grpc_compression_algorithm_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
@@ -99,8 +99,8 @@
gpr_ref_init(&channel->refs, 1 + is_client);
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
- channel->grpc_compression_level_string =
- grpc_mdstr_from_string(mdctx, "grpc-compression-level");
+ channel->grpc_compression_algorithm_string =
+ grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE];
@@ -202,7 +202,7 @@
grpc_mdelem_unref(channel->grpc_status_elem[i]);
}
grpc_mdstr_unref(channel->grpc_status_string);
- grpc_mdstr_unref(channel->grpc_compression_level_string);
+ grpc_mdstr_unref(channel->grpc_compression_algorithm_string);
grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string);
grpc_mdstr_unref(channel->authority_string);
@@ -261,8 +261,8 @@
return channel->grpc_status_string;
}
-grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
- return channel->grpc_compression_level_string;
+grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(grpc_channel *channel) {
+ return channel->grpc_compression_algorithm_string;
}
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index f838129..f4df06a 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -53,7 +53,8 @@
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
-grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
+grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(
+ grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c
index fe41780..8cc4cb7 100644
--- a/test/core/end2end/tests/request_with_compressed_payload.c
+++ b/test/core/end2end/tests/request_with_compressed_payload.c
@@ -251,10 +251,10 @@
config.tear_down_data(&f);
}
-static void test_invoke_request_with_excepcionally_uncompressed_payload(
+static void test_invoke_request_with_exceptionally_uncompressed_payload(
grpc_end2end_test_config config) {
request_with_payload_template(
- config, "test_invoke_request_with_excepcionally_uncompressed_payload",
+ config, "test_invoke_request_with_exceptionally_uncompressed_payload",
GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE);
}
@@ -276,7 +276,7 @@
void grpc_end2end_tests(grpc_end2end_test_config config) {
- test_invoke_request_with_excepcionally_uncompressed_payload(config);
+ test_invoke_request_with_exceptionally_uncompressed_payload(config);
test_invoke_request_with_compressed_payload(config);
test_invoke_request_with_uncompressed_payload(config);
}