blob: a629942c2dcfc0471208d9cb1a8b06cddef1b74c [file] [log] [blame]
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#include "src/core/ext/transport/inproc/inproc_transport.h"
20#include <grpc/support/alloc.h>
21#include <grpc/support/string_util.h>
22#include <grpc/support/sync.h>
23#include <grpc/support/time.h>
24#include <string.h>
25#include "src/core/lib/channel/channel_args.h"
26#include "src/core/lib/slice/slice_internal.h"
27#include "src/core/lib/surface/api_trace.h"
28#include "src/core/lib/surface/channel.h"
29#include "src/core/lib/surface/channel_stack_type.h"
30#include "src/core/lib/surface/server.h"
31#include "src/core/lib/transport/connectivity_state.h"
32#include "src/core/lib/transport/error_utils.h"
33#include "src/core/lib/transport/transport_impl.h"
34
35#define INPROC_LOG(...) \
36 do { \
37 if (GRPC_TRACER_ON(grpc_inproc_trace)) gpr_log(__VA_ARGS__); \
38 } while (0)
39
Vijay Pai3d7d5f42017-05-04 10:02:24 -070040static grpc_slice g_empty_slice;
41static grpc_slice g_fake_path_key;
42static grpc_slice g_fake_path_value;
43static grpc_slice g_fake_auth_key;
44static grpc_slice g_fake_auth_value;
45
46typedef struct {
47 gpr_mu mu;
48 gpr_refcount refs;
49} shared_mu;
50
51typedef struct inproc_transport {
52 grpc_transport base;
Craig Tillerbaa14a92017-11-03 09:09:36 -070053 shared_mu* mu;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070054 gpr_refcount refs;
55 bool is_client;
56 grpc_connectivity_state_tracker connectivity;
Craig Tillerbaa14a92017-11-03 09:09:36 -070057 void (*accept_stream_cb)(grpc_exec_ctx* exec_ctx, void* user_data,
58 grpc_transport* transport, const void* server_data);
59 void* accept_stream_data;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070060 bool is_closed;
Craig Tillerbaa14a92017-11-03 09:09:36 -070061 struct inproc_transport* other_side;
62 struct inproc_stream* stream_list;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070063} inproc_transport;
64
Vijay Pai3d7d5f42017-05-04 10:02:24 -070065typedef struct inproc_stream {
Craig Tillerbaa14a92017-11-03 09:09:36 -070066 inproc_transport* t;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070067 grpc_metadata_batch to_read_initial_md;
68 uint32_t to_read_initial_md_flags;
69 bool to_read_initial_md_filled;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070070 grpc_metadata_batch to_read_trailing_md;
71 bool to_read_trailing_md_filled;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -070072 bool ops_needed;
73 bool op_closure_scheduled;
74 grpc_closure op_closure;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070075 // Write buffer used only during gap at init time when client-side
76 // stream is set up but server side stream is not yet set up
77 grpc_metadata_batch write_buffer_initial_md;
78 bool write_buffer_initial_md_filled;
79 uint32_t write_buffer_initial_md_flags;
Craig Tiller89c14282017-07-19 15:32:27 -070080 grpc_millis write_buffer_deadline;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070081 grpc_metadata_batch write_buffer_trailing_md;
82 bool write_buffer_trailing_md_filled;
Craig Tillerbaa14a92017-11-03 09:09:36 -070083 grpc_error* write_buffer_cancel_error;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070084
Craig Tillerbaa14a92017-11-03 09:09:36 -070085 struct inproc_stream* other_side;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070086 bool other_side_closed; // won't talk anymore
87 bool write_buffer_other_side_closed; // on hold
Craig Tillerbaa14a92017-11-03 09:09:36 -070088 grpc_stream_refcount* refs;
89 grpc_closure* closure_at_destroy;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070090
Craig Tillerbaa14a92017-11-03 09:09:36 -070091 gpr_arena* arena;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070092
Craig Tillerbaa14a92017-11-03 09:09:36 -070093 grpc_transport_stream_op_batch* send_message_op;
94 grpc_transport_stream_op_batch* send_trailing_md_op;
95 grpc_transport_stream_op_batch* recv_initial_md_op;
96 grpc_transport_stream_op_batch* recv_message_op;
97 grpc_transport_stream_op_batch* recv_trailing_md_op;
Vijay Pai3d7d5f42017-05-04 10:02:24 -070098
Vijay Pai4f0cd0e2017-09-22 23:34:43 -070099 grpc_slice_buffer recv_message;
100 grpc_slice_buffer_stream recv_stream;
101 bool recv_inited;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700102
103 bool initial_md_sent;
104 bool trailing_md_sent;
105 bool initial_md_recvd;
106 bool trailing_md_recvd;
107
108 bool closed;
109
Craig Tillerbaa14a92017-11-03 09:09:36 -0700110 grpc_error* cancel_self_error;
111 grpc_error* cancel_other_error;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700112
Craig Tiller89c14282017-07-19 15:32:27 -0700113 grpc_millis deadline;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700114
115 bool listed;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700116 struct inproc_stream* stream_list_prev;
117 struct inproc_stream* stream_list_next;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700118} inproc_stream;
119
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700120static grpc_closure do_nothing_closure;
Craig Tillerbaa14a92017-11-03 09:09:36 -0700121static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s,
122 grpc_error* error);
123static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg,
124 grpc_error* error);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700125
Craig Tillerbaa14a92017-11-03 09:09:36 -0700126static void ref_transport(inproc_transport* t) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700127 INPROC_LOG(GPR_DEBUG, "ref_transport %p", t);
128 gpr_ref(&t->refs);
129}
130
Craig Tillerbaa14a92017-11-03 09:09:36 -0700131static void really_destroy_transport(grpc_exec_ctx* exec_ctx,
132 inproc_transport* t) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700133 INPROC_LOG(GPR_DEBUG, "really_destroy_transport %p", t);
134 grpc_connectivity_state_destroy(exec_ctx, &t->connectivity);
135 if (gpr_unref(&t->mu->refs)) {
136 gpr_free(t->mu);
137 }
138 gpr_free(t);
139}
140
Craig Tillerbaa14a92017-11-03 09:09:36 -0700141static void unref_transport(grpc_exec_ctx* exec_ctx, inproc_transport* t) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700142 INPROC_LOG(GPR_DEBUG, "unref_transport %p", t);
143 if (gpr_unref(&t->refs)) {
144 really_destroy_transport(exec_ctx, t);
145 }
146}
147
148#ifndef NDEBUG
149#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
150#define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs, reason)
151#else
152#define STREAM_REF(refs, reason) grpc_stream_ref(refs)
153#define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs)
154#endif
155
Craig Tillerbaa14a92017-11-03 09:09:36 -0700156static void ref_stream(inproc_stream* s, const char* reason) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700157 INPROC_LOG(GPR_DEBUG, "ref_stream %p %s", s, reason);
158 STREAM_REF(s->refs, reason);
159}
160
Craig Tillerbaa14a92017-11-03 09:09:36 -0700161static void unref_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s,
162 const char* reason) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700163 INPROC_LOG(GPR_DEBUG, "unref_stream %p %s", s, reason);
164 STREAM_UNREF(exec_ctx, s->refs, reason);
165}
166
Craig Tillerbaa14a92017-11-03 09:09:36 -0700167static void really_destroy_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700168 INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s);
169
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700170 GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
171 GRPC_ERROR_UNREF(s->cancel_self_error);
172 GRPC_ERROR_UNREF(s->cancel_other_error);
173
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700174 if (s->recv_inited) {
175 grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message);
176 }
177
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700178 unref_transport(exec_ctx, s->t);
179
180 if (s->closure_at_destroy) {
181 GRPC_CLOSURE_SCHED(exec_ctx, s->closure_at_destroy, GRPC_ERROR_NONE);
182 }
183}
184
Craig Tillerbaa14a92017-11-03 09:09:36 -0700185static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700186 bool is_initial) {
Craig Tiller4782d922017-11-10 09:53:21 -0800187 for (grpc_linked_mdelem* md = md_batch->list.head; md != nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700188 md = md->next) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700189 char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
190 char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700191 gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL",
192 is_client ? "CLI" : "SVR", key, value);
193 gpr_free(key);
194 gpr_free(value);
195 }
196}
197
Craig Tillerbaa14a92017-11-03 09:09:36 -0700198static grpc_error* fill_in_metadata(grpc_exec_ctx* exec_ctx, inproc_stream* s,
199 const grpc_metadata_batch* metadata,
200 uint32_t flags, grpc_metadata_batch* out_md,
201 uint32_t* outflags, bool* markfilled) {
Vijay Pai10519a32017-07-15 21:22:18 +0000202 if (GRPC_TRACER_ON(grpc_inproc_trace)) {
Craig Tiller4782d922017-11-10 09:53:21 -0800203 log_metadata(metadata, s->t->is_client, outflags != nullptr);
Vijay Pai10519a32017-07-15 21:22:18 +0000204 }
205
Craig Tiller4782d922017-11-10 09:53:21 -0800206 if (outflags != nullptr) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700207 *outflags = flags;
208 }
Craig Tiller4782d922017-11-10 09:53:21 -0800209 if (markfilled != nullptr) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700210 *markfilled = true;
211 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700212 grpc_error* error = GRPC_ERROR_NONE;
213 for (grpc_linked_mdelem* elem = metadata->list.head;
Craig Tiller4782d922017-11-10 09:53:21 -0800214 (elem != nullptr) && (error == GRPC_ERROR_NONE); elem = elem->next) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700215 grpc_linked_mdelem* nelem =
216 (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*nelem));
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700217 nelem->md = grpc_mdelem_from_slices(
218 exec_ctx, grpc_slice_intern(GRPC_MDKEY(elem->md)),
219 grpc_slice_intern(GRPC_MDVALUE(elem->md)));
220
221 error = grpc_metadata_batch_link_tail(exec_ctx, out_md, nelem);
222 }
223 return error;
224}
225
Craig Tillerbaa14a92017-11-03 09:09:36 -0700226static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
227 grpc_stream* gs, grpc_stream_refcount* refcount,
228 const void* server_data, gpr_arena* arena) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700229 INPROC_LOG(GPR_DEBUG, "init_stream %p %p %p", gt, gs, server_data);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700230 inproc_transport* t = (inproc_transport*)gt;
231 inproc_stream* s = (inproc_stream*)gs;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700232 s->arena = arena;
233
234 s->refs = refcount;
235 // Ref this stream right now
236 ref_stream(s, "inproc_init_stream:init");
237
238 grpc_metadata_batch_init(&s->to_read_initial_md);
239 s->to_read_initial_md_flags = 0;
240 s->to_read_initial_md_filled = false;
241 grpc_metadata_batch_init(&s->to_read_trailing_md);
242 s->to_read_trailing_md_filled = false;
243 grpc_metadata_batch_init(&s->write_buffer_initial_md);
244 s->write_buffer_initial_md_flags = 0;
245 s->write_buffer_initial_md_filled = false;
246 grpc_metadata_batch_init(&s->write_buffer_trailing_md);
247 s->write_buffer_trailing_md_filled = false;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700248 s->ops_needed = false;
249 s->op_closure_scheduled = false;
250 GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s,
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700251 grpc_schedule_on_exec_ctx);
252 s->t = t;
Craig Tiller4782d922017-11-10 09:53:21 -0800253 s->closure_at_destroy = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700254 s->other_side_closed = false;
255
256 s->initial_md_sent = s->trailing_md_sent = s->initial_md_recvd =
257 s->trailing_md_recvd = false;
258
259 s->closed = false;
260
261 s->cancel_self_error = GRPC_ERROR_NONE;
262 s->cancel_other_error = GRPC_ERROR_NONE;
263 s->write_buffer_cancel_error = GRPC_ERROR_NONE;
Craig Tiller89c14282017-07-19 15:32:27 -0700264 s->deadline = GRPC_MILLIS_INF_FUTURE;
265 s->write_buffer_deadline = GRPC_MILLIS_INF_FUTURE;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700266
Craig Tiller4782d922017-11-10 09:53:21 -0800267 s->stream_list_prev = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700268 gpr_mu_lock(&t->mu->mu);
269 s->listed = true;
270 ref_stream(s, "inproc_init_stream:list");
271 s->stream_list_next = t->stream_list;
272 if (t->stream_list) {
273 t->stream_list->stream_list_prev = s;
274 }
275 t->stream_list = s;
276 gpr_mu_unlock(&t->mu->mu);
277
278 if (!server_data) {
279 ref_transport(t);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700280 inproc_transport* st = t->other_side;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700281 ref_transport(st);
Craig Tiller4782d922017-11-10 09:53:21 -0800282 s->other_side = nullptr; // will get filled in soon
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700283 // Pass the client-side stream address to the server-side for a ref
284 ref_stream(s, "inproc_init_stream:clt"); // ref it now on behalf of server
285 // side to avoid destruction
286 INPROC_LOG(GPR_DEBUG, "calling accept stream cb %p %p",
287 st->accept_stream_cb, st->accept_stream_data);
288 (*st->accept_stream_cb)(exec_ctx, st->accept_stream_data, &st->base,
Craig Tillerbaa14a92017-11-03 09:09:36 -0700289 (void*)s);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700290 } else {
291 // This is the server-side and is being called through accept_stream_cb
Craig Tillerbaa14a92017-11-03 09:09:36 -0700292 inproc_stream* cs = (inproc_stream*)server_data;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700293 s->other_side = cs;
294 // Ref the server-side stream on behalf of the client now
295 ref_stream(s, "inproc_init_stream:srv");
296
297 // Now we are about to affect the other side, so lock the transport
298 // to make sure that it doesn't get destroyed
299 gpr_mu_lock(&s->t->mu->mu);
300 cs->other_side = s;
301 // Now transfer from the other side's write_buffer if any to the to_read
302 // buffer
303 if (cs->write_buffer_initial_md_filled) {
304 fill_in_metadata(exec_ctx, s, &cs->write_buffer_initial_md,
305 cs->write_buffer_initial_md_flags,
306 &s->to_read_initial_md, &s->to_read_initial_md_flags,
307 &s->to_read_initial_md_filled);
Craig Tiller89c14282017-07-19 15:32:27 -0700308 s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700309 grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md);
310 cs->write_buffer_initial_md_filled = false;
311 }
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700312 if (cs->write_buffer_trailing_md_filled) {
313 fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0,
Craig Tiller4782d922017-11-10 09:53:21 -0800314 &s->to_read_trailing_md, nullptr,
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700315 &s->to_read_trailing_md_filled);
316 grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_trailing_md);
317 cs->write_buffer_trailing_md_filled = false;
318 }
319 if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) {
320 s->cancel_other_error = cs->write_buffer_cancel_error;
321 cs->write_buffer_cancel_error = GRPC_ERROR_NONE;
322 }
323
324 gpr_mu_unlock(&s->t->mu->mu);
325 }
326 return 0; // return value is not important
327}
328
Craig Tillerbaa14a92017-11-03 09:09:36 -0700329static void close_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700330 if (!s->closed) {
Vijay Pai10519a32017-07-15 21:22:18 +0000331 // Release the metadata that we would have written out
332 grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_initial_md);
333 grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_trailing_md);
334
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700335 if (s->listed) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700336 inproc_stream* p = s->stream_list_prev;
337 inproc_stream* n = s->stream_list_next;
Craig Tiller4782d922017-11-10 09:53:21 -0800338 if (p != nullptr) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700339 p->stream_list_next = n;
340 } else {
341 s->t->stream_list = n;
342 }
Craig Tiller4782d922017-11-10 09:53:21 -0800343 if (n != nullptr) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700344 n->stream_list_prev = p;
345 }
346 s->listed = false;
347 unref_stream(exec_ctx, s, "close_stream:list");
348 }
349 s->closed = true;
350 unref_stream(exec_ctx, s, "close_stream:closing");
351 }
352}
353
354// This function means that we are done talking/listening to the other side
Craig Tillerbaa14a92017-11-03 09:09:36 -0700355static void close_other_side_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s,
356 const char* reason) {
Craig Tiller4782d922017-11-10 09:53:21 -0800357 if (s->other_side != nullptr) {
Vijay Pai10519a32017-07-15 21:22:18 +0000358 // First release the metadata that came from the other side's arena
359 grpc_metadata_batch_destroy(exec_ctx, &s->to_read_initial_md);
360 grpc_metadata_batch_destroy(exec_ctx, &s->to_read_trailing_md);
361
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700362 unref_stream(exec_ctx, s->other_side, reason);
363 s->other_side_closed = true;
Craig Tiller4782d922017-11-10 09:53:21 -0800364 s->other_side = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700365 } else if (!s->other_side_closed) {
366 s->write_buffer_other_side_closed = true;
367 }
368}
369
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700370// Call the on_complete closure associated with this stream_op_batch if
371// this stream_op_batch is only one of the pending operations for this
372// stream. This is called when one of the pending operations for the stream
373// is done and about to be NULLed out
Craig Tillerbaa14a92017-11-03 09:09:36 -0700374static void complete_if_batch_end_locked(grpc_exec_ctx* exec_ctx,
375 inproc_stream* s, grpc_error* error,
376 grpc_transport_stream_op_batch* op,
377 const char* msg) {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700378 int is_sm = (int)(op == s->send_message_op);
379 int is_stm = (int)(op == s->send_trailing_md_op);
380 int is_rim = (int)(op == s->recv_initial_md_op);
381 int is_rm = (int)(op == s->recv_message_op);
382 int is_rtm = (int)(op == s->recv_trailing_md_op);
383
384 if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
385 INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error);
386 GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error));
387 }
388}
389
Craig Tillerbaa14a92017-11-03 09:09:36 -0700390static void maybe_schedule_op_closure_locked(grpc_exec_ctx* exec_ctx,
391 inproc_stream* s,
392 grpc_error* error) {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700393 if (s && s->ops_needed && !s->op_closure_scheduled) {
394 GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error));
395 s->op_closure_scheduled = true;
396 s->ops_needed = false;
397 }
398}
399
Craig Tillerbaa14a92017-11-03 09:09:36 -0700400static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s,
401 grpc_error* error) {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700402 INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700403 // If we're failing this side, we need to make sure that
404 // we also send or have already sent trailing metadata
405 if (!s->trailing_md_sent) {
406 // Send trailing md to the other side indicating cancellation
407 s->trailing_md_sent = true;
408
409 grpc_metadata_batch fake_md;
410 grpc_metadata_batch_init(&fake_md);
411
Craig Tillerbaa14a92017-11-03 09:09:36 -0700412 inproc_stream* other = s->other_side;
Craig Tiller4782d922017-11-10 09:53:21 -0800413 grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_trailing_md
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700414 : &other->to_read_trailing_md;
Craig Tiller4782d922017-11-10 09:53:21 -0800415 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700416 : &other->to_read_trailing_md_filled;
Craig Tiller4782d922017-11-10 09:53:21 -0800417 fill_in_metadata(exec_ctx, s, &fake_md, 0, dest, nullptr, destfilled);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700418 grpc_metadata_batch_destroy(exec_ctx, &fake_md);
419
Craig Tiller4782d922017-11-10 09:53:21 -0800420 if (other != nullptr) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700421 if (other->cancel_other_error == GRPC_ERROR_NONE) {
422 other->cancel_other_error = GRPC_ERROR_REF(error);
423 }
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700424 maybe_schedule_op_closure_locked(exec_ctx, other, error);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700425 } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
426 s->write_buffer_cancel_error = GRPC_ERROR_REF(error);
427 }
428 }
429 if (s->recv_initial_md_op) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700430 grpc_error* err;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700431 if (!s->t->is_client) {
432 // If this is a server, provide initial metadata with a path and authority
433 // since it expects that as well as no error yet
434 grpc_metadata_batch fake_md;
435 grpc_metadata_batch_init(&fake_md);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700436 grpc_linked_mdelem* path_md =
437 (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*path_md));
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700438 path_md->md =
439 grpc_mdelem_from_slices(exec_ctx, g_fake_path_key, g_fake_path_value);
440 GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, path_md) ==
441 GRPC_ERROR_NONE);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700442 grpc_linked_mdelem* auth_md =
443 (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*auth_md));
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700444 auth_md->md =
445 grpc_mdelem_from_slices(exec_ctx, g_fake_auth_key, g_fake_auth_value);
446 GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, auth_md) ==
447 GRPC_ERROR_NONE);
448
449 fill_in_metadata(
450 exec_ctx, s, &fake_md, 0,
451 s->recv_initial_md_op->payload->recv_initial_metadata
452 .recv_initial_metadata,
453 s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
Craig Tiller4782d922017-11-10 09:53:21 -0800454 nullptr);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700455 grpc_metadata_batch_destroy(exec_ctx, &fake_md);
456 err = GRPC_ERROR_NONE;
457 } else {
458 err = GRPC_ERROR_REF(error);
459 }
460 INPROC_LOG(GPR_DEBUG,
461 "fail_helper %p scheduling initial-metadata-ready %p %p", s,
462 error, err);
463 GRPC_CLOSURE_SCHED(exec_ctx,
464 s->recv_initial_md_op->payload->recv_initial_metadata
465 .recv_initial_metadata_ready,
466 err);
467 // Last use of err so no need to REF and then UNREF it
468
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700469 complete_if_batch_end_locked(
470 exec_ctx, s, error, s->recv_initial_md_op,
471 "fail_helper scheduling recv-initial-metadata-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800472 s->recv_initial_md_op = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700473 }
474 if (s->recv_message_op) {
475 INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-ready %p", s,
476 error);
477 GRPC_CLOSURE_SCHED(
478 exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
479 GRPC_ERROR_REF(error));
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700480 complete_if_batch_end_locked(
481 exec_ctx, s, error, s->recv_message_op,
482 "fail_helper scheduling recv-message-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800483 s->recv_message_op = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700484 }
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700485 if (s->send_message_op) {
486 complete_if_batch_end_locked(
487 exec_ctx, s, error, s->send_message_op,
488 "fail_helper scheduling send-message-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800489 s->send_message_op = nullptr;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700490 }
491 if (s->send_trailing_md_op) {
492 complete_if_batch_end_locked(
493 exec_ctx, s, error, s->send_trailing_md_op,
494 "fail_helper scheduling send-trailng-md-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800495 s->send_trailing_md_op = nullptr;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700496 }
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700497 if (s->recv_trailing_md_op) {
498 INPROC_LOG(GPR_DEBUG,
499 "fail_helper %p scheduling trailing-md-on-complete %p", s,
500 error);
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700501 complete_if_batch_end_locked(
502 exec_ctx, s, error, s->recv_trailing_md_op,
503 "fail_helper scheduling recv-trailing-metadata-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800504 s->recv_trailing_md_op = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700505 }
506 close_other_side_locked(exec_ctx, s, "fail_helper:other_side");
507 close_stream_locked(exec_ctx, s);
508
509 GRPC_ERROR_UNREF(error);
510}
511
Craig Tillerbaa14a92017-11-03 09:09:36 -0700512static void message_transfer_locked(grpc_exec_ctx* exec_ctx,
513 inproc_stream* sender,
514 inproc_stream* receiver) {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700515 size_t remaining =
516 sender->send_message_op->payload->send_message.send_message->length;
517 if (receiver->recv_inited) {
518 grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message);
519 }
520 grpc_slice_buffer_init(&receiver->recv_message);
521 receiver->recv_inited = true;
522 do {
523 grpc_slice message_slice;
524 grpc_closure unused;
525 GPR_ASSERT(grpc_byte_stream_next(
526 exec_ctx, sender->send_message_op->payload->send_message.send_message,
527 SIZE_MAX, &unused));
Craig Tillerbaa14a92017-11-03 09:09:36 -0700528 grpc_error* error = grpc_byte_stream_pull(
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700529 exec_ctx, sender->send_message_op->payload->send_message.send_message,
530 &message_slice);
531 if (error != GRPC_ERROR_NONE) {
532 cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error));
533 break;
534 }
535 GPR_ASSERT(error == GRPC_ERROR_NONE);
536 remaining -= GRPC_SLICE_LENGTH(message_slice);
537 grpc_slice_buffer_add(&receiver->recv_message, message_slice);
538 } while (remaining > 0);
539
540 grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message,
541 0);
542 *receiver->recv_message_op->payload->recv_message.recv_message =
543 &receiver->recv_stream.base;
544 INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready",
545 receiver);
546 GRPC_CLOSURE_SCHED(
547 exec_ctx,
548 receiver->recv_message_op->payload->recv_message.recv_message_ready,
549 GRPC_ERROR_NONE);
550 complete_if_batch_end_locked(
551 exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op,
552 "message_transfer scheduling sender on_complete");
553 complete_if_batch_end_locked(
554 exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op,
555 "message_transfer scheduling receiver on_complete");
556
Craig Tiller4782d922017-11-10 09:53:21 -0800557 receiver->recv_message_op = nullptr;
558 sender->send_message_op = nullptr;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700559}
560
Craig Tillerbaa14a92017-11-03 09:09:36 -0700561static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg,
562 grpc_error* error) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700563 // This function gets called when we have contents in the unprocessed reads
564 // Get what we want based on our ops wanted
565 // Schedule our appropriate closures
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700566 // and then return to ops_needed state if still needed
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700567
568 // Since this is a closure directly invoked by the combiner, it should not
569 // unref the error parameter explicitly; the combiner will do that implicitly
Craig Tillerbaa14a92017-11-03 09:09:36 -0700570 grpc_error* new_err = GRPC_ERROR_NONE;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700571
572 bool needs_close = false;
573
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700574 INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700575 inproc_stream* s = (inproc_stream*)arg;
576 gpr_mu* mu = &s->t->mu->mu; // keep aside in case s gets closed
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700577 gpr_mu_lock(mu);
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700578 s->op_closure_scheduled = false;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700579 // cancellation takes precedence
Craig Tillerbaa14a92017-11-03 09:09:36 -0700580 inproc_stream* other = s->other_side;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700581
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700582 if (s->cancel_self_error != GRPC_ERROR_NONE) {
583 fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error));
584 goto done;
585 } else if (s->cancel_other_error != GRPC_ERROR_NONE) {
586 fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_other_error));
587 goto done;
588 } else if (error != GRPC_ERROR_NONE) {
589 fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(error));
590 goto done;
591 }
592
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700593 if (s->send_message_op && other) {
594 if (other->recv_message_op) {
595 message_transfer_locked(exec_ctx, s, other);
596 maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
597 } else if (!s->t->is_client &&
598 (s->trailing_md_sent || other->recv_trailing_md_op)) {
599 // A server send will never be matched if the client is waiting
600 // for trailing metadata already
601 complete_if_batch_end_locked(
602 exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op,
603 "op_state_machine scheduling send-message-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800604 s->send_message_op = nullptr;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700605 }
606 }
607 // Pause a send trailing metadata if there is still an outstanding
608 // send message unless we know that the send message will never get
609 // matched to a receive. This happens on the client if the server has
610 // already sent status.
611 if (s->send_trailing_md_op &&
612 (!s->send_message_op ||
613 (s->t->is_client &&
614 (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) {
Craig Tiller4782d922017-11-10 09:53:21 -0800615 grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_trailing_md
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700616 : &other->to_read_trailing_md;
Craig Tiller4782d922017-11-10 09:53:21 -0800617 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700618 : &other->to_read_trailing_md_filled;
619 if (*destfilled || s->trailing_md_sent) {
620 // The buffer is already in use; that's an error!
621 INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s);
622 new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700623 fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
624 goto done;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700625 } else {
Vijay Paia78be302017-10-19 11:59:12 -0700626 if (!other || !other->closed) {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700627 fill_in_metadata(exec_ctx, s,
628 s->send_trailing_md_op->payload->send_trailing_metadata
629 .send_trailing_metadata,
Craig Tiller4782d922017-11-10 09:53:21 -0800630 0, dest, nullptr, destfilled);
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700631 }
632 s->trailing_md_sent = true;
633 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
634 INPROC_LOG(GPR_DEBUG,
635 "op_state_machine %p scheduling trailing-md-on-complete", s);
636 GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
637 GRPC_ERROR_NONE);
Craig Tiller4782d922017-11-10 09:53:21 -0800638 s->recv_trailing_md_op = nullptr;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700639 needs_close = true;
640 }
641 }
642 maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
643 complete_if_batch_end_locked(
644 exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op,
645 "op_state_machine scheduling send-trailing-metadata-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800646 s->send_trailing_md_op = nullptr;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700647 }
648 if (s->recv_initial_md_op) {
649 if (s->initial_md_recvd) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700650 new_err =
651 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
652 INPROC_LOG(
653 GPR_DEBUG,
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700654 "op_state_machine %p scheduling on_complete errors for already "
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700655 "recvd initial md %p",
656 s, new_err);
657 fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
658 goto done;
659 }
660
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700661 if (s->to_read_initial_md_filled) {
662 s->initial_md_recvd = true;
663 new_err = fill_in_metadata(
664 exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags,
665 s->recv_initial_md_op->payload->recv_initial_metadata
666 .recv_initial_metadata,
667 s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
Craig Tiller4782d922017-11-10 09:53:21 -0800668 nullptr);
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700669 s->recv_initial_md_op->payload->recv_initial_metadata
670 .recv_initial_metadata->deadline = s->deadline;
671 grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
672 s->to_read_initial_md_filled = false;
673 INPROC_LOG(GPR_DEBUG,
674 "op_state_machine %p scheduling initial-metadata-ready %p", s,
675 new_err);
676 GRPC_CLOSURE_SCHED(exec_ctx,
677 s->recv_initial_md_op->payload->recv_initial_metadata
678 .recv_initial_metadata_ready,
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700679 GRPC_ERROR_REF(new_err));
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700680 complete_if_batch_end_locked(
681 exec_ctx, s, new_err, s->recv_initial_md_op,
682 "op_state_machine scheduling recv-initial-metadata-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800683 s->recv_initial_md_op = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700684
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700685 if (new_err != GRPC_ERROR_NONE) {
686 INPROC_LOG(GPR_DEBUG,
687 "op_state_machine %p scheduling on_complete errors2 %p", s,
688 new_err);
689 fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
690 goto done;
691 }
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700692 }
693 }
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700694 if (s->recv_message_op) {
695 if (other && other->send_message_op) {
696 message_transfer_locked(exec_ctx, other, s);
697 maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700698 }
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700699 }
700 if (s->recv_trailing_md_op && s->t->is_client && other &&
701 other->send_message_op) {
702 maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700703 }
704 if (s->to_read_trailing_md_filled) {
705 if (s->trailing_md_recvd) {
706 new_err =
707 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
708 INPROC_LOG(
709 GPR_DEBUG,
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700710 "op_state_machine %p scheduling on_complete errors for already "
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700711 "recvd trailing md %p",
712 s, new_err);
713 fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err));
714 goto done;
715 }
Craig Tiller4782d922017-11-10 09:53:21 -0800716 if (s->recv_message_op != nullptr) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700717 // This message needs to be wrapped up because it will never be
718 // satisfied
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700719 INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700720 GRPC_CLOSURE_SCHED(
721 exec_ctx,
722 s->recv_message_op->payload->recv_message.recv_message_ready,
723 GRPC_ERROR_NONE);
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700724 complete_if_batch_end_locked(
725 exec_ctx, s, new_err, s->recv_message_op,
726 "op_state_machine scheduling recv-message-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800727 s->recv_message_op = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700728 }
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700729 if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
730 // Nothing further will try to receive from this stream, so finish off
731 // any outstanding send_message op
732 complete_if_batch_end_locked(
733 exec_ctx, s, new_err, s->send_message_op,
734 "op_state_machine scheduling send-message-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800735 s->send_message_op = nullptr;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700736 }
Craig Tiller4782d922017-11-10 09:53:21 -0800737 if (s->recv_trailing_md_op != nullptr) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700738 // We wanted trailing metadata and we got it
739 s->trailing_md_recvd = true;
740 new_err =
741 fill_in_metadata(exec_ctx, s, &s->to_read_trailing_md, 0,
742 s->recv_trailing_md_op->payload
743 ->recv_trailing_metadata.recv_trailing_metadata,
Craig Tiller4782d922017-11-10 09:53:21 -0800744 nullptr, nullptr);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700745 grpc_metadata_batch_clear(exec_ctx, &s->to_read_trailing_md);
746 s->to_read_trailing_md_filled = false;
747
748 // We should schedule the recv_trailing_md_op completion if
749 // 1. this stream is the client-side
750 // 2. this stream is the server-side AND has already sent its trailing md
751 // (If the server hasn't already sent its trailing md, it doesn't have
752 // a final status, so don't mark this op complete)
753 if (s->t->is_client || s->trailing_md_sent) {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700754 INPROC_LOG(GPR_DEBUG,
755 "op_state_machine %p scheduling trailing-md-on-complete %p",
756 s, new_err);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700757 GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete,
758 GRPC_ERROR_REF(new_err));
Craig Tiller4782d922017-11-10 09:53:21 -0800759 s->recv_trailing_md_op = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700760 needs_close = true;
761 } else {
762 INPROC_LOG(GPR_DEBUG,
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700763 "op_state_machine %p server needs to delay handling "
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700764 "trailing-md-on-complete %p",
765 s, new_err);
766 }
767 } else {
768 INPROC_LOG(
769 GPR_DEBUG,
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700770 "op_state_machine %p has trailing md but not yet waiting for it", s);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700771 }
772 }
773 if (s->trailing_md_recvd && s->recv_message_op) {
774 // No further message will come on this stream, so finish off the
775 // recv_message_op
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700776 INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700777 GRPC_CLOSURE_SCHED(
778 exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready,
779 GRPC_ERROR_NONE);
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700780 complete_if_batch_end_locked(
781 exec_ctx, s, new_err, s->recv_message_op,
782 "op_state_machine scheduling recv-message-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800783 s->recv_message_op = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700784 }
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700785 if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
786 s->send_message_op) {
787 // Nothing further will try to receive from this stream, so finish off
788 // any outstanding send_message op
789 complete_if_batch_end_locked(
790 exec_ctx, s, new_err, s->send_message_op,
791 "op_state_machine scheduling send-message-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800792 s->send_message_op = nullptr;
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700793 }
794 if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
795 s->recv_message_op || s->recv_trailing_md_op) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700796 // Didn't get the item we wanted so we still need to get
797 // rescheduled
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700798 INPROC_LOG(
799 GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s,
800 s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
801 s->recv_message_op, s->recv_trailing_md_op);
802 s->ops_needed = true;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700803 }
804done:
805 if (needs_close) {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700806 close_other_side_locked(exec_ctx, s, "op_state_machine");
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700807 close_stream_locked(exec_ctx, s);
808 }
809 gpr_mu_unlock(mu);
810 GRPC_ERROR_UNREF(new_err);
811}
812
Craig Tillerbaa14a92017-11-03 09:09:36 -0700813static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s,
814 grpc_error* error) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700815 bool ret = false; // was the cancel accepted
816 INPROC_LOG(GPR_DEBUG, "cancel_stream %p with %s", s,
817 grpc_error_string(error));
818 if (s->cancel_self_error == GRPC_ERROR_NONE) {
819 ret = true;
820 s->cancel_self_error = GRPC_ERROR_REF(error);
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700821 maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700822 // Send trailing md to the other side indicating cancellation, even if we
823 // already have
824 s->trailing_md_sent = true;
825
826 grpc_metadata_batch cancel_md;
827 grpc_metadata_batch_init(&cancel_md);
828
Craig Tillerbaa14a92017-11-03 09:09:36 -0700829 inproc_stream* other = s->other_side;
Craig Tiller4782d922017-11-10 09:53:21 -0800830 grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_trailing_md
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700831 : &other->to_read_trailing_md;
Craig Tiller4782d922017-11-10 09:53:21 -0800832 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700833 : &other->to_read_trailing_md_filled;
Craig Tiller4782d922017-11-10 09:53:21 -0800834 fill_in_metadata(exec_ctx, s, &cancel_md, 0, dest, nullptr, destfilled);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700835 grpc_metadata_batch_destroy(exec_ctx, &cancel_md);
836
Craig Tiller4782d922017-11-10 09:53:21 -0800837 if (other != nullptr) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700838 if (other->cancel_other_error == GRPC_ERROR_NONE) {
839 other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error);
840 }
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700841 maybe_schedule_op_closure_locked(exec_ctx, other,
842 other->cancel_other_error);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700843 } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) {
844 s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error);
845 }
846
847 // if we are a server and already received trailing md but
848 // couldn't complete that because we hadn't yet sent out trailing
849 // md, now's the chance
850 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700851 complete_if_batch_end_locked(
852 exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op,
853 "cancel_stream scheduling trailing-md-on-complete");
Craig Tiller4782d922017-11-10 09:53:21 -0800854 s->recv_trailing_md_op = nullptr;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700855 }
856 }
857
858 close_other_side_locked(exec_ctx, s, "cancel_stream:other_side");
859 close_stream_locked(exec_ctx, s);
860
861 GRPC_ERROR_UNREF(error);
862 return ret;
863}
864
Craig Tillerbaa14a92017-11-03 09:09:36 -0700865static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
866 grpc_stream* gs,
867 grpc_transport_stream_op_batch* op) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700868 INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %p %p", gt, gs, op);
Craig Tillerbaa14a92017-11-03 09:09:36 -0700869 inproc_stream* s = (inproc_stream*)gs;
870 gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700871 gpr_mu_lock(mu);
872
873 if (GRPC_TRACER_ON(grpc_inproc_trace)) {
874 if (op->send_initial_metadata) {
875 log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
876 s->t->is_client, true);
877 }
878 if (op->send_trailing_metadata) {
879 log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
880 s->t->is_client, false);
881 }
882 }
Craig Tillerbaa14a92017-11-03 09:09:36 -0700883 grpc_error* error = GRPC_ERROR_NONE;
884 grpc_closure* on_complete = op->on_complete;
Craig Tiller4782d922017-11-10 09:53:21 -0800885 if (on_complete == nullptr) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700886 on_complete = &do_nothing_closure;
887 }
888
889 if (op->cancel_stream) {
890 // Call cancel_stream_locked without ref'ing the cancel_error because
891 // this function is responsible to make sure that that field gets unref'ed
892 cancel_stream_locked(exec_ctx, s, op->payload->cancel_stream.cancel_error);
893 // this op can complete without an error
894 } else if (s->cancel_self_error != GRPC_ERROR_NONE) {
895 // already self-canceled so still give it an error
896 error = GRPC_ERROR_REF(s->cancel_self_error);
897 } else {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700898 INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s,
899 s->t->is_client ? "client" : "server",
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700900 op->send_initial_metadata ? " send_initial_metadata" : "",
901 op->send_message ? " send_message" : "",
902 op->send_trailing_metadata ? " send_trailing_metadata" : "",
903 op->recv_initial_metadata ? " recv_initial_metadata" : "",
904 op->recv_message ? " recv_message" : "",
905 op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
906 }
907
908 bool needs_close = false;
909
Craig Tillerbaa14a92017-11-03 09:09:36 -0700910 inproc_stream* other = s->other_side;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700911 if (error == GRPC_ERROR_NONE &&
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700912 (op->send_initial_metadata || op->send_trailing_metadata)) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700913 if (s->t->is_closed) {
914 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
915 }
916 if (error == GRPC_ERROR_NONE && op->send_initial_metadata) {
Craig Tiller4782d922017-11-10 09:53:21 -0800917 grpc_metadata_batch* dest = (other == nullptr) ? &s->write_buffer_initial_md
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700918 : &other->to_read_initial_md;
Craig Tiller4782d922017-11-10 09:53:21 -0800919 uint32_t* destflags = (other == nullptr) ? &s->write_buffer_initial_md_flags
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700920 : &other->to_read_initial_md_flags;
Craig Tiller4782d922017-11-10 09:53:21 -0800921 bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700922 : &other->to_read_initial_md_filled;
923 if (*destfilled || s->initial_md_sent) {
924 // The buffer is already in use; that's an error!
925 INPROC_LOG(GPR_DEBUG, "Extra initial metadata %p", s);
926 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
927 } else {
Vijay Paia78be302017-10-19 11:59:12 -0700928 if (!other || !other->closed) {
Vijay Pai10519a32017-07-15 21:22:18 +0000929 fill_in_metadata(
930 exec_ctx, s,
931 op->payload->send_initial_metadata.send_initial_metadata,
932 op->payload->send_initial_metadata.send_initial_metadata_flags,
933 dest, destflags, destfilled);
934 }
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700935 if (s->t->is_client) {
Craig Tillerbaa14a92017-11-03 09:09:36 -0700936 grpc_millis* dl =
Craig Tiller4782d922017-11-10 09:53:21 -0800937 (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
Craig Tiller89c14282017-07-19 15:32:27 -0700938 *dl = GPR_MIN(*dl, op->payload->send_initial_metadata
939 .send_initial_metadata->deadline);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700940 s->initial_md_sent = true;
941 }
942 }
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700943 maybe_schedule_op_closure_locked(exec_ctx, other, error);
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700944 }
945 }
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700946
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700947 if (error == GRPC_ERROR_NONE &&
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700948 (op->send_message || op->send_trailing_metadata ||
949 op->recv_initial_metadata || op->recv_message ||
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700950 op->recv_trailing_metadata)) {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700951 // Mark ops that need to be processed by the closure
952 if (op->send_message) {
953 s->send_message_op = op;
954 }
955 if (op->send_trailing_metadata) {
956 s->send_trailing_md_op = op;
957 }
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700958 if (op->recv_initial_metadata) {
959 s->recv_initial_md_op = op;
960 }
961 if (op->recv_message) {
962 s->recv_message_op = op;
963 }
964 if (op->recv_trailing_metadata) {
965 s->recv_trailing_md_op = op;
966 }
967
968 // We want to initiate the closure if:
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700969 // 1. We want to send a message and the other side wants to receive or end
970 // 2. We want to send trailing metadata and there isn't an unmatched send
971 // 3. We want initial metadata and the other side has sent it
972 // 4. We want to receive a message and there is a message ready
973 // 5. There is trailing metadata, even if nothing specifically wants
974 // that because that can shut down the receive message as well
Craig Tillerbaa14a92017-11-03 09:09:36 -0700975 if ((op->send_message && other &&
Craig Tiller4782d922017-11-10 09:53:21 -0800976 ((other->recv_message_op != nullptr) ||
977 (other->recv_trailing_md_op != nullptr))) ||
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700978 (op->send_trailing_metadata && !op->send_message) ||
979 (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
Craig Tiller4782d922017-11-10 09:53:21 -0800980 (op->recv_message && other && (other->send_message_op != nullptr)) ||
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700981 (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
982 if (!s->op_closure_scheduled) {
983 GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE);
984 s->op_closure_scheduled = true;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700985 }
986 } else {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700987 s->ops_needed = true;
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700988 }
989 } else {
990 if (error != GRPC_ERROR_NONE) {
Vijay Pai4f0cd0e2017-09-22 23:34:43 -0700991 // Schedule op's closures that we didn't push to op state machine
Vijay Pai3d7d5f42017-05-04 10:02:24 -0700992 if (op->recv_initial_metadata) {
993 INPROC_LOG(
994 GPR_DEBUG,
995 "perform_stream_op error %p scheduling initial-metadata-ready %p",
996 s, error);
997 GRPC_CLOSURE_SCHED(
998 exec_ctx,
999 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1000 GRPC_ERROR_REF(error));
1001 }
1002 if (op->recv_message) {
1003 INPROC_LOG(
1004 GPR_DEBUG,
1005 "perform_stream_op error %p scheduling recv message-ready %p", s,
1006 error);
1007 GRPC_CLOSURE_SCHED(exec_ctx,
1008 op->payload->recv_message.recv_message_ready,
1009 GRPC_ERROR_REF(error));
1010 }
1011 }
1012 INPROC_LOG(GPR_DEBUG, "perform_stream_op %p scheduling on_complete %p", s,
1013 error);
1014 GRPC_CLOSURE_SCHED(exec_ctx, on_complete, GRPC_ERROR_REF(error));
1015 }
1016 if (needs_close) {
1017 close_other_side_locked(exec_ctx, s, "perform_stream_op:other_side");
1018 close_stream_locked(exec_ctx, s);
1019 }
1020 gpr_mu_unlock(mu);
1021 GRPC_ERROR_UNREF(error);
1022}
1023
Craig Tillerbaa14a92017-11-03 09:09:36 -07001024static void close_transport_locked(grpc_exec_ctx* exec_ctx,
1025 inproc_transport* t) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001026 INPROC_LOG(GPR_DEBUG, "close_transport %p %d", t, t->is_closed);
1027 grpc_connectivity_state_set(
1028 exec_ctx, &t->connectivity, GRPC_CHANNEL_SHUTDOWN,
1029 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Closing transport."),
1030 "close transport");
1031 if (!t->is_closed) {
1032 t->is_closed = true;
1033 /* Also end all streams on this transport */
Craig Tiller4782d922017-11-10 09:53:21 -08001034 while (t->stream_list != nullptr) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001035 // cancel_stream_locked also adjusts stream list
1036 cancel_stream_locked(
1037 exec_ctx, t->stream_list,
1038 grpc_error_set_int(
1039 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"),
1040 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
1041 }
1042 }
1043}
1044
Craig Tillerbaa14a92017-11-03 09:09:36 -07001045static void perform_transport_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
1046 grpc_transport_op* op) {
1047 inproc_transport* t = (inproc_transport*)gt;
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001048 INPROC_LOG(GPR_DEBUG, "perform_transport_op %p %p", t, op);
1049 gpr_mu_lock(&t->mu->mu);
1050 if (op->on_connectivity_state_change) {
1051 grpc_connectivity_state_notify_on_state_change(
1052 exec_ctx, &t->connectivity, op->connectivity_state,
1053 op->on_connectivity_state_change);
1054 }
1055 if (op->set_accept_stream) {
1056 t->accept_stream_cb = op->set_accept_stream_fn;
1057 t->accept_stream_data = op->set_accept_stream_user_data;
1058 }
1059 if (op->on_consumed) {
1060 GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
1061 }
1062
1063 bool do_close = false;
1064 if (op->goaway_error != GRPC_ERROR_NONE) {
1065 do_close = true;
1066 GRPC_ERROR_UNREF(op->goaway_error);
1067 }
1068 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1069 do_close = true;
1070 GRPC_ERROR_UNREF(op->disconnect_with_error);
1071 }
1072
1073 if (do_close) {
1074 close_transport_locked(exec_ctx, t);
1075 }
1076 gpr_mu_unlock(&t->mu->mu);
1077}
1078
Craig Tillerbaa14a92017-11-03 09:09:36 -07001079static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
1080 grpc_stream* gs,
1081 grpc_closure* then_schedule_closure) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001082 INPROC_LOG(GPR_DEBUG, "destroy_stream %p %p", gs, then_schedule_closure);
Craig Tillerbaa14a92017-11-03 09:09:36 -07001083 inproc_stream* s = (inproc_stream*)gs;
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001084 s->closure_at_destroy = then_schedule_closure;
1085 really_destroy_stream(exec_ctx, s);
1086}
1087
Craig Tillerbaa14a92017-11-03 09:09:36 -07001088static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) {
1089 inproc_transport* t = (inproc_transport*)gt;
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001090 INPROC_LOG(GPR_DEBUG, "destroy_transport %p", t);
1091 gpr_mu_lock(&t->mu->mu);
1092 close_transport_locked(exec_ctx, t);
1093 gpr_mu_unlock(&t->mu->mu);
1094 unref_transport(exec_ctx, t->other_side);
1095 unref_transport(exec_ctx, t);
1096}
1097
1098/*******************************************************************************
Yash Tibrewalbc130da2017-09-12 22:44:08 -07001099 * INTEGRATION GLUE
1100 */
1101
Craig Tillerbaa14a92017-11-03 09:09:36 -07001102static void set_pollset(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
1103 grpc_stream* gs, grpc_pollset* pollset) {
Yash Tibrewalbc130da2017-09-12 22:44:08 -07001104 // Nothing to do here
1105}
1106
Craig Tillerbaa14a92017-11-03 09:09:36 -07001107static void set_pollset_set(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
1108 grpc_stream* gs, grpc_pollset_set* pollset_set) {
Yash Tibrewalbc130da2017-09-12 22:44:08 -07001109 // Nothing to do here
1110}
1111
Craig Tillerbaa14a92017-11-03 09:09:36 -07001112static grpc_endpoint* get_endpoint(grpc_exec_ctx* exec_ctx, grpc_transport* t) {
Craig Tiller4782d922017-11-10 09:53:21 -08001113 return nullptr;
Yash Tibrewalbc130da2017-09-12 22:44:08 -07001114}
1115
1116/*******************************************************************************
1117 * GLOBAL INIT AND DESTROY
1118 */
Craig Tillerbaa14a92017-11-03 09:09:36 -07001119static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {}
Yash Tibrewalbc130da2017-09-12 22:44:08 -07001120
1121void grpc_inproc_transport_init(void) {
1122 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
Craig Tiller4782d922017-11-10 09:53:21 -08001123 GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr,
Yash Tibrewalbc130da2017-09-12 22:44:08 -07001124 grpc_schedule_on_exec_ctx);
Craig Tiller4782d922017-11-10 09:53:21 -08001125 g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
Yash Tibrewalbc130da2017-09-12 22:44:08 -07001126
1127 grpc_slice key_tmp = grpc_slice_from_static_string(":path");
1128 g_fake_path_key = grpc_slice_intern(key_tmp);
1129 grpc_slice_unref_internal(&exec_ctx, key_tmp);
1130
1131 g_fake_path_value = grpc_slice_from_static_string("/");
1132
1133 grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
1134 g_fake_auth_key = grpc_slice_intern(auth_tmp);
1135 grpc_slice_unref_internal(&exec_ctx, auth_tmp);
1136
1137 g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
1138 grpc_exec_ctx_finish(&exec_ctx);
1139}
1140
1141static const grpc_transport_vtable inproc_vtable = {
1142 sizeof(inproc_stream), "inproc", init_stream,
1143 set_pollset, set_pollset_set, perform_stream_op,
1144 perform_transport_op, destroy_stream, destroy_transport,
1145 get_endpoint};
1146
1147/*******************************************************************************
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001148 * Main inproc transport functions
1149 */
Craig Tillerbaa14a92017-11-03 09:09:36 -07001150static void inproc_transports_create(grpc_exec_ctx* exec_ctx,
1151 grpc_transport** server_transport,
1152 const grpc_channel_args* server_args,
1153 grpc_transport** client_transport,
1154 const grpc_channel_args* client_args) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001155 INPROC_LOG(GPR_DEBUG, "inproc_transports_create");
Craig Tillerbaa14a92017-11-03 09:09:36 -07001156 inproc_transport* st = (inproc_transport*)gpr_zalloc(sizeof(*st));
1157 inproc_transport* ct = (inproc_transport*)gpr_zalloc(sizeof(*ct));
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001158 // Share one lock between both sides since both sides get affected
Craig Tillerbaa14a92017-11-03 09:09:36 -07001159 st->mu = ct->mu = (shared_mu*)gpr_malloc(sizeof(*st->mu));
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001160 gpr_mu_init(&st->mu->mu);
1161 gpr_ref_init(&st->mu->refs, 2);
1162 st->base.vtable = &inproc_vtable;
1163 ct->base.vtable = &inproc_vtable;
1164 // Start each side of transport with 2 refs since they each have a ref
1165 // to the other
1166 gpr_ref_init(&st->refs, 2);
1167 gpr_ref_init(&ct->refs, 2);
1168 st->is_client = false;
1169 ct->is_client = true;
1170 grpc_connectivity_state_init(&st->connectivity, GRPC_CHANNEL_READY,
1171 "inproc_server");
1172 grpc_connectivity_state_init(&ct->connectivity, GRPC_CHANNEL_READY,
1173 "inproc_client");
1174 st->other_side = ct;
1175 ct->other_side = st;
Craig Tiller4782d922017-11-10 09:53:21 -08001176 st->stream_list = nullptr;
1177 ct->stream_list = nullptr;
Craig Tillerbaa14a92017-11-03 09:09:36 -07001178 *server_transport = (grpc_transport*)st;
1179 *client_transport = (grpc_transport*)ct;
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001180}
1181
Craig Tillerbaa14a92017-11-03 09:09:36 -07001182grpc_channel* grpc_inproc_channel_create(grpc_server* server,
1183 grpc_channel_args* args,
1184 void* reserved) {
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001185 GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1186 (server, args));
1187
1188 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1189
Craig Tillerbaa14a92017-11-03 09:09:36 -07001190 const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001191
1192 // Add a default authority channel argument for the client
1193
1194 grpc_arg default_authority_arg;
1195 default_authority_arg.type = GRPC_ARG_STRING;
Craig Tillerbaa14a92017-11-03 09:09:36 -07001196 default_authority_arg.key = (char*)GRPC_ARG_DEFAULT_AUTHORITY;
1197 default_authority_arg.value.string = (char*)"inproc.authority";
1198 grpc_channel_args* client_args =
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001199 grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
1200
Craig Tillerbaa14a92017-11-03 09:09:36 -07001201 grpc_transport* server_transport;
1202 grpc_transport* client_transport;
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001203 inproc_transports_create(&exec_ctx, &server_transport, server_args,
1204 &client_transport, client_args);
1205
Craig Tiller4782d922017-11-10 09:53:21 -08001206 grpc_server_setup_transport(&exec_ctx, server, server_transport, nullptr,
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001207 server_args);
Craig Tillerbaa14a92017-11-03 09:09:36 -07001208 grpc_channel* channel =
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001209 grpc_channel_create(&exec_ctx, "inproc", client_args,
1210 GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
1211
1212 // Free up created channel args
1213 grpc_channel_args_destroy(&exec_ctx, client_args);
1214
1215 // Now finish scheduled operations
1216 grpc_exec_ctx_finish(&exec_ctx);
1217
1218 return channel;
1219}
1220
Vijay Pai3d7d5f42017-05-04 10:02:24 -07001221void grpc_inproc_transport_shutdown(void) {
1222 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1223 grpc_slice_unref_internal(&exec_ctx, g_empty_slice);
1224 grpc_slice_unref_internal(&exec_ctx, g_fake_path_key);
1225 grpc_slice_unref_internal(&exec_ctx, g_fake_path_value);
1226 grpc_slice_unref_internal(&exec_ctx, g_fake_auth_key);
1227 grpc_slice_unref_internal(&exec_ctx, g_fake_auth_value);
1228 grpc_exec_ctx_finish(&exec_ctx);
1229}