Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1 | /* |
| 2 | * |
| 3 | * Copyright 2017 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | #include "src/core/ext/transport/inproc/inproc_transport.h" |
| 20 | #include <grpc/support/alloc.h> |
| 21 | #include <grpc/support/string_util.h> |
| 22 | #include <grpc/support/sync.h> |
| 23 | #include <grpc/support/time.h> |
| 24 | #include <string.h> |
| 25 | #include "src/core/lib/channel/channel_args.h" |
| 26 | #include "src/core/lib/slice/slice_internal.h" |
| 27 | #include "src/core/lib/surface/api_trace.h" |
| 28 | #include "src/core/lib/surface/channel.h" |
| 29 | #include "src/core/lib/surface/channel_stack_type.h" |
| 30 | #include "src/core/lib/surface/server.h" |
| 31 | #include "src/core/lib/transport/connectivity_state.h" |
| 32 | #include "src/core/lib/transport/error_utils.h" |
| 33 | #include "src/core/lib/transport/transport_impl.h" |
| 34 | |
| 35 | #define INPROC_LOG(...) \ |
| 36 | do { \ |
| 37 | if (GRPC_TRACER_ON(grpc_inproc_trace)) gpr_log(__VA_ARGS__); \ |
| 38 | } while (0) |
| 39 | |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 40 | static grpc_slice g_empty_slice; |
| 41 | static grpc_slice g_fake_path_key; |
| 42 | static grpc_slice g_fake_path_value; |
| 43 | static grpc_slice g_fake_auth_key; |
| 44 | static grpc_slice g_fake_auth_value; |
| 45 | |
| 46 | typedef struct { |
| 47 | gpr_mu mu; |
| 48 | gpr_refcount refs; |
| 49 | } shared_mu; |
| 50 | |
| 51 | typedef struct inproc_transport { |
| 52 | grpc_transport base; |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 53 | shared_mu* mu; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 54 | gpr_refcount refs; |
| 55 | bool is_client; |
| 56 | grpc_connectivity_state_tracker connectivity; |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 57 | void (*accept_stream_cb)(grpc_exec_ctx* exec_ctx, void* user_data, |
| 58 | grpc_transport* transport, const void* server_data); |
| 59 | void* accept_stream_data; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 60 | bool is_closed; |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 61 | struct inproc_transport* other_side; |
| 62 | struct inproc_stream* stream_list; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 63 | } inproc_transport; |
| 64 | |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 65 | typedef struct inproc_stream { |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 66 | inproc_transport* t; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 67 | grpc_metadata_batch to_read_initial_md; |
| 68 | uint32_t to_read_initial_md_flags; |
| 69 | bool to_read_initial_md_filled; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 70 | grpc_metadata_batch to_read_trailing_md; |
| 71 | bool to_read_trailing_md_filled; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 72 | bool ops_needed; |
| 73 | bool op_closure_scheduled; |
| 74 | grpc_closure op_closure; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 75 | // Write buffer used only during gap at init time when client-side |
| 76 | // stream is set up but server side stream is not yet set up |
| 77 | grpc_metadata_batch write_buffer_initial_md; |
| 78 | bool write_buffer_initial_md_filled; |
| 79 | uint32_t write_buffer_initial_md_flags; |
Craig Tiller | 89c1428 | 2017-07-19 15:32:27 -0700 | [diff] [blame] | 80 | grpc_millis write_buffer_deadline; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 81 | grpc_metadata_batch write_buffer_trailing_md; |
| 82 | bool write_buffer_trailing_md_filled; |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 83 | grpc_error* write_buffer_cancel_error; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 84 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 85 | struct inproc_stream* other_side; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 86 | bool other_side_closed; // won't talk anymore |
| 87 | bool write_buffer_other_side_closed; // on hold |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 88 | grpc_stream_refcount* refs; |
| 89 | grpc_closure* closure_at_destroy; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 90 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 91 | gpr_arena* arena; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 92 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 93 | grpc_transport_stream_op_batch* send_message_op; |
| 94 | grpc_transport_stream_op_batch* send_trailing_md_op; |
| 95 | grpc_transport_stream_op_batch* recv_initial_md_op; |
| 96 | grpc_transport_stream_op_batch* recv_message_op; |
| 97 | grpc_transport_stream_op_batch* recv_trailing_md_op; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 98 | |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 99 | grpc_slice_buffer recv_message; |
| 100 | grpc_slice_buffer_stream recv_stream; |
| 101 | bool recv_inited; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 102 | |
| 103 | bool initial_md_sent; |
| 104 | bool trailing_md_sent; |
| 105 | bool initial_md_recvd; |
| 106 | bool trailing_md_recvd; |
| 107 | |
| 108 | bool closed; |
| 109 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 110 | grpc_error* cancel_self_error; |
| 111 | grpc_error* cancel_other_error; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 112 | |
Craig Tiller | 89c1428 | 2017-07-19 15:32:27 -0700 | [diff] [blame] | 113 | grpc_millis deadline; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 114 | |
| 115 | bool listed; |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 116 | struct inproc_stream* stream_list_prev; |
| 117 | struct inproc_stream* stream_list_next; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 118 | } inproc_stream; |
| 119 | |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 120 | static grpc_closure do_nothing_closure; |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 121 | static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, |
| 122 | grpc_error* error); |
| 123 | static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, |
| 124 | grpc_error* error); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 125 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 126 | static void ref_transport(inproc_transport* t) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 127 | INPROC_LOG(GPR_DEBUG, "ref_transport %p", t); |
| 128 | gpr_ref(&t->refs); |
| 129 | } |
| 130 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 131 | static void really_destroy_transport(grpc_exec_ctx* exec_ctx, |
| 132 | inproc_transport* t) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 133 | INPROC_LOG(GPR_DEBUG, "really_destroy_transport %p", t); |
| 134 | grpc_connectivity_state_destroy(exec_ctx, &t->connectivity); |
| 135 | if (gpr_unref(&t->mu->refs)) { |
| 136 | gpr_free(t->mu); |
| 137 | } |
| 138 | gpr_free(t); |
| 139 | } |
| 140 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 141 | static void unref_transport(grpc_exec_ctx* exec_ctx, inproc_transport* t) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 142 | INPROC_LOG(GPR_DEBUG, "unref_transport %p", t); |
| 143 | if (gpr_unref(&t->refs)) { |
| 144 | really_destroy_transport(exec_ctx, t); |
| 145 | } |
| 146 | } |
| 147 | |
| 148 | #ifndef NDEBUG |
| 149 | #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason) |
| 150 | #define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs, reason) |
| 151 | #else |
| 152 | #define STREAM_REF(refs, reason) grpc_stream_ref(refs) |
| 153 | #define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs) |
| 154 | #endif |
| 155 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 156 | static void ref_stream(inproc_stream* s, const char* reason) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 157 | INPROC_LOG(GPR_DEBUG, "ref_stream %p %s", s, reason); |
| 158 | STREAM_REF(s->refs, reason); |
| 159 | } |
| 160 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 161 | static void unref_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s, |
| 162 | const char* reason) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 163 | INPROC_LOG(GPR_DEBUG, "unref_stream %p %s", s, reason); |
| 164 | STREAM_UNREF(exec_ctx, s->refs, reason); |
| 165 | } |
| 166 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 167 | static void really_destroy_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 168 | INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s); |
| 169 | |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 170 | GRPC_ERROR_UNREF(s->write_buffer_cancel_error); |
| 171 | GRPC_ERROR_UNREF(s->cancel_self_error); |
| 172 | GRPC_ERROR_UNREF(s->cancel_other_error); |
| 173 | |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 174 | if (s->recv_inited) { |
| 175 | grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message); |
| 176 | } |
| 177 | |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 178 | unref_transport(exec_ctx, s->t); |
| 179 | |
| 180 | if (s->closure_at_destroy) { |
| 181 | GRPC_CLOSURE_SCHED(exec_ctx, s->closure_at_destroy, GRPC_ERROR_NONE); |
| 182 | } |
| 183 | } |
| 184 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 185 | static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 186 | bool is_initial) { |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 187 | for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 188 | md = md->next) { |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 189 | char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md)); |
| 190 | char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md)); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 191 | gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL", |
| 192 | is_client ? "CLI" : "SVR", key, value); |
| 193 | gpr_free(key); |
| 194 | gpr_free(value); |
| 195 | } |
| 196 | } |
| 197 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 198 | static grpc_error* fill_in_metadata(grpc_exec_ctx* exec_ctx, inproc_stream* s, |
| 199 | const grpc_metadata_batch* metadata, |
| 200 | uint32_t flags, grpc_metadata_batch* out_md, |
| 201 | uint32_t* outflags, bool* markfilled) { |
Vijay Pai | 10519a3 | 2017-07-15 21:22:18 +0000 | [diff] [blame] | 202 | if (GRPC_TRACER_ON(grpc_inproc_trace)) { |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 203 | log_metadata(metadata, s->t->is_client, outflags != nullptr); |
Vijay Pai | 10519a3 | 2017-07-15 21:22:18 +0000 | [diff] [blame] | 204 | } |
| 205 | |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 206 | if (outflags != nullptr) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 207 | *outflags = flags; |
| 208 | } |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 209 | if (markfilled != nullptr) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 210 | *markfilled = true; |
| 211 | } |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 212 | grpc_error* error = GRPC_ERROR_NONE; |
| 213 | for (grpc_linked_mdelem* elem = metadata->list.head; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 214 | (elem != nullptr) && (error == GRPC_ERROR_NONE); elem = elem->next) { |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 215 | grpc_linked_mdelem* nelem = |
| 216 | (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*nelem)); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 217 | nelem->md = grpc_mdelem_from_slices( |
| 218 | exec_ctx, grpc_slice_intern(GRPC_MDKEY(elem->md)), |
| 219 | grpc_slice_intern(GRPC_MDVALUE(elem->md))); |
| 220 | |
| 221 | error = grpc_metadata_batch_link_tail(exec_ctx, out_md, nelem); |
| 222 | } |
| 223 | return error; |
| 224 | } |
| 225 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 226 | static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, |
| 227 | grpc_stream* gs, grpc_stream_refcount* refcount, |
| 228 | const void* server_data, gpr_arena* arena) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 229 | INPROC_LOG(GPR_DEBUG, "init_stream %p %p %p", gt, gs, server_data); |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 230 | inproc_transport* t = (inproc_transport*)gt; |
| 231 | inproc_stream* s = (inproc_stream*)gs; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 232 | s->arena = arena; |
| 233 | |
| 234 | s->refs = refcount; |
| 235 | // Ref this stream right now |
| 236 | ref_stream(s, "inproc_init_stream:init"); |
| 237 | |
| 238 | grpc_metadata_batch_init(&s->to_read_initial_md); |
| 239 | s->to_read_initial_md_flags = 0; |
| 240 | s->to_read_initial_md_filled = false; |
| 241 | grpc_metadata_batch_init(&s->to_read_trailing_md); |
| 242 | s->to_read_trailing_md_filled = false; |
| 243 | grpc_metadata_batch_init(&s->write_buffer_initial_md); |
| 244 | s->write_buffer_initial_md_flags = 0; |
| 245 | s->write_buffer_initial_md_filled = false; |
| 246 | grpc_metadata_batch_init(&s->write_buffer_trailing_md); |
| 247 | s->write_buffer_trailing_md_filled = false; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 248 | s->ops_needed = false; |
| 249 | s->op_closure_scheduled = false; |
| 250 | GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s, |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 251 | grpc_schedule_on_exec_ctx); |
| 252 | s->t = t; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 253 | s->closure_at_destroy = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 254 | s->other_side_closed = false; |
| 255 | |
| 256 | s->initial_md_sent = s->trailing_md_sent = s->initial_md_recvd = |
| 257 | s->trailing_md_recvd = false; |
| 258 | |
| 259 | s->closed = false; |
| 260 | |
| 261 | s->cancel_self_error = GRPC_ERROR_NONE; |
| 262 | s->cancel_other_error = GRPC_ERROR_NONE; |
| 263 | s->write_buffer_cancel_error = GRPC_ERROR_NONE; |
Craig Tiller | 89c1428 | 2017-07-19 15:32:27 -0700 | [diff] [blame] | 264 | s->deadline = GRPC_MILLIS_INF_FUTURE; |
| 265 | s->write_buffer_deadline = GRPC_MILLIS_INF_FUTURE; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 266 | |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 267 | s->stream_list_prev = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 268 | gpr_mu_lock(&t->mu->mu); |
| 269 | s->listed = true; |
| 270 | ref_stream(s, "inproc_init_stream:list"); |
| 271 | s->stream_list_next = t->stream_list; |
| 272 | if (t->stream_list) { |
| 273 | t->stream_list->stream_list_prev = s; |
| 274 | } |
| 275 | t->stream_list = s; |
| 276 | gpr_mu_unlock(&t->mu->mu); |
| 277 | |
| 278 | if (!server_data) { |
| 279 | ref_transport(t); |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 280 | inproc_transport* st = t->other_side; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 281 | ref_transport(st); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 282 | s->other_side = nullptr; // will get filled in soon |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 283 | // Pass the client-side stream address to the server-side for a ref |
| 284 | ref_stream(s, "inproc_init_stream:clt"); // ref it now on behalf of server |
| 285 | // side to avoid destruction |
| 286 | INPROC_LOG(GPR_DEBUG, "calling accept stream cb %p %p", |
| 287 | st->accept_stream_cb, st->accept_stream_data); |
| 288 | (*st->accept_stream_cb)(exec_ctx, st->accept_stream_data, &st->base, |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 289 | (void*)s); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 290 | } else { |
| 291 | // This is the server-side and is being called through accept_stream_cb |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 292 | inproc_stream* cs = (inproc_stream*)server_data; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 293 | s->other_side = cs; |
| 294 | // Ref the server-side stream on behalf of the client now |
| 295 | ref_stream(s, "inproc_init_stream:srv"); |
| 296 | |
| 297 | // Now we are about to affect the other side, so lock the transport |
| 298 | // to make sure that it doesn't get destroyed |
| 299 | gpr_mu_lock(&s->t->mu->mu); |
| 300 | cs->other_side = s; |
| 301 | // Now transfer from the other side's write_buffer if any to the to_read |
| 302 | // buffer |
| 303 | if (cs->write_buffer_initial_md_filled) { |
| 304 | fill_in_metadata(exec_ctx, s, &cs->write_buffer_initial_md, |
| 305 | cs->write_buffer_initial_md_flags, |
| 306 | &s->to_read_initial_md, &s->to_read_initial_md_flags, |
| 307 | &s->to_read_initial_md_filled); |
Craig Tiller | 89c1428 | 2017-07-19 15:32:27 -0700 | [diff] [blame] | 308 | s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 309 | grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md); |
| 310 | cs->write_buffer_initial_md_filled = false; |
| 311 | } |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 312 | if (cs->write_buffer_trailing_md_filled) { |
| 313 | fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0, |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 314 | &s->to_read_trailing_md, nullptr, |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 315 | &s->to_read_trailing_md_filled); |
| 316 | grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_trailing_md); |
| 317 | cs->write_buffer_trailing_md_filled = false; |
| 318 | } |
| 319 | if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { |
| 320 | s->cancel_other_error = cs->write_buffer_cancel_error; |
| 321 | cs->write_buffer_cancel_error = GRPC_ERROR_NONE; |
| 322 | } |
| 323 | |
| 324 | gpr_mu_unlock(&s->t->mu->mu); |
| 325 | } |
| 326 | return 0; // return value is not important |
| 327 | } |
| 328 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 329 | static void close_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 330 | if (!s->closed) { |
Vijay Pai | 10519a3 | 2017-07-15 21:22:18 +0000 | [diff] [blame] | 331 | // Release the metadata that we would have written out |
| 332 | grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_initial_md); |
| 333 | grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_trailing_md); |
| 334 | |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 335 | if (s->listed) { |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 336 | inproc_stream* p = s->stream_list_prev; |
| 337 | inproc_stream* n = s->stream_list_next; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 338 | if (p != nullptr) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 339 | p->stream_list_next = n; |
| 340 | } else { |
| 341 | s->t->stream_list = n; |
| 342 | } |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 343 | if (n != nullptr) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 344 | n->stream_list_prev = p; |
| 345 | } |
| 346 | s->listed = false; |
| 347 | unref_stream(exec_ctx, s, "close_stream:list"); |
| 348 | } |
| 349 | s->closed = true; |
| 350 | unref_stream(exec_ctx, s, "close_stream:closing"); |
| 351 | } |
| 352 | } |
| 353 | |
| 354 | // This function means that we are done talking/listening to the other side |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 355 | static void close_other_side_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, |
| 356 | const char* reason) { |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 357 | if (s->other_side != nullptr) { |
Vijay Pai | 10519a3 | 2017-07-15 21:22:18 +0000 | [diff] [blame] | 358 | // First release the metadata that came from the other side's arena |
| 359 | grpc_metadata_batch_destroy(exec_ctx, &s->to_read_initial_md); |
| 360 | grpc_metadata_batch_destroy(exec_ctx, &s->to_read_trailing_md); |
| 361 | |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 362 | unref_stream(exec_ctx, s->other_side, reason); |
| 363 | s->other_side_closed = true; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 364 | s->other_side = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 365 | } else if (!s->other_side_closed) { |
| 366 | s->write_buffer_other_side_closed = true; |
| 367 | } |
| 368 | } |
| 369 | |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 370 | // Call the on_complete closure associated with this stream_op_batch if |
| 371 | // this stream_op_batch is only one of the pending operations for this |
| 372 | // stream. This is called when one of the pending operations for the stream |
| 373 | // is done and about to be NULLed out |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 374 | static void complete_if_batch_end_locked(grpc_exec_ctx* exec_ctx, |
| 375 | inproc_stream* s, grpc_error* error, |
| 376 | grpc_transport_stream_op_batch* op, |
| 377 | const char* msg) { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 378 | int is_sm = (int)(op == s->send_message_op); |
| 379 | int is_stm = (int)(op == s->send_trailing_md_op); |
| 380 | int is_rim = (int)(op == s->recv_initial_md_op); |
| 381 | int is_rm = (int)(op == s->recv_message_op); |
| 382 | int is_rtm = (int)(op == s->recv_trailing_md_op); |
| 383 | |
| 384 | if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { |
| 385 | INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error); |
| 386 | GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error)); |
| 387 | } |
| 388 | } |
| 389 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 390 | static void maybe_schedule_op_closure_locked(grpc_exec_ctx* exec_ctx, |
| 391 | inproc_stream* s, |
| 392 | grpc_error* error) { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 393 | if (s && s->ops_needed && !s->op_closure_scheduled) { |
| 394 | GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error)); |
| 395 | s->op_closure_scheduled = true; |
| 396 | s->ops_needed = false; |
| 397 | } |
| 398 | } |
| 399 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 400 | static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, |
| 401 | grpc_error* error) { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 402 | INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 403 | // If we're failing this side, we need to make sure that |
| 404 | // we also send or have already sent trailing metadata |
| 405 | if (!s->trailing_md_sent) { |
| 406 | // Send trailing md to the other side indicating cancellation |
| 407 | s->trailing_md_sent = true; |
| 408 | |
| 409 | grpc_metadata_batch fake_md; |
| 410 | grpc_metadata_batch_init(&fake_md); |
| 411 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 412 | inproc_stream* other = s->other_side; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 413 | grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_trailing_md |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 414 | : &other->to_read_trailing_md; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 415 | bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 416 | : &other->to_read_trailing_md_filled; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 417 | fill_in_metadata(exec_ctx, s, &fake_md, 0, dest, nullptr, destfilled); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 418 | grpc_metadata_batch_destroy(exec_ctx, &fake_md); |
| 419 | |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 420 | if (other != nullptr) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 421 | if (other->cancel_other_error == GRPC_ERROR_NONE) { |
| 422 | other->cancel_other_error = GRPC_ERROR_REF(error); |
| 423 | } |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 424 | maybe_schedule_op_closure_locked(exec_ctx, other, error); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 425 | } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { |
| 426 | s->write_buffer_cancel_error = GRPC_ERROR_REF(error); |
| 427 | } |
| 428 | } |
| 429 | if (s->recv_initial_md_op) { |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 430 | grpc_error* err; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 431 | if (!s->t->is_client) { |
| 432 | // If this is a server, provide initial metadata with a path and authority |
| 433 | // since it expects that as well as no error yet |
| 434 | grpc_metadata_batch fake_md; |
| 435 | grpc_metadata_batch_init(&fake_md); |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 436 | grpc_linked_mdelem* path_md = |
| 437 | (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*path_md)); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 438 | path_md->md = |
| 439 | grpc_mdelem_from_slices(exec_ctx, g_fake_path_key, g_fake_path_value); |
| 440 | GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, path_md) == |
| 441 | GRPC_ERROR_NONE); |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 442 | grpc_linked_mdelem* auth_md = |
| 443 | (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*auth_md)); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 444 | auth_md->md = |
| 445 | grpc_mdelem_from_slices(exec_ctx, g_fake_auth_key, g_fake_auth_value); |
| 446 | GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, auth_md) == |
| 447 | GRPC_ERROR_NONE); |
| 448 | |
| 449 | fill_in_metadata( |
| 450 | exec_ctx, s, &fake_md, 0, |
| 451 | s->recv_initial_md_op->payload->recv_initial_metadata |
| 452 | .recv_initial_metadata, |
| 453 | s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 454 | nullptr); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 455 | grpc_metadata_batch_destroy(exec_ctx, &fake_md); |
| 456 | err = GRPC_ERROR_NONE; |
| 457 | } else { |
| 458 | err = GRPC_ERROR_REF(error); |
| 459 | } |
| 460 | INPROC_LOG(GPR_DEBUG, |
| 461 | "fail_helper %p scheduling initial-metadata-ready %p %p", s, |
| 462 | error, err); |
| 463 | GRPC_CLOSURE_SCHED(exec_ctx, |
| 464 | s->recv_initial_md_op->payload->recv_initial_metadata |
| 465 | .recv_initial_metadata_ready, |
| 466 | err); |
| 467 | // Last use of err so no need to REF and then UNREF it |
| 468 | |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 469 | complete_if_batch_end_locked( |
| 470 | exec_ctx, s, error, s->recv_initial_md_op, |
| 471 | "fail_helper scheduling recv-initial-metadata-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 472 | s->recv_initial_md_op = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 473 | } |
| 474 | if (s->recv_message_op) { |
| 475 | INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-ready %p", s, |
| 476 | error); |
| 477 | GRPC_CLOSURE_SCHED( |
| 478 | exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, |
| 479 | GRPC_ERROR_REF(error)); |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 480 | complete_if_batch_end_locked( |
| 481 | exec_ctx, s, error, s->recv_message_op, |
| 482 | "fail_helper scheduling recv-message-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 483 | s->recv_message_op = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 484 | } |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 485 | if (s->send_message_op) { |
| 486 | complete_if_batch_end_locked( |
| 487 | exec_ctx, s, error, s->send_message_op, |
| 488 | "fail_helper scheduling send-message-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 489 | s->send_message_op = nullptr; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 490 | } |
| 491 | if (s->send_trailing_md_op) { |
| 492 | complete_if_batch_end_locked( |
| 493 | exec_ctx, s, error, s->send_trailing_md_op, |
| 494 | "fail_helper scheduling send-trailng-md-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 495 | s->send_trailing_md_op = nullptr; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 496 | } |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 497 | if (s->recv_trailing_md_op) { |
| 498 | INPROC_LOG(GPR_DEBUG, |
| 499 | "fail_helper %p scheduling trailing-md-on-complete %p", s, |
| 500 | error); |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 501 | complete_if_batch_end_locked( |
| 502 | exec_ctx, s, error, s->recv_trailing_md_op, |
| 503 | "fail_helper scheduling recv-trailing-metadata-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 504 | s->recv_trailing_md_op = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 505 | } |
| 506 | close_other_side_locked(exec_ctx, s, "fail_helper:other_side"); |
| 507 | close_stream_locked(exec_ctx, s); |
| 508 | |
| 509 | GRPC_ERROR_UNREF(error); |
| 510 | } |
| 511 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 512 | static void message_transfer_locked(grpc_exec_ctx* exec_ctx, |
| 513 | inproc_stream* sender, |
| 514 | inproc_stream* receiver) { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 515 | size_t remaining = |
| 516 | sender->send_message_op->payload->send_message.send_message->length; |
| 517 | if (receiver->recv_inited) { |
| 518 | grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message); |
| 519 | } |
| 520 | grpc_slice_buffer_init(&receiver->recv_message); |
| 521 | receiver->recv_inited = true; |
| 522 | do { |
| 523 | grpc_slice message_slice; |
| 524 | grpc_closure unused; |
| 525 | GPR_ASSERT(grpc_byte_stream_next( |
| 526 | exec_ctx, sender->send_message_op->payload->send_message.send_message, |
| 527 | SIZE_MAX, &unused)); |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 528 | grpc_error* error = grpc_byte_stream_pull( |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 529 | exec_ctx, sender->send_message_op->payload->send_message.send_message, |
| 530 | &message_slice); |
| 531 | if (error != GRPC_ERROR_NONE) { |
| 532 | cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error)); |
| 533 | break; |
| 534 | } |
| 535 | GPR_ASSERT(error == GRPC_ERROR_NONE); |
| 536 | remaining -= GRPC_SLICE_LENGTH(message_slice); |
| 537 | grpc_slice_buffer_add(&receiver->recv_message, message_slice); |
| 538 | } while (remaining > 0); |
| 539 | |
| 540 | grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message, |
| 541 | 0); |
| 542 | *receiver->recv_message_op->payload->recv_message.recv_message = |
| 543 | &receiver->recv_stream.base; |
| 544 | INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready", |
| 545 | receiver); |
| 546 | GRPC_CLOSURE_SCHED( |
| 547 | exec_ctx, |
| 548 | receiver->recv_message_op->payload->recv_message.recv_message_ready, |
| 549 | GRPC_ERROR_NONE); |
| 550 | complete_if_batch_end_locked( |
| 551 | exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op, |
| 552 | "message_transfer scheduling sender on_complete"); |
| 553 | complete_if_batch_end_locked( |
| 554 | exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op, |
| 555 | "message_transfer scheduling receiver on_complete"); |
| 556 | |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 557 | receiver->recv_message_op = nullptr; |
| 558 | sender->send_message_op = nullptr; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 559 | } |
| 560 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 561 | static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, |
| 562 | grpc_error* error) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 563 | // This function gets called when we have contents in the unprocessed reads |
| 564 | // Get what we want based on our ops wanted |
| 565 | // Schedule our appropriate closures |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 566 | // and then return to ops_needed state if still needed |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 567 | |
| 568 | // Since this is a closure directly invoked by the combiner, it should not |
| 569 | // unref the error parameter explicitly; the combiner will do that implicitly |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 570 | grpc_error* new_err = GRPC_ERROR_NONE; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 571 | |
| 572 | bool needs_close = false; |
| 573 | |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 574 | INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg); |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 575 | inproc_stream* s = (inproc_stream*)arg; |
| 576 | gpr_mu* mu = &s->t->mu->mu; // keep aside in case s gets closed |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 577 | gpr_mu_lock(mu); |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 578 | s->op_closure_scheduled = false; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 579 | // cancellation takes precedence |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 580 | inproc_stream* other = s->other_side; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 581 | |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 582 | if (s->cancel_self_error != GRPC_ERROR_NONE) { |
| 583 | fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error)); |
| 584 | goto done; |
| 585 | } else if (s->cancel_other_error != GRPC_ERROR_NONE) { |
| 586 | fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_other_error)); |
| 587 | goto done; |
| 588 | } else if (error != GRPC_ERROR_NONE) { |
| 589 | fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(error)); |
| 590 | goto done; |
| 591 | } |
| 592 | |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 593 | if (s->send_message_op && other) { |
| 594 | if (other->recv_message_op) { |
| 595 | message_transfer_locked(exec_ctx, s, other); |
| 596 | maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); |
| 597 | } else if (!s->t->is_client && |
| 598 | (s->trailing_md_sent || other->recv_trailing_md_op)) { |
| 599 | // A server send will never be matched if the client is waiting |
| 600 | // for trailing metadata already |
| 601 | complete_if_batch_end_locked( |
| 602 | exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op, |
| 603 | "op_state_machine scheduling send-message-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 604 | s->send_message_op = nullptr; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 605 | } |
| 606 | } |
| 607 | // Pause a send trailing metadata if there is still an outstanding |
| 608 | // send message unless we know that the send message will never get |
| 609 | // matched to a receive. This happens on the client if the server has |
| 610 | // already sent status. |
| 611 | if (s->send_trailing_md_op && |
| 612 | (!s->send_message_op || |
| 613 | (s->t->is_client && |
| 614 | (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) { |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 615 | grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_trailing_md |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 616 | : &other->to_read_trailing_md; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 617 | bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 618 | : &other->to_read_trailing_md_filled; |
| 619 | if (*destfilled || s->trailing_md_sent) { |
| 620 | // The buffer is already in use; that's an error! |
| 621 | INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); |
| 622 | new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 623 | fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); |
| 624 | goto done; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 625 | } else { |
Vijay Pai | a78be30 | 2017-10-19 11:59:12 -0700 | [diff] [blame] | 626 | if (!other || !other->closed) { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 627 | fill_in_metadata(exec_ctx, s, |
| 628 | s->send_trailing_md_op->payload->send_trailing_metadata |
| 629 | .send_trailing_metadata, |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 630 | 0, dest, nullptr, destfilled); |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 631 | } |
| 632 | s->trailing_md_sent = true; |
| 633 | if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { |
| 634 | INPROC_LOG(GPR_DEBUG, |
| 635 | "op_state_machine %p scheduling trailing-md-on-complete", s); |
| 636 | GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, |
| 637 | GRPC_ERROR_NONE); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 638 | s->recv_trailing_md_op = nullptr; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 639 | needs_close = true; |
| 640 | } |
| 641 | } |
| 642 | maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); |
| 643 | complete_if_batch_end_locked( |
| 644 | exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op, |
| 645 | "op_state_machine scheduling send-trailing-metadata-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 646 | s->send_trailing_md_op = nullptr; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 647 | } |
| 648 | if (s->recv_initial_md_op) { |
| 649 | if (s->initial_md_recvd) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 650 | new_err = |
| 651 | GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md"); |
| 652 | INPROC_LOG( |
| 653 | GPR_DEBUG, |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 654 | "op_state_machine %p scheduling on_complete errors for already " |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 655 | "recvd initial md %p", |
| 656 | s, new_err); |
| 657 | fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); |
| 658 | goto done; |
| 659 | } |
| 660 | |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 661 | if (s->to_read_initial_md_filled) { |
| 662 | s->initial_md_recvd = true; |
| 663 | new_err = fill_in_metadata( |
| 664 | exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, |
| 665 | s->recv_initial_md_op->payload->recv_initial_metadata |
| 666 | .recv_initial_metadata, |
| 667 | s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 668 | nullptr); |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 669 | s->recv_initial_md_op->payload->recv_initial_metadata |
| 670 | .recv_initial_metadata->deadline = s->deadline; |
| 671 | grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); |
| 672 | s->to_read_initial_md_filled = false; |
| 673 | INPROC_LOG(GPR_DEBUG, |
| 674 | "op_state_machine %p scheduling initial-metadata-ready %p", s, |
| 675 | new_err); |
| 676 | GRPC_CLOSURE_SCHED(exec_ctx, |
| 677 | s->recv_initial_md_op->payload->recv_initial_metadata |
| 678 | .recv_initial_metadata_ready, |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 679 | GRPC_ERROR_REF(new_err)); |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 680 | complete_if_batch_end_locked( |
| 681 | exec_ctx, s, new_err, s->recv_initial_md_op, |
| 682 | "op_state_machine scheduling recv-initial-metadata-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 683 | s->recv_initial_md_op = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 684 | |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 685 | if (new_err != GRPC_ERROR_NONE) { |
| 686 | INPROC_LOG(GPR_DEBUG, |
| 687 | "op_state_machine %p scheduling on_complete errors2 %p", s, |
| 688 | new_err); |
| 689 | fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); |
| 690 | goto done; |
| 691 | } |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 692 | } |
| 693 | } |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 694 | if (s->recv_message_op) { |
| 695 | if (other && other->send_message_op) { |
| 696 | message_transfer_locked(exec_ctx, other, s); |
| 697 | maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 698 | } |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 699 | } |
| 700 | if (s->recv_trailing_md_op && s->t->is_client && other && |
| 701 | other->send_message_op) { |
| 702 | maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 703 | } |
| 704 | if (s->to_read_trailing_md_filled) { |
| 705 | if (s->trailing_md_recvd) { |
| 706 | new_err = |
| 707 | GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md"); |
| 708 | INPROC_LOG( |
| 709 | GPR_DEBUG, |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 710 | "op_state_machine %p scheduling on_complete errors for already " |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 711 | "recvd trailing md %p", |
| 712 | s, new_err); |
| 713 | fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); |
| 714 | goto done; |
| 715 | } |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 716 | if (s->recv_message_op != nullptr) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 717 | // This message needs to be wrapped up because it will never be |
| 718 | // satisfied |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 719 | INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 720 | GRPC_CLOSURE_SCHED( |
| 721 | exec_ctx, |
| 722 | s->recv_message_op->payload->recv_message.recv_message_ready, |
| 723 | GRPC_ERROR_NONE); |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 724 | complete_if_batch_end_locked( |
| 725 | exec_ctx, s, new_err, s->recv_message_op, |
| 726 | "op_state_machine scheduling recv-message-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 727 | s->recv_message_op = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 728 | } |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 729 | if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) { |
| 730 | // Nothing further will try to receive from this stream, so finish off |
| 731 | // any outstanding send_message op |
| 732 | complete_if_batch_end_locked( |
| 733 | exec_ctx, s, new_err, s->send_message_op, |
| 734 | "op_state_machine scheduling send-message-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 735 | s->send_message_op = nullptr; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 736 | } |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 737 | if (s->recv_trailing_md_op != nullptr) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 738 | // We wanted trailing metadata and we got it |
| 739 | s->trailing_md_recvd = true; |
| 740 | new_err = |
| 741 | fill_in_metadata(exec_ctx, s, &s->to_read_trailing_md, 0, |
| 742 | s->recv_trailing_md_op->payload |
| 743 | ->recv_trailing_metadata.recv_trailing_metadata, |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 744 | nullptr, nullptr); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 745 | grpc_metadata_batch_clear(exec_ctx, &s->to_read_trailing_md); |
| 746 | s->to_read_trailing_md_filled = false; |
| 747 | |
| 748 | // We should schedule the recv_trailing_md_op completion if |
| 749 | // 1. this stream is the client-side |
| 750 | // 2. this stream is the server-side AND has already sent its trailing md |
| 751 | // (If the server hasn't already sent its trailing md, it doesn't have |
| 752 | // a final status, so don't mark this op complete) |
| 753 | if (s->t->is_client || s->trailing_md_sent) { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 754 | INPROC_LOG(GPR_DEBUG, |
| 755 | "op_state_machine %p scheduling trailing-md-on-complete %p", |
| 756 | s, new_err); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 757 | GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, |
| 758 | GRPC_ERROR_REF(new_err)); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 759 | s->recv_trailing_md_op = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 760 | needs_close = true; |
| 761 | } else { |
| 762 | INPROC_LOG(GPR_DEBUG, |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 763 | "op_state_machine %p server needs to delay handling " |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 764 | "trailing-md-on-complete %p", |
| 765 | s, new_err); |
| 766 | } |
| 767 | } else { |
| 768 | INPROC_LOG( |
| 769 | GPR_DEBUG, |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 770 | "op_state_machine %p has trailing md but not yet waiting for it", s); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 771 | } |
| 772 | } |
| 773 | if (s->trailing_md_recvd && s->recv_message_op) { |
| 774 | // No further message will come on this stream, so finish off the |
| 775 | // recv_message_op |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 776 | INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 777 | GRPC_CLOSURE_SCHED( |
| 778 | exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, |
| 779 | GRPC_ERROR_NONE); |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 780 | complete_if_batch_end_locked( |
| 781 | exec_ctx, s, new_err, s->recv_message_op, |
| 782 | "op_state_machine scheduling recv-message-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 783 | s->recv_message_op = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 784 | } |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 785 | if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) && |
| 786 | s->send_message_op) { |
| 787 | // Nothing further will try to receive from this stream, so finish off |
| 788 | // any outstanding send_message op |
| 789 | complete_if_batch_end_locked( |
| 790 | exec_ctx, s, new_err, s->send_message_op, |
| 791 | "op_state_machine scheduling send-message-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 792 | s->send_message_op = nullptr; |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 793 | } |
| 794 | if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op || |
| 795 | s->recv_message_op || s->recv_trailing_md_op) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 796 | // Didn't get the item we wanted so we still need to get |
| 797 | // rescheduled |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 798 | INPROC_LOG( |
| 799 | GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s, |
| 800 | s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op, |
| 801 | s->recv_message_op, s->recv_trailing_md_op); |
| 802 | s->ops_needed = true; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 803 | } |
| 804 | done: |
| 805 | if (needs_close) { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 806 | close_other_side_locked(exec_ctx, s, "op_state_machine"); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 807 | close_stream_locked(exec_ctx, s); |
| 808 | } |
| 809 | gpr_mu_unlock(mu); |
| 810 | GRPC_ERROR_UNREF(new_err); |
| 811 | } |
| 812 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 813 | static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, |
| 814 | grpc_error* error) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 815 | bool ret = false; // was the cancel accepted |
| 816 | INPROC_LOG(GPR_DEBUG, "cancel_stream %p with %s", s, |
| 817 | grpc_error_string(error)); |
| 818 | if (s->cancel_self_error == GRPC_ERROR_NONE) { |
| 819 | ret = true; |
| 820 | s->cancel_self_error = GRPC_ERROR_REF(error); |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 821 | maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 822 | // Send trailing md to the other side indicating cancellation, even if we |
| 823 | // already have |
| 824 | s->trailing_md_sent = true; |
| 825 | |
| 826 | grpc_metadata_batch cancel_md; |
| 827 | grpc_metadata_batch_init(&cancel_md); |
| 828 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 829 | inproc_stream* other = s->other_side; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 830 | grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_trailing_md |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 831 | : &other->to_read_trailing_md; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 832 | bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 833 | : &other->to_read_trailing_md_filled; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 834 | fill_in_metadata(exec_ctx, s, &cancel_md, 0, dest, nullptr, destfilled); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 835 | grpc_metadata_batch_destroy(exec_ctx, &cancel_md); |
| 836 | |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 837 | if (other != nullptr) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 838 | if (other->cancel_other_error == GRPC_ERROR_NONE) { |
| 839 | other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error); |
| 840 | } |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 841 | maybe_schedule_op_closure_locked(exec_ctx, other, |
| 842 | other->cancel_other_error); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 843 | } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { |
| 844 | s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error); |
| 845 | } |
| 846 | |
| 847 | // if we are a server and already received trailing md but |
| 848 | // couldn't complete that because we hadn't yet sent out trailing |
| 849 | // md, now's the chance |
| 850 | if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 851 | complete_if_batch_end_locked( |
| 852 | exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op, |
| 853 | "cancel_stream scheduling trailing-md-on-complete"); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 854 | s->recv_trailing_md_op = nullptr; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 855 | } |
| 856 | } |
| 857 | |
| 858 | close_other_side_locked(exec_ctx, s, "cancel_stream:other_side"); |
| 859 | close_stream_locked(exec_ctx, s); |
| 860 | |
| 861 | GRPC_ERROR_UNREF(error); |
| 862 | return ret; |
| 863 | } |
| 864 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 865 | static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, |
| 866 | grpc_stream* gs, |
| 867 | grpc_transport_stream_op_batch* op) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 868 | INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %p %p", gt, gs, op); |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 869 | inproc_stream* s = (inproc_stream*)gs; |
| 870 | gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 871 | gpr_mu_lock(mu); |
| 872 | |
| 873 | if (GRPC_TRACER_ON(grpc_inproc_trace)) { |
| 874 | if (op->send_initial_metadata) { |
| 875 | log_metadata(op->payload->send_initial_metadata.send_initial_metadata, |
| 876 | s->t->is_client, true); |
| 877 | } |
| 878 | if (op->send_trailing_metadata) { |
| 879 | log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata, |
| 880 | s->t->is_client, false); |
| 881 | } |
| 882 | } |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 883 | grpc_error* error = GRPC_ERROR_NONE; |
| 884 | grpc_closure* on_complete = op->on_complete; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 885 | if (on_complete == nullptr) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 886 | on_complete = &do_nothing_closure; |
| 887 | } |
| 888 | |
| 889 | if (op->cancel_stream) { |
| 890 | // Call cancel_stream_locked without ref'ing the cancel_error because |
| 891 | // this function is responsible to make sure that that field gets unref'ed |
| 892 | cancel_stream_locked(exec_ctx, s, op->payload->cancel_stream.cancel_error); |
| 893 | // this op can complete without an error |
| 894 | } else if (s->cancel_self_error != GRPC_ERROR_NONE) { |
| 895 | // already self-canceled so still give it an error |
| 896 | error = GRPC_ERROR_REF(s->cancel_self_error); |
| 897 | } else { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 898 | INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s, |
| 899 | s->t->is_client ? "client" : "server", |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 900 | op->send_initial_metadata ? " send_initial_metadata" : "", |
| 901 | op->send_message ? " send_message" : "", |
| 902 | op->send_trailing_metadata ? " send_trailing_metadata" : "", |
| 903 | op->recv_initial_metadata ? " recv_initial_metadata" : "", |
| 904 | op->recv_message ? " recv_message" : "", |
| 905 | op->recv_trailing_metadata ? " recv_trailing_metadata" : ""); |
| 906 | } |
| 907 | |
| 908 | bool needs_close = false; |
| 909 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 910 | inproc_stream* other = s->other_side; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 911 | if (error == GRPC_ERROR_NONE && |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 912 | (op->send_initial_metadata || op->send_trailing_metadata)) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 913 | if (s->t->is_closed) { |
| 914 | error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); |
| 915 | } |
| 916 | if (error == GRPC_ERROR_NONE && op->send_initial_metadata) { |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 917 | grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_initial_md |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 918 | : &other->to_read_initial_md; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 919 | uint32_t* destflags = (other == nullptr) ? &s->write_buffer_initial_md_flags |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 920 | : &other->to_read_initial_md_flags; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 921 | bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 922 | : &other->to_read_initial_md_filled; |
| 923 | if (*destfilled || s->initial_md_sent) { |
| 924 | // The buffer is already in use; that's an error! |
| 925 | INPROC_LOG(GPR_DEBUG, "Extra initial metadata %p", s); |
| 926 | error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata"); |
| 927 | } else { |
Vijay Pai | a78be30 | 2017-10-19 11:59:12 -0700 | [diff] [blame] | 928 | if (!other || !other->closed) { |
Vijay Pai | 10519a3 | 2017-07-15 21:22:18 +0000 | [diff] [blame] | 929 | fill_in_metadata( |
| 930 | exec_ctx, s, |
| 931 | op->payload->send_initial_metadata.send_initial_metadata, |
| 932 | op->payload->send_initial_metadata.send_initial_metadata_flags, |
| 933 | dest, destflags, destfilled); |
| 934 | } |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 935 | if (s->t->is_client) { |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 936 | grpc_millis* dl = |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 937 | (other == nullptr) ? &s->write_buffer_deadline : &other->deadline; |
Craig Tiller | 89c1428 | 2017-07-19 15:32:27 -0700 | [diff] [blame] | 938 | *dl = GPR_MIN(*dl, op->payload->send_initial_metadata |
| 939 | .send_initial_metadata->deadline); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 940 | s->initial_md_sent = true; |
| 941 | } |
| 942 | } |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 943 | maybe_schedule_op_closure_locked(exec_ctx, other, error); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 944 | } |
| 945 | } |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 946 | |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 947 | if (error == GRPC_ERROR_NONE && |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 948 | (op->send_message || op->send_trailing_metadata || |
| 949 | op->recv_initial_metadata || op->recv_message || |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 950 | op->recv_trailing_metadata)) { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 951 | // Mark ops that need to be processed by the closure |
| 952 | if (op->send_message) { |
| 953 | s->send_message_op = op; |
| 954 | } |
| 955 | if (op->send_trailing_metadata) { |
| 956 | s->send_trailing_md_op = op; |
| 957 | } |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 958 | if (op->recv_initial_metadata) { |
| 959 | s->recv_initial_md_op = op; |
| 960 | } |
| 961 | if (op->recv_message) { |
| 962 | s->recv_message_op = op; |
| 963 | } |
| 964 | if (op->recv_trailing_metadata) { |
| 965 | s->recv_trailing_md_op = op; |
| 966 | } |
| 967 | |
| 968 | // We want to initiate the closure if: |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 969 | // 1. We want to send a message and the other side wants to receive or end |
| 970 | // 2. We want to send trailing metadata and there isn't an unmatched send |
| 971 | // 3. We want initial metadata and the other side has sent it |
| 972 | // 4. We want to receive a message and there is a message ready |
| 973 | // 5. There is trailing metadata, even if nothing specifically wants |
| 974 | // that because that can shut down the receive message as well |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 975 | if ((op->send_message && other && |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 976 | ((other->recv_message_op != nullptr) || |
| 977 | (other->recv_trailing_md_op != nullptr))) || |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 978 | (op->send_trailing_metadata && !op->send_message) || |
| 979 | (op->recv_initial_metadata && s->to_read_initial_md_filled) || |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 980 | (op->recv_message && other && (other->send_message_op != nullptr)) || |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 981 | (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { |
| 982 | if (!s->op_closure_scheduled) { |
| 983 | GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE); |
| 984 | s->op_closure_scheduled = true; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 985 | } |
| 986 | } else { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 987 | s->ops_needed = true; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 988 | } |
| 989 | } else { |
| 990 | if (error != GRPC_ERROR_NONE) { |
Vijay Pai | 4f0cd0e | 2017-09-22 23:34:43 -0700 | [diff] [blame] | 991 | // Schedule op's closures that we didn't push to op state machine |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 992 | if (op->recv_initial_metadata) { |
| 993 | INPROC_LOG( |
| 994 | GPR_DEBUG, |
| 995 | "perform_stream_op error %p scheduling initial-metadata-ready %p", |
| 996 | s, error); |
| 997 | GRPC_CLOSURE_SCHED( |
| 998 | exec_ctx, |
| 999 | op->payload->recv_initial_metadata.recv_initial_metadata_ready, |
| 1000 | GRPC_ERROR_REF(error)); |
| 1001 | } |
| 1002 | if (op->recv_message) { |
| 1003 | INPROC_LOG( |
| 1004 | GPR_DEBUG, |
| 1005 | "perform_stream_op error %p scheduling recv message-ready %p", s, |
| 1006 | error); |
| 1007 | GRPC_CLOSURE_SCHED(exec_ctx, |
| 1008 | op->payload->recv_message.recv_message_ready, |
| 1009 | GRPC_ERROR_REF(error)); |
| 1010 | } |
| 1011 | } |
| 1012 | INPROC_LOG(GPR_DEBUG, "perform_stream_op %p scheduling on_complete %p", s, |
| 1013 | error); |
| 1014 | GRPC_CLOSURE_SCHED(exec_ctx, on_complete, GRPC_ERROR_REF(error)); |
| 1015 | } |
| 1016 | if (needs_close) { |
| 1017 | close_other_side_locked(exec_ctx, s, "perform_stream_op:other_side"); |
| 1018 | close_stream_locked(exec_ctx, s); |
| 1019 | } |
| 1020 | gpr_mu_unlock(mu); |
| 1021 | GRPC_ERROR_UNREF(error); |
| 1022 | } |
| 1023 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1024 | static void close_transport_locked(grpc_exec_ctx* exec_ctx, |
| 1025 | inproc_transport* t) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1026 | INPROC_LOG(GPR_DEBUG, "close_transport %p %d", t, t->is_closed); |
| 1027 | grpc_connectivity_state_set( |
| 1028 | exec_ctx, &t->connectivity, GRPC_CHANNEL_SHUTDOWN, |
| 1029 | GRPC_ERROR_CREATE_FROM_STATIC_STRING("Closing transport."), |
| 1030 | "close transport"); |
| 1031 | if (!t->is_closed) { |
| 1032 | t->is_closed = true; |
| 1033 | /* Also end all streams on this transport */ |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 1034 | while (t->stream_list != nullptr) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1035 | // cancel_stream_locked also adjusts stream list |
| 1036 | cancel_stream_locked( |
| 1037 | exec_ctx, t->stream_list, |
| 1038 | grpc_error_set_int( |
| 1039 | GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"), |
| 1040 | GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); |
| 1041 | } |
| 1042 | } |
| 1043 | } |
| 1044 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1045 | static void perform_transport_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, |
| 1046 | grpc_transport_op* op) { |
| 1047 | inproc_transport* t = (inproc_transport*)gt; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1048 | INPROC_LOG(GPR_DEBUG, "perform_transport_op %p %p", t, op); |
| 1049 | gpr_mu_lock(&t->mu->mu); |
| 1050 | if (op->on_connectivity_state_change) { |
| 1051 | grpc_connectivity_state_notify_on_state_change( |
| 1052 | exec_ctx, &t->connectivity, op->connectivity_state, |
| 1053 | op->on_connectivity_state_change); |
| 1054 | } |
| 1055 | if (op->set_accept_stream) { |
| 1056 | t->accept_stream_cb = op->set_accept_stream_fn; |
| 1057 | t->accept_stream_data = op->set_accept_stream_user_data; |
| 1058 | } |
| 1059 | if (op->on_consumed) { |
| 1060 | GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); |
| 1061 | } |
| 1062 | |
| 1063 | bool do_close = false; |
| 1064 | if (op->goaway_error != GRPC_ERROR_NONE) { |
| 1065 | do_close = true; |
| 1066 | GRPC_ERROR_UNREF(op->goaway_error); |
| 1067 | } |
| 1068 | if (op->disconnect_with_error != GRPC_ERROR_NONE) { |
| 1069 | do_close = true; |
| 1070 | GRPC_ERROR_UNREF(op->disconnect_with_error); |
| 1071 | } |
| 1072 | |
| 1073 | if (do_close) { |
| 1074 | close_transport_locked(exec_ctx, t); |
| 1075 | } |
| 1076 | gpr_mu_unlock(&t->mu->mu); |
| 1077 | } |
| 1078 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1079 | static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, |
| 1080 | grpc_stream* gs, |
| 1081 | grpc_closure* then_schedule_closure) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1082 | INPROC_LOG(GPR_DEBUG, "destroy_stream %p %p", gs, then_schedule_closure); |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1083 | inproc_stream* s = (inproc_stream*)gs; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1084 | s->closure_at_destroy = then_schedule_closure; |
| 1085 | really_destroy_stream(exec_ctx, s); |
| 1086 | } |
| 1087 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1088 | static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) { |
| 1089 | inproc_transport* t = (inproc_transport*)gt; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1090 | INPROC_LOG(GPR_DEBUG, "destroy_transport %p", t); |
| 1091 | gpr_mu_lock(&t->mu->mu); |
| 1092 | close_transport_locked(exec_ctx, t); |
| 1093 | gpr_mu_unlock(&t->mu->mu); |
| 1094 | unref_transport(exec_ctx, t->other_side); |
| 1095 | unref_transport(exec_ctx, t); |
| 1096 | } |
| 1097 | |
| 1098 | /******************************************************************************* |
Yash Tibrewal | bc130da | 2017-09-12 22:44:08 -0700 | [diff] [blame] | 1099 | * INTEGRATION GLUE |
| 1100 | */ |
| 1101 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1102 | static void set_pollset(grpc_exec_ctx* exec_ctx, grpc_transport* gt, |
| 1103 | grpc_stream* gs, grpc_pollset* pollset) { |
Yash Tibrewal | bc130da | 2017-09-12 22:44:08 -0700 | [diff] [blame] | 1104 | // Nothing to do here |
| 1105 | } |
| 1106 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1107 | static void set_pollset_set(grpc_exec_ctx* exec_ctx, grpc_transport* gt, |
| 1108 | grpc_stream* gs, grpc_pollset_set* pollset_set) { |
Yash Tibrewal | bc130da | 2017-09-12 22:44:08 -0700 | [diff] [blame] | 1109 | // Nothing to do here |
| 1110 | } |
| 1111 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1112 | static grpc_endpoint* get_endpoint(grpc_exec_ctx* exec_ctx, grpc_transport* t) { |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 1113 | return nullptr; |
Yash Tibrewal | bc130da | 2017-09-12 22:44:08 -0700 | [diff] [blame] | 1114 | } |
| 1115 | |
| 1116 | /******************************************************************************* |
| 1117 | * GLOBAL INIT AND DESTROY |
| 1118 | */ |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1119 | static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} |
Yash Tibrewal | bc130da | 2017-09-12 22:44:08 -0700 | [diff] [blame] | 1120 | |
| 1121 | void grpc_inproc_transport_init(void) { |
| 1122 | grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 1123 | GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr, |
Yash Tibrewal | bc130da | 2017-09-12 22:44:08 -0700 | [diff] [blame] | 1124 | grpc_schedule_on_exec_ctx); |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 1125 | g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0); |
Yash Tibrewal | bc130da | 2017-09-12 22:44:08 -0700 | [diff] [blame] | 1126 | |
| 1127 | grpc_slice key_tmp = grpc_slice_from_static_string(":path"); |
| 1128 | g_fake_path_key = grpc_slice_intern(key_tmp); |
| 1129 | grpc_slice_unref_internal(&exec_ctx, key_tmp); |
| 1130 | |
| 1131 | g_fake_path_value = grpc_slice_from_static_string("/"); |
| 1132 | |
| 1133 | grpc_slice auth_tmp = grpc_slice_from_static_string(":authority"); |
| 1134 | g_fake_auth_key = grpc_slice_intern(auth_tmp); |
| 1135 | grpc_slice_unref_internal(&exec_ctx, auth_tmp); |
| 1136 | |
| 1137 | g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); |
| 1138 | grpc_exec_ctx_finish(&exec_ctx); |
| 1139 | } |
| 1140 | |
| 1141 | static const grpc_transport_vtable inproc_vtable = { |
| 1142 | sizeof(inproc_stream), "inproc", init_stream, |
| 1143 | set_pollset, set_pollset_set, perform_stream_op, |
| 1144 | perform_transport_op, destroy_stream, destroy_transport, |
| 1145 | get_endpoint}; |
| 1146 | |
| 1147 | /******************************************************************************* |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1148 | * Main inproc transport functions |
| 1149 | */ |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1150 | static void inproc_transports_create(grpc_exec_ctx* exec_ctx, |
| 1151 | grpc_transport** server_transport, |
| 1152 | const grpc_channel_args* server_args, |
| 1153 | grpc_transport** client_transport, |
| 1154 | const grpc_channel_args* client_args) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1155 | INPROC_LOG(GPR_DEBUG, "inproc_transports_create"); |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1156 | inproc_transport* st = (inproc_transport*)gpr_zalloc(sizeof(*st)); |
| 1157 | inproc_transport* ct = (inproc_transport*)gpr_zalloc(sizeof(*ct)); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1158 | // Share one lock between both sides since both sides get affected |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1159 | st->mu = ct->mu = (shared_mu*)gpr_malloc(sizeof(*st->mu)); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1160 | gpr_mu_init(&st->mu->mu); |
| 1161 | gpr_ref_init(&st->mu->refs, 2); |
| 1162 | st->base.vtable = &inproc_vtable; |
| 1163 | ct->base.vtable = &inproc_vtable; |
| 1164 | // Start each side of transport with 2 refs since they each have a ref |
| 1165 | // to the other |
| 1166 | gpr_ref_init(&st->refs, 2); |
| 1167 | gpr_ref_init(&ct->refs, 2); |
| 1168 | st->is_client = false; |
| 1169 | ct->is_client = true; |
| 1170 | grpc_connectivity_state_init(&st->connectivity, GRPC_CHANNEL_READY, |
| 1171 | "inproc_server"); |
| 1172 | grpc_connectivity_state_init(&ct->connectivity, GRPC_CHANNEL_READY, |
| 1173 | "inproc_client"); |
| 1174 | st->other_side = ct; |
| 1175 | ct->other_side = st; |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 1176 | st->stream_list = nullptr; |
| 1177 | ct->stream_list = nullptr; |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1178 | *server_transport = (grpc_transport*)st; |
| 1179 | *client_transport = (grpc_transport*)ct; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1180 | } |
| 1181 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1182 | grpc_channel* grpc_inproc_channel_create(grpc_server* server, |
| 1183 | grpc_channel_args* args, |
| 1184 | void* reserved) { |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1185 | GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2, |
| 1186 | (server, args)); |
| 1187 | |
| 1188 | grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
| 1189 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1190 | const grpc_channel_args* server_args = grpc_server_get_channel_args(server); |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1191 | |
| 1192 | // Add a default authority channel argument for the client |
| 1193 | |
| 1194 | grpc_arg default_authority_arg; |
| 1195 | default_authority_arg.type = GRPC_ARG_STRING; |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1196 | default_authority_arg.key = (char*)GRPC_ARG_DEFAULT_AUTHORITY; |
| 1197 | default_authority_arg.value.string = (char*)"inproc.authority"; |
| 1198 | grpc_channel_args* client_args = |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1199 | grpc_channel_args_copy_and_add(args, &default_authority_arg, 1); |
| 1200 | |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1201 | grpc_transport* server_transport; |
| 1202 | grpc_transport* client_transport; |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1203 | inproc_transports_create(&exec_ctx, &server_transport, server_args, |
| 1204 | &client_transport, client_args); |
| 1205 | |
Craig Tiller | 4782d92 | 2017-11-10 09:53:21 -0800 | [diff] [blame^] | 1206 | grpc_server_setup_transport(&exec_ctx, server, server_transport, nullptr, |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1207 | server_args); |
Craig Tiller | baa14a9 | 2017-11-03 09:09:36 -0700 | [diff] [blame] | 1208 | grpc_channel* channel = |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1209 | grpc_channel_create(&exec_ctx, "inproc", client_args, |
| 1210 | GRPC_CLIENT_DIRECT_CHANNEL, client_transport); |
| 1211 | |
| 1212 | // Free up created channel args |
| 1213 | grpc_channel_args_destroy(&exec_ctx, client_args); |
| 1214 | |
| 1215 | // Now finish scheduled operations |
| 1216 | grpc_exec_ctx_finish(&exec_ctx); |
| 1217 | |
| 1218 | return channel; |
| 1219 | } |
| 1220 | |
Vijay Pai | 3d7d5f4 | 2017-05-04 10:02:24 -0700 | [diff] [blame] | 1221 | void grpc_inproc_transport_shutdown(void) { |
| 1222 | grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; |
| 1223 | grpc_slice_unref_internal(&exec_ctx, g_empty_slice); |
| 1224 | grpc_slice_unref_internal(&exec_ctx, g_fake_path_key); |
| 1225 | grpc_slice_unref_internal(&exec_ctx, g_fake_path_value); |
| 1226 | grpc_slice_unref_internal(&exec_ctx, g_fake_auth_key); |
| 1227 | grpc_slice_unref_internal(&exec_ctx, g_fake_auth_value); |
| 1228 | grpc_exec_ctx_finish(&exec_ctx); |
| 1229 | } |