stuff
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index e32ee28..86bd5d2 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -503,7 +503,7 @@
gpr_mu_lock(&t->mu);
t->calling_back = 1;
- ref_transport(t);
+ ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
sr = setup(arg, &t->base, t->metadata_context);
@@ -515,7 +515,7 @@
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
- ref_transport(t);
+ ref_transport(t); /* matches unref inside recv_data */
recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
unref_transport(t);
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index c3901bf..dabe68f 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -50,22 +50,9 @@
Must be ignored by receivers */
GRPC_NO_OP,
GRPC_OP_METADATA,
- /* Begin a message/metadata element/status - as defined by
- grpc_message_type. */
- GRPC_OP_BEGIN_MESSAGE,
- /* Add a slice of data to the current message/metadata element/status.
- Must not overflow the forward declared length. */
- GRPC_OP_SLICE
+ GRPC_OP_MESSAGE
} grpc_stream_op_code;
-/* Arguments for GRPC_OP_BEGIN */
-typedef struct grpc_begin_message {
- /* How many bytes of data will this message contain */
- gpr_uint32 length;
- /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */
- gpr_uint32 flags;
-} grpc_begin_message;
-
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
@@ -118,9 +105,8 @@
/* the arguments to this operation. union fields are named according to the
associated op-code */
union {
- grpc_begin_message begin_message;
+ grpc_byte_buffer *message;
grpc_metadata_batch metadata;
- gpr_slice slice;
} data;
} grpc_stream_op;
@@ -148,16 +134,8 @@
/* Append a GRPC_NO_OP to a buffer */
void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb);
-/* Append a GRPC_OP_BEGIN to a buffer */
-void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
- gpr_uint32 flags);
+void grpc_sopb_add_message(grpc_stream_op_buffer *sopb, grpc_byte_buffer *bb);
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
-/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
-void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
-/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
- void (*cb)(void *arg, grpc_op_error error),
- void *arg);
/* Append a buffer to a buffer - does not ref/unref any internal objects */
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 6dd0fda..f31011e 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -78,6 +78,8 @@
void (*accept_stream)(void *user_data, grpc_transport *transport,
const void *server_data);
+ void (*goaway)(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug);
+
/* The transport has been closed */
void (*closed)(void *user_data, grpc_transport *transport);
};
@@ -139,8 +141,14 @@
void *recv_user_data;
grpc_pollset *bind_pollset;
+
+ grpc_status_code cancel_with_status;
} grpc_transport_op;
+void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
+
+char *grpc_transport_op_string(grpc_transport_op *op);
+
/* Send a batch of operations on a transport
Takes ownership of any objects contained in ops.
@@ -161,19 +169,6 @@
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data);
-/* Abort a stream
-
- Terminate reading and writing for a stream. A final recv_batch with no
- operations and final_state == GRPC_STREAM_CLOSED will be received locally,
- and no more data will be presented to the up-layer.
-
- TODO(ctiller): consider adding a HTTP/2 reason to this function. */
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
- grpc_status_code status);
-
-void grpc_transport_add_to_pollset(grpc_transport *transport,
- grpc_pollset *pollset);
-
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
gpr_slice debug_data);
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
new file mode 100644
index 0000000..5f7e1be
--- /dev/null
+++ b/src/core/transport/transport_op_string.c
@@ -0,0 +1,141 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/channel_stack.h"
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/useful.h>
+
+static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
+ gpr_strvec_add(b, gpr_strdup(" key="));
+ gpr_strvec_add(
+ b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
+ GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
+
+ gpr_strvec_add(b, gpr_strdup(" value="));
+ gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
+ GPR_SLICE_LENGTH(md->value->slice),
+ GPR_HEXDUMP_PLAINTEXT));
+}
+
+static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
+ grpc_linked_mdelem *m;
+ for (m = md.list.head; m != NULL; m = m->next) {
+ put_metadata(b, m->md);
+ }
+ if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
+ char *tmp;
+ gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
+ md.deadline.tv_nsec);
+ gpr_strvec_add(b, tmp);
+ }
+}
+
+char *grpc_call_op_string(grpc_call_op *op) {
+ char *tmp;
+ char *out;
+
+ gpr_strvec b;
+ gpr_strvec_init(&b);
+
+ switch (op->dir) {
+ case GRPC_CALL_DOWN:
+ gpr_strvec_add(&b, gpr_strdup(">"));
+ break;
+ case GRPC_CALL_UP:
+ gpr_strvec_add(&b, gpr_strdup("<"));
+ break;
+ }
+ switch (op->type) {
+ case GRPC_SEND_METADATA:
+ gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
+ put_metadata_list(&b, op->data.metadata);
+ break;
+ case GRPC_SEND_MESSAGE:
+ gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE"));
+ break;
+ case GRPC_SEND_PREFORMATTED_MESSAGE:
+ gpr_strvec_add(&b, gpr_strdup("SEND_PREFORMATTED_MESSAGE"));
+ break;
+ case GRPC_SEND_FINISH:
+ gpr_strvec_add(&b, gpr_strdup("SEND_FINISH"));
+ break;
+ case GRPC_REQUEST_DATA:
+ gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA"));
+ break;
+ case GRPC_RECV_METADATA:
+ gpr_strvec_add(&b, gpr_strdup("RECV_METADATA"));
+ put_metadata_list(&b, op->data.metadata);
+ break;
+ case GRPC_RECV_MESSAGE:
+ gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
+ break;
+ case GRPC_RECV_HALF_CLOSE:
+ gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE"));
+ break;
+ case GRPC_RECV_FINISH:
+ gpr_strvec_add(&b, gpr_strdup("RECV_FINISH"));
+ break;
+ case GRPC_RECV_SYNTHETIC_STATUS:
+ gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'",
+ op->data.synthetic_status.status,
+ op->data.synthetic_status.message);
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_CANCEL_OP:
+ gpr_strvec_add(&b, gpr_strdup("CANCEL_OP"));
+ break;
+ }
+ gpr_asprintf(&tmp, " flags=0x%08x", op->flags);
+ gpr_strvec_add(&b, tmp);
+ if (op->bind_pollset) {
+ gpr_strvec_add(&b, gpr_strdup("bind_pollset"));
+ }
+
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
+
+ return out;
+}
+
+void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
+ grpc_call_element *elem, grpc_call_op *op) {
+ char *str = grpc_call_op_string(op);
+ gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
+ gpr_free(str);
+}