blob: e01cb81a890e486fe25f3692af28aa102623d059 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/channel/connected_channel.h"
35
36#include <stdarg.h>
37#include <stdio.h>
38#include <string.h>
39
40#include "src/core/transport/transport.h"
41#include <grpc/byte_buffer.h>
42#include <grpc/support/alloc.h>
43#include <grpc/support/log.h>
44#include <grpc/support/slice_buffer.h>
45#include <grpc/support/string.h>
46
47#define MAX_BUFFER_LENGTH 8192
48/* the protobuf library will (by default) start warning at 100megs */
49#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
50
51typedef struct {
52 grpc_transport *transport;
53 gpr_uint32 max_message_length;
54} channel_data;
55
56typedef struct {
57 grpc_call_element *elem;
58 grpc_stream_op_buffer outgoing_sopb;
59
60 gpr_uint32 max_message_length;
61 gpr_uint32 incoming_message_length;
62 gpr_uint8 reading_message;
63 gpr_uint8 got_metadata_boundary;
64 gpr_uint8 got_read_close;
65 gpr_slice_buffer incoming_message;
66 gpr_uint32 outgoing_buffer_length_estimate;
67} call_data;
68
69/* We perform a small hack to locate transport data alongside the connected
70 channel data in call allocations, to allow everything to be pulled in minimal
71 cache line requests */
Craig Tillerecd49342015-01-18 14:36:47 -080072#define TRANSPORT_STREAM_FROM_CALL_DATA(calld) ((grpc_stream *)((calld)+1))
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080073#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
74 (((call_data *)(transport_stream)) - 1)
75
76/* Copy the contents of a byte buffer into stream ops */
77static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
78 grpc_stream_op_buffer *sopb) {
79 size_t i;
80
81 switch (byte_buffer->type) {
82 case GRPC_BB_SLICE_BUFFER:
83 for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
84 gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
85 gpr_slice_ref(slice);
86 grpc_sopb_add_slice(sopb, slice);
87 }
88 break;
89 }
90}
91
92/* Flush queued stream operations onto the transport */
93static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
94 call_data *calld, int is_last) {
95 size_t nops;
96
97 if (op->flags & GRPC_WRITE_BUFFER_HINT) {
98 if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) {
99 op->done_cb(op->user_data, GRPC_OP_OK);
100 return;
101 }
102 }
103
104 calld->outgoing_buffer_length_estimate = 0;
105 grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data);
106
107 nops = calld->outgoing_sopb.nops;
108 calld->outgoing_sopb.nops = 0;
109 grpc_transport_send_batch(chand->transport,
110 TRANSPORT_STREAM_FROM_CALL_DATA(calld),
111 calld->outgoing_sopb.ops, nops, is_last);
112}
113
114/* Intercept a call operation and either push it directly up or translate it
115 into transport stream operations */
ctillerf962f522014-12-10 15:28:27 -0800116static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
117 grpc_call_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800118 call_data *calld = elem->call_data;
119 channel_data *chand = elem->channel_data;
120 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
121 GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
122
123 switch (op->type) {
124 case GRPC_SEND_METADATA:
125 grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
126 grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
127 op->user_data);
128 break;
129 case GRPC_SEND_DEADLINE:
130 grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline);
131 grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
132 op->user_data);
133 break;
134 case GRPC_SEND_START:
ctillerd79b4862014-12-17 16:36:59 -0800135 grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800136 grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb);
137 end_bufferable_op(op, chand, calld, 0);
138 break;
139 case GRPC_SEND_MESSAGE:
140 grpc_sopb_add_begin_message(&calld->outgoing_sopb,
141 grpc_byte_buffer_length(op->data.message),
142 op->flags);
143 copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
144 calld->outgoing_buffer_length_estimate +=
145 (5 + grpc_byte_buffer_length(op->data.message));
146 end_bufferable_op(op, chand, calld, 0);
147 break;
148 case GRPC_SEND_FINISH:
149 end_bufferable_op(op, chand, calld, 1);
150 break;
151 case GRPC_REQUEST_DATA:
152 /* re-arm window updates if they were disarmed by finish_message */
153 grpc_transport_set_allow_window_updates(
154 chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1);
155 break;
156 case GRPC_CANCEL_OP:
157 grpc_transport_abort_stream(chand->transport,
158 TRANSPORT_STREAM_FROM_CALL_DATA(calld),
159 GRPC_STATUS_CANCELLED);
160 break;
161 default:
162 GPR_ASSERT(op->dir == GRPC_CALL_UP);
163 grpc_call_next_op(elem, op);
164 break;
165 }
166}
167
168/* Currently we assume all channel operations should just be pushed up. */
ctillerf962f522014-12-10 15:28:27 -0800169static void channel_op(grpc_channel_element *elem,
170 grpc_channel_element *from_elem, grpc_channel_op *op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800171 channel_data *chand = elem->channel_data;
172 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
173
174 switch (op->type) {
nnoble0c475f02014-12-05 15:37:39 -0800175 case GRPC_CHANNEL_GOAWAY:
176 grpc_transport_goaway(chand->transport, op->data.goaway.status,
177 op->data.goaway.message);
178 break;
179 case GRPC_CHANNEL_DISCONNECT:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800180 grpc_transport_close(chand->transport);
181 break;
182 default:
183 GPR_ASSERT(op->dir == GRPC_CALL_UP);
184 grpc_channel_next_op(elem, op);
185 break;
186 }
187}
188
189/* Constructor for call_data */
190static void init_call_elem(grpc_call_element *elem,
191 const void *server_transport_data) {
192 call_data *calld = elem->call_data;
193 channel_data *chand = elem->channel_data;
194 int r;
195
196 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
197 calld->elem = elem;
198 grpc_sopb_init(&calld->outgoing_sopb);
199
200 calld->reading_message = 0;
201 calld->got_metadata_boundary = 0;
202 calld->got_read_close = 0;
203 calld->outgoing_buffer_length_estimate = 0;
204 calld->max_message_length = chand->max_message_length;
205 gpr_slice_buffer_init(&calld->incoming_message);
206 r = grpc_transport_init_stream(chand->transport,
207 TRANSPORT_STREAM_FROM_CALL_DATA(calld),
208 server_transport_data);
209 GPR_ASSERT(r == 0);
210}
211
212/* Destructor for call_data */
213static void destroy_call_elem(grpc_call_element *elem) {
214 call_data *calld = elem->call_data;
215 channel_data *chand = elem->channel_data;
216 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
217 grpc_sopb_destroy(&calld->outgoing_sopb);
218 gpr_slice_buffer_destroy(&calld->incoming_message);
219 grpc_transport_destroy_stream(chand->transport,
220 TRANSPORT_STREAM_FROM_CALL_DATA(calld));
221}
222
223/* Constructor for channel_data */
224static void init_channel_elem(grpc_channel_element *elem,
225 const grpc_channel_args *args, grpc_mdctx *mdctx,
226 int is_first, int is_last) {
227 channel_data *cd = (channel_data *)elem->channel_data;
228 size_t i;
229 GPR_ASSERT(!is_first);
230 GPR_ASSERT(is_last);
231 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
232 cd->transport = NULL;
233
234 cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
235 if (args) {
236 for (i = 0; i < args->num_args; i++) {
237 if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) {
238 if (args->args[i].type != GRPC_ARG_INTEGER) {
239 gpr_log(GPR_ERROR, "%s ignored: it must be an integer",
240 GRPC_ARG_MAX_MESSAGE_LENGTH);
241 } else if (args->args[i].value.integer < 0) {
242 gpr_log(GPR_ERROR, "%s ignored: it must be >= 0",
243 GRPC_ARG_MAX_MESSAGE_LENGTH);
244 } else {
245 cd->max_message_length = args->args[i].value.integer;
246 }
247 }
248 }
249 }
250}
251
252/* Destructor for channel_data */
253static void destroy_channel_elem(grpc_channel_element *elem) {
254 channel_data *cd = (channel_data *)elem->channel_data;
255 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
256 grpc_transport_destroy(cd->transport);
257}
258
259const grpc_channel_filter grpc_connected_channel_filter = {
Craig Tillerecd49342015-01-18 14:36:47 -0800260 call_op, channel_op,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800261
Craig Tillerecd49342015-01-18 14:36:47 -0800262 sizeof(call_data), init_call_elem, destroy_call_elem,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800263
264 sizeof(channel_data), init_channel_elem, destroy_channel_elem,
265
266 "connected",
267};
268
269static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
270 grpc_stream *stream, size_t size_hint) {
271 return gpr_slice_malloc(size_hint);
272}
273
274/* Transport callback to accept a new stream... calls up to handle it */
275static void accept_stream(void *user_data, grpc_transport *transport,
276 const void *transport_server_data) {
277 grpc_channel_element *elem = user_data;
278 channel_data *chand = elem->channel_data;
279 grpc_channel_op op;
280
281 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
282 GPR_ASSERT(chand->transport == transport);
283
284 op.type = GRPC_ACCEPT_CALL;
285 op.dir = GRPC_CALL_UP;
286 op.data.accept_call.transport = transport;
287 op.data.accept_call.transport_server_data = transport_server_data;
ctillerf962f522014-12-10 15:28:27 -0800288 channel_op(elem, NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800289}
290
291static void recv_error(channel_data *chand, call_data *calld, int line,
ctiller0cd69562015-01-09 14:22:10 -0800292 const char *message) {
293 gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800294
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800295 if (chand->transport) {
296 grpc_transport_abort_stream(chand->transport,
297 TRANSPORT_STREAM_FROM_CALL_DATA(calld),
298 GRPC_STATUS_INVALID_ARGUMENT);
299 }
300}
301
302static void do_nothing(void *calldata, grpc_op_error error) {}
303
304static void done_message(void *user_data, grpc_op_error error) {
305 grpc_byte_buffer_destroy(user_data);
306}
307
308static void finish_message(channel_data *chand, call_data *calld) {
309 grpc_call_element *elem = calld->elem;
310 grpc_call_op call_op;
311 call_op.dir = GRPC_CALL_UP;
312 call_op.flags = 0;
313 /* if we got all the bytes for this message, call up the stack */
314 call_op.type = GRPC_RECV_MESSAGE;
315 call_op.done_cb = done_message;
316 /* TODO(ctiller): this could be a lot faster if coded directly */
317 call_op.user_data = call_op.data.message = grpc_byte_buffer_create(
318 calld->incoming_message.slices, calld->incoming_message.count);
319 gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
320
321 /* disable window updates until we get a request more from above */
322 grpc_transport_set_allow_window_updates(
323 chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0);
324
325 GPR_ASSERT(calld->incoming_message.count == 0);
326 calld->reading_message = 0;
327 grpc_call_next_op(elem, &call_op);
328}
329
330/* Handle incoming stream ops from the transport, translating them into
331 call_ops to pass up the call stack */
332static void recv_batch(void *user_data, grpc_transport *transport,
333 grpc_stream *stream, grpc_stream_op *ops,
334 size_t ops_count, grpc_stream_state final_state) {
335 call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream);
336 grpc_call_element *elem = calld->elem;
337 channel_data *chand = elem->channel_data;
338 grpc_stream_op *stream_op;
339 grpc_call_op call_op;
340 size_t i;
341 gpr_uint32 length;
342
343 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
344
345 for (i = 0; i < ops_count; i++) {
346 stream_op = ops + i;
347 switch (stream_op->type) {
348 case GRPC_OP_FLOW_CTL_CB:
349 gpr_log(GPR_ERROR,
350 "should not receive flow control ops from transport");
351 abort();
352 break;
353 case GRPC_NO_OP:
354 break;
355 case GRPC_OP_METADATA:
356 call_op.type = GRPC_RECV_METADATA;
357 call_op.dir = GRPC_CALL_UP;
358 call_op.flags = 0;
359 call_op.data.metadata = stream_op->data.metadata;
360 call_op.done_cb = do_nothing;
361 call_op.user_data = NULL;
362 grpc_call_next_op(elem, &call_op);
363 break;
364 case GRPC_OP_DEADLINE:
365 call_op.type = GRPC_RECV_DEADLINE;
366 call_op.dir = GRPC_CALL_UP;
367 call_op.flags = 0;
368 call_op.data.deadline = stream_op->data.deadline;
369 call_op.done_cb = do_nothing;
370 call_op.user_data = NULL;
371 grpc_call_next_op(elem, &call_op);
372 break;
373 case GRPC_OP_METADATA_BOUNDARY:
374 if (!calld->got_metadata_boundary) {
375 calld->got_metadata_boundary = 1;
376 call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA;
377 call_op.dir = GRPC_CALL_UP;
378 call_op.flags = 0;
379 call_op.done_cb = do_nothing;
380 call_op.user_data = NULL;
381 grpc_call_next_op(elem, &call_op);
382 }
383 break;
384 case GRPC_OP_BEGIN_MESSAGE:
385 /* can't begin a message when we're still reading a message */
386 if (calld->reading_message) {
ctiller0cd69562015-01-09 14:22:10 -0800387 char message[128];
388 sprintf(message,
389 "Message terminated early; read %d bytes, expected %d",
390 (int)calld->incoming_message.length,
391 (int)calld->incoming_message_length);
392 recv_error(chand, calld, __LINE__, message);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800393 return;
394 }
395 /* stash away parameters, and prepare for incoming slices */
396 length = stream_op->data.begin_message.length;
397 if (length > calld->max_message_length) {
ctiller0cd69562015-01-09 14:22:10 -0800398 char message[128];
399 sprintf(
400 message,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800401 "Maximum message length of %d exceeded by a message of length %d",
402 calld->max_message_length, length);
ctiller0cd69562015-01-09 14:22:10 -0800403 recv_error(chand, calld, __LINE__, message);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800404 } else if (length > 0) {
405 calld->reading_message = 1;
406 calld->incoming_message_length = length;
407 } else {
408 finish_message(chand, calld);
409 }
410 break;
411 case GRPC_OP_SLICE:
412 if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) {
413 gpr_slice_unref(stream_op->data.slice);
414 break;
415 }
416 /* we have to be reading a message to know what to do here */
417 if (!calld->reading_message) {
418 recv_error(chand, calld, __LINE__,
419 "Received payload data while not reading a message");
420 return;
421 }
422 /* append the slice to the incoming buffer */
423 gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice);
424 if (calld->incoming_message.length > calld->incoming_message_length) {
425 /* if we got too many bytes, complain */
ctiller0cd69562015-01-09 14:22:10 -0800426 char message[128];
427 sprintf(message,
428 "Receiving message overflow; read %d bytes, expected %d",
429 (int)calld->incoming_message.length,
430 (int)calld->incoming_message_length);
431 recv_error(chand, calld, __LINE__, message);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800432 return;
433 } else if (calld->incoming_message.length ==
434 calld->incoming_message_length) {
435 finish_message(chand, calld);
436 }
437 }
438 }
439 /* if the stream closed, then call up the stack to let it know */
440 if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED ||
441 final_state == GRPC_STREAM_CLOSED)) {
442 calld->got_read_close = 1;
443 if (calld->reading_message) {
ctiller0cd69562015-01-09 14:22:10 -0800444 char message[128];
445 sprintf(message, "Last message truncated; read %d bytes, expected %d",
446 (int)calld->incoming_message.length,
447 (int)calld->incoming_message_length);
448 recv_error(chand, calld, __LINE__, message);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800449 }
450 call_op.type = GRPC_RECV_HALF_CLOSE;
451 call_op.dir = GRPC_CALL_UP;
452 call_op.flags = 0;
453 call_op.done_cb = do_nothing;
454 call_op.user_data = NULL;
455 grpc_call_next_op(elem, &call_op);
456 }
457 if (final_state == GRPC_STREAM_CLOSED) {
458 call_op.type = GRPC_RECV_FINISH;
459 call_op.dir = GRPC_CALL_UP;
460 call_op.flags = 0;
461 call_op.done_cb = do_nothing;
462 call_op.user_data = NULL;
463 grpc_call_next_op(elem, &call_op);
464 }
465}
466
nnoble0c475f02014-12-05 15:37:39 -0800467static void transport_goaway(void *user_data, grpc_transport *transport,
468 grpc_status_code status, gpr_slice debug) {
469 /* transport got goaway ==> call up and handle it */
470 grpc_channel_element *elem = user_data;
471 channel_data *chand = elem->channel_data;
472 char *msg;
473 grpc_channel_op op;
474
475 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
476 GPR_ASSERT(chand->transport == transport);
477
478 msg = gpr_hexdump((const char *)GPR_SLICE_START_PTR(debug),
479 GPR_SLICE_LENGTH(debug), GPR_HEXDUMP_PLAINTEXT);
480 gpr_log(GPR_DEBUG, "got goaway: status=%d, message=%s", status, msg);
481 gpr_free(msg);
482
483 op.type = GRPC_TRANSPORT_GOAWAY;
484 op.dir = GRPC_CALL_UP;
485 op.data.goaway.status = status;
486 op.data.goaway.message = debug;
ctillerf962f522014-12-10 15:28:27 -0800487 channel_op(elem, NULL, &op);
nnoble0c475f02014-12-05 15:37:39 -0800488}
489
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800490static void transport_closed(void *user_data, grpc_transport *transport) {
491 /* transport was closed ==> call up and handle it */
492 grpc_channel_element *elem = user_data;
493 channel_data *chand = elem->channel_data;
494 grpc_channel_op op;
495
496 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
497 GPR_ASSERT(chand->transport == transport);
498
499 op.type = GRPC_TRANSPORT_CLOSED;
500 op.dir = GRPC_CALL_UP;
ctillerf962f522014-12-10 15:28:27 -0800501 channel_op(elem, NULL, &op);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800502}
503
504const grpc_transport_callbacks connected_channel_transport_callbacks = {
nnoble0c475f02014-12-05 15:37:39 -0800505 alloc_recv_buffer, accept_stream, recv_batch,
506 transport_goaway, transport_closed,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800507};
508
509grpc_transport_setup_result grpc_connected_channel_bind_transport(
510 grpc_channel_stack *channel_stack, grpc_transport *transport) {
511 /* Assumes that the connected channel filter is always the last filter
512 in a channel stack */
513 grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
514 channel_data *cd = (channel_data *)elem->channel_data;
515 grpc_transport_setup_result ret;
516 GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
517 GPR_ASSERT(cd->transport == NULL);
518 cd->transport = transport;
519
520 /* HACK(ctiller): increase call stack size for the channel to make space
521 for channel data. We need a cleaner (but performant) way to do this,
522 and I'm not sure what that is yet.
523 This is only "safe" because call stacks place no additional data after
524 the last call element, and the last call element MUST be the connected
525 channel. */
526 channel_stack->call_stack_size += grpc_transport_stream_size(transport);
527
528 ret.user_data = elem;
529 ret.callbacks = &connected_channel_transport_callbacks;
530 return ret;
531}