blob: 682a820b487bdbe8f68a8762ee683b1703f88bb4 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02003 * Copyright 2015 gRPC authors.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02005 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08008 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +02009 * http://www.apache.org/licenses/LICENSE-2.0
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080010 *
Jan Tattermusch7897ae92017-06-07 22:57:36 +020011 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080016 *
17 */
18
Craig Tiller9533d042016-03-25 17:11:06 -070019#include "src/core/lib/transport/transport.h"
Craig Tillere0221ff2016-07-11 15:56:08 -070020
21#include <string.h>
22
Craig Tiller45ce9272015-07-31 11:22:35 -070023#include <grpc/support/alloc.h>
Craig Tiller9d35a1f2015-11-02 14:16:12 -080024#include <grpc/support/atm.h>
Craig Tiller45ce9272015-07-31 11:22:35 -070025#include <grpc/support/log.h>
Craig Tillere2d6a612016-03-09 10:14:30 -080026#include <grpc/support/sync.h>
Craig Tillere0221ff2016-07-11 15:56:08 -070027
Craig Tiller7c70b6c2017-01-23 07:48:42 -080028#include "src/core/lib/iomgr/executor.h"
Craig Tillera59c16c2016-10-31 07:25:01 -070029#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -070030#include "src/core/lib/slice/slice_string_helpers.h"
Craig Tillerf0f70a82016-06-23 13:55:06 -070031#include "src/core/lib/support/string.h"
Craig Tiller9533d042016-03-25 17:11:06 -070032#include "src/core/lib/transport/transport_impl.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080033
ncteisena1354852017-06-08 16:25:53 -070034#ifndef NDEBUG
ncteisen7712c7c2017-07-12 23:11:27 -070035grpc_tracer_flag grpc_trace_stream_refcount =
36 GRPC_TRACER_INITIALIZER(false, "stream_refcount");
ncteisena1354852017-06-08 16:25:53 -070037#endif
ncteisen9c43fc02017-06-08 16:06:23 -070038
39#ifndef NDEBUG
Craig Tiller9d35a1f2015-11-02 14:16:12 -080040void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
ncteisen9c43fc02017-06-08 16:06:23 -070041 if (GRPC_TRACER_ON(grpc_trace_stream_refcount)) {
42 gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
43 gpr_log(GPR_DEBUG, "%s %p:%p REF %" PRIdPTR "->%" PRIdPTR " %s",
44 refcount->object_type, refcount, refcount->destroy.cb_arg, val,
45 val + 1, reason);
46 }
Craig Tiller9d35a1f2015-11-02 14:16:12 -080047#else
48void grpc_stream_ref(grpc_stream_refcount *refcount) {
49#endif
Craig Tiller0cb803d2016-03-02 22:17:24 -080050 gpr_ref_non_zero(&refcount->refs);
Craig Tiller9d35a1f2015-11-02 14:16:12 -080051}
52
ncteisen9c43fc02017-06-08 16:06:23 -070053#ifndef NDEBUG
Craig Tiller9d35a1f2015-11-02 14:16:12 -080054void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
55 const char *reason) {
ncteisen9c43fc02017-06-08 16:06:23 -070056 if (GRPC_TRACER_ON(grpc_trace_stream_refcount)) {
57 gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
58 gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s",
59 refcount->object_type, refcount, refcount->destroy.cb_arg, val,
60 val - 1, reason);
61 }
Craig Tiller9d35a1f2015-11-02 14:16:12 -080062#else
63void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
64 grpc_stream_refcount *refcount) {
65#endif
66 if (gpr_unref(&refcount->refs)) {
Craig Tiller7c70b6c2017-01-23 07:48:42 -080067 if (exec_ctx->flags & GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) {
68 /* Ick.
69 The thread we're running on MAY be owned (indirectly) by a call-stack.
70 If that's the case, destroying the call-stack MAY try to destroy the
71 thread, which is a tangled mess that we just don't want to ever have to
72 cope with.
73 Throw this over to the executor (on a core-owned thread) and process it
74 there. */
Craig Tiller7a82afd2017-07-18 09:40:40 -070075 refcount->destroy.scheduler =
76 grpc_executor_scheduler(GRPC_EXECUTOR_SHORT);
Craig Tiller7c70b6c2017-01-23 07:48:42 -080077 }
ncteisen969b46e2017-06-08 14:57:11 -070078 GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE);
Craig Tiller9d35a1f2015-11-02 14:16:12 -080079 }
80}
81
Craig Tiller295df6d2017-03-01 11:28:24 -080082#define STREAM_REF_FROM_SLICE_REF(p) \
83 ((grpc_stream_refcount *)(((uint8_t *)p) - \
84 offsetof(grpc_stream_refcount, slice_refcount)))
85
86static void slice_stream_ref(void *p) {
ncteisen9c43fc02017-06-08 16:06:23 -070087#ifndef NDEBUG
Craig Tiller295df6d2017-03-01 11:28:24 -080088 grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p), "slice");
89#else
90 grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p));
91#endif
92}
93
94static void slice_stream_unref(grpc_exec_ctx *exec_ctx, void *p) {
ncteisen9c43fc02017-06-08 16:06:23 -070095#ifndef NDEBUG
Craig Tiller295df6d2017-03-01 11:28:24 -080096 grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p), "slice");
97#else
98 grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p));
99#endif
100}
101
102grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount,
103 void *buffer, size_t length) {
104 slice_stream_ref(&refcount->slice_refcount);
Yash Tibrewal533d1182017-09-18 10:48:22 -0700105 grpc_slice res;
106 res.refcount = &refcount->slice_refcount,
107 res.data.refcounted.bytes = (uint8_t *)buffer;
108 res.data.refcounted.length = length;
109 return res;
Craig Tiller295df6d2017-03-01 11:28:24 -0800110}
111
112static const grpc_slice_refcount_vtable stream_ref_slice_vtable = {
113 .ref = slice_stream_ref,
114 .unref = slice_stream_unref,
115 .eq = grpc_slice_default_eq_impl,
116 .hash = grpc_slice_default_hash_impl};
117
ncteisen9c43fc02017-06-08 16:06:23 -0700118#ifndef NDEBUG
Craig Tiller27e5aa42015-11-24 16:28:54 -0800119void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
120 grpc_iomgr_cb_func cb, void *cb_arg,
121 const char *object_type) {
122 refcount->object_type = object_type;
123#else
124void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
125 grpc_iomgr_cb_func cb, void *cb_arg) {
126#endif
127 gpr_ref_init(&refcount->refs, initial_refs);
ncteisen969b46e2017-06-08 14:57:11 -0700128 GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
Craig Tiller295df6d2017-03-01 11:28:24 -0800129 refcount->slice_refcount.vtable = &stream_ref_slice_vtable;
130 refcount->slice_refcount.sub_refcount = &refcount->slice_refcount;
Craig Tiller27e5aa42015-11-24 16:28:54 -0800131}
132
Craig Tiller466129e2016-03-09 14:43:18 -0800133static void move64(uint64_t *from, uint64_t *to) {
134 *to += *from;
135 *from = 0;
Craig Tillere2d6a612016-03-09 10:14:30 -0800136}
137
Craig Tiller466129e2016-03-09 14:43:18 -0800138void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from,
139 grpc_transport_one_way_stats *to) {
140 move64(&from->framing_bytes, &to->framing_bytes);
141 move64(&from->data_bytes, &to->data_bytes);
142 move64(&from->header_bytes, &to->header_bytes);
143}
144
145void grpc_transport_move_stats(grpc_transport_stream_stats *from,
146 grpc_transport_stream_stats *to) {
147 grpc_transport_move_one_way_stats(&from->incoming, &to->incoming);
148 grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing);
Craig Tillere2d6a612016-03-09 10:14:30 -0800149}
150
Craig Tillera82950e2015-09-22 12:33:20 -0700151size_t grpc_transport_stream_size(grpc_transport *transport) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800152 return transport->vtable->sizeof_stream;
153}
154
Craig Tillera82950e2015-09-22 12:33:20 -0700155void grpc_transport_destroy(grpc_exec_ctx *exec_ctx,
156 grpc_transport *transport) {
157 transport->vtable->destroy(exec_ctx, transport);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800158}
159
Craig Tillera82950e2015-09-22 12:33:20 -0700160int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
161 grpc_transport *transport, grpc_stream *stream,
Craig Tiller9d35a1f2015-11-02 14:16:12 -0800162 grpc_stream_refcount *refcount,
Craig Tiller7d2c3982017-03-13 12:58:29 -0700163 const void *server_data, gpr_arena *arena) {
Craig Tiller9d35a1f2015-11-02 14:16:12 -0800164 return transport->vtable->init_stream(exec_ctx, transport, stream, refcount,
Craig Tiller7d2c3982017-03-13 12:58:29 -0700165 server_data, arena);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800166}
167
Craig Tillera82950e2015-09-22 12:33:20 -0700168void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,
169 grpc_transport *transport,
170 grpc_stream *stream,
Craig Tillera0f3abd2017-03-31 15:42:16 -0700171 grpc_transport_stream_op_batch *op) {
Craig Tillera82950e2015-09-22 12:33:20 -0700172 transport->vtable->perform_stream_op(exec_ctx, transport, stream, op);
Craig Tiller3f475422015-06-25 10:43:05 -0700173}
174
Craig Tillera82950e2015-09-22 12:33:20 -0700175void grpc_transport_perform_op(grpc_exec_ctx *exec_ctx,
176 grpc_transport *transport,
177 grpc_transport_op *op) {
178 transport->vtable->perform_op(exec_ctx, transport, op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800179}
180
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700181void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
David Garcia Quintas2a50dfe2016-05-31 15:09:12 -0700182 grpc_stream *stream,
183 grpc_polling_entity *pollent) {
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700184 grpc_pollset *pollset;
185 grpc_pollset_set *pollset_set;
David Garcia Quintasc4d51122016-06-06 14:56:02 -0700186 if ((pollset = grpc_polling_entity_pollset(pollent)) != NULL) {
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700187 transport->vtable->set_pollset(exec_ctx, transport, stream, pollset);
David Garcia Quintasc4d51122016-06-06 14:56:02 -0700188 } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) != NULL) {
David Garcia Quintasf72eb972016-05-03 18:28:09 -0700189 transport->vtable->set_pollset_set(exec_ctx, transport, stream,
190 pollset_set);
191 } else {
192 abort();
193 }
Craig Tiller9d35a1f2015-11-02 14:16:12 -0800194}
195
Craig Tillera82950e2015-09-22 12:33:20 -0700196void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
197 grpc_transport *transport,
Craig Tillerd426cac2017-03-13 12:30:45 -0700198 grpc_stream *stream,
199 grpc_closure *then_schedule_closure) {
Craig Tiller2c8063c2016-03-22 22:12:15 -0700200 transport->vtable->destroy_stream(exec_ctx, transport, stream,
Craig Tillerd426cac2017-03-13 12:30:45 -0700201 then_schedule_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202}
203
Yuchen Zeng5ab4ca52016-10-24 10:49:55 -0700204grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
205 grpc_transport *transport) {
206 return transport->vtable->get_endpoint(exec_ctx, transport);
207}
208
Mark D. Roth57940612017-07-26 14:29:52 -0700209// This comment should be sung to the tune of
210// "Supercalifragilisticexpialidocious":
211//
Vijay Paiff17b052017-06-27 11:05:29 -0700212// grpc_transport_stream_op_batch_finish_with_failure
213// is a function that must always unref cancel_error
214// though it lives in lib, it handles transport stream ops sure
215// it's grpc_transport_stream_op_batch_finish_with_failure
Craig Tillere1b51da2017-03-31 15:44:33 -0700216void grpc_transport_stream_op_batch_finish_with_failure(
Mark D. Roth57940612017-07-26 14:29:52 -0700217 grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch,
Mark D. Roth764cf042017-09-01 09:00:06 -0700218 grpc_error *error, grpc_call_combiner *call_combiner) {
Mark D. Roth57940612017-07-26 14:29:52 -0700219 if (batch->send_message) {
220 grpc_byte_stream_destroy(exec_ctx,
221 batch->payload->send_message.send_message);
222 }
223 if (batch->recv_message) {
Mark D. Roth764cf042017-09-01 09:00:06 -0700224 GRPC_CALL_COMBINER_START(exec_ctx, call_combiner,
225 batch->payload->recv_message.recv_message_ready,
226 GRPC_ERROR_REF(error),
227 "failing recv_message_ready");
Craig Tillerea54b8c2017-03-01 16:58:28 -0800228 }
Mark D. Roth57940612017-07-26 14:29:52 -0700229 if (batch->recv_initial_metadata) {
Mark D. Roth764cf042017-09-01 09:00:06 -0700230 GRPC_CALL_COMBINER_START(
231 exec_ctx, call_combiner,
Mark D. Roth57940612017-07-26 14:29:52 -0700232 batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
Mark D. Roth764cf042017-09-01 09:00:06 -0700233 GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
Craig Tillerea54b8c2017-03-01 16:58:28 -0800234 }
Mark D. Roth57940612017-07-26 14:29:52 -0700235 GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error);
236 if (batch->cancel_stream) {
237 GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
Craig Tillerea54b8c2017-03-01 16:58:28 -0800238 }
Craig Tiller7e320ba2015-04-23 16:28:07 -0700239}
Craig Tiller2ea37fd2015-04-24 13:03:49 -0700240
Craig Tillera82950e2015-09-22 12:33:20 -0700241typedef struct {
Craig Tillere0221ff2016-07-11 15:56:08 -0700242 grpc_closure outer_on_complete;
243 grpc_closure *inner_on_complete;
244 grpc_transport_op op;
245} made_transport_op;
246
247static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg,
248 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700249 made_transport_op *op = (made_transport_op *)arg;
ncteisen969b46e2017-06-08 14:57:11 -0700250 GRPC_CLOSURE_SCHED(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error));
Craig Tillere0221ff2016-07-11 15:56:08 -0700251 gpr_free(op);
252}
253
254grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700255 made_transport_op *op = (made_transport_op *)gpr_malloc(sizeof(*op));
ncteisen969b46e2017-06-08 14:57:11 -0700256 GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op,
Craig Tiller91031da2016-12-28 15:44:25 -0800257 grpc_schedule_on_exec_ctx);
Craig Tillere0221ff2016-07-11 15:56:08 -0700258 op->inner_on_complete = on_complete;
259 memset(&op->op, 0, sizeof(op->op));
Craig Tillered334f02016-07-12 09:15:07 -0700260 op->op.on_consumed = &op->outer_on_complete;
Craig Tillere0221ff2016-07-11 15:56:08 -0700261 return &op->op;
262}
Craig Tillerdfd3a8f2016-08-24 09:43:45 -0700263
264typedef struct {
265 grpc_closure outer_on_complete;
266 grpc_closure *inner_on_complete;
Craig Tillera0f3abd2017-03-31 15:42:16 -0700267 grpc_transport_stream_op_batch op;
268 grpc_transport_stream_op_batch_payload payload;
Craig Tillerdfd3a8f2016-08-24 09:43:45 -0700269} made_transport_stream_op;
270
271static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg,
272 grpc_error *error) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700273 made_transport_stream_op *op = (made_transport_stream_op *)arg;
Craig Tillerc5b90df2017-03-10 16:11:08 -0800274 grpc_closure *c = op->inner_on_complete;
Craig Tillerdfd3a8f2016-08-24 09:43:45 -0700275 gpr_free(op);
ncteisen969b46e2017-06-08 14:57:11 -0700276 GRPC_CLOSURE_RUN(exec_ctx, c, GRPC_ERROR_REF(error));
Craig Tillerdfd3a8f2016-08-24 09:43:45 -0700277}
278
Craig Tillera0f3abd2017-03-31 15:42:16 -0700279grpc_transport_stream_op_batch *grpc_make_transport_stream_op(
Craig Tillerdfd3a8f2016-08-24 09:43:45 -0700280 grpc_closure *on_complete) {
Yash Tibrewalca3c1c02017-09-07 22:47:16 -0700281 made_transport_stream_op *op =
282 (made_transport_stream_op *)gpr_zalloc(sizeof(*op));
Craig Tillerea54b8c2017-03-01 16:58:28 -0800283 op->op.payload = &op->payload;
ncteisen969b46e2017-06-08 14:57:11 -0700284 GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op,
Craig Tiller91031da2016-12-28 15:44:25 -0800285 op, grpc_schedule_on_exec_ctx);
Craig Tillerdfd3a8f2016-08-24 09:43:45 -0700286 op->inner_on_complete = on_complete;
Craig Tillerdfd3a8f2016-08-24 09:43:45 -0700287 op->op.on_complete = &op->outer_on_complete;
288 return &op->op;
289}