blob: 1ad4819fd12adcf335970f71df6aafe5b2fc7346 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/transport/chttp2_transport.h"
35
36#include <math.h>
37#include <stdio.h>
38#include <string.h>
39
Craig Tillerd50e5652015-02-24 16:46:22 -080040#include "src/core/debug/trace.h"
Craig Tiller485d7762015-01-23 12:54:05 -080041#include "src/core/support/string.h"
nnoble0c475f02014-12-05 15:37:39 -080042#include "src/core/transport/chttp2/frame_data.h"
43#include "src/core/transport/chttp2/frame_goaway.h"
44#include "src/core/transport/chttp2/frame_ping.h"
45#include "src/core/transport/chttp2/frame_rst_stream.h"
46#include "src/core/transport/chttp2/frame_settings.h"
47#include "src/core/transport/chttp2/frame_window_update.h"
48#include "src/core/transport/chttp2/hpack_parser.h"
49#include "src/core/transport/chttp2/http2_errors.h"
50#include "src/core/transport/chttp2/status_conversion.h"
51#include "src/core/transport/chttp2/stream_encoder.h"
52#include "src/core/transport/chttp2/stream_map.h"
53#include "src/core/transport/chttp2/timeout_encoding.h"
54#include "src/core/transport/transport_impl.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080055#include <grpc/support/alloc.h>
56#include <grpc/support/log.h>
57#include <grpc/support/slice_buffer.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080058#include <grpc/support/useful.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080059
ctiller493fbcc2014-12-07 15:09:10 -080060#define DEFAULT_WINDOW 65535
61#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080062#define MAX_WINDOW 0x7fffffffu
63
64#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
65#define CLIENT_CONNECT_STRLEN 24
66
67typedef struct transport transport;
68typedef struct stream stream;
69
Craig Tillerd50e5652015-02-24 16:46:22 -080070#define IF_TRACING(stmt) \
71 if (!(grpc_trace_bits & GRPC_TRACE_HTTP)) \
72 ; \
73 else \
74 stmt
75
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080076/* streams are kept in various linked lists depending on what things need to
77 happen to them... this enum labels each list */
78typedef enum {
79 /* streams that have pending writes */
80 WRITABLE = 0,
ctiller00297df2015-01-12 11:23:09 -080081 /* streams that have been selected to be written */
82 WRITING,
83 /* streams that have just been written, and included a close */
84 WRITTEN_CLOSED,
85 /* streams that have been cancelled and have some pending state updates
86 to perform */
87 CANCELLED,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080088 /* streams that want to send window updates */
89 WINDOW_UPDATE,
90 /* streams that are waiting to start because there are too many concurrent
91 streams on the connection */
92 WAITING_FOR_CONCURRENCY,
93 /* streams that want to callback the application */
94 PENDING_CALLBACKS,
95 /* streams that *ARE* calling back to the application */
96 EXECUTING_CALLBACKS,
97 STREAM_LIST_COUNT /* must be last */
98} stream_list_id;
99
100/* deframer state for the overall http2 stream of bytes */
101typedef enum {
102 /* prefix: one entry per http2 connection prefix byte */
103 DTS_CLIENT_PREFIX_0 = 0,
104 DTS_CLIENT_PREFIX_1,
105 DTS_CLIENT_PREFIX_2,
106 DTS_CLIENT_PREFIX_3,
107 DTS_CLIENT_PREFIX_4,
108 DTS_CLIENT_PREFIX_5,
109 DTS_CLIENT_PREFIX_6,
110 DTS_CLIENT_PREFIX_7,
111 DTS_CLIENT_PREFIX_8,
112 DTS_CLIENT_PREFIX_9,
113 DTS_CLIENT_PREFIX_10,
114 DTS_CLIENT_PREFIX_11,
115 DTS_CLIENT_PREFIX_12,
116 DTS_CLIENT_PREFIX_13,
117 DTS_CLIENT_PREFIX_14,
118 DTS_CLIENT_PREFIX_15,
119 DTS_CLIENT_PREFIX_16,
120 DTS_CLIENT_PREFIX_17,
121 DTS_CLIENT_PREFIX_18,
122 DTS_CLIENT_PREFIX_19,
123 DTS_CLIENT_PREFIX_20,
124 DTS_CLIENT_PREFIX_21,
125 DTS_CLIENT_PREFIX_22,
126 DTS_CLIENT_PREFIX_23,
127 /* frame header byte 0... */
128 /* must follow from the prefix states */
129 DTS_FH_0,
130 DTS_FH_1,
131 DTS_FH_2,
132 DTS_FH_3,
133 DTS_FH_4,
134 DTS_FH_5,
135 DTS_FH_6,
136 DTS_FH_7,
137 /* ... frame header byte 8 */
138 DTS_FH_8,
139 /* inside a http2 frame */
140 DTS_FRAME
141} deframe_transport_state;
142
143typedef struct {
144 stream *head;
145 stream *tail;
146} stream_list;
147
148typedef struct {
149 stream *next;
150 stream *prev;
151} stream_link;
152
153typedef enum {
154 ERROR_STATE_NONE,
155 ERROR_STATE_SEEN,
156 ERROR_STATE_NOTIFIED
157} error_state;
158
159/* We keep several sets of connection wide parameters */
160typedef enum {
161 /* The settings our peer has asked for (and we have acked) */
162 PEER_SETTINGS = 0,
163 /* The settings we'd like to have */
164 LOCAL_SETTINGS,
165 /* The settings we've published to our peer */
166 SENT_SETTINGS,
167 /* The settings the peer has acked */
168 ACKED_SETTINGS,
169 NUM_SETTING_SETS
170} setting_set;
171
172/* Outstanding ping request data */
173typedef struct {
174 gpr_uint8 id[8];
175 void (*cb)(void *user_data);
176 void *user_data;
177} outstanding_ping;
178
nnoble0c475f02014-12-05 15:37:39 -0800179typedef struct {
180 grpc_status_code status;
181 gpr_slice debug;
182} pending_goaway;
183
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800184struct transport {
185 grpc_transport base; /* must be first */
186 const grpc_transport_callbacks *cb;
187 void *cb_user_data;
188 grpc_endpoint *ep;
189 grpc_mdctx *metadata_context;
190 gpr_refcount refs;
191 gpr_uint8 is_client;
192
193 gpr_mu mu;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800194 gpr_cv cv;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800195
196 /* basic state management - what are we doing at the moment? */
197 gpr_uint8 reading;
198 gpr_uint8 writing;
199 gpr_uint8 calling_back;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800200 gpr_uint8 destroying;
Craig Tillerd75fe662015-02-21 07:30:49 -0800201 gpr_uint8 closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202 error_state error_state;
203
204 /* stream indexing */
205 gpr_uint32 next_stream_id;
nnoble0c475f02014-12-05 15:37:39 -0800206 gpr_uint32 last_incoming_stream_id;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800207
208 /* settings */
209 gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
ctiller493fbcc2014-12-07 15:09:10 -0800210 gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */
211 gpr_uint8 sent_local_settings; /* have local settings been sent? */
212 gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213
214 /* window management */
215 gpr_uint32 outgoing_window;
216 gpr_uint32 incoming_window;
ctiller493fbcc2014-12-07 15:09:10 -0800217 gpr_uint32 connection_window_target;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800218
219 /* deframing */
220 deframe_transport_state deframe_state;
221 gpr_uint8 incoming_frame_type;
222 gpr_uint8 incoming_frame_flags;
223 gpr_uint8 header_eof;
224 gpr_uint32 expect_continuation_stream_id;
225 gpr_uint32 incoming_frame_size;
226 gpr_uint32 incoming_stream_id;
227
228 /* hpack encoding */
229 grpc_chttp2_hpack_compressor hpack_compressor;
230
231 /* various parsers */
232 grpc_chttp2_hpack_parser hpack_parser;
233 /* simple one shot parsers */
234 union {
235 grpc_chttp2_window_update_parser window_update;
236 grpc_chttp2_settings_parser settings;
237 grpc_chttp2_ping_parser ping;
238 } simple_parsers;
239
nnoble0c475f02014-12-05 15:37:39 -0800240 /* goaway */
241 grpc_chttp2_goaway_parser goaway_parser;
242 pending_goaway *pending_goaways;
243 size_t num_pending_goaways;
244 size_t cap_pending_goaways;
245
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800246 /* state for a stream that's not yet been created */
247 grpc_stream_op_buffer new_stream_sopb;
248
Craig Tillercb818ba2015-01-29 17:08:01 -0800249 /* stream ops that need to be destroyed, but outside of the lock */
250 grpc_stream_op_buffer nuke_later_sopb;
251
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800252 /* active parser */
253 void *parser_data;
254 stream *incoming_stream;
255 grpc_chttp2_parse_error (*parser)(void *parser_user_data,
256 grpc_chttp2_parse_state *state,
257 gpr_slice slice, int is_last);
258
259 gpr_slice_buffer outbuf;
260 gpr_slice_buffer qbuf;
261
262 stream_list lists[STREAM_LIST_COUNT];
263 grpc_chttp2_stream_map stream_map;
264
265 /* metadata object cache */
266 grpc_mdstr *str_grpc_timeout;
267
268 /* pings */
269 outstanding_ping *pings;
270 size_t ping_count;
271 size_t ping_capacity;
272 gpr_int64 ping_counter;
273};
274
275struct stream {
276 gpr_uint32 id;
277
278 gpr_uint32 outgoing_window;
279 gpr_uint32 incoming_window;
ctiller00297df2015-01-12 11:23:09 -0800280 /* when the application requests writes be closed, the write_closed is
281 'queued'; when the close is flow controlled into the send path, we are
282 'sending' it; when the write has been performed it is 'sent' */
283 gpr_uint8 queued_write_closed;
284 gpr_uint8 sending_write_closed;
285 gpr_uint8 sent_write_closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800286 gpr_uint8 read_closed;
287 gpr_uint8 cancelled;
288 gpr_uint8 allow_window_updates;
289 gpr_uint8 published_close;
290
291 stream_link links[STREAM_LIST_COUNT];
292 gpr_uint8 included[STREAM_LIST_COUNT];
293
ctiller00297df2015-01-12 11:23:09 -0800294 /* sops from application */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800295 grpc_stream_op_buffer outgoing_sopb;
ctiller00297df2015-01-12 11:23:09 -0800296 /* sops that have passed flow control to be written */
297 grpc_stream_op_buffer writing_sopb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800298
299 grpc_chttp2_data_parser parser;
300
301 grpc_stream_state callback_state;
302 grpc_stream_op_buffer callback_sopb;
303};
304
305static const grpc_transport_vtable vtable;
306
307static void push_setting(transport *t, grpc_chttp2_setting_id id,
308 gpr_uint32 value);
309
310static int prepare_callbacks(transport *t);
Craig Tillerd1345de2015-02-24 21:55:20 -0800311static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
Craig Tiller748fe3f2015-03-02 07:48:50 -0800312static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313
314static int prepare_write(transport *t);
ctiller00297df2015-01-12 11:23:09 -0800315static void perform_write(transport *t, grpc_endpoint *ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800316
317static void lock(transport *t);
318static void unlock(transport *t);
319
320static void drop_connection(transport *t);
321static void end_all_the_calls(transport *t);
322
323static stream *stream_list_remove_head(transport *t, stream_list_id id);
324static void stream_list_remove(transport *t, stream *s, stream_list_id id);
325static void stream_list_add_tail(transport *t, stream *s, stream_list_id id);
326static void stream_list_join(transport *t, stream *s, stream_list_id id);
327
328static void cancel_stream_id(transport *t, gpr_uint32 id,
329 grpc_status_code local_status,
330 grpc_chttp2_error_code error_code, int send_rst);
331static void cancel_stream(transport *t, stream *s,
332 grpc_status_code local_status,
333 grpc_chttp2_error_code error_code, int send_rst);
ctiller00297df2015-01-12 11:23:09 -0800334static void finalize_cancellations(transport *t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800335static stream *lookup_stream(transport *t, gpr_uint32 id);
336static void remove_from_stream_map(transport *t, stream *s);
337static void maybe_start_some_streams(transport *t);
338
339static void become_skip_parser(transport *t);
340
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800341static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
342 grpc_endpoint_cb_status error);
343
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800344/*
345 * CONSTRUCTION/DESTRUCTION/REFCOUNTING
346 */
347
Craig Tiller9be83ee2015-02-18 14:16:15 -0800348static void destruct_transport(transport *t) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800349 size_t i;
350
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800351 gpr_mu_lock(&t->mu);
352
353 GPR_ASSERT(t->ep == NULL);
354
355 gpr_slice_buffer_destroy(&t->outbuf);
356 gpr_slice_buffer_destroy(&t->qbuf);
357 grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
358 grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
nnoble0c475f02014-12-05 15:37:39 -0800359 grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800360
361 grpc_mdstr_unref(t->str_grpc_timeout);
362
363 for (i = 0; i < STREAM_LIST_COUNT; i++) {
364 GPR_ASSERT(t->lists[i].head == NULL);
365 GPR_ASSERT(t->lists[i].tail == NULL);
366 }
367
368 GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
369
370 grpc_chttp2_stream_map_destroy(&t->stream_map);
371
372 gpr_mu_unlock(&t->mu);
373 gpr_mu_destroy(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800374 gpr_cv_destroy(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800375
376 /* callback remaining pings: they're not allowed to call into the transpot,
377 and maybe they hold resources that need to be freed */
378 for (i = 0; i < t->ping_count; i++) {
379 t->pings[i].cb(t->pings[i].user_data);
380 }
381 gpr_free(t->pings);
382
nnoble0c475f02014-12-05 15:37:39 -0800383 for (i = 0; i < t->num_pending_goaways; i++) {
384 gpr_slice_unref(t->pending_goaways[i].debug);
385 }
386 gpr_free(t->pending_goaways);
387
Craig Tiller8ed35ea2015-01-30 11:27:43 -0800388 grpc_sopb_destroy(&t->nuke_later_sopb);
389
Craig Tiller9be83ee2015-02-18 14:16:15 -0800390 grpc_mdctx_unref(t->metadata_context);
391
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800392 gpr_free(t);
393}
394
Craig Tiller9be83ee2015-02-18 14:16:15 -0800395static void unref_transport(transport *t) {
396 if (!gpr_unref(&t->refs)) return;
397 destruct_transport(t);
398}
399
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800400static void ref_transport(transport *t) { gpr_ref(&t->refs); }
401
402static void init_transport(transport *t, grpc_transport_setup_callback setup,
403 void *arg, const grpc_channel_args *channel_args,
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800404 grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
405 grpc_mdctx *mdctx, int is_client) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800406 size_t i;
407 int j;
408 grpc_transport_setup_result sr;
409
410 GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
411
412 t->base.vtable = &vtable;
413 t->ep = ep;
414 /* one ref is for destroy, the other for when ep becomes NULL */
415 gpr_ref_init(&t->refs, 2);
416 gpr_mu_init(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800417 gpr_cv_init(&t->cv);
Craig Tiller9be83ee2015-02-18 14:16:15 -0800418 grpc_mdctx_ref(mdctx);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800419 t->metadata_context = mdctx;
420 t->str_grpc_timeout =
421 grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
422 t->reading = 1;
423 t->writing = 0;
424 t->error_state = ERROR_STATE_NONE;
425 t->next_stream_id = is_client ? 1 : 2;
nnoble0c475f02014-12-05 15:37:39 -0800426 t->last_incoming_stream_id = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800427 t->destroying = 0;
Craig Tillerd75fe662015-02-21 07:30:49 -0800428 t->closed = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800429 t->is_client = is_client;
430 t->outgoing_window = DEFAULT_WINDOW;
431 t->incoming_window = DEFAULT_WINDOW;
ctiller493fbcc2014-12-07 15:09:10 -0800432 t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800433 t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
434 t->expect_continuation_stream_id = 0;
435 t->pings = NULL;
436 t->ping_count = 0;
437 t->ping_capacity = 0;
438 t->ping_counter = gpr_now().tv_nsec;
439 grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
nnoble0c475f02014-12-05 15:37:39 -0800440 grpc_chttp2_goaway_parser_init(&t->goaway_parser);
441 t->pending_goaways = NULL;
442 t->num_pending_goaways = 0;
443 t->cap_pending_goaways = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800444 gpr_slice_buffer_init(&t->outbuf);
445 gpr_slice_buffer_init(&t->qbuf);
Craig Tillercb818ba2015-01-29 17:08:01 -0800446 grpc_sopb_init(&t->nuke_later_sopb);
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800447 grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800448 if (is_client) {
449 gpr_slice_buffer_add(&t->qbuf,
450 gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
451 }
452 /* 8 is a random stab in the dark as to a good initial size: it's small enough
453 that it shouldn't waste memory for infrequently used connections, yet
454 large enough that the exponential growth should happen nicely when it's
455 needed.
456 TODO(ctiller): tune this */
457 grpc_chttp2_stream_map_init(&t->stream_map, 8);
458 memset(&t->lists, 0, sizeof(t->lists));
459
460 /* copy in initial settings to all setting sets */
461 for (i = 0; i < NUM_SETTING_SETS; i++) {
462 for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) {
463 t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
464 }
465 }
466 t->dirtied_local_settings = 1;
ctiller493fbcc2014-12-07 15:09:10 -0800467 /* Hack: it's common for implementations to assume 65536 bytes initial send
468 window -- this should by rights be 0 */
469 t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800470 t->sent_local_settings = 0;
471
472 /* configure http2 the way we like it */
473 if (t->is_client) {
474 push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
475 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
476 }
ctiller493fbcc2014-12-07 15:09:10 -0800477 push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800478
479 if (channel_args) {
480 for (i = 0; i < channel_args->num_args; i++) {
481 if (0 ==
482 strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
483 if (t->is_client) {
484 gpr_log(GPR_ERROR, "%s: is ignored on the client",
485 GRPC_ARG_MAX_CONCURRENT_STREAMS);
486 } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
487 gpr_log(GPR_ERROR, "%s: must be an integer",
488 GRPC_ARG_MAX_CONCURRENT_STREAMS);
489 } else {
490 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
491 channel_args->args[i].value.integer);
492 }
493 }
494 }
495 }
496
497 gpr_mu_lock(&t->mu);
498 t->calling_back = 1;
499 ref_transport(t);
500 gpr_mu_unlock(&t->mu);
501
502 sr = setup(arg, &t->base, t->metadata_context);
503
504 lock(t);
505 t->cb = sr.callbacks;
506 t->cb_user_data = sr.user_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800507 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800508 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800509 unlock(t);
Craig Tillerdcf9c0e2015-02-11 16:12:41 -0800510
511 ref_transport(t);
512 recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
513
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800514 unref_transport(t);
515}
516
517static void destroy_transport(grpc_transport *gt) {
518 transport *t = (transport *)gt;
519
Craig Tiller748fe3f2015-03-02 07:48:50 -0800520 lock(t);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800521 t->destroying = 1;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000522 /* Wait for pending stuff to finish.
523 We need to be not calling back to ensure that closed() gets a chance to
524 trigger if needed during unlock() before we die.
525 We need to be not writing as cancellation finalization may produce some
526 callbacks that NEED to be made to close out some streams when t->writing
527 becomes 0. */
528 while (t->calling_back || t->writing) {
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800529 gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
530 }
Craig Tiller748fe3f2015-03-02 07:48:50 -0800531 drop_connection(t);
532 unlock(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800533
Craig Tillerb9eb1802015-03-02 16:41:32 +0000534 lock(t);
535 GPR_ASSERT(!t->cb);
536 unlock(t);
537
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800538 unref_transport(t);
539}
540
541static void close_transport(grpc_transport *gt) {
542 transport *t = (transport *)gt;
543 gpr_mu_lock(&t->mu);
Craig Tillerd75fe662015-02-21 07:30:49 -0800544 GPR_ASSERT(!t->closed);
545 t->closed = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800546 if (t->ep) {
547 grpc_endpoint_shutdown(t->ep);
548 }
549 gpr_mu_unlock(&t->mu);
550}
551
nnoble0c475f02014-12-05 15:37:39 -0800552static void goaway(grpc_transport *gt, grpc_status_code status,
553 gpr_slice debug_data) {
554 transport *t = (transport *)gt;
555 lock(t);
556 grpc_chttp2_goaway_append(t->last_incoming_stream_id,
557 grpc_chttp2_grpc_status_to_http2_error(status),
558 debug_data, &t->qbuf);
559 unlock(t);
560}
561
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800562static int init_stream(grpc_transport *gt, grpc_stream *gs,
563 const void *server_data) {
564 transport *t = (transport *)gt;
565 stream *s = (stream *)gs;
566
567 ref_transport(t);
568
569 if (!server_data) {
570 lock(t);
571 s->id = 0;
572 } else {
Craig Tillerd50e5652015-02-24 16:46:22 -0800573 s->id = (gpr_uint32)(gpr_uintptr) server_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800574 t->incoming_stream = s;
575 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
576 }
577
ctiller493fbcc2014-12-07 15:09:10 -0800578 s->outgoing_window =
579 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
580 s->incoming_window =
581 t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
ctiller00297df2015-01-12 11:23:09 -0800582 s->queued_write_closed = 0;
583 s->sending_write_closed = 0;
584 s->sent_write_closed = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800585 s->read_closed = 0;
586 s->cancelled = 0;
587 s->allow_window_updates = 0;
588 s->published_close = 0;
589 memset(&s->links, 0, sizeof(s->links));
590 memset(&s->included, 0, sizeof(s->included));
591 grpc_sopb_init(&s->outgoing_sopb);
ctiller00297df2015-01-12 11:23:09 -0800592 grpc_sopb_init(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800593 grpc_sopb_init(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800594 grpc_chttp2_data_parser_init(&s->parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800595
596 if (!server_data) {
597 unlock(t);
598 }
599
600 return 0;
601}
602
Craig Tillercb818ba2015-01-29 17:08:01 -0800603static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) {
604 grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
605 sopb->nops = 0;
606}
607
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800608static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
609 transport *t = (transport *)gt;
610 stream *s = (stream *)gs;
611 size_t i;
612
613 gpr_mu_lock(&t->mu);
614
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800615 /* stop parsing if we're currently parsing this stream */
616 if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id &&
617 s->id != 0) {
618 become_skip_parser(t);
619 }
620
621 for (i = 0; i < STREAM_LIST_COUNT; i++) {
622 stream_list_remove(t, s, i);
623 }
624 remove_from_stream_map(t, s);
625
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800626 gpr_mu_unlock(&t->mu);
627
628 grpc_sopb_destroy(&s->outgoing_sopb);
ctiller00297df2015-01-12 11:23:09 -0800629 grpc_sopb_destroy(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800630 grpc_sopb_destroy(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800631 grpc_chttp2_data_parser_destroy(&s->parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800632
633 unref_transport(t);
634}
635
636/*
637 * LIST MANAGEMENT
638 */
639
ctiller00297df2015-01-12 11:23:09 -0800640static int stream_list_empty(transport *t, stream_list_id id) {
641 return t->lists[id].head == NULL;
642}
643
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800644static stream *stream_list_remove_head(transport *t, stream_list_id id) {
645 stream *s = t->lists[id].head;
646 if (s) {
647 stream *new_head = s->links[id].next;
648 GPR_ASSERT(s->included[id]);
649 if (new_head) {
650 t->lists[id].head = new_head;
651 new_head->links[id].prev = NULL;
652 } else {
653 t->lists[id].head = NULL;
654 t->lists[id].tail = NULL;
655 }
656 s->included[id] = 0;
657 }
658 return s;
659}
660
661static void stream_list_remove(transport *t, stream *s, stream_list_id id) {
662 if (!s->included[id]) return;
663 s->included[id] = 0;
664 if (s->links[id].prev) {
665 s->links[id].prev->links[id].next = s->links[id].next;
666 } else {
667 GPR_ASSERT(t->lists[id].head == s);
668 t->lists[id].head = s->links[id].next;
669 }
670 if (s->links[id].next) {
671 s->links[id].next->links[id].prev = s->links[id].prev;
672 } else {
673 t->lists[id].tail = s->links[id].prev;
674 }
675}
676
677static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
678 stream *old_tail;
679 GPR_ASSERT(!s->included[id]);
680 old_tail = t->lists[id].tail;
681 s->links[id].next = NULL;
682 s->links[id].prev = old_tail;
683 if (old_tail) {
684 old_tail->links[id].next = s;
685 } else {
686 s->links[id].prev = NULL;
687 t->lists[id].head = s;
688 }
689 t->lists[id].tail = s;
690 s->included[id] = 1;
691}
692
693static void stream_list_join(transport *t, stream *s, stream_list_id id) {
Craig Tillerb9eb1802015-03-02 16:41:32 +0000694 if (id == PENDING_CALLBACKS) GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800695 if (s->included[id]) {
696 return;
697 }
698 stream_list_add_tail(t, s, id);
699}
700
701static void remove_from_stream_map(transport *t, stream *s) {
702 if (s->id == 0) return;
703 if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
704 maybe_start_some_streams(t);
705 }
706}
707
708/*
709 * LOCK MANAGEMENT
710 */
711
712/* We take a transport-global lock in response to calls coming in from above,
713 and in response to data being received from below. New data to be written
714 is always queued, as are callbacks to process data. During unlock() we
715 check our todo lists and initiate callbacks and flush writes. */
716
717static void lock(transport *t) { gpr_mu_lock(&t->mu); }
718
719static void unlock(transport *t) {
720 int start_write = 0;
721 int perform_callbacks = 0;
722 int call_closed = 0;
nnoble0c475f02014-12-05 15:37:39 -0800723 int num_goaways = 0;
724 int i;
725 pending_goaway *goaways = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800726 grpc_endpoint *ep = t->ep;
Craig Tillere3018e62015-02-13 17:05:19 -0800727 grpc_stream_op_buffer nuke_now;
Craig Tillerd1345de2015-02-24 21:55:20 -0800728 const grpc_transport_callbacks *cb = t->cb;
Craig Tiller06059952015-02-18 08:34:56 -0800729
Craig Tillere3018e62015-02-13 17:05:19 -0800730 grpc_sopb_init(&nuke_now);
731 if (t->nuke_later_sopb.nops) {
732 grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
Craig Tillercb818ba2015-01-29 17:08:01 -0800733 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800734
735 /* see if we need to trigger a write - and if so, get the data ready */
736 if (ep && !t->writing) {
737 t->writing = start_write = prepare_write(t);
738 if (start_write) {
739 ref_transport(t);
740 }
741 }
742
ctiller00297df2015-01-12 11:23:09 -0800743 if (!t->writing) {
744 finalize_cancellations(t);
745 }
746
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800747 /* gather any callbacks that need to be made */
Craig Tillerd1345de2015-02-24 21:55:20 -0800748 if (!t->calling_back && cb) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800749 perform_callbacks = prepare_callbacks(t);
750 if (perform_callbacks) {
751 t->calling_back = 1;
752 }
Craig Tillerb9eb1802015-03-02 16:41:32 +0000753 if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800754 call_closed = 1;
755 t->calling_back = 1;
Craig Tillerd1345de2015-02-24 21:55:20 -0800756 t->cb = NULL; /* no more callbacks */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800757 t->error_state = ERROR_STATE_NOTIFIED;
758 }
nnoble0c475f02014-12-05 15:37:39 -0800759 if (t->num_pending_goaways) {
760 goaways = t->pending_goaways;
761 num_goaways = t->num_pending_goaways;
762 t->pending_goaways = NULL;
763 t->num_pending_goaways = 0;
ctiller82e275f2014-12-12 08:43:28 -0800764 t->cap_pending_goaways = 0;
nnoble0c475f02014-12-05 15:37:39 -0800765 t->calling_back = 1;
766 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800767 }
768
nnoble0c475f02014-12-05 15:37:39 -0800769 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800770 ref_transport(t);
771 }
772
773 /* finally unlock */
774 gpr_mu_unlock(&t->mu);
775
776 /* perform some callbacks if necessary */
nnoble0c475f02014-12-05 15:37:39 -0800777 for (i = 0; i < num_goaways; i++) {
Craig Tillerd1345de2015-02-24 21:55:20 -0800778 cb->goaway(t->cb_user_data, &t->base, goaways[i].status,
779 goaways[i].debug);
nnoble0c475f02014-12-05 15:37:39 -0800780 }
781
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800782 if (perform_callbacks) {
Craig Tillerd1345de2015-02-24 21:55:20 -0800783 run_callbacks(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800784 }
785
786 if (call_closed) {
Craig Tiller748fe3f2015-03-02 07:48:50 -0800787 call_cb_closed(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800788 }
789
790 /* write some bytes if necessary */
ctiller00297df2015-01-12 11:23:09 -0800791 if (start_write) {
792 /* ultimately calls unref_transport(t); and clears t->writing */
793 perform_write(t, ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800794 }
795
nnoble0c475f02014-12-05 15:37:39 -0800796 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800797 lock(t);
798 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800799 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800800 unlock(t);
801 unref_transport(t);
802 }
nnoble0c475f02014-12-05 15:37:39 -0800803
Craig Tillere3018e62015-02-13 17:05:19 -0800804 grpc_sopb_destroy(&nuke_now);
Craig Tillercb818ba2015-01-29 17:08:01 -0800805
nnoble0c475f02014-12-05 15:37:39 -0800806 gpr_free(goaways);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800807}
808
809/*
810 * OUTPUT PROCESSING
811 */
812
813static void push_setting(transport *t, grpc_chttp2_setting_id id,
814 gpr_uint32 value) {
815 const grpc_chttp2_setting_parameters *sp =
816 &grpc_chttp2_settings_parameters[id];
817 gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
818 if (use_value != value) {
819 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
820 value, use_value);
821 }
822 if (use_value != t->settings[LOCAL_SETTINGS][id]) {
823 t->settings[LOCAL_SETTINGS][id] = use_value;
824 t->dirtied_local_settings = 1;
825 }
826}
827
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800828static int prepare_write(transport *t) {
829 stream *s;
830 gpr_slice_buffer tempbuf;
ctiller00297df2015-01-12 11:23:09 -0800831 gpr_uint32 window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800832
833 /* simple writes are queued to qbuf, and flushed here */
834 tempbuf = t->qbuf;
835 t->qbuf = t->outbuf;
836 t->outbuf = tempbuf;
837 GPR_ASSERT(t->qbuf.count == 0);
838
839 if (t->dirtied_local_settings && !t->sent_local_settings) {
840 gpr_slice_buffer_add(
ctiller493fbcc2014-12-07 15:09:10 -0800841 &t->outbuf, grpc_chttp2_settings_create(
842 t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
843 t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
844 t->force_send_settings = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800845 t->dirtied_local_settings = 0;
846 t->sent_local_settings = 1;
847 }
848
849 /* for each stream that's become writable, frame it's data (according to
850 available window sizes) and add to the output buffer */
851 while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE))) {
ctiller00297df2015-01-12 11:23:09 -0800852 window_delta = grpc_chttp2_preencode(
853 s->outgoing_sopb.ops, &s->outgoing_sopb.nops,
854 GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
855 t->outgoing_window -= window_delta;
856 s->outgoing_window -= window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800857
ctiller00297df2015-01-12 11:23:09 -0800858 s->sending_write_closed =
859 s->queued_write_closed && s->outgoing_sopb.nops == 0;
860 if (s->writing_sopb.nops > 0 || s->sending_write_closed) {
861 stream_list_join(t, s, WRITING);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800862 }
863
864 /* if there are still writes to do and the stream still has window
865 available, then schedule a further write */
866 if (s->outgoing_sopb.nops && s->outgoing_window) {
867 GPR_ASSERT(!t->outgoing_window);
868 stream_list_add_tail(t, s, WRITABLE);
869 }
870 }
871
872 /* for each stream that wants to update its window, add that window here */
873 while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
ctiller00297df2015-01-12 11:23:09 -0800874 window_delta =
ctiller493fbcc2014-12-07 15:09:10 -0800875 t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
876 s->incoming_window;
ctiller00297df2015-01-12 11:23:09 -0800877 if (!s->read_closed && window_delta) {
878 gpr_slice_buffer_add(
879 &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
880 s->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800881 }
882 }
883
884 /* if the transport is ready to send a window update, do so here also */
ctiller493fbcc2014-12-07 15:09:10 -0800885 if (t->incoming_window < t->connection_window_target * 3 / 4) {
ctiller00297df2015-01-12 11:23:09 -0800886 window_delta = t->connection_window_target - t->incoming_window;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800887 gpr_slice_buffer_add(&t->outbuf,
ctiller00297df2015-01-12 11:23:09 -0800888 grpc_chttp2_window_update_create(0, window_delta));
889 t->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800890 }
891
ctiller00297df2015-01-12 11:23:09 -0800892 return t->outbuf.length > 0 || !stream_list_empty(t, WRITING);
893}
894
895static void finalize_outbuf(transport *t) {
896 stream *s;
897
898 while ((s = stream_list_remove_head(t, WRITING))) {
899 grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
900 s->sending_write_closed, s->id, &t->hpack_compressor,
901 &t->outbuf);
902 s->writing_sopb.nops = 0;
903 if (s->sending_write_closed) {
904 stream_list_join(t, s, WRITTEN_CLOSED);
905 }
906 }
907}
908
909static void finish_write_common(transport *t, int success) {
910 stream *s;
911
912 lock(t);
913 if (!success) {
914 drop_connection(t);
915 }
916 while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
917 s->sent_write_closed = 1;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000918 if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
ctiller00297df2015-01-12 11:23:09 -0800919 }
920 t->outbuf.count = 0;
921 t->outbuf.length = 0;
922 /* leave the writing flag up on shutdown to prevent further writes in unlock()
923 from starting */
924 t->writing = 0;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000925 if (t->destroying) {
926 gpr_cv_signal(&t->cv);
927 }
ctiller00297df2015-01-12 11:23:09 -0800928 if (!t->reading) {
929 grpc_endpoint_destroy(t->ep);
930 t->ep = NULL;
ctiller00297df2015-01-12 11:23:09 -0800931 unref_transport(t); /* safe because we'll still have the ref for write */
932 }
933 unlock(t);
934
935 unref_transport(t);
936}
937
938static void finish_write(void *tp, grpc_endpoint_cb_status error) {
939 transport *t = tp;
940 finish_write_common(t, error == GRPC_ENDPOINT_CB_OK);
941}
942
943static void perform_write(transport *t, grpc_endpoint *ep) {
944 finalize_outbuf(t);
945
946 GPR_ASSERT(t->outbuf.count > 0);
947
948 switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
949 finish_write, t)) {
950 case GRPC_ENDPOINT_WRITE_DONE:
951 finish_write_common(t, 1);
952 break;
953 case GRPC_ENDPOINT_WRITE_ERROR:
954 finish_write_common(t, 0);
955 break;
956 case GRPC_ENDPOINT_WRITE_PENDING:
957 break;
958 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800959}
960
961static void maybe_start_some_streams(transport *t) {
962 while (
963 grpc_chttp2_stream_map_size(&t->stream_map) <
964 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
965 stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
966 if (!s) break;
967
968 GPR_ASSERT(s->id == 0);
969 s->id = t->next_stream_id;
970 t->next_stream_id += 2;
971 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
972 stream_list_join(t, s, WRITABLE);
973 }
974}
975
976static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
977 size_t ops_count, int is_last) {
978 transport *t = (transport *)gt;
979 stream *s = (stream *)gs;
980
981 lock(t);
982
983 if (is_last) {
ctiller00297df2015-01-12 11:23:09 -0800984 s->queued_write_closed = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800985 }
986 if (!s->cancelled) {
987 grpc_sopb_append(&s->outgoing_sopb, ops, ops_count);
ctiller00297df2015-01-12 11:23:09 -0800988 if (s->id == 0) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800989 stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
990 maybe_start_some_streams(t);
ctiller00297df2015-01-12 11:23:09 -0800991 } else {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800992 stream_list_join(t, s, WRITABLE);
993 }
994 } else {
Craig Tillercefb00e2015-02-03 11:42:37 -0800995 grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800996 }
997 if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed) {
998 stream_list_join(t, s, PENDING_CALLBACKS);
999 }
1000
1001 unlock(t);
1002}
1003
1004static void abort_stream(grpc_transport *gt, grpc_stream *gs,
1005 grpc_status_code status) {
1006 transport *t = (transport *)gt;
1007 stream *s = (stream *)gs;
1008
1009 lock(t);
1010 cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status),
1011 1);
1012 unlock(t);
1013}
1014
1015static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
1016 void *user_data) {
1017 transport *t = (transport *)gt;
1018 outstanding_ping *p;
1019
1020 lock(t);
1021 if (t->ping_capacity == t->ping_count) {
1022 t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2);
1023 t->pings =
1024 gpr_realloc(t->pings, sizeof(outstanding_ping) * t->ping_capacity);
1025 }
1026 p = &t->pings[t->ping_count++];
nnoble8f4e42c2014-12-11 16:36:46 -08001027 p->id[0] = (t->ping_counter >> 56) & 0xff;
1028 p->id[1] = (t->ping_counter >> 48) & 0xff;
1029 p->id[2] = (t->ping_counter >> 40) & 0xff;
1030 p->id[3] = (t->ping_counter >> 32) & 0xff;
1031 p->id[4] = (t->ping_counter >> 24) & 0xff;
1032 p->id[5] = (t->ping_counter >> 16) & 0xff;
1033 p->id[6] = (t->ping_counter >> 8) & 0xff;
1034 p->id[7] = t->ping_counter & 0xff;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001035 p->cb = cb;
1036 p->user_data = user_data;
1037 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
1038 unlock(t);
1039}
1040
1041/*
1042 * INPUT PROCESSING
1043 */
1044
ctiller00297df2015-01-12 11:23:09 -08001045static void finalize_cancellations(transport *t) {
1046 stream *s;
1047
1048 while ((s = stream_list_remove_head(t, CANCELLED))) {
1049 s->read_closed = 1;
1050 s->sent_write_closed = 1;
1051 stream_list_join(t, s, PENDING_CALLBACKS);
1052 }
1053}
1054
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001055static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
1056 grpc_status_code local_status,
1057 grpc_chttp2_error_code error_code,
1058 int send_rst) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001059 int had_outgoing;
Craig Tiller8b433a22015-01-23 14:47:07 -08001060 char buffer[GPR_LTOA_MIN_BUFSIZE];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001061
1062 if (s) {
1063 /* clear out any unreported input & output: nobody cares anymore */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001064 had_outgoing = s->outgoing_sopb.nops != 0;
Craig Tillercb818ba2015-01-29 17:08:01 -08001065 schedule_nuke_sopb(t, &s->parser.incoming_sopb);
1066 schedule_nuke_sopb(t, &s->outgoing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001067 if (s->cancelled) {
1068 send_rst = 0;
ctiller00297df2015-01-12 11:23:09 -08001069 } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001070 s->cancelled = 1;
ctiller00297df2015-01-12 11:23:09 -08001071 stream_list_join(t, s, CANCELLED);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001072
Craig Tillera7ed5d92015-01-23 11:30:16 -08001073 gpr_ltoa(local_status, buffer);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001074 grpc_sopb_add_metadata(
1075 &s->parser.incoming_sopb,
1076 grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
1077
1078 stream_list_join(t, s, PENDING_CALLBACKS);
1079 }
1080 }
1081 if (!id) send_rst = 0;
1082 if (send_rst) {
1083 gpr_slice_buffer_add(&t->qbuf,
1084 grpc_chttp2_rst_stream_create(id, error_code));
1085 }
1086}
1087
1088static void cancel_stream_id(transport *t, gpr_uint32 id,
1089 grpc_status_code local_status,
1090 grpc_chttp2_error_code error_code, int send_rst) {
1091 cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
1092 send_rst);
1093}
1094
1095static void cancel_stream(transport *t, stream *s,
1096 grpc_status_code local_status,
1097 grpc_chttp2_error_code error_code, int send_rst) {
1098 cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
1099}
1100
1101static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
1102 cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
1103 GRPC_CHTTP2_INTERNAL_ERROR, 0);
1104}
1105
1106static void end_all_the_calls(transport *t) {
1107 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
1108}
1109
1110static void drop_connection(transport *t) {
1111 if (t->error_state == ERROR_STATE_NONE) {
1112 t->error_state = ERROR_STATE_SEEN;
1113 }
1114 end_all_the_calls(t);
1115}
1116
1117static void maybe_join_window_updates(transport *t, stream *s) {
ctiller493fbcc2014-12-07 15:09:10 -08001118 if (s->allow_window_updates &&
1119 s->incoming_window <
1120 t->settings[LOCAL_SETTINGS]
1121 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
1122 3 / 4) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001123 stream_list_join(t, s, WINDOW_UPDATE);
1124 }
1125}
1126
1127static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp,
1128 int allow) {
1129 transport *t = (transport *)tp;
1130 stream *s = (stream *)sp;
1131
1132 lock(t);
1133 s->allow_window_updates = allow;
1134 if (allow) {
1135 maybe_join_window_updates(t, s);
1136 } else {
1137 stream_list_remove(t, s, WINDOW_UPDATE);
1138 }
1139 unlock(t);
1140}
1141
1142static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
1143 if (t->incoming_frame_size > t->incoming_window) {
1144 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1145 t->incoming_frame_size, t->incoming_window);
1146 return GRPC_CHTTP2_CONNECTION_ERROR;
1147 }
1148
1149 if (t->incoming_frame_size > s->incoming_window) {
1150 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1151 t->incoming_frame_size, s->incoming_window);
1152 return GRPC_CHTTP2_CONNECTION_ERROR;
1153 }
1154
1155 t->incoming_window -= t->incoming_frame_size;
1156 s->incoming_window -= t->incoming_frame_size;
1157
1158 /* if the stream incoming window is getting low, schedule an update */
1159 maybe_join_window_updates(t, s);
1160
1161 return GRPC_CHTTP2_PARSE_OK;
1162}
1163
1164static stream *lookup_stream(transport *t, gpr_uint32 id) {
1165 return grpc_chttp2_stream_map_find(&t->stream_map, id);
1166}
1167
1168static grpc_chttp2_parse_error skip_parser(void *parser,
1169 grpc_chttp2_parse_state *st,
1170 gpr_slice slice, int is_last) {
1171 return GRPC_CHTTP2_PARSE_OK;
1172}
1173
1174static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
1175
1176static int init_skip_frame(transport *t, int is_header) {
1177 if (is_header) {
1178 int is_eoh = t->expect_continuation_stream_id != 0;
1179 t->parser = grpc_chttp2_header_parser_parse;
1180 t->parser_data = &t->hpack_parser;
1181 t->hpack_parser.on_header = skip_header;
1182 t->hpack_parser.on_header_user_data = NULL;
1183 t->hpack_parser.is_boundary = is_eoh;
1184 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1185 } else {
1186 t->parser = skip_parser;
1187 }
1188 return 1;
1189}
1190
1191static void become_skip_parser(transport *t) {
1192 init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse);
1193}
1194
1195static int init_data_frame_parser(transport *t) {
1196 stream *s = lookup_stream(t, t->incoming_stream_id);
1197 grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
1198 if (!s || s->read_closed) return init_skip_frame(t, 0);
1199 if (err == GRPC_CHTTP2_PARSE_OK) {
1200 err = update_incoming_window(t, s);
1201 }
1202 if (err == GRPC_CHTTP2_PARSE_OK) {
1203 err = grpc_chttp2_data_parser_begin_frame(&s->parser,
1204 t->incoming_frame_flags);
1205 }
1206 switch (err) {
1207 case GRPC_CHTTP2_PARSE_OK:
1208 t->incoming_stream = s;
1209 t->parser = grpc_chttp2_data_parser_parse;
1210 t->parser_data = &s->parser;
1211 return 1;
1212 case GRPC_CHTTP2_STREAM_ERROR:
1213 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1214 GRPC_CHTTP2_INTERNAL_ERROR),
1215 GRPC_CHTTP2_INTERNAL_ERROR, 1);
1216 return init_skip_frame(t, 0);
1217 case GRPC_CHTTP2_CONNECTION_ERROR:
1218 drop_connection(t);
1219 return 0;
1220 }
1221 gpr_log(GPR_ERROR, "should never reach here");
1222 abort();
1223 return 0;
1224}
1225
1226static void free_timeout(void *p) { gpr_free(p); }
1227
1228static void on_header(void *tp, grpc_mdelem *md) {
1229 transport *t = tp;
1230 stream *s = t->incoming_stream;
1231
1232 GPR_ASSERT(s);
Craig Tillerd50e5652015-02-24 16:46:22 -08001233
1234 IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
1235 grpc_mdstr_as_c_string(md->key),
1236 grpc_mdstr_as_c_string(md->value)));
1237
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001238 stream_list_join(t, s, PENDING_CALLBACKS);
1239 if (md->key == t->str_grpc_timeout) {
1240 gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
1241 if (!cached_timeout) {
1242 /* not already parsed: parse it now, and store the result away */
1243 cached_timeout = gpr_malloc(sizeof(gpr_timespec));
1244 if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value),
1245 cached_timeout)) {
1246 gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
1247 grpc_mdstr_as_c_string(md->value));
1248 *cached_timeout = gpr_inf_future;
1249 }
1250 grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
1251 }
1252 grpc_sopb_add_deadline(&s->parser.incoming_sopb,
1253 gpr_time_add(gpr_now(), *cached_timeout));
1254 grpc_mdelem_unref(md);
1255 } else {
1256 grpc_sopb_add_metadata(&s->parser.incoming_sopb, md);
1257 }
1258}
1259
1260static int init_header_frame_parser(transport *t, int is_continuation) {
1261 int is_eoh =
1262 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
1263 stream *s;
1264
1265 if (is_eoh) {
1266 t->expect_continuation_stream_id = 0;
1267 } else {
1268 t->expect_continuation_stream_id = t->incoming_stream_id;
1269 }
1270
1271 if (!is_continuation) {
1272 t->header_eof =
1273 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
1274 }
1275
1276 /* could be a new stream or an existing stream */
1277 s = lookup_stream(t, t->incoming_stream_id);
1278 if (!s) {
1279 if (is_continuation) {
1280 gpr_log(GPR_ERROR, "stream disbanded before CONTINUATION received");
1281 return init_skip_frame(t, 1);
1282 }
1283 if (t->is_client) {
1284 if ((t->incoming_stream_id & 1) &&
1285 t->incoming_stream_id < t->next_stream_id) {
1286 /* this is an old (probably cancelled) stream */
1287 } else {
1288 gpr_log(GPR_ERROR, "ignoring new stream creation on client");
1289 }
1290 return init_skip_frame(t, 1);
nnoble0c475f02014-12-05 15:37:39 -08001291 } else if (t->last_incoming_stream_id > t->incoming_stream_id) {
1292 gpr_log(GPR_ERROR,
1293 "ignoring out of order new stream request on server; last stream "
1294 "id=%d, new stream id=%d",
1295 t->last_incoming_stream_id, t->incoming_stream);
1296 return init_skip_frame(t, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001297 }
1298 t->incoming_stream = NULL;
1299 /* if stream is accepted, we set incoming_stream in init_stream */
1300 t->cb->accept_stream(t->cb_user_data, &t->base,
Craig Tillerd50e5652015-02-24 16:46:22 -08001301 (void *)(gpr_uintptr) t->incoming_stream_id);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001302 s = t->incoming_stream;
1303 if (!s) {
1304 gpr_log(GPR_ERROR, "stream not accepted");
1305 return init_skip_frame(t, 1);
1306 }
1307 } else {
1308 t->incoming_stream = s;
1309 }
1310 if (t->incoming_stream->read_closed) {
1311 gpr_log(GPR_ERROR, "skipping already closed stream header");
1312 t->incoming_stream = NULL;
1313 return init_skip_frame(t, 1);
1314 }
1315 t->parser = grpc_chttp2_header_parser_parse;
1316 t->parser_data = &t->hpack_parser;
1317 t->hpack_parser.on_header = on_header;
1318 t->hpack_parser.on_header_user_data = t;
1319 t->hpack_parser.is_boundary = is_eoh;
1320 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1321 if (!is_continuation &&
1322 (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
1323 grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser);
1324 }
1325 return 1;
1326}
1327
1328static int init_window_update_frame_parser(transport *t) {
1329 int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
1330 &t->simple_parsers.window_update,
1331 t->incoming_frame_size,
1332 t->incoming_frame_flags);
1333 if (!ok) {
1334 drop_connection(t);
1335 }
1336 t->parser = grpc_chttp2_window_update_parser_parse;
1337 t->parser_data = &t->simple_parsers.window_update;
1338 return ok;
1339}
1340
1341static int init_ping_parser(transport *t) {
1342 int ok = GRPC_CHTTP2_PARSE_OK ==
1343 grpc_chttp2_ping_parser_begin_frame(&t->simple_parsers.ping,
1344 t->incoming_frame_size,
1345 t->incoming_frame_flags);
1346 if (!ok) {
1347 drop_connection(t);
1348 }
1349 t->parser = grpc_chttp2_ping_parser_parse;
1350 t->parser_data = &t->simple_parsers.ping;
1351 return ok;
1352}
1353
nnoble0c475f02014-12-05 15:37:39 -08001354static int init_goaway_parser(transport *t) {
1355 int ok =
1356 GRPC_CHTTP2_PARSE_OK ==
1357 grpc_chttp2_goaway_parser_begin_frame(
1358 &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
1359 if (!ok) {
1360 drop_connection(t);
1361 }
1362 t->parser = grpc_chttp2_goaway_parser_parse;
1363 t->parser_data = &t->goaway_parser;
1364 return ok;
1365}
1366
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001367static int init_settings_frame_parser(transport *t) {
1368 int ok = GRPC_CHTTP2_PARSE_OK ==
1369 grpc_chttp2_settings_parser_begin_frame(
1370 &t->simple_parsers.settings, t->incoming_frame_size,
1371 t->incoming_frame_flags, t->settings[PEER_SETTINGS]);
1372 if (!ok) {
1373 drop_connection(t);
1374 }
1375 if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
1376 memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS],
1377 GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
1378 }
1379 t->parser = grpc_chttp2_settings_parser_parse;
1380 t->parser_data = &t->simple_parsers.settings;
1381 return ok;
1382}
1383
1384static int init_frame_parser(transport *t) {
1385 if (t->expect_continuation_stream_id != 0) {
1386 if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
1387 gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
1388 t->incoming_frame_type);
1389 return 0;
1390 }
1391 if (t->expect_continuation_stream_id != t->incoming_stream_id) {
1392 gpr_log(GPR_ERROR,
1393 "Expected CONTINUATION frame for stream %08x, got stream %08x",
1394 t->expect_continuation_stream_id, t->incoming_stream_id);
1395 return 0;
1396 }
1397 return init_header_frame_parser(t, 1);
1398 }
1399 switch (t->incoming_frame_type) {
1400 case GRPC_CHTTP2_FRAME_DATA:
1401 return init_data_frame_parser(t);
1402 case GRPC_CHTTP2_FRAME_HEADER:
1403 return init_header_frame_parser(t, 0);
1404 case GRPC_CHTTP2_FRAME_CONTINUATION:
1405 gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
1406 return 0;
1407 case GRPC_CHTTP2_FRAME_RST_STREAM:
1408 /* TODO(ctiller): actually parse the reason */
1409 cancel_stream_id(
1410 t, t->incoming_stream_id,
1411 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
1412 GRPC_CHTTP2_CANCEL, 0);
1413 return init_skip_frame(t, 0);
1414 case GRPC_CHTTP2_FRAME_SETTINGS:
1415 return init_settings_frame_parser(t);
1416 case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
1417 return init_window_update_frame_parser(t);
1418 case GRPC_CHTTP2_FRAME_PING:
1419 return init_ping_parser(t);
nnoble0c475f02014-12-05 15:37:39 -08001420 case GRPC_CHTTP2_FRAME_GOAWAY:
1421 return init_goaway_parser(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001422 default:
1423 gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
1424 return init_skip_frame(t, 0);
1425 }
1426}
1427
1428static int is_window_update_legal(gpr_uint32 window_update, gpr_uint32 window) {
1429 return window_update < MAX_WINDOW - window;
1430}
1431
1432static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
1433 grpc_chttp2_parse_state st;
1434 size_t i;
1435 memset(&st, 0, sizeof(st));
1436 switch (t->parser(t->parser_data, &st, slice, is_last)) {
1437 case GRPC_CHTTP2_PARSE_OK:
1438 if (st.end_of_stream) {
1439 t->incoming_stream->read_closed = 1;
1440 stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
1441 }
1442 if (st.need_flush_reads) {
1443 stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
1444 }
1445 if (st.metadata_boundary) {
1446 grpc_sopb_add_metadata_boundary(
1447 &t->incoming_stream->parser.incoming_sopb);
1448 stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
1449 }
1450 if (st.ack_settings) {
1451 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
1452 maybe_start_some_streams(t);
1453 }
1454 if (st.send_ping_ack) {
1455 gpr_slice_buffer_add(
1456 &t->qbuf,
1457 grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes));
1458 }
nnoble0c475f02014-12-05 15:37:39 -08001459 if (st.goaway) {
1460 if (t->num_pending_goaways == t->cap_pending_goaways) {
1461 t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
1462 t->pending_goaways =
1463 gpr_realloc(t->pending_goaways,
1464 sizeof(pending_goaway) * t->cap_pending_goaways);
1465 }
1466 t->pending_goaways[t->num_pending_goaways].status =
1467 grpc_chttp2_http2_error_to_grpc_status(st.goaway_error);
1468 t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text;
1469 t->num_pending_goaways++;
1470 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001471 if (st.process_ping_reply) {
1472 for (i = 0; i < t->ping_count; i++) {
1473 if (0 ==
1474 memcmp(t->pings[i].id, t->simple_parsers.ping.opaque_8bytes, 8)) {
1475 t->pings[i].cb(t->pings[i].user_data);
1476 memmove(&t->pings[i], &t->pings[i + 1],
1477 (t->ping_count - i - 1) * sizeof(outstanding_ping));
1478 t->ping_count--;
1479 break;
1480 }
1481 }
1482 }
1483 if (st.window_update) {
1484 if (t->incoming_stream_id) {
1485 /* if there was a stream id, this is for some stream */
1486 stream *s = lookup_stream(t, t->incoming_stream_id);
1487 if (s) {
1488 int was_window_empty = s->outgoing_window == 0;
1489 if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
1490 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1491 GRPC_CHTTP2_FLOW_CONTROL_ERROR),
1492 GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
1493 } else {
1494 s->outgoing_window += st.window_update;
1495 /* if this window update makes outgoing ops writable again,
1496 flag that */
1497 if (was_window_empty && s->outgoing_sopb.nops) {
1498 stream_list_join(t, s, WRITABLE);
1499 }
1500 }
1501 }
1502 } else {
1503 /* transport level window update */
1504 if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
1505 drop_connection(t);
1506 } else {
1507 t->outgoing_window += st.window_update;
1508 }
1509 }
1510 }
1511 return 1;
1512 case GRPC_CHTTP2_STREAM_ERROR:
1513 become_skip_parser(t);
1514 cancel_stream_id(
1515 t, t->incoming_stream_id,
1516 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
1517 GRPC_CHTTP2_INTERNAL_ERROR, 1);
1518 return 1;
1519 case GRPC_CHTTP2_CONNECTION_ERROR:
1520 drop_connection(t);
1521 return 0;
1522 }
1523 gpr_log(GPR_ERROR, "should never reach here");
1524 abort();
1525 return 0;
1526}
1527
1528static int process_read(transport *t, gpr_slice slice) {
1529 gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
1530 gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
1531 gpr_uint8 *cur = beg;
1532
1533 if (cur == end) return 1;
1534
1535 switch (t->deframe_state) {
1536 case DTS_CLIENT_PREFIX_0:
1537 case DTS_CLIENT_PREFIX_1:
1538 case DTS_CLIENT_PREFIX_2:
1539 case DTS_CLIENT_PREFIX_3:
1540 case DTS_CLIENT_PREFIX_4:
1541 case DTS_CLIENT_PREFIX_5:
1542 case DTS_CLIENT_PREFIX_6:
1543 case DTS_CLIENT_PREFIX_7:
1544 case DTS_CLIENT_PREFIX_8:
1545 case DTS_CLIENT_PREFIX_9:
1546 case DTS_CLIENT_PREFIX_10:
1547 case DTS_CLIENT_PREFIX_11:
1548 case DTS_CLIENT_PREFIX_12:
1549 case DTS_CLIENT_PREFIX_13:
1550 case DTS_CLIENT_PREFIX_14:
1551 case DTS_CLIENT_PREFIX_15:
1552 case DTS_CLIENT_PREFIX_16:
1553 case DTS_CLIENT_PREFIX_17:
1554 case DTS_CLIENT_PREFIX_18:
1555 case DTS_CLIENT_PREFIX_19:
1556 case DTS_CLIENT_PREFIX_20:
1557 case DTS_CLIENT_PREFIX_21:
1558 case DTS_CLIENT_PREFIX_22:
1559 case DTS_CLIENT_PREFIX_23:
1560 while (cur != end && t->deframe_state != DTS_FH_0) {
1561 if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) {
1562 gpr_log(GPR_ERROR,
1563 "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
1564 "at byte %d",
1565 CLIENT_CONNECT_STRING[t->deframe_state],
Craig Tillerd50e5652015-02-24 16:46:22 -08001566 (int)(gpr_uint8) CLIENT_CONNECT_STRING[t->deframe_state],
1567 *cur, (int)*cur, t->deframe_state);
Craig Tiller5246e7a2015-01-19 14:59:08 -08001568 drop_connection(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001569 return 0;
1570 }
1571 ++cur;
1572 ++t->deframe_state;
1573 }
1574 if (cur == end) {
1575 return 1;
1576 }
1577 /* fallthrough */
1578 dts_fh_0:
1579 case DTS_FH_0:
1580 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001581 t->incoming_frame_size = ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001582 if (++cur == end) {
1583 t->deframe_state = DTS_FH_1;
1584 return 1;
1585 }
1586 /* fallthrough */
1587 case DTS_FH_1:
1588 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001589 t->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001590 if (++cur == end) {
1591 t->deframe_state = DTS_FH_2;
1592 return 1;
1593 }
1594 /* fallthrough */
1595 case DTS_FH_2:
1596 GPR_ASSERT(cur < end);
1597 t->incoming_frame_size |= *cur;
1598 if (++cur == end) {
1599 t->deframe_state = DTS_FH_3;
1600 return 1;
1601 }
1602 /* fallthrough */
1603 case DTS_FH_3:
1604 GPR_ASSERT(cur < end);
1605 t->incoming_frame_type = *cur;
1606 if (++cur == end) {
1607 t->deframe_state = DTS_FH_4;
1608 return 1;
1609 }
1610 /* fallthrough */
1611 case DTS_FH_4:
1612 GPR_ASSERT(cur < end);
1613 t->incoming_frame_flags = *cur;
1614 if (++cur == end) {
1615 t->deframe_state = DTS_FH_5;
1616 return 1;
1617 }
1618 /* fallthrough */
1619 case DTS_FH_5:
1620 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001621 t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001622 if (++cur == end) {
1623 t->deframe_state = DTS_FH_6;
1624 return 1;
1625 }
1626 /* fallthrough */
1627 case DTS_FH_6:
1628 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001629 t->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001630 if (++cur == end) {
1631 t->deframe_state = DTS_FH_7;
1632 return 1;
1633 }
1634 /* fallthrough */
1635 case DTS_FH_7:
1636 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001637 t->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001638 if (++cur == end) {
1639 t->deframe_state = DTS_FH_8;
1640 return 1;
1641 }
1642 /* fallthrough */
1643 case DTS_FH_8:
1644 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001645 t->incoming_stream_id |= ((gpr_uint32)*cur);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001646 t->deframe_state = DTS_FRAME;
1647 if (!init_frame_parser(t)) {
1648 return 0;
1649 }
nnoble0c475f02014-12-05 15:37:39 -08001650 t->last_incoming_stream_id = t->incoming_stream_id;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001651 if (t->incoming_frame_size == 0) {
1652 if (!parse_frame_slice(t, gpr_empty_slice(), 1)) {
1653 return 0;
1654 }
1655 if (++cur == end) {
1656 t->deframe_state = DTS_FH_0;
1657 return 1;
1658 }
1659 goto dts_fh_0; /* loop */
1660 }
1661 if (++cur == end) {
1662 return 1;
1663 }
1664 /* fallthrough */
1665 case DTS_FRAME:
1666 GPR_ASSERT(cur < end);
Craig Tiller54f9a652015-02-19 21:41:20 -08001667 if ((gpr_uint32)(end - cur) == t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001668 if (!parse_frame_slice(
1669 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
1670 return 0;
1671 }
1672 t->deframe_state = DTS_FH_0;
1673 return 1;
Craig Tiller0c0b60c2015-01-21 15:49:28 -08001674 } else if ((gpr_uint32)(end - cur) > t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001675 if (!parse_frame_slice(
1676 t, gpr_slice_sub_no_ref(slice, cur - beg,
1677 cur + t->incoming_frame_size - beg),
1678 1)) {
1679 return 0;
1680 }
1681 cur += t->incoming_frame_size;
1682 goto dts_fh_0; /* loop */
1683 } else {
1684 if (!parse_frame_slice(
1685 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
1686 return 0;
1687 }
1688 t->incoming_frame_size -= (end - cur);
1689 return 1;
1690 }
1691 gpr_log(GPR_ERROR, "should never reach here");
1692 abort();
1693 }
1694
1695 gpr_log(GPR_ERROR, "should never reach here");
1696 abort();
1697}
1698
1699/* tcp read callback */
1700static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
1701 grpc_endpoint_cb_status error) {
1702 transport *t = tp;
1703 size_t i;
1704 int keep_reading = 0;
1705
1706 switch (error) {
1707 case GRPC_ENDPOINT_CB_SHUTDOWN:
1708 case GRPC_ENDPOINT_CB_EOF:
1709 case GRPC_ENDPOINT_CB_ERROR:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001710 lock(t);
1711 drop_connection(t);
1712 t->reading = 0;
1713 if (!t->writing && t->ep) {
1714 grpc_endpoint_destroy(t->ep);
1715 t->ep = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001716 unref_transport(t); /* safe as we still have a ref for read */
1717 }
1718 unlock(t);
1719 unref_transport(t);
1720 break;
1721 case GRPC_ENDPOINT_CB_OK:
1722 lock(t);
1723 for (i = 0; i < nslices && process_read(t, slices[i]); i++)
1724 ;
1725 unlock(t);
1726 keep_reading = 1;
1727 break;
1728 }
1729
1730 for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
1731
1732 if (keep_reading) {
ctiller58393c22015-01-07 14:03:30 -08001733 grpc_endpoint_notify_on_read(t->ep, recv_data, t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001734 }
1735}
1736
1737/*
1738 * CALLBACK LOOP
1739 */
1740
1741static grpc_stream_state compute_state(gpr_uint8 write_closed,
1742 gpr_uint8 read_closed) {
1743 if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
1744 if (write_closed) return GRPC_STREAM_SEND_CLOSED;
1745 if (read_closed) return GRPC_STREAM_RECV_CLOSED;
1746 return GRPC_STREAM_OPEN;
1747}
1748
1749static int prepare_callbacks(transport *t) {
1750 stream *s;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001751 int n = 0;
1752 while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
1753 int execute = 1;
Craig Tillere3018e62015-02-13 17:05:19 -08001754 grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001755
ctiller00297df2015-01-12 11:23:09 -08001756 s->callback_state = compute_state(s->sent_write_closed, s->read_closed);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001757 if (s->callback_state == GRPC_STREAM_CLOSED) {
1758 remove_from_stream_map(t, s);
1759 if (s->published_close) {
1760 execute = 0;
1761 }
1762 s->published_close = 1;
1763 }
1764
1765 if (execute) {
1766 stream_list_add_tail(t, s, EXECUTING_CALLBACKS);
1767 n = 1;
1768 }
1769 }
1770 return n;
1771}
1772
Craig Tillerd1345de2015-02-24 21:55:20 -08001773static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001774 stream *s;
1775 while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
1776 size_t nops = s->callback_sopb.nops;
1777 s->callback_sopb.nops = 0;
Craig Tillerd1345de2015-02-24 21:55:20 -08001778 cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
1779 s->callback_sopb.ops, nops, s->callback_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001780 }
1781}
1782
Craig Tiller748fe3f2015-03-02 07:48:50 -08001783static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
1784 cb->closed(t->cb_user_data, &t->base);
1785}
1786
ctillerd79b4862014-12-17 16:36:59 -08001787static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
1788 transport *t = (transport *)gt;
1789 lock(t);
1790 if (t->ep) {
1791 grpc_endpoint_add_to_pollset(t->ep, pollset);
1792 }
1793 unlock(t);
1794}
1795
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001796/*
1797 * INTEGRATION GLUE
1798 */
1799
1800static const grpc_transport_vtable vtable = {
Craig Tillerd50e5652015-02-24 16:46:22 -08001801 sizeof(stream), init_stream, send_batch, set_allow_window_updates,
1802 add_to_pollset, destroy_stream, abort_stream, goaway,
1803 close_transport, send_ping, destroy_transport};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001804
1805void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
1806 void *arg,
1807 const grpc_channel_args *channel_args,
1808 grpc_endpoint *ep, gpr_slice *slices,
1809 size_t nslices, grpc_mdctx *mdctx,
1810 int is_client) {
1811 transport *t = gpr_malloc(sizeof(transport));
Nicolas Noble5ea99bb2015-02-04 14:13:09 -08001812 init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx,
1813 is_client);
Craig Tiller190d3602015-02-18 09:23:38 -08001814}