blob: e32ee284e093f886577aa97d3301de00d70f1776 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/transport/chttp2_transport.h"
35
36#include <math.h>
37#include <stdio.h>
38#include <string.h>
39
Craig Tiller485d7762015-01-23 12:54:05 -080040#include "src/core/support/string.h"
nnoble0c475f02014-12-05 15:37:39 -080041#include "src/core/transport/chttp2/frame_data.h"
42#include "src/core/transport/chttp2/frame_goaway.h"
43#include "src/core/transport/chttp2/frame_ping.h"
44#include "src/core/transport/chttp2/frame_rst_stream.h"
45#include "src/core/transport/chttp2/frame_settings.h"
46#include "src/core/transport/chttp2/frame_window_update.h"
47#include "src/core/transport/chttp2/hpack_parser.h"
48#include "src/core/transport/chttp2/http2_errors.h"
49#include "src/core/transport/chttp2/status_conversion.h"
50#include "src/core/transport/chttp2/stream_encoder.h"
51#include "src/core/transport/chttp2/stream_map.h"
52#include "src/core/transport/chttp2/timeout_encoding.h"
53#include "src/core/transport/transport_impl.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080054#include <grpc/support/alloc.h>
55#include <grpc/support/log.h>
56#include <grpc/support/slice_buffer.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080057#include <grpc/support/useful.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080058
ctiller493fbcc2014-12-07 15:09:10 -080059#define DEFAULT_WINDOW 65535
60#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080061#define MAX_WINDOW 0x7fffffffu
62
63#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
64#define CLIENT_CONNECT_STRLEN 24
65
Craig Tillerfaa84802015-03-01 21:56:38 -080066int grpc_http_trace = 0;
67
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068typedef struct transport transport;
69typedef struct stream stream;
70
Craig Tiller5c019ae2015-04-17 16:46:53 -070071#define IF_TRACING(stmt) \
72 if (!(grpc_http_trace)) \
73 ; \
74 else \
Craig Tillerd50e5652015-02-24 16:46:22 -080075 stmt
76
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080077/* streams are kept in various linked lists depending on what things need to
78 happen to them... this enum labels each list */
79typedef enum {
80 /* streams that have pending writes */
81 WRITABLE = 0,
ctiller00297df2015-01-12 11:23:09 -080082 /* streams that have been selected to be written */
83 WRITING,
84 /* streams that have just been written, and included a close */
85 WRITTEN_CLOSED,
86 /* streams that have been cancelled and have some pending state updates
87 to perform */
88 CANCELLED,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080089 /* streams that want to send window updates */
90 WINDOW_UPDATE,
91 /* streams that are waiting to start because there are too many concurrent
92 streams on the connection */
93 WAITING_FOR_CONCURRENCY,
94 /* streams that want to callback the application */
95 PENDING_CALLBACKS,
96 /* streams that *ARE* calling back to the application */
97 EXECUTING_CALLBACKS,
98 STREAM_LIST_COUNT /* must be last */
99} stream_list_id;
100
101/* deframer state for the overall http2 stream of bytes */
102typedef enum {
103 /* prefix: one entry per http2 connection prefix byte */
104 DTS_CLIENT_PREFIX_0 = 0,
105 DTS_CLIENT_PREFIX_1,
106 DTS_CLIENT_PREFIX_2,
107 DTS_CLIENT_PREFIX_3,
108 DTS_CLIENT_PREFIX_4,
109 DTS_CLIENT_PREFIX_5,
110 DTS_CLIENT_PREFIX_6,
111 DTS_CLIENT_PREFIX_7,
112 DTS_CLIENT_PREFIX_8,
113 DTS_CLIENT_PREFIX_9,
114 DTS_CLIENT_PREFIX_10,
115 DTS_CLIENT_PREFIX_11,
116 DTS_CLIENT_PREFIX_12,
117 DTS_CLIENT_PREFIX_13,
118 DTS_CLIENT_PREFIX_14,
119 DTS_CLIENT_PREFIX_15,
120 DTS_CLIENT_PREFIX_16,
121 DTS_CLIENT_PREFIX_17,
122 DTS_CLIENT_PREFIX_18,
123 DTS_CLIENT_PREFIX_19,
124 DTS_CLIENT_PREFIX_20,
125 DTS_CLIENT_PREFIX_21,
126 DTS_CLIENT_PREFIX_22,
127 DTS_CLIENT_PREFIX_23,
128 /* frame header byte 0... */
129 /* must follow from the prefix states */
130 DTS_FH_0,
131 DTS_FH_1,
132 DTS_FH_2,
133 DTS_FH_3,
134 DTS_FH_4,
135 DTS_FH_5,
136 DTS_FH_6,
137 DTS_FH_7,
138 /* ... frame header byte 8 */
139 DTS_FH_8,
140 /* inside a http2 frame */
141 DTS_FRAME
142} deframe_transport_state;
143
144typedef struct {
145 stream *head;
146 stream *tail;
147} stream_list;
148
149typedef struct {
150 stream *next;
151 stream *prev;
152} stream_link;
153
154typedef enum {
155 ERROR_STATE_NONE,
156 ERROR_STATE_SEEN,
157 ERROR_STATE_NOTIFIED
158} error_state;
159
160/* We keep several sets of connection wide parameters */
161typedef enum {
162 /* The settings our peer has asked for (and we have acked) */
163 PEER_SETTINGS = 0,
164 /* The settings we'd like to have */
165 LOCAL_SETTINGS,
166 /* The settings we've published to our peer */
167 SENT_SETTINGS,
168 /* The settings the peer has acked */
169 ACKED_SETTINGS,
170 NUM_SETTING_SETS
171} setting_set;
172
173/* Outstanding ping request data */
174typedef struct {
175 gpr_uint8 id[8];
176 void (*cb)(void *user_data);
177 void *user_data;
178} outstanding_ping;
179
nnoble0c475f02014-12-05 15:37:39 -0800180typedef struct {
181 grpc_status_code status;
182 gpr_slice debug;
183} pending_goaway;
184
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800185struct transport {
186 grpc_transport base; /* must be first */
187 const grpc_transport_callbacks *cb;
188 void *cb_user_data;
189 grpc_endpoint *ep;
190 grpc_mdctx *metadata_context;
191 gpr_refcount refs;
192 gpr_uint8 is_client;
193
194 gpr_mu mu;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800195 gpr_cv cv;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800196
197 /* basic state management - what are we doing at the moment? */
198 gpr_uint8 reading;
199 gpr_uint8 writing;
200 gpr_uint8 calling_back;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800201 gpr_uint8 destroying;
Craig Tillerd75fe662015-02-21 07:30:49 -0800202 gpr_uint8 closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800203 error_state error_state;
204
205 /* stream indexing */
206 gpr_uint32 next_stream_id;
nnoble0c475f02014-12-05 15:37:39 -0800207 gpr_uint32 last_incoming_stream_id;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800208
209 /* settings */
210 gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
ctiller493fbcc2014-12-07 15:09:10 -0800211 gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */
212 gpr_uint8 sent_local_settings; /* have local settings been sent? */
213 gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800214
215 /* window management */
216 gpr_uint32 outgoing_window;
217 gpr_uint32 incoming_window;
ctiller493fbcc2014-12-07 15:09:10 -0800218 gpr_uint32 connection_window_target;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800219
220 /* deframing */
221 deframe_transport_state deframe_state;
222 gpr_uint8 incoming_frame_type;
223 gpr_uint8 incoming_frame_flags;
224 gpr_uint8 header_eof;
225 gpr_uint32 expect_continuation_stream_id;
226 gpr_uint32 incoming_frame_size;
227 gpr_uint32 incoming_stream_id;
228
229 /* hpack encoding */
230 grpc_chttp2_hpack_compressor hpack_compressor;
231
232 /* various parsers */
233 grpc_chttp2_hpack_parser hpack_parser;
234 /* simple one shot parsers */
235 union {
236 grpc_chttp2_window_update_parser window_update;
237 grpc_chttp2_settings_parser settings;
238 grpc_chttp2_ping_parser ping;
239 } simple_parsers;
240
nnoble0c475f02014-12-05 15:37:39 -0800241 /* goaway */
242 grpc_chttp2_goaway_parser goaway_parser;
243 pending_goaway *pending_goaways;
244 size_t num_pending_goaways;
245 size_t cap_pending_goaways;
246
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800247 /* state for a stream that's not yet been created */
248 grpc_stream_op_buffer new_stream_sopb;
249
Craig Tillercb818ba2015-01-29 17:08:01 -0800250 /* stream ops that need to be destroyed, but outside of the lock */
251 grpc_stream_op_buffer nuke_later_sopb;
252
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800253 /* active parser */
254 void *parser_data;
255 stream *incoming_stream;
256 grpc_chttp2_parse_error (*parser)(void *parser_user_data,
257 grpc_chttp2_parse_state *state,
258 gpr_slice slice, int is_last);
259
260 gpr_slice_buffer outbuf;
261 gpr_slice_buffer qbuf;
262
263 stream_list lists[STREAM_LIST_COUNT];
264 grpc_chttp2_stream_map stream_map;
265
266 /* metadata object cache */
267 grpc_mdstr *str_grpc_timeout;
268
269 /* pings */
270 outstanding_ping *pings;
271 size_t ping_count;
272 size_t ping_capacity;
273 gpr_int64 ping_counter;
274};
275
276struct stream {
277 gpr_uint32 id;
278
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800279 gpr_uint32 incoming_window;
Craig Tiller84b88842015-04-20 08:47:52 -0700280 gpr_int64 outgoing_window;
ctiller00297df2015-01-12 11:23:09 -0800281 /* when the application requests writes be closed, the write_closed is
282 'queued'; when the close is flow controlled into the send path, we are
283 'sending' it; when the write has been performed it is 'sent' */
284 gpr_uint8 queued_write_closed;
285 gpr_uint8 sending_write_closed;
286 gpr_uint8 sent_write_closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800287 gpr_uint8 read_closed;
288 gpr_uint8 cancelled;
289 gpr_uint8 allow_window_updates;
290 gpr_uint8 published_close;
291
292 stream_link links[STREAM_LIST_COUNT];
293 gpr_uint8 included[STREAM_LIST_COUNT];
294
Craig Tiller9c1043e2015-04-16 16:20:38 -0700295 /* incoming metadata */
296 grpc_linked_mdelem *incoming_metadata;
297 size_t incoming_metadata_count;
298 size_t incoming_metadata_capacity;
299 gpr_timespec incoming_deadline;
300
ctiller00297df2015-01-12 11:23:09 -0800301 /* sops from application */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800302 grpc_stream_op_buffer outgoing_sopb;
ctiller00297df2015-01-12 11:23:09 -0800303 /* sops that have passed flow control to be written */
304 grpc_stream_op_buffer writing_sopb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800305
306 grpc_chttp2_data_parser parser;
307
308 grpc_stream_state callback_state;
309 grpc_stream_op_buffer callback_sopb;
310};
311
312static const grpc_transport_vtable vtable;
313
314static void push_setting(transport *t, grpc_chttp2_setting_id id,
315 gpr_uint32 value);
316
317static int prepare_callbacks(transport *t);
Craig Tillerd1345de2015-02-24 21:55:20 -0800318static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
Craig Tiller748fe3f2015-03-02 07:48:50 -0800319static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800320
321static int prepare_write(transport *t);
ctiller00297df2015-01-12 11:23:09 -0800322static void perform_write(transport *t, grpc_endpoint *ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800323
324static void lock(transport *t);
325static void unlock(transport *t);
326
327static void drop_connection(transport *t);
328static void end_all_the_calls(transport *t);
329
330static stream *stream_list_remove_head(transport *t, stream_list_id id);
331static void stream_list_remove(transport *t, stream *s, stream_list_id id);
332static void stream_list_add_tail(transport *t, stream *s, stream_list_id id);
333static void stream_list_join(transport *t, stream *s, stream_list_id id);
334
335static void cancel_stream_id(transport *t, gpr_uint32 id,
336 grpc_status_code local_status,
337 grpc_chttp2_error_code error_code, int send_rst);
338static void cancel_stream(transport *t, stream *s,
339 grpc_status_code local_status,
340 grpc_chttp2_error_code error_code, int send_rst);
ctiller00297df2015-01-12 11:23:09 -0800341static void finalize_cancellations(transport *t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800342static stream *lookup_stream(transport *t, gpr_uint32 id);
343static void remove_from_stream_map(transport *t, stream *s);
344static void maybe_start_some_streams(transport *t);
345
346static void become_skip_parser(transport *t);
347
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800348static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
349 grpc_endpoint_cb_status error);
350
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800351/*
352 * CONSTRUCTION/DESTRUCTION/REFCOUNTING
353 */
354
Craig Tiller9be83ee2015-02-18 14:16:15 -0800355static void destruct_transport(transport *t) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800356 size_t i;
357
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800358 gpr_mu_lock(&t->mu);
359
360 GPR_ASSERT(t->ep == NULL);
361
362 gpr_slice_buffer_destroy(&t->outbuf);
363 gpr_slice_buffer_destroy(&t->qbuf);
364 grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
365 grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
nnoble0c475f02014-12-05 15:37:39 -0800366 grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367
368 grpc_mdstr_unref(t->str_grpc_timeout);
369
370 for (i = 0; i < STREAM_LIST_COUNT; i++) {
371 GPR_ASSERT(t->lists[i].head == NULL);
372 GPR_ASSERT(t->lists[i].tail == NULL);
373 }
374
375 GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
376
377 grpc_chttp2_stream_map_destroy(&t->stream_map);
378
379 gpr_mu_unlock(&t->mu);
380 gpr_mu_destroy(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800381 gpr_cv_destroy(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800382
383 /* callback remaining pings: they're not allowed to call into the transpot,
384 and maybe they hold resources that need to be freed */
385 for (i = 0; i < t->ping_count; i++) {
386 t->pings[i].cb(t->pings[i].user_data);
387 }
388 gpr_free(t->pings);
389
nnoble0c475f02014-12-05 15:37:39 -0800390 for (i = 0; i < t->num_pending_goaways; i++) {
391 gpr_slice_unref(t->pending_goaways[i].debug);
392 }
393 gpr_free(t->pending_goaways);
394
Craig Tiller8ed35ea2015-01-30 11:27:43 -0800395 grpc_sopb_destroy(&t->nuke_later_sopb);
396
Craig Tiller9be83ee2015-02-18 14:16:15 -0800397 grpc_mdctx_unref(t->metadata_context);
398
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800399 gpr_free(t);
400}
401
Craig Tiller9be83ee2015-02-18 14:16:15 -0800402static void unref_transport(transport *t) {
403 if (!gpr_unref(&t->refs)) return;
404 destruct_transport(t);
405}
406
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800407static void ref_transport(transport *t) { gpr_ref(&t->refs); }
408
409static void init_transport(transport *t, grpc_transport_setup_callback setup,
410 void *arg, const grpc_channel_args *channel_args,
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800411 grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
412 grpc_mdctx *mdctx, int is_client) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800413 size_t i;
414 int j;
415 grpc_transport_setup_result sr;
416
417 GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
418
419 t->base.vtable = &vtable;
420 t->ep = ep;
421 /* one ref is for destroy, the other for when ep becomes NULL */
422 gpr_ref_init(&t->refs, 2);
423 gpr_mu_init(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800424 gpr_cv_init(&t->cv);
Craig Tiller9be83ee2015-02-18 14:16:15 -0800425 grpc_mdctx_ref(mdctx);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800426 t->metadata_context = mdctx;
427 t->str_grpc_timeout =
428 grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
429 t->reading = 1;
430 t->writing = 0;
431 t->error_state = ERROR_STATE_NONE;
432 t->next_stream_id = is_client ? 1 : 2;
nnoble0c475f02014-12-05 15:37:39 -0800433 t->last_incoming_stream_id = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800434 t->destroying = 0;
Craig Tillerd75fe662015-02-21 07:30:49 -0800435 t->closed = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800436 t->is_client = is_client;
437 t->outgoing_window = DEFAULT_WINDOW;
438 t->incoming_window = DEFAULT_WINDOW;
ctiller493fbcc2014-12-07 15:09:10 -0800439 t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800440 t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
441 t->expect_continuation_stream_id = 0;
442 t->pings = NULL;
443 t->ping_count = 0;
444 t->ping_capacity = 0;
445 t->ping_counter = gpr_now().tv_nsec;
446 grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
nnoble0c475f02014-12-05 15:37:39 -0800447 grpc_chttp2_goaway_parser_init(&t->goaway_parser);
448 t->pending_goaways = NULL;
449 t->num_pending_goaways = 0;
450 t->cap_pending_goaways = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800451 gpr_slice_buffer_init(&t->outbuf);
452 gpr_slice_buffer_init(&t->qbuf);
Craig Tillercb818ba2015-01-29 17:08:01 -0800453 grpc_sopb_init(&t->nuke_later_sopb);
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800454 grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800455 if (is_client) {
456 gpr_slice_buffer_add(&t->qbuf,
457 gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
458 }
459 /* 8 is a random stab in the dark as to a good initial size: it's small enough
460 that it shouldn't waste memory for infrequently used connections, yet
461 large enough that the exponential growth should happen nicely when it's
462 needed.
463 TODO(ctiller): tune this */
464 grpc_chttp2_stream_map_init(&t->stream_map, 8);
465 memset(&t->lists, 0, sizeof(t->lists));
466
467 /* copy in initial settings to all setting sets */
468 for (i = 0; i < NUM_SETTING_SETS; i++) {
469 for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) {
470 t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
471 }
472 }
473 t->dirtied_local_settings = 1;
ctiller493fbcc2014-12-07 15:09:10 -0800474 /* Hack: it's common for implementations to assume 65536 bytes initial send
475 window -- this should by rights be 0 */
476 t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800477 t->sent_local_settings = 0;
478
479 /* configure http2 the way we like it */
480 if (t->is_client) {
481 push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
482 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
483 }
ctiller493fbcc2014-12-07 15:09:10 -0800484 push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800485
486 if (channel_args) {
487 for (i = 0; i < channel_args->num_args; i++) {
488 if (0 ==
489 strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
490 if (t->is_client) {
491 gpr_log(GPR_ERROR, "%s: is ignored on the client",
492 GRPC_ARG_MAX_CONCURRENT_STREAMS);
493 } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
494 gpr_log(GPR_ERROR, "%s: must be an integer",
495 GRPC_ARG_MAX_CONCURRENT_STREAMS);
496 } else {
497 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
498 channel_args->args[i].value.integer);
499 }
500 }
501 }
502 }
503
504 gpr_mu_lock(&t->mu);
505 t->calling_back = 1;
506 ref_transport(t);
507 gpr_mu_unlock(&t->mu);
508
509 sr = setup(arg, &t->base, t->metadata_context);
510
511 lock(t);
512 t->cb = sr.callbacks;
513 t->cb_user_data = sr.user_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800514 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800515 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800516 unlock(t);
Craig Tillerdcf9c0e2015-02-11 16:12:41 -0800517
518 ref_transport(t);
519 recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
520
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800521 unref_transport(t);
522}
523
524static void destroy_transport(grpc_transport *gt) {
525 transport *t = (transport *)gt;
526
Craig Tiller748fe3f2015-03-02 07:48:50 -0800527 lock(t);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800528 t->destroying = 1;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000529 /* Wait for pending stuff to finish.
530 We need to be not calling back to ensure that closed() gets a chance to
531 trigger if needed during unlock() before we die.
532 We need to be not writing as cancellation finalization may produce some
533 callbacks that NEED to be made to close out some streams when t->writing
534 becomes 0. */
535 while (t->calling_back || t->writing) {
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800536 gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
537 }
Craig Tiller748fe3f2015-03-02 07:48:50 -0800538 drop_connection(t);
539 unlock(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800540
Craig Tillerbb88a042015-03-02 10:56:33 -0800541 /* The drop_connection() above puts the transport into an error state, and
542 the follow-up unlock should then (as part of the cleanup work it does)
543 ensure that cb is NULL, and therefore not call back anything further.
544 This check validates this very subtle behavior.
545 It's shutdown path, so I don't believe an extra lock pair is going to be
546 problematic for performance. */
Craig Tillerb9eb1802015-03-02 16:41:32 +0000547 lock(t);
548 GPR_ASSERT(!t->cb);
549 unlock(t);
550
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800551 unref_transport(t);
552}
553
554static void close_transport(grpc_transport *gt) {
555 transport *t = (transport *)gt;
556 gpr_mu_lock(&t->mu);
Craig Tillerd75fe662015-02-21 07:30:49 -0800557 GPR_ASSERT(!t->closed);
558 t->closed = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800559 if (t->ep) {
560 grpc_endpoint_shutdown(t->ep);
561 }
562 gpr_mu_unlock(&t->mu);
563}
564
nnoble0c475f02014-12-05 15:37:39 -0800565static void goaway(grpc_transport *gt, grpc_status_code status,
566 gpr_slice debug_data) {
567 transport *t = (transport *)gt;
568 lock(t);
569 grpc_chttp2_goaway_append(t->last_incoming_stream_id,
570 grpc_chttp2_grpc_status_to_http2_error(status),
571 debug_data, &t->qbuf);
572 unlock(t);
573}
574
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800575static int init_stream(grpc_transport *gt, grpc_stream *gs,
576 const void *server_data) {
577 transport *t = (transport *)gt;
578 stream *s = (stream *)gs;
579
580 ref_transport(t);
581
582 if (!server_data) {
583 lock(t);
584 s->id = 0;
585 } else {
Craig Tiller5c019ae2015-04-17 16:46:53 -0700586 s->id = (gpr_uint32)(gpr_uintptr)server_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800587 t->incoming_stream = s;
588 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
589 }
590
ctiller493fbcc2014-12-07 15:09:10 -0800591 s->outgoing_window =
592 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
593 s->incoming_window =
594 t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
ctiller00297df2015-01-12 11:23:09 -0800595 s->queued_write_closed = 0;
596 s->sending_write_closed = 0;
597 s->sent_write_closed = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800598 s->read_closed = 0;
599 s->cancelled = 0;
600 s->allow_window_updates = 0;
601 s->published_close = 0;
Craig Tiller9c1043e2015-04-16 16:20:38 -0700602 s->incoming_metadata_count = 0;
603 s->incoming_metadata_capacity = 0;
604 s->incoming_metadata = NULL;
605 s->incoming_deadline = gpr_inf_future;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800606 memset(&s->links, 0, sizeof(s->links));
607 memset(&s->included, 0, sizeof(s->included));
608 grpc_sopb_init(&s->outgoing_sopb);
ctiller00297df2015-01-12 11:23:09 -0800609 grpc_sopb_init(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800610 grpc_sopb_init(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800611 grpc_chttp2_data_parser_init(&s->parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800612
613 if (!server_data) {
614 unlock(t);
615 }
616
617 return 0;
618}
619
Craig Tillercb818ba2015-01-29 17:08:01 -0800620static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) {
621 grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
622 sopb->nops = 0;
623}
624
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800625static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
626 transport *t = (transport *)gt;
627 stream *s = (stream *)gs;
628 size_t i;
629
630 gpr_mu_lock(&t->mu);
631
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800632 /* stop parsing if we're currently parsing this stream */
633 if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id &&
634 s->id != 0) {
635 become_skip_parser(t);
636 }
637
638 for (i = 0; i < STREAM_LIST_COUNT; i++) {
639 stream_list_remove(t, s, i);
640 }
641 remove_from_stream_map(t, s);
642
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800643 gpr_mu_unlock(&t->mu);
644
645 grpc_sopb_destroy(&s->outgoing_sopb);
ctiller00297df2015-01-12 11:23:09 -0800646 grpc_sopb_destroy(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800647 grpc_sopb_destroy(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800648 grpc_chttp2_data_parser_destroy(&s->parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800649
650 unref_transport(t);
651}
652
653/*
654 * LIST MANAGEMENT
655 */
656
ctiller00297df2015-01-12 11:23:09 -0800657static int stream_list_empty(transport *t, stream_list_id id) {
658 return t->lists[id].head == NULL;
659}
660
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800661static stream *stream_list_remove_head(transport *t, stream_list_id id) {
662 stream *s = t->lists[id].head;
663 if (s) {
664 stream *new_head = s->links[id].next;
665 GPR_ASSERT(s->included[id]);
666 if (new_head) {
667 t->lists[id].head = new_head;
668 new_head->links[id].prev = NULL;
669 } else {
670 t->lists[id].head = NULL;
671 t->lists[id].tail = NULL;
672 }
673 s->included[id] = 0;
674 }
675 return s;
676}
677
678static void stream_list_remove(transport *t, stream *s, stream_list_id id) {
679 if (!s->included[id]) return;
680 s->included[id] = 0;
681 if (s->links[id].prev) {
682 s->links[id].prev->links[id].next = s->links[id].next;
683 } else {
684 GPR_ASSERT(t->lists[id].head == s);
685 t->lists[id].head = s->links[id].next;
686 }
687 if (s->links[id].next) {
688 s->links[id].next->links[id].prev = s->links[id].prev;
689 } else {
690 t->lists[id].tail = s->links[id].prev;
691 }
692}
693
694static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
695 stream *old_tail;
696 GPR_ASSERT(!s->included[id]);
697 old_tail = t->lists[id].tail;
698 s->links[id].next = NULL;
699 s->links[id].prev = old_tail;
700 if (old_tail) {
701 old_tail->links[id].next = s;
702 } else {
703 s->links[id].prev = NULL;
704 t->lists[id].head = s;
705 }
706 t->lists[id].tail = s;
707 s->included[id] = 1;
708}
709
710static void stream_list_join(transport *t, stream *s, stream_list_id id) {
Craig Tiller5c019ae2015-04-17 16:46:53 -0700711 if (id == PENDING_CALLBACKS)
712 GPR_ASSERT(t->cb != NULL || t->error_state == ERROR_STATE_NONE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800713 if (s->included[id]) {
714 return;
715 }
716 stream_list_add_tail(t, s, id);
717}
718
719static void remove_from_stream_map(transport *t, stream *s) {
720 if (s->id == 0) return;
721 if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
722 maybe_start_some_streams(t);
723 }
724}
725
726/*
727 * LOCK MANAGEMENT
728 */
729
730/* We take a transport-global lock in response to calls coming in from above,
731 and in response to data being received from below. New data to be written
732 is always queued, as are callbacks to process data. During unlock() we
733 check our todo lists and initiate callbacks and flush writes. */
734
735static void lock(transport *t) { gpr_mu_lock(&t->mu); }
736
737static void unlock(transport *t) {
738 int start_write = 0;
739 int perform_callbacks = 0;
740 int call_closed = 0;
nnoble0c475f02014-12-05 15:37:39 -0800741 int num_goaways = 0;
742 int i;
743 pending_goaway *goaways = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800744 grpc_endpoint *ep = t->ep;
Craig Tillere3018e62015-02-13 17:05:19 -0800745 grpc_stream_op_buffer nuke_now;
Craig Tillerd1345de2015-02-24 21:55:20 -0800746 const grpc_transport_callbacks *cb = t->cb;
Craig Tiller06059952015-02-18 08:34:56 -0800747
Craig Tillere3018e62015-02-13 17:05:19 -0800748 grpc_sopb_init(&nuke_now);
749 if (t->nuke_later_sopb.nops) {
750 grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
Craig Tillercb818ba2015-01-29 17:08:01 -0800751 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800752
753 /* see if we need to trigger a write - and if so, get the data ready */
754 if (ep && !t->writing) {
755 t->writing = start_write = prepare_write(t);
756 if (start_write) {
757 ref_transport(t);
758 }
759 }
760
ctiller00297df2015-01-12 11:23:09 -0800761 if (!t->writing) {
762 finalize_cancellations(t);
763 }
764
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800765 /* gather any callbacks that need to be made */
Craig Tillerd1345de2015-02-24 21:55:20 -0800766 if (!t->calling_back && cb) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800767 perform_callbacks = prepare_callbacks(t);
768 if (perform_callbacks) {
769 t->calling_back = 1;
770 }
Craig Tillerb9eb1802015-03-02 16:41:32 +0000771 if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800772 call_closed = 1;
773 t->calling_back = 1;
Craig Tiller5c019ae2015-04-17 16:46:53 -0700774 t->cb = NULL; /* no more callbacks */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800775 t->error_state = ERROR_STATE_NOTIFIED;
776 }
nnoble0c475f02014-12-05 15:37:39 -0800777 if (t->num_pending_goaways) {
778 goaways = t->pending_goaways;
779 num_goaways = t->num_pending_goaways;
780 t->pending_goaways = NULL;
781 t->num_pending_goaways = 0;
ctiller82e275f2014-12-12 08:43:28 -0800782 t->cap_pending_goaways = 0;
nnoble0c475f02014-12-05 15:37:39 -0800783 t->calling_back = 1;
784 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800785 }
786
nnoble0c475f02014-12-05 15:37:39 -0800787 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800788 ref_transport(t);
789 }
790
791 /* finally unlock */
792 gpr_mu_unlock(&t->mu);
793
794 /* perform some callbacks if necessary */
nnoble0c475f02014-12-05 15:37:39 -0800795 for (i = 0; i < num_goaways; i++) {
Craig Tiller5c019ae2015-04-17 16:46:53 -0700796 cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
nnoble0c475f02014-12-05 15:37:39 -0800797 }
798
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800799 if (perform_callbacks) {
Craig Tillerd1345de2015-02-24 21:55:20 -0800800 run_callbacks(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800801 }
802
803 if (call_closed) {
Craig Tiller748fe3f2015-03-02 07:48:50 -0800804 call_cb_closed(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800805 }
806
807 /* write some bytes if necessary */
ctiller00297df2015-01-12 11:23:09 -0800808 if (start_write) {
809 /* ultimately calls unref_transport(t); and clears t->writing */
810 perform_write(t, ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800811 }
812
nnoble0c475f02014-12-05 15:37:39 -0800813 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800814 lock(t);
815 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800816 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800817 unlock(t);
818 unref_transport(t);
819 }
nnoble0c475f02014-12-05 15:37:39 -0800820
Craig Tillere3018e62015-02-13 17:05:19 -0800821 grpc_sopb_destroy(&nuke_now);
Craig Tillercb818ba2015-01-29 17:08:01 -0800822
nnoble0c475f02014-12-05 15:37:39 -0800823 gpr_free(goaways);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800824}
825
826/*
827 * OUTPUT PROCESSING
828 */
829
830static void push_setting(transport *t, grpc_chttp2_setting_id id,
831 gpr_uint32 value) {
832 const grpc_chttp2_setting_parameters *sp =
833 &grpc_chttp2_settings_parameters[id];
834 gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
835 if (use_value != value) {
836 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
837 value, use_value);
838 }
839 if (use_value != t->settings[LOCAL_SETTINGS][id]) {
840 t->settings[LOCAL_SETTINGS][id] = use_value;
841 t->dirtied_local_settings = 1;
842 }
843}
844
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800845static int prepare_write(transport *t) {
846 stream *s;
ctiller00297df2015-01-12 11:23:09 -0800847 gpr_uint32 window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800848
849 /* simple writes are queued to qbuf, and flushed here */
Craig Tiller721f3622015-04-13 16:14:28 -0700850 gpr_slice_buffer_swap(&t->qbuf, &t->outbuf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800851 GPR_ASSERT(t->qbuf.count == 0);
852
853 if (t->dirtied_local_settings && !t->sent_local_settings) {
854 gpr_slice_buffer_add(
ctiller493fbcc2014-12-07 15:09:10 -0800855 &t->outbuf, grpc_chttp2_settings_create(
856 t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
857 t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
858 t->force_send_settings = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800859 t->dirtied_local_settings = 0;
860 t->sent_local_settings = 1;
861 }
862
863 /* for each stream that's become writable, frame it's data (according to
864 available window sizes) and add to the output buffer */
Craig Tiller84b88842015-04-20 08:47:52 -0700865 while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
866 s->outgoing_window > 0) {
ctiller00297df2015-01-12 11:23:09 -0800867 window_delta = grpc_chttp2_preencode(
868 s->outgoing_sopb.ops, &s->outgoing_sopb.nops,
869 GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
870 t->outgoing_window -= window_delta;
871 s->outgoing_window -= window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800872
ctiller00297df2015-01-12 11:23:09 -0800873 s->sending_write_closed =
874 s->queued_write_closed && s->outgoing_sopb.nops == 0;
875 if (s->writing_sopb.nops > 0 || s->sending_write_closed) {
876 stream_list_join(t, s, WRITING);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800877 }
878
879 /* if there are still writes to do and the stream still has window
880 available, then schedule a further write */
Yang Gao42c15a32015-04-20 14:34:28 -0700881 if (s->outgoing_sopb.nops > 0 && s->outgoing_window > 0) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800882 GPR_ASSERT(!t->outgoing_window);
883 stream_list_add_tail(t, s, WRITABLE);
884 }
885 }
886
887 /* for each stream that wants to update its window, add that window here */
888 while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
ctiller00297df2015-01-12 11:23:09 -0800889 window_delta =
ctiller493fbcc2014-12-07 15:09:10 -0800890 t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
891 s->incoming_window;
ctiller00297df2015-01-12 11:23:09 -0800892 if (!s->read_closed && window_delta) {
893 gpr_slice_buffer_add(
894 &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
895 s->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800896 }
897 }
898
899 /* if the transport is ready to send a window update, do so here also */
ctiller493fbcc2014-12-07 15:09:10 -0800900 if (t->incoming_window < t->connection_window_target * 3 / 4) {
ctiller00297df2015-01-12 11:23:09 -0800901 window_delta = t->connection_window_target - t->incoming_window;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800902 gpr_slice_buffer_add(&t->outbuf,
ctiller00297df2015-01-12 11:23:09 -0800903 grpc_chttp2_window_update_create(0, window_delta));
904 t->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800905 }
906
ctiller00297df2015-01-12 11:23:09 -0800907 return t->outbuf.length > 0 || !stream_list_empty(t, WRITING);
908}
909
910static void finalize_outbuf(transport *t) {
911 stream *s;
912
913 while ((s = stream_list_remove_head(t, WRITING))) {
914 grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
915 s->sending_write_closed, s->id, &t->hpack_compressor,
916 &t->outbuf);
917 s->writing_sopb.nops = 0;
918 if (s->sending_write_closed) {
919 stream_list_join(t, s, WRITTEN_CLOSED);
920 }
921 }
922}
923
924static void finish_write_common(transport *t, int success) {
925 stream *s;
926
927 lock(t);
928 if (!success) {
929 drop_connection(t);
930 }
931 while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
932 s->sent_write_closed = 1;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000933 if (!s->cancelled) stream_list_join(t, s, PENDING_CALLBACKS);
ctiller00297df2015-01-12 11:23:09 -0800934 }
935 t->outbuf.count = 0;
936 t->outbuf.length = 0;
937 /* leave the writing flag up on shutdown to prevent further writes in unlock()
938 from starting */
939 t->writing = 0;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000940 if (t->destroying) {
941 gpr_cv_signal(&t->cv);
942 }
ctiller00297df2015-01-12 11:23:09 -0800943 if (!t->reading) {
944 grpc_endpoint_destroy(t->ep);
945 t->ep = NULL;
ctiller00297df2015-01-12 11:23:09 -0800946 unref_transport(t); /* safe because we'll still have the ref for write */
947 }
948 unlock(t);
949
950 unref_transport(t);
951}
952
953static void finish_write(void *tp, grpc_endpoint_cb_status error) {
954 transport *t = tp;
955 finish_write_common(t, error == GRPC_ENDPOINT_CB_OK);
956}
957
958static void perform_write(transport *t, grpc_endpoint *ep) {
959 finalize_outbuf(t);
960
961 GPR_ASSERT(t->outbuf.count > 0);
962
963 switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
964 finish_write, t)) {
965 case GRPC_ENDPOINT_WRITE_DONE:
966 finish_write_common(t, 1);
967 break;
968 case GRPC_ENDPOINT_WRITE_ERROR:
969 finish_write_common(t, 0);
970 break;
971 case GRPC_ENDPOINT_WRITE_PENDING:
972 break;
973 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800974}
975
976static void maybe_start_some_streams(transport *t) {
977 while (
978 grpc_chttp2_stream_map_size(&t->stream_map) <
979 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
980 stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
981 if (!s) break;
982
983 GPR_ASSERT(s->id == 0);
984 s->id = t->next_stream_id;
985 t->next_stream_id += 2;
986 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
987 stream_list_join(t, s, WRITABLE);
988 }
989}
990
991static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
992 size_t ops_count, int is_last) {
993 transport *t = (transport *)gt;
994 stream *s = (stream *)gs;
995
996 lock(t);
997
998 if (is_last) {
ctiller00297df2015-01-12 11:23:09 -0800999 s->queued_write_closed = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001000 }
1001 if (!s->cancelled) {
1002 grpc_sopb_append(&s->outgoing_sopb, ops, ops_count);
ctiller00297df2015-01-12 11:23:09 -08001003 if (s->id == 0) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001004 stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
1005 maybe_start_some_streams(t);
ctiller00297df2015-01-12 11:23:09 -08001006 } else {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001007 stream_list_join(t, s, WRITABLE);
1008 }
1009 } else {
Craig Tillercefb00e2015-02-03 11:42:37 -08001010 grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001011 }
Craig Tillerfa9b1a42015-03-02 11:37:40 -08001012 if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed &&
1013 !s->published_close) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001014 stream_list_join(t, s, PENDING_CALLBACKS);
1015 }
1016
1017 unlock(t);
1018}
1019
1020static void abort_stream(grpc_transport *gt, grpc_stream *gs,
1021 grpc_status_code status) {
1022 transport *t = (transport *)gt;
1023 stream *s = (stream *)gs;
1024
1025 lock(t);
1026 cancel_stream(t, s, status, grpc_chttp2_grpc_status_to_http2_error(status),
1027 1);
1028 unlock(t);
1029}
1030
1031static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
1032 void *user_data) {
1033 transport *t = (transport *)gt;
1034 outstanding_ping *p;
1035
1036 lock(t);
1037 if (t->ping_capacity == t->ping_count) {
1038 t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2);
1039 t->pings =
1040 gpr_realloc(t->pings, sizeof(outstanding_ping) * t->ping_capacity);
1041 }
1042 p = &t->pings[t->ping_count++];
nnoble8f4e42c2014-12-11 16:36:46 -08001043 p->id[0] = (t->ping_counter >> 56) & 0xff;
1044 p->id[1] = (t->ping_counter >> 48) & 0xff;
1045 p->id[2] = (t->ping_counter >> 40) & 0xff;
1046 p->id[3] = (t->ping_counter >> 32) & 0xff;
1047 p->id[4] = (t->ping_counter >> 24) & 0xff;
1048 p->id[5] = (t->ping_counter >> 16) & 0xff;
1049 p->id[6] = (t->ping_counter >> 8) & 0xff;
1050 p->id[7] = t->ping_counter & 0xff;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001051 p->cb = cb;
1052 p->user_data = user_data;
1053 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
1054 unlock(t);
1055}
1056
1057/*
1058 * INPUT PROCESSING
1059 */
1060
ctiller00297df2015-01-12 11:23:09 -08001061static void finalize_cancellations(transport *t) {
1062 stream *s;
1063
1064 while ((s = stream_list_remove_head(t, CANCELLED))) {
1065 s->read_closed = 1;
1066 s->sent_write_closed = 1;
1067 stream_list_join(t, s, PENDING_CALLBACKS);
1068 }
1069}
1070
Craig Tiller9c1043e2015-04-16 16:20:38 -07001071static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
1072 if (s->incoming_metadata_capacity == s->incoming_metadata_count) {
Craig Tiller5c019ae2015-04-17 16:46:53 -07001073 s->incoming_metadata_capacity =
1074 GPR_MAX(8, 2 * s->incoming_metadata_capacity);
1075 s->incoming_metadata =
1076 gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) *
1077 s->incoming_metadata_capacity);
Craig Tiller9c1043e2015-04-16 16:20:38 -07001078 }
1079 s->incoming_metadata[s->incoming_metadata_count++].md = elem;
1080}
1081
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001082static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
1083 grpc_status_code local_status,
1084 grpc_chttp2_error_code error_code,
1085 int send_rst) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001086 int had_outgoing;
Craig Tiller8b433a22015-01-23 14:47:07 -08001087 char buffer[GPR_LTOA_MIN_BUFSIZE];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001088
1089 if (s) {
1090 /* clear out any unreported input & output: nobody cares anymore */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001091 had_outgoing = s->outgoing_sopb.nops != 0;
Craig Tillercb818ba2015-01-29 17:08:01 -08001092 schedule_nuke_sopb(t, &s->parser.incoming_sopb);
1093 schedule_nuke_sopb(t, &s->outgoing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001094 if (s->cancelled) {
1095 send_rst = 0;
ctiller00297df2015-01-12 11:23:09 -08001096 } else if (!s->read_closed || !s->sent_write_closed || had_outgoing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001097 s->cancelled = 1;
ctiller00297df2015-01-12 11:23:09 -08001098 stream_list_join(t, s, CANCELLED);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001099
Craig Tillera7ed5d92015-01-23 11:30:16 -08001100 gpr_ltoa(local_status, buffer);
Craig Tiller5c019ae2015-04-17 16:46:53 -07001101 add_incoming_metadata(
1102 t, s,
1103 grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
Craig Tillerbd222712015-04-17 16:09:40 -07001104 switch (local_status) {
1105 case GRPC_STATUS_CANCELLED:
Craig Tiller5c019ae2015-04-17 16:46:53 -07001106 add_incoming_metadata(
1107 t, s, grpc_mdelem_from_strings(t->metadata_context,
1108 "grpc-message", "Cancelled"));
Craig Tillerbd222712015-04-17 16:09:40 -07001109 break;
1110 default:
1111 break;
1112 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001113
1114 stream_list_join(t, s, PENDING_CALLBACKS);
1115 }
1116 }
1117 if (!id) send_rst = 0;
1118 if (send_rst) {
1119 gpr_slice_buffer_add(&t->qbuf,
1120 grpc_chttp2_rst_stream_create(id, error_code));
1121 }
1122}
1123
1124static void cancel_stream_id(transport *t, gpr_uint32 id,
1125 grpc_status_code local_status,
1126 grpc_chttp2_error_code error_code, int send_rst) {
1127 cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
1128 send_rst);
1129}
1130
1131static void cancel_stream(transport *t, stream *s,
1132 grpc_status_code local_status,
1133 grpc_chttp2_error_code error_code, int send_rst) {
1134 cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
1135}
1136
1137static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
1138 cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
1139 GRPC_CHTTP2_INTERNAL_ERROR, 0);
1140}
1141
1142static void end_all_the_calls(transport *t) {
1143 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
1144}
1145
1146static void drop_connection(transport *t) {
1147 if (t->error_state == ERROR_STATE_NONE) {
1148 t->error_state = ERROR_STATE_SEEN;
1149 }
1150 end_all_the_calls(t);
1151}
1152
1153static void maybe_join_window_updates(transport *t, stream *s) {
ctiller493fbcc2014-12-07 15:09:10 -08001154 if (s->allow_window_updates &&
1155 s->incoming_window <
1156 t->settings[LOCAL_SETTINGS]
1157 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
1158 3 / 4) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001159 stream_list_join(t, s, WINDOW_UPDATE);
1160 }
1161}
1162
1163static void set_allow_window_updates(grpc_transport *tp, grpc_stream *sp,
1164 int allow) {
1165 transport *t = (transport *)tp;
1166 stream *s = (stream *)sp;
1167
1168 lock(t);
1169 s->allow_window_updates = allow;
1170 if (allow) {
1171 maybe_join_window_updates(t, s);
1172 } else {
1173 stream_list_remove(t, s, WINDOW_UPDATE);
1174 }
1175 unlock(t);
1176}
1177
1178static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
1179 if (t->incoming_frame_size > t->incoming_window) {
1180 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1181 t->incoming_frame_size, t->incoming_window);
1182 return GRPC_CHTTP2_CONNECTION_ERROR;
1183 }
1184
1185 if (t->incoming_frame_size > s->incoming_window) {
1186 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1187 t->incoming_frame_size, s->incoming_window);
1188 return GRPC_CHTTP2_CONNECTION_ERROR;
1189 }
1190
1191 t->incoming_window -= t->incoming_frame_size;
1192 s->incoming_window -= t->incoming_frame_size;
1193
1194 /* if the stream incoming window is getting low, schedule an update */
1195 maybe_join_window_updates(t, s);
1196
1197 return GRPC_CHTTP2_PARSE_OK;
1198}
1199
1200static stream *lookup_stream(transport *t, gpr_uint32 id) {
1201 return grpc_chttp2_stream_map_find(&t->stream_map, id);
1202}
1203
1204static grpc_chttp2_parse_error skip_parser(void *parser,
1205 grpc_chttp2_parse_state *st,
1206 gpr_slice slice, int is_last) {
1207 return GRPC_CHTTP2_PARSE_OK;
1208}
1209
1210static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
1211
1212static int init_skip_frame(transport *t, int is_header) {
1213 if (is_header) {
1214 int is_eoh = t->expect_continuation_stream_id != 0;
1215 t->parser = grpc_chttp2_header_parser_parse;
1216 t->parser_data = &t->hpack_parser;
1217 t->hpack_parser.on_header = skip_header;
1218 t->hpack_parser.on_header_user_data = NULL;
1219 t->hpack_parser.is_boundary = is_eoh;
1220 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1221 } else {
1222 t->parser = skip_parser;
1223 }
1224 return 1;
1225}
1226
1227static void become_skip_parser(transport *t) {
1228 init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse);
1229}
1230
1231static int init_data_frame_parser(transport *t) {
1232 stream *s = lookup_stream(t, t->incoming_stream_id);
1233 grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
1234 if (!s || s->read_closed) return init_skip_frame(t, 0);
1235 if (err == GRPC_CHTTP2_PARSE_OK) {
1236 err = update_incoming_window(t, s);
1237 }
1238 if (err == GRPC_CHTTP2_PARSE_OK) {
1239 err = grpc_chttp2_data_parser_begin_frame(&s->parser,
1240 t->incoming_frame_flags);
1241 }
1242 switch (err) {
1243 case GRPC_CHTTP2_PARSE_OK:
1244 t->incoming_stream = s;
1245 t->parser = grpc_chttp2_data_parser_parse;
1246 t->parser_data = &s->parser;
1247 return 1;
1248 case GRPC_CHTTP2_STREAM_ERROR:
1249 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1250 GRPC_CHTTP2_INTERNAL_ERROR),
1251 GRPC_CHTTP2_INTERNAL_ERROR, 1);
1252 return init_skip_frame(t, 0);
1253 case GRPC_CHTTP2_CONNECTION_ERROR:
1254 drop_connection(t);
1255 return 0;
1256 }
1257 gpr_log(GPR_ERROR, "should never reach here");
1258 abort();
1259 return 0;
1260}
1261
1262static void free_timeout(void *p) { gpr_free(p); }
1263
1264static void on_header(void *tp, grpc_mdelem *md) {
1265 transport *t = tp;
1266 stream *s = t->incoming_stream;
1267
1268 GPR_ASSERT(s);
Craig Tillerd50e5652015-02-24 16:46:22 -08001269
1270 IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
1271 grpc_mdstr_as_c_string(md->key),
1272 grpc_mdstr_as_c_string(md->value)));
1273
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001274 stream_list_join(t, s, PENDING_CALLBACKS);
1275 if (md->key == t->str_grpc_timeout) {
1276 gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
1277 if (!cached_timeout) {
1278 /* not already parsed: parse it now, and store the result away */
1279 cached_timeout = gpr_malloc(sizeof(gpr_timespec));
1280 if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value),
1281 cached_timeout)) {
1282 gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
1283 grpc_mdstr_as_c_string(md->value));
1284 *cached_timeout = gpr_inf_future;
1285 }
1286 grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
1287 }
Craig Tiller9c1043e2015-04-16 16:20:38 -07001288 s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001289 grpc_mdelem_unref(md);
1290 } else {
Craig Tiller9c1043e2015-04-16 16:20:38 -07001291 add_incoming_metadata(t, s, md);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001292 }
1293}
1294
1295static int init_header_frame_parser(transport *t, int is_continuation) {
1296 int is_eoh =
1297 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
1298 stream *s;
1299
1300 if (is_eoh) {
1301 t->expect_continuation_stream_id = 0;
1302 } else {
1303 t->expect_continuation_stream_id = t->incoming_stream_id;
1304 }
1305
1306 if (!is_continuation) {
1307 t->header_eof =
1308 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
1309 }
1310
1311 /* could be a new stream or an existing stream */
1312 s = lookup_stream(t, t->incoming_stream_id);
1313 if (!s) {
1314 if (is_continuation) {
1315 gpr_log(GPR_ERROR, "stream disbanded before CONTINUATION received");
1316 return init_skip_frame(t, 1);
1317 }
1318 if (t->is_client) {
1319 if ((t->incoming_stream_id & 1) &&
1320 t->incoming_stream_id < t->next_stream_id) {
1321 /* this is an old (probably cancelled) stream */
1322 } else {
1323 gpr_log(GPR_ERROR, "ignoring new stream creation on client");
1324 }
1325 return init_skip_frame(t, 1);
nnoble0c475f02014-12-05 15:37:39 -08001326 } else if (t->last_incoming_stream_id > t->incoming_stream_id) {
1327 gpr_log(GPR_ERROR,
1328 "ignoring out of order new stream request on server; last stream "
1329 "id=%d, new stream id=%d",
1330 t->last_incoming_stream_id, t->incoming_stream);
1331 return init_skip_frame(t, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001332 }
1333 t->incoming_stream = NULL;
1334 /* if stream is accepted, we set incoming_stream in init_stream */
1335 t->cb->accept_stream(t->cb_user_data, &t->base,
Craig Tiller5c019ae2015-04-17 16:46:53 -07001336 (void *)(gpr_uintptr)t->incoming_stream_id);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001337 s = t->incoming_stream;
1338 if (!s) {
1339 gpr_log(GPR_ERROR, "stream not accepted");
1340 return init_skip_frame(t, 1);
1341 }
1342 } else {
1343 t->incoming_stream = s;
1344 }
1345 if (t->incoming_stream->read_closed) {
1346 gpr_log(GPR_ERROR, "skipping already closed stream header");
1347 t->incoming_stream = NULL;
1348 return init_skip_frame(t, 1);
1349 }
1350 t->parser = grpc_chttp2_header_parser_parse;
1351 t->parser_data = &t->hpack_parser;
1352 t->hpack_parser.on_header = on_header;
1353 t->hpack_parser.on_header_user_data = t;
1354 t->hpack_parser.is_boundary = is_eoh;
1355 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1356 if (!is_continuation &&
1357 (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
1358 grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser);
1359 }
1360 return 1;
1361}
1362
1363static int init_window_update_frame_parser(transport *t) {
1364 int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
1365 &t->simple_parsers.window_update,
1366 t->incoming_frame_size,
1367 t->incoming_frame_flags);
1368 if (!ok) {
1369 drop_connection(t);
1370 }
1371 t->parser = grpc_chttp2_window_update_parser_parse;
1372 t->parser_data = &t->simple_parsers.window_update;
1373 return ok;
1374}
1375
1376static int init_ping_parser(transport *t) {
1377 int ok = GRPC_CHTTP2_PARSE_OK ==
1378 grpc_chttp2_ping_parser_begin_frame(&t->simple_parsers.ping,
1379 t->incoming_frame_size,
1380 t->incoming_frame_flags);
1381 if (!ok) {
1382 drop_connection(t);
1383 }
1384 t->parser = grpc_chttp2_ping_parser_parse;
1385 t->parser_data = &t->simple_parsers.ping;
1386 return ok;
1387}
1388
nnoble0c475f02014-12-05 15:37:39 -08001389static int init_goaway_parser(transport *t) {
1390 int ok =
1391 GRPC_CHTTP2_PARSE_OK ==
1392 grpc_chttp2_goaway_parser_begin_frame(
1393 &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
1394 if (!ok) {
1395 drop_connection(t);
1396 }
1397 t->parser = grpc_chttp2_goaway_parser_parse;
1398 t->parser_data = &t->goaway_parser;
1399 return ok;
1400}
1401
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001402static int init_settings_frame_parser(transport *t) {
1403 int ok = GRPC_CHTTP2_PARSE_OK ==
1404 grpc_chttp2_settings_parser_begin_frame(
1405 &t->simple_parsers.settings, t->incoming_frame_size,
1406 t->incoming_frame_flags, t->settings[PEER_SETTINGS]);
1407 if (!ok) {
1408 drop_connection(t);
1409 }
1410 if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
1411 memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS],
1412 GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
1413 }
1414 t->parser = grpc_chttp2_settings_parser_parse;
1415 t->parser_data = &t->simple_parsers.settings;
1416 return ok;
1417}
1418
1419static int init_frame_parser(transport *t) {
1420 if (t->expect_continuation_stream_id != 0) {
1421 if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
1422 gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
1423 t->incoming_frame_type);
1424 return 0;
1425 }
1426 if (t->expect_continuation_stream_id != t->incoming_stream_id) {
1427 gpr_log(GPR_ERROR,
1428 "Expected CONTINUATION frame for stream %08x, got stream %08x",
1429 t->expect_continuation_stream_id, t->incoming_stream_id);
1430 return 0;
1431 }
1432 return init_header_frame_parser(t, 1);
1433 }
1434 switch (t->incoming_frame_type) {
1435 case GRPC_CHTTP2_FRAME_DATA:
1436 return init_data_frame_parser(t);
1437 case GRPC_CHTTP2_FRAME_HEADER:
1438 return init_header_frame_parser(t, 0);
1439 case GRPC_CHTTP2_FRAME_CONTINUATION:
1440 gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
1441 return 0;
1442 case GRPC_CHTTP2_FRAME_RST_STREAM:
1443 /* TODO(ctiller): actually parse the reason */
1444 cancel_stream_id(
1445 t, t->incoming_stream_id,
1446 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
1447 GRPC_CHTTP2_CANCEL, 0);
1448 return init_skip_frame(t, 0);
1449 case GRPC_CHTTP2_FRAME_SETTINGS:
1450 return init_settings_frame_parser(t);
1451 case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
1452 return init_window_update_frame_parser(t);
1453 case GRPC_CHTTP2_FRAME_PING:
1454 return init_ping_parser(t);
nnoble0c475f02014-12-05 15:37:39 -08001455 case GRPC_CHTTP2_FRAME_GOAWAY:
1456 return init_goaway_parser(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001457 default:
1458 gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
1459 return init_skip_frame(t, 0);
1460 }
1461}
1462
Craig Tiller84b88842015-04-20 08:47:52 -07001463static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
1464 return window + window_update < MAX_WINDOW;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001465}
1466
Craig Tiller5c019ae2015-04-17 16:46:53 -07001467static void free_md(void *p, grpc_op_error result) { gpr_free(p); }
Craig Tiller9c1043e2015-04-16 16:20:38 -07001468
Craig Tillerbd222712015-04-17 16:09:40 -07001469static void add_metadata_batch(transport *t, stream *s) {
Craig Tiller9c1043e2015-04-16 16:20:38 -07001470 grpc_metadata_batch b;
Craig Tiller9c1043e2015-04-16 16:20:38 -07001471 size_t i;
1472
1473 b.list.head = &s->incoming_metadata[0];
1474 b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1];
1475 b.garbage.head = b.garbage.tail = NULL;
1476 b.deadline = s->incoming_deadline;
1477
1478 for (i = 1; i < s->incoming_metadata_count; i++) {
1479 s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1];
1480 s->incoming_metadata[i - 1].next = &s->incoming_metadata[i];
1481 }
1482 s->incoming_metadata[0].prev = NULL;
1483 s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
1484
1485 grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
Craig Tiller5c019ae2015-04-17 16:46:53 -07001486 grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md,
1487 s->incoming_metadata);
Craig Tiller9c1043e2015-04-16 16:20:38 -07001488
1489 /* reset */
1490 s->incoming_deadline = gpr_inf_future;
1491 s->incoming_metadata = NULL;
1492 s->incoming_metadata_count = 0;
1493 s->incoming_metadata_capacity = 0;
1494}
1495
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001496static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
1497 grpc_chttp2_parse_state st;
1498 size_t i;
1499 memset(&st, 0, sizeof(st));
1500 switch (t->parser(t->parser_data, &st, slice, is_last)) {
1501 case GRPC_CHTTP2_PARSE_OK:
1502 if (st.end_of_stream) {
1503 t->incoming_stream->read_closed = 1;
1504 stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
1505 }
1506 if (st.need_flush_reads) {
1507 stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
1508 }
1509 if (st.metadata_boundary) {
Craig Tillerbd222712015-04-17 16:09:40 -07001510 add_metadata_batch(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001511 stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
1512 }
1513 if (st.ack_settings) {
1514 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
1515 maybe_start_some_streams(t);
1516 }
1517 if (st.send_ping_ack) {
1518 gpr_slice_buffer_add(
1519 &t->qbuf,
1520 grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes));
1521 }
nnoble0c475f02014-12-05 15:37:39 -08001522 if (st.goaway) {
1523 if (t->num_pending_goaways == t->cap_pending_goaways) {
1524 t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
1525 t->pending_goaways =
1526 gpr_realloc(t->pending_goaways,
1527 sizeof(pending_goaway) * t->cap_pending_goaways);
1528 }
1529 t->pending_goaways[t->num_pending_goaways].status =
1530 grpc_chttp2_http2_error_to_grpc_status(st.goaway_error);
1531 t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text;
1532 t->num_pending_goaways++;
1533 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001534 if (st.process_ping_reply) {
1535 for (i = 0; i < t->ping_count; i++) {
1536 if (0 ==
1537 memcmp(t->pings[i].id, t->simple_parsers.ping.opaque_8bytes, 8)) {
1538 t->pings[i].cb(t->pings[i].user_data);
1539 memmove(&t->pings[i], &t->pings[i + 1],
1540 (t->ping_count - i - 1) * sizeof(outstanding_ping));
1541 t->ping_count--;
1542 break;
1543 }
1544 }
1545 }
Yang Gaof1021032015-04-18 00:10:29 -07001546 if (st.initial_window_update) {
1547 for (i = 0; i < t->stream_map.count; i++) {
1548 stream *s = (stream*)(t->stream_map.values[i]);
Craig Tiller84b88842015-04-20 08:47:52 -07001549 int was_window_empty = s->outgoing_window <= 0;
1550 s->outgoing_window += st.initial_window_update;
1551 if (was_window_empty && s->outgoing_window > 0 &&
1552 s->outgoing_sopb.nops > 0) {
1553 stream_list_join(t, s, WRITABLE);
Yang Gaof1021032015-04-18 00:10:29 -07001554 }
1555 }
1556 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001557 if (st.window_update) {
1558 if (t->incoming_stream_id) {
1559 /* if there was a stream id, this is for some stream */
1560 stream *s = lookup_stream(t, t->incoming_stream_id);
1561 if (s) {
Craig Tiller84b88842015-04-20 08:47:52 -07001562 int was_window_empty = s->outgoing_window <= 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001563 if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
1564 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1565 GRPC_CHTTP2_FLOW_CONTROL_ERROR),
1566 GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
1567 } else {
1568 s->outgoing_window += st.window_update;
1569 /* if this window update makes outgoing ops writable again,
1570 flag that */
1571 if (was_window_empty && s->outgoing_sopb.nops) {
1572 stream_list_join(t, s, WRITABLE);
1573 }
1574 }
1575 }
1576 } else {
1577 /* transport level window update */
1578 if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
1579 drop_connection(t);
1580 } else {
1581 t->outgoing_window += st.window_update;
1582 }
1583 }
1584 }
1585 return 1;
1586 case GRPC_CHTTP2_STREAM_ERROR:
1587 become_skip_parser(t);
1588 cancel_stream_id(
1589 t, t->incoming_stream_id,
1590 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
1591 GRPC_CHTTP2_INTERNAL_ERROR, 1);
1592 return 1;
1593 case GRPC_CHTTP2_CONNECTION_ERROR:
1594 drop_connection(t);
1595 return 0;
1596 }
1597 gpr_log(GPR_ERROR, "should never reach here");
1598 abort();
1599 return 0;
1600}
1601
1602static int process_read(transport *t, gpr_slice slice) {
1603 gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
1604 gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
1605 gpr_uint8 *cur = beg;
1606
1607 if (cur == end) return 1;
1608
1609 switch (t->deframe_state) {
1610 case DTS_CLIENT_PREFIX_0:
1611 case DTS_CLIENT_PREFIX_1:
1612 case DTS_CLIENT_PREFIX_2:
1613 case DTS_CLIENT_PREFIX_3:
1614 case DTS_CLIENT_PREFIX_4:
1615 case DTS_CLIENT_PREFIX_5:
1616 case DTS_CLIENT_PREFIX_6:
1617 case DTS_CLIENT_PREFIX_7:
1618 case DTS_CLIENT_PREFIX_8:
1619 case DTS_CLIENT_PREFIX_9:
1620 case DTS_CLIENT_PREFIX_10:
1621 case DTS_CLIENT_PREFIX_11:
1622 case DTS_CLIENT_PREFIX_12:
1623 case DTS_CLIENT_PREFIX_13:
1624 case DTS_CLIENT_PREFIX_14:
1625 case DTS_CLIENT_PREFIX_15:
1626 case DTS_CLIENT_PREFIX_16:
1627 case DTS_CLIENT_PREFIX_17:
1628 case DTS_CLIENT_PREFIX_18:
1629 case DTS_CLIENT_PREFIX_19:
1630 case DTS_CLIENT_PREFIX_20:
1631 case DTS_CLIENT_PREFIX_21:
1632 case DTS_CLIENT_PREFIX_22:
1633 case DTS_CLIENT_PREFIX_23:
1634 while (cur != end && t->deframe_state != DTS_FH_0) {
1635 if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) {
1636 gpr_log(GPR_ERROR,
1637 "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
1638 "at byte %d",
1639 CLIENT_CONNECT_STRING[t->deframe_state],
Craig Tiller5c019ae2015-04-17 16:46:53 -07001640 (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur,
1641 (int)*cur, t->deframe_state);
Craig Tiller5246e7a2015-01-19 14:59:08 -08001642 drop_connection(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001643 return 0;
1644 }
1645 ++cur;
1646 ++t->deframe_state;
1647 }
1648 if (cur == end) {
1649 return 1;
1650 }
1651 /* fallthrough */
1652 dts_fh_0:
1653 case DTS_FH_0:
1654 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001655 t->incoming_frame_size = ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001656 if (++cur == end) {
1657 t->deframe_state = DTS_FH_1;
1658 return 1;
1659 }
1660 /* fallthrough */
1661 case DTS_FH_1:
1662 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001663 t->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001664 if (++cur == end) {
1665 t->deframe_state = DTS_FH_2;
1666 return 1;
1667 }
1668 /* fallthrough */
1669 case DTS_FH_2:
1670 GPR_ASSERT(cur < end);
1671 t->incoming_frame_size |= *cur;
1672 if (++cur == end) {
1673 t->deframe_state = DTS_FH_3;
1674 return 1;
1675 }
1676 /* fallthrough */
1677 case DTS_FH_3:
1678 GPR_ASSERT(cur < end);
1679 t->incoming_frame_type = *cur;
1680 if (++cur == end) {
1681 t->deframe_state = DTS_FH_4;
1682 return 1;
1683 }
1684 /* fallthrough */
1685 case DTS_FH_4:
1686 GPR_ASSERT(cur < end);
1687 t->incoming_frame_flags = *cur;
1688 if (++cur == end) {
1689 t->deframe_state = DTS_FH_5;
1690 return 1;
1691 }
1692 /* fallthrough */
1693 case DTS_FH_5:
1694 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001695 t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001696 if (++cur == end) {
1697 t->deframe_state = DTS_FH_6;
1698 return 1;
1699 }
1700 /* fallthrough */
1701 case DTS_FH_6:
1702 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001703 t->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001704 if (++cur == end) {
1705 t->deframe_state = DTS_FH_7;
1706 return 1;
1707 }
1708 /* fallthrough */
1709 case DTS_FH_7:
1710 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001711 t->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001712 if (++cur == end) {
1713 t->deframe_state = DTS_FH_8;
1714 return 1;
1715 }
1716 /* fallthrough */
1717 case DTS_FH_8:
1718 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001719 t->incoming_stream_id |= ((gpr_uint32)*cur);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001720 t->deframe_state = DTS_FRAME;
1721 if (!init_frame_parser(t)) {
1722 return 0;
1723 }
Tatsuhiro Tsujikawa1cbf8d72015-03-13 23:59:40 +09001724 /* t->last_incoming_stream_id is used as last-stream-id when
1725 sending GOAWAY frame.
1726 https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
1727 says that last-stream-id is peer-initiated stream ID. So,
1728 since we don't have server pushed streams, client should send
1729 GOAWAY last-stream-id=0 in this case. */
Tatsuhiro Tsujikawad11f6102015-03-12 22:57:22 +09001730 if (!t->is_client) {
1731 t->last_incoming_stream_id = t->incoming_stream_id;
1732 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001733 if (t->incoming_frame_size == 0) {
1734 if (!parse_frame_slice(t, gpr_empty_slice(), 1)) {
1735 return 0;
1736 }
1737 if (++cur == end) {
1738 t->deframe_state = DTS_FH_0;
1739 return 1;
1740 }
1741 goto dts_fh_0; /* loop */
1742 }
1743 if (++cur == end) {
1744 return 1;
1745 }
1746 /* fallthrough */
1747 case DTS_FRAME:
1748 GPR_ASSERT(cur < end);
Craig Tiller54f9a652015-02-19 21:41:20 -08001749 if ((gpr_uint32)(end - cur) == t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001750 if (!parse_frame_slice(
1751 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
1752 return 0;
1753 }
1754 t->deframe_state = DTS_FH_0;
1755 return 1;
Craig Tiller0c0b60c2015-01-21 15:49:28 -08001756 } else if ((gpr_uint32)(end - cur) > t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001757 if (!parse_frame_slice(
1758 t, gpr_slice_sub_no_ref(slice, cur - beg,
1759 cur + t->incoming_frame_size - beg),
1760 1)) {
1761 return 0;
1762 }
1763 cur += t->incoming_frame_size;
1764 goto dts_fh_0; /* loop */
1765 } else {
1766 if (!parse_frame_slice(
1767 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
1768 return 0;
1769 }
1770 t->incoming_frame_size -= (end - cur);
1771 return 1;
1772 }
1773 gpr_log(GPR_ERROR, "should never reach here");
1774 abort();
1775 }
1776
1777 gpr_log(GPR_ERROR, "should never reach here");
1778 abort();
Nicolas "Pixel" Noble7f13eb22015-04-01 20:57:33 -07001779
1780 return 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001781}
1782
1783/* tcp read callback */
1784static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
1785 grpc_endpoint_cb_status error) {
1786 transport *t = tp;
1787 size_t i;
1788 int keep_reading = 0;
1789
1790 switch (error) {
1791 case GRPC_ENDPOINT_CB_SHUTDOWN:
1792 case GRPC_ENDPOINT_CB_EOF:
1793 case GRPC_ENDPOINT_CB_ERROR:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001794 lock(t);
1795 drop_connection(t);
1796 t->reading = 0;
1797 if (!t->writing && t->ep) {
1798 grpc_endpoint_destroy(t->ep);
1799 t->ep = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001800 unref_transport(t); /* safe as we still have a ref for read */
1801 }
1802 unlock(t);
1803 unref_transport(t);
1804 break;
1805 case GRPC_ENDPOINT_CB_OK:
1806 lock(t);
1807 for (i = 0; i < nslices && process_read(t, slices[i]); i++)
1808 ;
1809 unlock(t);
1810 keep_reading = 1;
1811 break;
1812 }
1813
1814 for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
1815
1816 if (keep_reading) {
ctiller58393c22015-01-07 14:03:30 -08001817 grpc_endpoint_notify_on_read(t->ep, recv_data, t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001818 }
1819}
1820
1821/*
1822 * CALLBACK LOOP
1823 */
1824
1825static grpc_stream_state compute_state(gpr_uint8 write_closed,
1826 gpr_uint8 read_closed) {
1827 if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
1828 if (write_closed) return GRPC_STREAM_SEND_CLOSED;
1829 if (read_closed) return GRPC_STREAM_RECV_CLOSED;
1830 return GRPC_STREAM_OPEN;
1831}
1832
1833static int prepare_callbacks(transport *t) {
1834 stream *s;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001835 int n = 0;
1836 while ((s = stream_list_remove_head(t, PENDING_CALLBACKS))) {
1837 int execute = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001838
ctiller00297df2015-01-12 11:23:09 -08001839 s->callback_state = compute_state(s->sent_write_closed, s->read_closed);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001840 if (s->callback_state == GRPC_STREAM_CLOSED) {
1841 remove_from_stream_map(t, s);
1842 if (s->published_close) {
1843 execute = 0;
Craig Tillerbd222712015-04-17 16:09:40 -07001844 } else if (s->incoming_metadata_count) {
1845 add_metadata_batch(t, s);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001846 }
1847 s->published_close = 1;
1848 }
1849
Craig Tillerbd222712015-04-17 16:09:40 -07001850 grpc_sopb_swap(&s->parser.incoming_sopb, &s->callback_sopb);
1851
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001852 if (execute) {
1853 stream_list_add_tail(t, s, EXECUTING_CALLBACKS);
1854 n = 1;
1855 }
1856 }
1857 return n;
1858}
1859
Craig Tillerd1345de2015-02-24 21:55:20 -08001860static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001861 stream *s;
1862 while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) {
1863 size_t nops = s->callback_sopb.nops;
1864 s->callback_sopb.nops = 0;
Craig Tillerd1345de2015-02-24 21:55:20 -08001865 cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s,
1866 s->callback_sopb.ops, nops, s->callback_state);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001867 }
1868}
1869
Craig Tiller748fe3f2015-03-02 07:48:50 -08001870static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
1871 cb->closed(t->cb_user_data, &t->base);
1872}
1873
ctillerd79b4862014-12-17 16:36:59 -08001874static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
1875 transport *t = (transport *)gt;
1876 lock(t);
1877 if (t->ep) {
1878 grpc_endpoint_add_to_pollset(t->ep, pollset);
1879 }
1880 unlock(t);
1881}
1882
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001883/*
1884 * INTEGRATION GLUE
1885 */
1886
1887static const grpc_transport_vtable vtable = {
Craig Tiller5c019ae2015-04-17 16:46:53 -07001888 sizeof(stream), init_stream, send_batch, set_allow_window_updates,
1889 add_to_pollset, destroy_stream, abort_stream, goaway, close_transport,
1890 send_ping, destroy_transport};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001891
1892void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
1893 void *arg,
1894 const grpc_channel_args *channel_args,
1895 grpc_endpoint *ep, gpr_slice *slices,
1896 size_t nslices, grpc_mdctx *mdctx,
1897 int is_client) {
1898 transport *t = gpr_malloc(sizeof(transport));
Nicolas Noble5ea99bb2015-02-04 14:13:09 -08001899 init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx,
1900 is_client);
Craig Tiller190d3602015-02-18 09:23:38 -08001901}