Bug fixing
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 237def4..7b50e28 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -361,7 +361,8 @@
grpc_chttp2_error_code error_code, int send_rst);
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
- grpc_chttp2_error_code error_code, int send_rst);
+ grpc_chttp2_error_code error_code,
+ grpc_mdstr *optional_message, int send_rst);
static void finalize_cancellations(transport *t);
static stream *lookup_stream(transport *t, gpr_uint32 id);
static void remove_from_stream_map(transport *t, stream *s);
@@ -1011,6 +1012,12 @@
}
static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ cancel_stream(
+ t, s, op->cancel_with_status,
+ grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), op->cancel_message, 1);
+ }
+
if (op->send_ops) {
GPR_ASSERT(s->outgoing_sopb == NULL);
s->send_done_closure.cb = op->on_done_send;
@@ -1037,26 +1044,16 @@
GPR_ASSERT(s->incoming_sopb == NULL);
s->recv_done_closure.cb = op->on_done_recv;
s->recv_done_closure.user_data = op->recv_user_data;
- if (!s->cancelled) {
- s->incoming_sopb = op->recv_ops;
- s->incoming_sopb->nops = 0;
- s->publish_state = op->recv_state;
- maybe_finish_read(t, s);
- maybe_join_window_updates(t, s);
- } else {
- schedule_cb(t, s->recv_done_closure, 0);
- }
+ s->incoming_sopb = op->recv_ops;
+ s->incoming_sopb->nops = 0;
+ s->publish_state = op->recv_state;
+ maybe_finish_read(t, s);
+ maybe_join_window_updates(t, s);
}
if (op->bind_pollset) {
add_to_pollset_locked(t, op->bind_pollset);
}
-
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- cancel_stream(
- t, s, op->cancel_with_status,
- grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), 1);
- }
}
static void perform_op(grpc_transport *gt, grpc_stream *gs,
@@ -1123,6 +1120,7 @@
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
+ grpc_mdstr *optional_message,
int send_rst) {
int had_outgoing;
char buffer[GPR_LTOA_MIN_BUFSIZE];
@@ -1147,14 +1145,18 @@
add_incoming_metadata(
t, s,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
- switch (local_status) {
- case GRPC_STATUS_CANCELLED:
- add_incoming_metadata(
- t, s, grpc_mdelem_from_strings(t->metadata_context,
- "grpc-message", "Cancelled"));
- break;
- default:
- break;
+ if (!optional_message) {
+ switch (local_status) {
+ case GRPC_STATUS_CANCELLED:
+ add_incoming_metadata(
+ t, s, grpc_mdelem_from_strings(t->metadata_context,
+ "grpc-message", "Cancelled"));
+ break;
+ default:
+ break;
+ }
+ } else {
+ add_incoming_metadata(t, s, grpc_mdelem_from_metadata_strings(t->metadata_context, grpc_mdstr_from_string(t->metadata_context, "grpc-message"), grpc_mdstr_ref(optional_message)));
}
add_metadata_batch(t, s);
maybe_finish_read(t, s);
@@ -1165,24 +1167,27 @@
gpr_slice_buffer_add(&t->qbuf,
grpc_chttp2_rst_stream_create(id, error_code));
}
+ if (optional_message) {
+ grpc_mdstr_unref(optional_message);
+ }
}
static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, int send_rst) {
cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
- send_rst);
+ NULL, send_rst);
}
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
- grpc_chttp2_error_code error_code, int send_rst) {
- cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
+ grpc_chttp2_error_code error_code, grpc_mdstr *optional_message, int send_rst) {
+ cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message, send_rst);
}
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
- GRPC_CHTTP2_INTERNAL_ERROR, 0);
+ GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
}
static void end_all_the_calls(transport *t) {
@@ -1285,7 +1290,7 @@
case GRPC_CHTTP2_STREAM_ERROR:
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_INTERNAL_ERROR),
- GRPC_CHTTP2_INTERNAL_ERROR, 1);
+ GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1);
return init_skip_frame(t, 0);
case GRPC_CHTTP2_CONNECTION_ERROR:
drop_connection(t);
@@ -1598,7 +1603,7 @@
if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
GRPC_CHTTP2_FLOW_CONTROL_ERROR),
- GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
+ GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
} else {
s->outgoing_window += st.window_update;
/* if this window update makes outgoing ops writable again,
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 987dd4c..cc93921 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -94,3 +94,12 @@
op->on_done_recv(op->recv_user_data, 0);
}
}
+
+void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_status_code status, grpc_mdstr *message) {
+ if (op->cancel_with_status == GRPC_STATUS_OK) {
+ op->cancel_with_status = status;
+ op->cancel_message = message;
+ } else if (message) {
+ grpc_mdstr_unref(message);
+ }
+}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 5036dfc..7c4bed1 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -75,6 +75,7 @@
grpc_pollset *bind_pollset;
grpc_status_code cancel_with_status;
+ grpc_mdstr *cancel_message;
} grpc_transport_op;
/* Callbacks made from the transport to the upper layers of grpc. */
@@ -134,6 +135,8 @@
void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
+void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_status_code status, grpc_mdstr *message);
+
/* TODO(ctiller): remove this */
void grpc_transport_add_to_pollset(grpc_transport *transport,
grpc_pollset *pollset);
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 54f501f..b9283b7 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -139,6 +139,10 @@
first = 0;
gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
gpr_strvec_add(&b, tmp);
+ if (op->cancel_message) {
+ gpr_asprintf(&tmp, ";msg='%s'", grpc_mdstr_as_c_string(op->cancel_message));
+ gpr_strvec_add(&b, tmp);
+ }
}
out = gpr_strvec_flatten(&b, NULL);