blob: 0640c848d799f661a2c077b4e195196a07a4a50c [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/transport/chttp2_transport.h"
35
36#include <math.h>
37#include <stdio.h>
38#include <string.h>
39
Craig Tiller485d7762015-01-23 12:54:05 -080040#include "src/core/support/string.h"
nnoble0c475f02014-12-05 15:37:39 -080041#include "src/core/transport/chttp2/frame_data.h"
42#include "src/core/transport/chttp2/frame_goaway.h"
43#include "src/core/transport/chttp2/frame_ping.h"
44#include "src/core/transport/chttp2/frame_rst_stream.h"
45#include "src/core/transport/chttp2/frame_settings.h"
46#include "src/core/transport/chttp2/frame_window_update.h"
47#include "src/core/transport/chttp2/hpack_parser.h"
48#include "src/core/transport/chttp2/http2_errors.h"
49#include "src/core/transport/chttp2/status_conversion.h"
50#include "src/core/transport/chttp2/stream_encoder.h"
51#include "src/core/transport/chttp2/stream_map.h"
52#include "src/core/transport/chttp2/timeout_encoding.h"
53#include "src/core/transport/transport_impl.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080054#include <grpc/support/alloc.h>
55#include <grpc/support/log.h>
56#include <grpc/support/slice_buffer.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080057#include <grpc/support/useful.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080058
ctiller493fbcc2014-12-07 15:09:10 -080059#define DEFAULT_WINDOW 65535
60#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080061#define MAX_WINDOW 0x7fffffffu
62
63#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
64#define CLIENT_CONNECT_STRLEN 24
65
Craig Tillerfaa84802015-03-01 21:56:38 -080066int grpc_http_trace = 0;
67
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068typedef struct transport transport;
69typedef struct stream stream;
70
Craig Tiller5c019ae2015-04-17 16:46:53 -070071#define IF_TRACING(stmt) \
72 if (!(grpc_http_trace)) \
73 ; \
74 else \
Craig Tillerd50e5652015-02-24 16:46:22 -080075 stmt
76
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080077/* streams are kept in various linked lists depending on what things need to
78 happen to them... this enum labels each list */
79typedef enum {
80 /* streams that have pending writes */
81 WRITABLE = 0,
ctiller00297df2015-01-12 11:23:09 -080082 /* streams that have been selected to be written */
83 WRITING,
84 /* streams that have just been written, and included a close */
85 WRITTEN_CLOSED,
86 /* streams that have been cancelled and have some pending state updates
87 to perform */
88 CANCELLED,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080089 /* streams that want to send window updates */
90 WINDOW_UPDATE,
91 /* streams that are waiting to start because there are too many concurrent
92 streams on the connection */
93 WAITING_FOR_CONCURRENCY,
Craig Tillerc079c112015-04-22 15:23:39 -070094 /* streams that have finished reading: we wait until unlock to coalesce
95 all changes into one callback */
96 FINISHED_READ_OP,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080097 STREAM_LIST_COUNT /* must be last */
98} stream_list_id;
99
100/* deframer state for the overall http2 stream of bytes */
101typedef enum {
102 /* prefix: one entry per http2 connection prefix byte */
103 DTS_CLIENT_PREFIX_0 = 0,
104 DTS_CLIENT_PREFIX_1,
105 DTS_CLIENT_PREFIX_2,
106 DTS_CLIENT_PREFIX_3,
107 DTS_CLIENT_PREFIX_4,
108 DTS_CLIENT_PREFIX_5,
109 DTS_CLIENT_PREFIX_6,
110 DTS_CLIENT_PREFIX_7,
111 DTS_CLIENT_PREFIX_8,
112 DTS_CLIENT_PREFIX_9,
113 DTS_CLIENT_PREFIX_10,
114 DTS_CLIENT_PREFIX_11,
115 DTS_CLIENT_PREFIX_12,
116 DTS_CLIENT_PREFIX_13,
117 DTS_CLIENT_PREFIX_14,
118 DTS_CLIENT_PREFIX_15,
119 DTS_CLIENT_PREFIX_16,
120 DTS_CLIENT_PREFIX_17,
121 DTS_CLIENT_PREFIX_18,
122 DTS_CLIENT_PREFIX_19,
123 DTS_CLIENT_PREFIX_20,
124 DTS_CLIENT_PREFIX_21,
125 DTS_CLIENT_PREFIX_22,
126 DTS_CLIENT_PREFIX_23,
127 /* frame header byte 0... */
128 /* must follow from the prefix states */
129 DTS_FH_0,
130 DTS_FH_1,
131 DTS_FH_2,
132 DTS_FH_3,
133 DTS_FH_4,
134 DTS_FH_5,
135 DTS_FH_6,
136 DTS_FH_7,
137 /* ... frame header byte 8 */
138 DTS_FH_8,
139 /* inside a http2 frame */
140 DTS_FRAME
141} deframe_transport_state;
142
Craig Tillerc079c112015-04-22 15:23:39 -0700143typedef enum {
144 WRITE_STATE_OPEN,
145 WRITE_STATE_QUEUED_CLOSE,
146 WRITE_STATE_SENT_CLOSE
147} WRITE_STATE;
148
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800149typedef struct {
150 stream *head;
151 stream *tail;
152} stream_list;
153
154typedef struct {
155 stream *next;
156 stream *prev;
157} stream_link;
158
159typedef enum {
160 ERROR_STATE_NONE,
161 ERROR_STATE_SEEN,
162 ERROR_STATE_NOTIFIED
163} error_state;
164
165/* We keep several sets of connection wide parameters */
166typedef enum {
167 /* The settings our peer has asked for (and we have acked) */
168 PEER_SETTINGS = 0,
169 /* The settings we'd like to have */
170 LOCAL_SETTINGS,
171 /* The settings we've published to our peer */
172 SENT_SETTINGS,
173 /* The settings the peer has acked */
174 ACKED_SETTINGS,
175 NUM_SETTING_SETS
176} setting_set;
177
178/* Outstanding ping request data */
179typedef struct {
180 gpr_uint8 id[8];
181 void (*cb)(void *user_data);
182 void *user_data;
183} outstanding_ping;
184
nnoble0c475f02014-12-05 15:37:39 -0800185typedef struct {
186 grpc_status_code status;
187 gpr_slice debug;
188} pending_goaway;
189
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700190typedef struct {
191 void (*cb)(void *user_data, int success);
192 void *user_data;
Craig Tillerc079c112015-04-22 15:23:39 -0700193 int success;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700194} op_closure;
195
196typedef struct {
197 op_closure *callbacks;
198 size_t count;
199 size_t capacity;
200} op_closure_array;
201
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202struct transport {
203 grpc_transport base; /* must be first */
204 const grpc_transport_callbacks *cb;
205 void *cb_user_data;
206 grpc_endpoint *ep;
207 grpc_mdctx *metadata_context;
208 gpr_refcount refs;
209 gpr_uint8 is_client;
210
211 gpr_mu mu;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800212 gpr_cv cv;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213
214 /* basic state management - what are we doing at the moment? */
215 gpr_uint8 reading;
216 gpr_uint8 writing;
217 gpr_uint8 calling_back;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800218 gpr_uint8 destroying;
Craig Tillerd75fe662015-02-21 07:30:49 -0800219 gpr_uint8 closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800220 error_state error_state;
221
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700222 /* queued callbacks */
223 op_closure_array pending_callbacks;
224 op_closure_array executing_callbacks;
225
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800226 /* stream indexing */
227 gpr_uint32 next_stream_id;
nnoble0c475f02014-12-05 15:37:39 -0800228 gpr_uint32 last_incoming_stream_id;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800229
230 /* settings */
231 gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
ctiller493fbcc2014-12-07 15:09:10 -0800232 gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */
233 gpr_uint8 sent_local_settings; /* have local settings been sent? */
234 gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800235
236 /* window management */
237 gpr_uint32 outgoing_window;
238 gpr_uint32 incoming_window;
ctiller493fbcc2014-12-07 15:09:10 -0800239 gpr_uint32 connection_window_target;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800240
241 /* deframing */
242 deframe_transport_state deframe_state;
243 gpr_uint8 incoming_frame_type;
244 gpr_uint8 incoming_frame_flags;
245 gpr_uint8 header_eof;
246 gpr_uint32 expect_continuation_stream_id;
247 gpr_uint32 incoming_frame_size;
248 gpr_uint32 incoming_stream_id;
249
250 /* hpack encoding */
251 grpc_chttp2_hpack_compressor hpack_compressor;
252
253 /* various parsers */
254 grpc_chttp2_hpack_parser hpack_parser;
255 /* simple one shot parsers */
256 union {
257 grpc_chttp2_window_update_parser window_update;
258 grpc_chttp2_settings_parser settings;
259 grpc_chttp2_ping_parser ping;
260 } simple_parsers;
261
nnoble0c475f02014-12-05 15:37:39 -0800262 /* goaway */
263 grpc_chttp2_goaway_parser goaway_parser;
264 pending_goaway *pending_goaways;
265 size_t num_pending_goaways;
266 size_t cap_pending_goaways;
267
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800268 /* state for a stream that's not yet been created */
269 grpc_stream_op_buffer new_stream_sopb;
270
Craig Tillercb818ba2015-01-29 17:08:01 -0800271 /* stream ops that need to be destroyed, but outside of the lock */
272 grpc_stream_op_buffer nuke_later_sopb;
273
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800274 /* active parser */
275 void *parser_data;
276 stream *incoming_stream;
277 grpc_chttp2_parse_error (*parser)(void *parser_user_data,
278 grpc_chttp2_parse_state *state,
279 gpr_slice slice, int is_last);
280
281 gpr_slice_buffer outbuf;
282 gpr_slice_buffer qbuf;
283
284 stream_list lists[STREAM_LIST_COUNT];
285 grpc_chttp2_stream_map stream_map;
286
287 /* metadata object cache */
288 grpc_mdstr *str_grpc_timeout;
289
290 /* pings */
291 outstanding_ping *pings;
292 size_t ping_count;
293 size_t ping_capacity;
294 gpr_int64 ping_counter;
295};
296
297struct stream {
298 gpr_uint32 id;
299
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800300 gpr_uint32 incoming_window;
Craig Tiller84b88842015-04-20 08:47:52 -0700301 gpr_int64 outgoing_window;
ctiller00297df2015-01-12 11:23:09 -0800302 /* when the application requests writes be closed, the write_closed is
303 'queued'; when the close is flow controlled into the send path, we are
304 'sending' it; when the write has been performed it is 'sent' */
Craig Tillerc079c112015-04-22 15:23:39 -0700305 WRITE_STATE write_state;
306 gpr_uint8 send_closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800307 gpr_uint8 read_closed;
308 gpr_uint8 cancelled;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800309
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700310 op_closure send_done_closure;
311 op_closure recv_done_closure;
312
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313 stream_link links[STREAM_LIST_COUNT];
314 gpr_uint8 included[STREAM_LIST_COUNT];
315
Craig Tiller9c1043e2015-04-16 16:20:38 -0700316 /* incoming metadata */
317 grpc_linked_mdelem *incoming_metadata;
318 size_t incoming_metadata_count;
319 size_t incoming_metadata_capacity;
320 gpr_timespec incoming_deadline;
321
ctiller00297df2015-01-12 11:23:09 -0800322 /* sops from application */
Craig Tillerc079c112015-04-22 15:23:39 -0700323 grpc_stream_op_buffer *outgoing_sopb;
324 grpc_stream_op_buffer *incoming_sopb;
325 grpc_stream_state *publish_state;
326 grpc_stream_state published_state;
ctiller00297df2015-01-12 11:23:09 -0800327 /* sops that have passed flow control to be written */
328 grpc_stream_op_buffer writing_sopb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800329
330 grpc_chttp2_data_parser parser;
331
332 grpc_stream_state callback_state;
333 grpc_stream_op_buffer callback_sopb;
334};
335
336static const grpc_transport_vtable vtable;
337
338static void push_setting(transport *t, grpc_chttp2_setting_id id,
339 gpr_uint32 value);
340
341static int prepare_callbacks(transport *t);
Craig Tillerd1345de2015-02-24 21:55:20 -0800342static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
Craig Tiller748fe3f2015-03-02 07:48:50 -0800343static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800344
345static int prepare_write(transport *t);
ctiller00297df2015-01-12 11:23:09 -0800346static void perform_write(transport *t, grpc_endpoint *ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800347
348static void lock(transport *t);
349static void unlock(transport *t);
350
351static void drop_connection(transport *t);
352static void end_all_the_calls(transport *t);
353
354static stream *stream_list_remove_head(transport *t, stream_list_id id);
355static void stream_list_remove(transport *t, stream *s, stream_list_id id);
356static void stream_list_add_tail(transport *t, stream *s, stream_list_id id);
357static void stream_list_join(transport *t, stream *s, stream_list_id id);
358
359static void cancel_stream_id(transport *t, gpr_uint32 id,
360 grpc_status_code local_status,
361 grpc_chttp2_error_code error_code, int send_rst);
362static void cancel_stream(transport *t, stream *s,
363 grpc_status_code local_status,
Craig Tiller1a727fd2015-04-24 13:21:22 -0700364 grpc_chttp2_error_code error_code,
Craig Tiller2ea37fd2015-04-24 13:03:49 -0700365 grpc_mdstr *optional_message, int send_rst);
ctiller00297df2015-01-12 11:23:09 -0800366static void finalize_cancellations(transport *t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367static stream *lookup_stream(transport *t, gpr_uint32 id);
368static void remove_from_stream_map(transport *t, stream *s);
369static void maybe_start_some_streams(transport *t);
370
371static void become_skip_parser(transport *t);
372
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800373static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
374 grpc_endpoint_cb_status error);
375
Craig Tillerc079c112015-04-22 15:23:39 -0700376static void schedule_cb(transport *t, op_closure closure, int success);
377static void maybe_finish_read(transport *t, stream *s);
378static void maybe_join_window_updates(transport *t, stream *s);
379static void finish_reads(transport *t);
380static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
Craig Tiller50d9db52015-04-23 10:52:14 -0700381static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
Craig Tiller7d4a96a2015-04-24 07:54:07 -0700382static void add_metadata_batch(transport *t, stream *s);
Craig Tillerc079c112015-04-22 15:23:39 -0700383
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800384/*
385 * CONSTRUCTION/DESTRUCTION/REFCOUNTING
386 */
387
Craig Tiller9be83ee2015-02-18 14:16:15 -0800388static void destruct_transport(transport *t) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800389 size_t i;
390
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800391 gpr_mu_lock(&t->mu);
392
393 GPR_ASSERT(t->ep == NULL);
394
395 gpr_slice_buffer_destroy(&t->outbuf);
396 gpr_slice_buffer_destroy(&t->qbuf);
397 grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
398 grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
nnoble0c475f02014-12-05 15:37:39 -0800399 grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800400
401 grpc_mdstr_unref(t->str_grpc_timeout);
402
403 for (i = 0; i < STREAM_LIST_COUNT; i++) {
404 GPR_ASSERT(t->lists[i].head == NULL);
405 GPR_ASSERT(t->lists[i].tail == NULL);
406 }
407
408 GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
409
410 grpc_chttp2_stream_map_destroy(&t->stream_map);
411
412 gpr_mu_unlock(&t->mu);
413 gpr_mu_destroy(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800414 gpr_cv_destroy(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800415
416 /* callback remaining pings: they're not allowed to call into the transpot,
417 and maybe they hold resources that need to be freed */
418 for (i = 0; i < t->ping_count; i++) {
419 t->pings[i].cb(t->pings[i].user_data);
420 }
421 gpr_free(t->pings);
422
Craig Tiller4df5cae2015-04-24 13:46:12 -0700423 gpr_free(t->pending_callbacks.callbacks);
424 gpr_free(t->executing_callbacks.callbacks);
425
nnoble0c475f02014-12-05 15:37:39 -0800426 for (i = 0; i < t->num_pending_goaways; i++) {
427 gpr_slice_unref(t->pending_goaways[i].debug);
428 }
429 gpr_free(t->pending_goaways);
430
Craig Tiller8ed35ea2015-01-30 11:27:43 -0800431 grpc_sopb_destroy(&t->nuke_later_sopb);
432
Craig Tiller9be83ee2015-02-18 14:16:15 -0800433 grpc_mdctx_unref(t->metadata_context);
434
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800435 gpr_free(t);
436}
437
Craig Tiller9be83ee2015-02-18 14:16:15 -0800438static void unref_transport(transport *t) {
439 if (!gpr_unref(&t->refs)) return;
440 destruct_transport(t);
441}
442
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800443static void ref_transport(transport *t) { gpr_ref(&t->refs); }
444
445static void init_transport(transport *t, grpc_transport_setup_callback setup,
446 void *arg, const grpc_channel_args *channel_args,
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800447 grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
448 grpc_mdctx *mdctx, int is_client) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800449 size_t i;
450 int j;
451 grpc_transport_setup_result sr;
452
453 GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
454
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700455 memset(t, 0, sizeof(*t));
456
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800457 t->base.vtable = &vtable;
458 t->ep = ep;
459 /* one ref is for destroy, the other for when ep becomes NULL */
460 gpr_ref_init(&t->refs, 2);
461 gpr_mu_init(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800462 gpr_cv_init(&t->cv);
Craig Tiller9be83ee2015-02-18 14:16:15 -0800463 grpc_mdctx_ref(mdctx);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800464 t->metadata_context = mdctx;
465 t->str_grpc_timeout =
466 grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
467 t->reading = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800468 t->error_state = ERROR_STATE_NONE;
469 t->next_stream_id = is_client ? 1 : 2;
470 t->is_client = is_client;
471 t->outgoing_window = DEFAULT_WINDOW;
472 t->incoming_window = DEFAULT_WINDOW;
ctiller493fbcc2014-12-07 15:09:10 -0800473 t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800474 t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800475 t->ping_counter = gpr_now().tv_nsec;
476 grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
nnoble0c475f02014-12-05 15:37:39 -0800477 grpc_chttp2_goaway_parser_init(&t->goaway_parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800478 gpr_slice_buffer_init(&t->outbuf);
479 gpr_slice_buffer_init(&t->qbuf);
Craig Tillercb818ba2015-01-29 17:08:01 -0800480 grpc_sopb_init(&t->nuke_later_sopb);
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800481 grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800482 if (is_client) {
483 gpr_slice_buffer_add(&t->qbuf,
484 gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
485 }
486 /* 8 is a random stab in the dark as to a good initial size: it's small enough
487 that it shouldn't waste memory for infrequently used connections, yet
488 large enough that the exponential growth should happen nicely when it's
489 needed.
490 TODO(ctiller): tune this */
491 grpc_chttp2_stream_map_init(&t->stream_map, 8);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800492
493 /* copy in initial settings to all setting sets */
494 for (i = 0; i < NUM_SETTING_SETS; i++) {
495 for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) {
496 t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
497 }
498 }
499 t->dirtied_local_settings = 1;
ctiller493fbcc2014-12-07 15:09:10 -0800500 /* Hack: it's common for implementations to assume 65536 bytes initial send
501 window -- this should by rights be 0 */
502 t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800503 t->sent_local_settings = 0;
504
505 /* configure http2 the way we like it */
506 if (t->is_client) {
507 push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
508 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
509 }
ctiller493fbcc2014-12-07 15:09:10 -0800510 push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800511
512 if (channel_args) {
513 for (i = 0; i < channel_args->num_args; i++) {
514 if (0 ==
515 strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
516 if (t->is_client) {
517 gpr_log(GPR_ERROR, "%s: is ignored on the client",
518 GRPC_ARG_MAX_CONCURRENT_STREAMS);
519 } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
520 gpr_log(GPR_ERROR, "%s: must be an integer",
521 GRPC_ARG_MAX_CONCURRENT_STREAMS);
522 } else {
523 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
524 channel_args->args[i].value.integer);
525 }
526 }
527 }
528 }
529
530 gpr_mu_lock(&t->mu);
531 t->calling_back = 1;
Craig Tiller06aeea72015-04-23 10:54:45 -0700532 ref_transport(t); /* matches unref at end of this function */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800533 gpr_mu_unlock(&t->mu);
534
535 sr = setup(arg, &t->base, t->metadata_context);
536
537 lock(t);
538 t->cb = sr.callbacks;
539 t->cb_user_data = sr.user_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800540 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800541 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800542 unlock(t);
Craig Tillerdcf9c0e2015-02-11 16:12:41 -0800543
Craig Tiller06aeea72015-04-23 10:54:45 -0700544 ref_transport(t); /* matches unref inside recv_data */
Craig Tillerdcf9c0e2015-02-11 16:12:41 -0800545 recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
546
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800547 unref_transport(t);
548}
549
550static void destroy_transport(grpc_transport *gt) {
551 transport *t = (transport *)gt;
552
Craig Tiller748fe3f2015-03-02 07:48:50 -0800553 lock(t);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800554 t->destroying = 1;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000555 /* Wait for pending stuff to finish.
556 We need to be not calling back to ensure that closed() gets a chance to
557 trigger if needed during unlock() before we die.
558 We need to be not writing as cancellation finalization may produce some
559 callbacks that NEED to be made to close out some streams when t->writing
560 becomes 0. */
561 while (t->calling_back || t->writing) {
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800562 gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
563 }
Craig Tiller748fe3f2015-03-02 07:48:50 -0800564 drop_connection(t);
565 unlock(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800566
Craig Tillerbb88a042015-03-02 10:56:33 -0800567 /* The drop_connection() above puts the transport into an error state, and
568 the follow-up unlock should then (as part of the cleanup work it does)
569 ensure that cb is NULL, and therefore not call back anything further.
570 This check validates this very subtle behavior.
571 It's shutdown path, so I don't believe an extra lock pair is going to be
572 problematic for performance. */
Craig Tillerb9eb1802015-03-02 16:41:32 +0000573 lock(t);
574 GPR_ASSERT(!t->cb);
575 unlock(t);
576
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800577 unref_transport(t);
578}
579
580static void close_transport(grpc_transport *gt) {
581 transport *t = (transport *)gt;
582 gpr_mu_lock(&t->mu);
Craig Tillerd75fe662015-02-21 07:30:49 -0800583 GPR_ASSERT(!t->closed);
584 t->closed = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800585 if (t->ep) {
586 grpc_endpoint_shutdown(t->ep);
587 }
588 gpr_mu_unlock(&t->mu);
589}
590
nnoble0c475f02014-12-05 15:37:39 -0800591static void goaway(grpc_transport *gt, grpc_status_code status,
592 gpr_slice debug_data) {
593 transport *t = (transport *)gt;
594 lock(t);
595 grpc_chttp2_goaway_append(t->last_incoming_stream_id,
596 grpc_chttp2_grpc_status_to_http2_error(status),
597 debug_data, &t->qbuf);
598 unlock(t);
599}
600
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800601static int init_stream(grpc_transport *gt, grpc_stream *gs,
Craig Tiller50d9db52015-04-23 10:52:14 -0700602 const void *server_data, grpc_transport_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800603 transport *t = (transport *)gt;
604 stream *s = (stream *)gs;
605
Craig Tillerc079c112015-04-22 15:23:39 -0700606 memset(s, 0, sizeof(*s));
607
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800608 ref_transport(t);
609
610 if (!server_data) {
611 lock(t);
612 s->id = 0;
613 } else {
Craig Tiller3f2c2212015-04-23 07:56:33 -0700614 /* already locked */
Craig Tiller5c019ae2015-04-17 16:46:53 -0700615 s->id = (gpr_uint32)(gpr_uintptr)server_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800616 t->incoming_stream = s;
617 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
618 }
619
ctiller493fbcc2014-12-07 15:09:10 -0800620 s->outgoing_window =
621 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
622 s->incoming_window =
623 t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
Craig Tiller9c1043e2015-04-16 16:20:38 -0700624 s->incoming_deadline = gpr_inf_future;
ctiller00297df2015-01-12 11:23:09 -0800625 grpc_sopb_init(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800626 grpc_sopb_init(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800627 grpc_chttp2_data_parser_init(&s->parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800628
Craig Tiller50d9db52015-04-23 10:52:14 -0700629 if (initial_op) perform_op_locked(t, s, initial_op);
630
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800631 if (!server_data) {
632 unlock(t);
633 }
634
635 return 0;
636}
637
Craig Tillercb818ba2015-01-29 17:08:01 -0800638static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) {
639 grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
640 sopb->nops = 0;
641}
642
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800643static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
644 transport *t = (transport *)gt;
645 stream *s = (stream *)gs;
646 size_t i;
647
648 gpr_mu_lock(&t->mu);
649
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800650 /* stop parsing if we're currently parsing this stream */
651 if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id &&
652 s->id != 0) {
653 become_skip_parser(t);
654 }
655
656 for (i = 0; i < STREAM_LIST_COUNT; i++) {
657 stream_list_remove(t, s, i);
658 }
659 remove_from_stream_map(t, s);
660
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800661 gpr_mu_unlock(&t->mu);
662
Craig Tillerc079c112015-04-22 15:23:39 -0700663 GPR_ASSERT(s->outgoing_sopb == NULL);
Craig Tiller48bfcdc2015-04-24 14:24:27 -0700664 GPR_ASSERT(s->incoming_sopb == NULL);
ctiller00297df2015-01-12 11:23:09 -0800665 grpc_sopb_destroy(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800666 grpc_sopb_destroy(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800667 grpc_chttp2_data_parser_destroy(&s->parser);
Craig Tiller48bfcdc2015-04-24 14:24:27 -0700668 GPR_ASSERT(s->incoming_metadata_count == 0);
669 gpr_free(s->incoming_metadata);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800670
671 unref_transport(t);
672}
673
674/*
675 * LIST MANAGEMENT
676 */
677
ctiller00297df2015-01-12 11:23:09 -0800678static int stream_list_empty(transport *t, stream_list_id id) {
679 return t->lists[id].head == NULL;
680}
681
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800682static stream *stream_list_remove_head(transport *t, stream_list_id id) {
683 stream *s = t->lists[id].head;
684 if (s) {
685 stream *new_head = s->links[id].next;
686 GPR_ASSERT(s->included[id]);
687 if (new_head) {
688 t->lists[id].head = new_head;
689 new_head->links[id].prev = NULL;
690 } else {
691 t->lists[id].head = NULL;
692 t->lists[id].tail = NULL;
693 }
694 s->included[id] = 0;
695 }
696 return s;
697}
698
699static void stream_list_remove(transport *t, stream *s, stream_list_id id) {
700 if (!s->included[id]) return;
701 s->included[id] = 0;
702 if (s->links[id].prev) {
703 s->links[id].prev->links[id].next = s->links[id].next;
704 } else {
705 GPR_ASSERT(t->lists[id].head == s);
706 t->lists[id].head = s->links[id].next;
707 }
708 if (s->links[id].next) {
709 s->links[id].next->links[id].prev = s->links[id].prev;
710 } else {
711 t->lists[id].tail = s->links[id].prev;
712 }
713}
714
715static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
716 stream *old_tail;
717 GPR_ASSERT(!s->included[id]);
718 old_tail = t->lists[id].tail;
719 s->links[id].next = NULL;
720 s->links[id].prev = old_tail;
721 if (old_tail) {
722 old_tail->links[id].next = s;
723 } else {
724 s->links[id].prev = NULL;
725 t->lists[id].head = s;
726 }
727 t->lists[id].tail = s;
728 s->included[id] = 1;
729}
730
731static void stream_list_join(transport *t, stream *s, stream_list_id id) {
732 if (s->included[id]) {
733 return;
734 }
735 stream_list_add_tail(t, s, id);
736}
737
738static void remove_from_stream_map(transport *t, stream *s) {
739 if (s->id == 0) return;
Craig Tiller1a727fd2015-04-24 13:21:22 -0700740 IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d",
741 t->is_client ? "CLI" : "SVR", s->id));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800742 if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
743 maybe_start_some_streams(t);
744 }
745}
746
747/*
748 * LOCK MANAGEMENT
749 */
750
751/* We take a transport-global lock in response to calls coming in from above,
752 and in response to data being received from below. New data to be written
753 is always queued, as are callbacks to process data. During unlock() we
754 check our todo lists and initiate callbacks and flush writes. */
755
756static void lock(transport *t) { gpr_mu_lock(&t->mu); }
757
758static void unlock(transport *t) {
759 int start_write = 0;
760 int perform_callbacks = 0;
761 int call_closed = 0;
nnoble0c475f02014-12-05 15:37:39 -0800762 int num_goaways = 0;
763 int i;
764 pending_goaway *goaways = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800765 grpc_endpoint *ep = t->ep;
Craig Tillere3018e62015-02-13 17:05:19 -0800766 grpc_stream_op_buffer nuke_now;
Craig Tillerd1345de2015-02-24 21:55:20 -0800767 const grpc_transport_callbacks *cb = t->cb;
Craig Tiller06059952015-02-18 08:34:56 -0800768
Craig Tillere3018e62015-02-13 17:05:19 -0800769 grpc_sopb_init(&nuke_now);
770 if (t->nuke_later_sopb.nops) {
771 grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
Craig Tillercb818ba2015-01-29 17:08:01 -0800772 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800773
774 /* see if we need to trigger a write - and if so, get the data ready */
775 if (ep && !t->writing) {
776 t->writing = start_write = prepare_write(t);
777 if (start_write) {
778 ref_transport(t);
779 }
780 }
781
ctiller00297df2015-01-12 11:23:09 -0800782 if (!t->writing) {
783 finalize_cancellations(t);
784 }
785
Craig Tillerc079c112015-04-22 15:23:39 -0700786 finish_reads(t);
787
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800788 /* gather any callbacks that need to be made */
Craig Tillerd1345de2015-02-24 21:55:20 -0800789 if (!t->calling_back && cb) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800790 perform_callbacks = prepare_callbacks(t);
791 if (perform_callbacks) {
792 t->calling_back = 1;
793 }
Craig Tillerb9eb1802015-03-02 16:41:32 +0000794 if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800795 call_closed = 1;
796 t->calling_back = 1;
Craig Tiller5c019ae2015-04-17 16:46:53 -0700797 t->cb = NULL; /* no more callbacks */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800798 t->error_state = ERROR_STATE_NOTIFIED;
799 }
nnoble0c475f02014-12-05 15:37:39 -0800800 if (t->num_pending_goaways) {
801 goaways = t->pending_goaways;
802 num_goaways = t->num_pending_goaways;
803 t->pending_goaways = NULL;
804 t->num_pending_goaways = 0;
ctiller82e275f2014-12-12 08:43:28 -0800805 t->cap_pending_goaways = 0;
nnoble0c475f02014-12-05 15:37:39 -0800806 t->calling_back = 1;
807 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800808 }
809
nnoble0c475f02014-12-05 15:37:39 -0800810 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800811 ref_transport(t);
812 }
813
814 /* finally unlock */
815 gpr_mu_unlock(&t->mu);
816
817 /* perform some callbacks if necessary */
nnoble0c475f02014-12-05 15:37:39 -0800818 for (i = 0; i < num_goaways; i++) {
Craig Tiller5c019ae2015-04-17 16:46:53 -0700819 cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
nnoble0c475f02014-12-05 15:37:39 -0800820 }
821
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800822 if (perform_callbacks) {
Craig Tillerd1345de2015-02-24 21:55:20 -0800823 run_callbacks(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800824 }
825
826 if (call_closed) {
Craig Tiller748fe3f2015-03-02 07:48:50 -0800827 call_cb_closed(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800828 }
829
830 /* write some bytes if necessary */
ctiller00297df2015-01-12 11:23:09 -0800831 if (start_write) {
832 /* ultimately calls unref_transport(t); and clears t->writing */
833 perform_write(t, ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800834 }
835
nnoble0c475f02014-12-05 15:37:39 -0800836 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800837 lock(t);
838 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800839 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800840 unlock(t);
841 unref_transport(t);
842 }
nnoble0c475f02014-12-05 15:37:39 -0800843
Craig Tillere3018e62015-02-13 17:05:19 -0800844 grpc_sopb_destroy(&nuke_now);
Craig Tillercb818ba2015-01-29 17:08:01 -0800845
nnoble0c475f02014-12-05 15:37:39 -0800846 gpr_free(goaways);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800847}
848
849/*
850 * OUTPUT PROCESSING
851 */
852
853static void push_setting(transport *t, grpc_chttp2_setting_id id,
854 gpr_uint32 value) {
855 const grpc_chttp2_setting_parameters *sp =
856 &grpc_chttp2_settings_parameters[id];
857 gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
858 if (use_value != value) {
859 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
860 value, use_value);
861 }
862 if (use_value != t->settings[LOCAL_SETTINGS][id]) {
863 t->settings[LOCAL_SETTINGS][id] = use_value;
864 t->dirtied_local_settings = 1;
865 }
866}
867
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800868static int prepare_write(transport *t) {
869 stream *s;
ctiller00297df2015-01-12 11:23:09 -0800870 gpr_uint32 window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800871
872 /* simple writes are queued to qbuf, and flushed here */
Craig Tiller721f3622015-04-13 16:14:28 -0700873 gpr_slice_buffer_swap(&t->qbuf, &t->outbuf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800874 GPR_ASSERT(t->qbuf.count == 0);
875
876 if (t->dirtied_local_settings && !t->sent_local_settings) {
877 gpr_slice_buffer_add(
ctiller493fbcc2014-12-07 15:09:10 -0800878 &t->outbuf, grpc_chttp2_settings_create(
879 t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
880 t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
881 t->force_send_settings = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800882 t->dirtied_local_settings = 0;
883 t->sent_local_settings = 1;
884 }
885
886 /* for each stream that's become writable, frame it's data (according to
887 available window sizes) and add to the output buffer */
Craig Tiller84b88842015-04-20 08:47:52 -0700888 while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
889 s->outgoing_window > 0) {
ctiller00297df2015-01-12 11:23:09 -0800890 window_delta = grpc_chttp2_preencode(
Craig Tillerc079c112015-04-22 15:23:39 -0700891 s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
ctiller00297df2015-01-12 11:23:09 -0800892 GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
893 t->outgoing_window -= window_delta;
894 s->outgoing_window -= window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800895
Craig Tiller06aeea72015-04-23 10:54:45 -0700896 if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
897 s->outgoing_sopb->nops == 0) {
Craig Tillerc079c112015-04-22 15:23:39 -0700898 s->send_closed = 1;
899 }
900 if (s->writing_sopb.nops > 0 || s->send_closed) {
ctiller00297df2015-01-12 11:23:09 -0800901 stream_list_join(t, s, WRITING);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800902 }
903
Craig Tillerc079c112015-04-22 15:23:39 -0700904 /* we should either exhaust window or have no ops left, but not both */
Craig Tillerc079c112015-04-22 15:23:39 -0700905 if (s->outgoing_sopb->nops == 0) {
906 s->outgoing_sopb = NULL;
907 schedule_cb(t, s->send_done_closure, 1);
Craig Tillere8893142015-04-23 16:02:01 -0700908 } else if (s->outgoing_window) {
909 stream_list_add_tail(t, s, WRITABLE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800910 }
911 }
912
913 /* for each stream that wants to update its window, add that window here */
914 while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
ctiller00297df2015-01-12 11:23:09 -0800915 window_delta =
ctiller493fbcc2014-12-07 15:09:10 -0800916 t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
917 s->incoming_window;
ctiller00297df2015-01-12 11:23:09 -0800918 if (!s->read_closed && window_delta) {
919 gpr_slice_buffer_add(
920 &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
921 s->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800922 }
923 }
924
925 /* if the transport is ready to send a window update, do so here also */
ctiller493fbcc2014-12-07 15:09:10 -0800926 if (t->incoming_window < t->connection_window_target * 3 / 4) {
ctiller00297df2015-01-12 11:23:09 -0800927 window_delta = t->connection_window_target - t->incoming_window;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800928 gpr_slice_buffer_add(&t->outbuf,
ctiller00297df2015-01-12 11:23:09 -0800929 grpc_chttp2_window_update_create(0, window_delta));
930 t->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800931 }
932
ctiller00297df2015-01-12 11:23:09 -0800933 return t->outbuf.length > 0 || !stream_list_empty(t, WRITING);
934}
935
936static void finalize_outbuf(transport *t) {
937 stream *s;
938
939 while ((s = stream_list_remove_head(t, WRITING))) {
940 grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
Craig Tiller06aeea72015-04-23 10:54:45 -0700941 s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
ctiller00297df2015-01-12 11:23:09 -0800942 s->writing_sopb.nops = 0;
Craig Tillerc079c112015-04-22 15:23:39 -0700943 if (s->send_closed) {
ctiller00297df2015-01-12 11:23:09 -0800944 stream_list_join(t, s, WRITTEN_CLOSED);
945 }
946 }
947}
948
949static void finish_write_common(transport *t, int success) {
950 stream *s;
951
952 lock(t);
953 if (!success) {
954 drop_connection(t);
955 }
956 while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
Craig Tillerc079c112015-04-22 15:23:39 -0700957 s->write_state = WRITE_STATE_SENT_CLOSE;
958 if (!s->cancelled) {
959 maybe_finish_read(t, s);
960 }
ctiller00297df2015-01-12 11:23:09 -0800961 }
962 t->outbuf.count = 0;
963 t->outbuf.length = 0;
964 /* leave the writing flag up on shutdown to prevent further writes in unlock()
965 from starting */
966 t->writing = 0;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000967 if (t->destroying) {
968 gpr_cv_signal(&t->cv);
969 }
ctiller00297df2015-01-12 11:23:09 -0800970 if (!t->reading) {
971 grpc_endpoint_destroy(t->ep);
972 t->ep = NULL;
ctiller00297df2015-01-12 11:23:09 -0800973 unref_transport(t); /* safe because we'll still have the ref for write */
974 }
975 unlock(t);
976
977 unref_transport(t);
978}
979
980static void finish_write(void *tp, grpc_endpoint_cb_status error) {
981 transport *t = tp;
982 finish_write_common(t, error == GRPC_ENDPOINT_CB_OK);
983}
984
985static void perform_write(transport *t, grpc_endpoint *ep) {
986 finalize_outbuf(t);
987
988 GPR_ASSERT(t->outbuf.count > 0);
989
990 switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
991 finish_write, t)) {
992 case GRPC_ENDPOINT_WRITE_DONE:
993 finish_write_common(t, 1);
994 break;
995 case GRPC_ENDPOINT_WRITE_ERROR:
996 finish_write_common(t, 0);
997 break;
998 case GRPC_ENDPOINT_WRITE_PENDING:
999 break;
1000 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001001}
1002
1003static void maybe_start_some_streams(transport *t) {
1004 while (
1005 grpc_chttp2_stream_map_size(&t->stream_map) <
1006 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
1007 stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
1008 if (!s) break;
1009
Craig Tiller1a727fd2015-04-24 13:21:22 -07001010 IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d",
1011 t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
Craig Tillerc1f75602015-04-24 11:44:53 -07001012
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001013 GPR_ASSERT(s->id == 0);
1014 s->id = t->next_stream_id;
1015 t->next_stream_id += 2;
1016 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1017 stream_list_join(t, s, WRITABLE);
1018 }
1019}
1020
Craig Tiller50d9db52015-04-23 10:52:14 -07001021static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001022 if (op->cancel_with_status != GRPC_STATUS_OK) {
1023 cancel_stream(
1024 t, s, op->cancel_with_status,
Craig Tiller1a727fd2015-04-24 13:21:22 -07001025 grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
1026 op->cancel_message, 1);
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001027 }
1028
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001029 if (op->send_ops) {
Craig Tillerc079c112015-04-22 15:23:39 -07001030 GPR_ASSERT(s->outgoing_sopb == NULL);
1031 s->send_done_closure.cb = op->on_done_send;
1032 s->send_done_closure.user_data = op->send_user_data;
1033 if (!s->cancelled) {
1034 s->outgoing_sopb = op->send_ops;
1035 if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
1036 s->write_state = WRITE_STATE_QUEUED_CLOSE;
1037 }
1038 if (s->id == 0) {
Craig Tiller1a727fd2015-04-24 13:21:22 -07001039 IF_TRACING(gpr_log(GPR_DEBUG,
1040 "HTTP:%s: New stream %p waiting for concurrency",
1041 t->is_client ? "CLI" : "SVR", s));
Craig Tillerc079c112015-04-22 15:23:39 -07001042 stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
1043 maybe_start_some_streams(t);
1044 } else if (s->outgoing_window > 0) {
1045 stream_list_join(t, s, WRITABLE);
1046 }
1047 } else {
1048 schedule_nuke_sopb(t, op->send_ops);
1049 schedule_cb(t, s->send_done_closure, 0);
1050 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001051 }
1052
1053 if (op->recv_ops) {
Craig Tillerc079c112015-04-22 15:23:39 -07001054 GPR_ASSERT(s->incoming_sopb == NULL);
1055 s->recv_done_closure.cb = op->on_done_recv;
1056 s->recv_done_closure.user_data = op->recv_user_data;
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001057 s->incoming_sopb = op->recv_ops;
1058 s->incoming_sopb->nops = 0;
1059 s->publish_state = op->recv_state;
1060 maybe_finish_read(t, s);
1061 maybe_join_window_updates(t, s);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001062 }
1063
1064 if (op->bind_pollset) {
Craig Tillerc079c112015-04-22 15:23:39 -07001065 add_to_pollset_locked(t, op->bind_pollset);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001066 }
Craig Tiller50d9db52015-04-23 10:52:14 -07001067}
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001068
Craig Tiller06aeea72015-04-23 10:54:45 -07001069static void perform_op(grpc_transport *gt, grpc_stream *gs,
1070 grpc_transport_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -07001071 transport *t = (transport *)gt;
1072 stream *s = (stream *)gs;
1073
1074 lock(t);
1075 perform_op_locked(t, s, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001076 unlock(t);
1077}
1078
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001079static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
1080 void *user_data) {
1081 transport *t = (transport *)gt;
1082 outstanding_ping *p;
1083
1084 lock(t);
1085 if (t->ping_capacity == t->ping_count) {
1086 t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2);
1087 t->pings =
1088 gpr_realloc(t->pings, sizeof(outstanding_ping) * t->ping_capacity);
1089 }
1090 p = &t->pings[t->ping_count++];
nnoble8f4e42c2014-12-11 16:36:46 -08001091 p->id[0] = (t->ping_counter >> 56) & 0xff;
1092 p->id[1] = (t->ping_counter >> 48) & 0xff;
1093 p->id[2] = (t->ping_counter >> 40) & 0xff;
1094 p->id[3] = (t->ping_counter >> 32) & 0xff;
1095 p->id[4] = (t->ping_counter >> 24) & 0xff;
1096 p->id[5] = (t->ping_counter >> 16) & 0xff;
1097 p->id[6] = (t->ping_counter >> 8) & 0xff;
1098 p->id[7] = t->ping_counter & 0xff;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001099 p->cb = cb;
1100 p->user_data = user_data;
1101 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
1102 unlock(t);
1103}
1104
1105/*
1106 * INPUT PROCESSING
1107 */
1108
ctiller00297df2015-01-12 11:23:09 -08001109static void finalize_cancellations(transport *t) {
1110 stream *s;
1111
1112 while ((s = stream_list_remove_head(t, CANCELLED))) {
1113 s->read_closed = 1;
Craig Tillerc079c112015-04-22 15:23:39 -07001114 s->write_state = WRITE_STATE_SENT_CLOSE;
1115 maybe_finish_read(t, s);
ctiller00297df2015-01-12 11:23:09 -08001116 }
1117}
1118
Craig Tiller9c1043e2015-04-16 16:20:38 -07001119static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
1120 if (s->incoming_metadata_capacity == s->incoming_metadata_count) {
Craig Tiller5c019ae2015-04-17 16:46:53 -07001121 s->incoming_metadata_capacity =
1122 GPR_MAX(8, 2 * s->incoming_metadata_capacity);
1123 s->incoming_metadata =
1124 gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) *
1125 s->incoming_metadata_capacity);
Craig Tiller9c1043e2015-04-16 16:20:38 -07001126 }
1127 s->incoming_metadata[s->incoming_metadata_count++].md = elem;
1128}
1129
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001130static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
1131 grpc_status_code local_status,
1132 grpc_chttp2_error_code error_code,
Craig Tiller1a727fd2015-04-24 13:21:22 -07001133 grpc_mdstr *optional_message, int send_rst) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001134 int had_outgoing;
Craig Tiller8b433a22015-01-23 14:47:07 -08001135 char buffer[GPR_LTOA_MIN_BUFSIZE];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001136
1137 if (s) {
1138 /* clear out any unreported input & output: nobody cares anymore */
Craig Tillerc079c112015-04-22 15:23:39 -07001139 had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
Craig Tillercb818ba2015-01-29 17:08:01 -08001140 schedule_nuke_sopb(t, &s->parser.incoming_sopb);
Craig Tillerc079c112015-04-22 15:23:39 -07001141 if (s->outgoing_sopb) {
1142 schedule_nuke_sopb(t, s->outgoing_sopb);
Craig Tiller7abc8d22015-04-23 16:43:55 -07001143 s->outgoing_sopb = NULL;
Craig Tillerc52779f2015-04-24 13:19:48 -07001144 stream_list_remove(t, s, WRITABLE);
Craig Tillerc079c112015-04-22 15:23:39 -07001145 schedule_cb(t, s->send_done_closure, 0);
1146 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001147 if (s->cancelled) {
1148 send_rst = 0;
Craig Tiller06aeea72015-04-23 10:54:45 -07001149 } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE ||
1150 had_outgoing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001151 s->cancelled = 1;
ctiller00297df2015-01-12 11:23:09 -08001152 stream_list_join(t, s, CANCELLED);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001153
Craig Tillera7ed5d92015-01-23 11:30:16 -08001154 gpr_ltoa(local_status, buffer);
Craig Tiller5c019ae2015-04-17 16:46:53 -07001155 add_incoming_metadata(
1156 t, s,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001157 grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001158 if (!optional_message) {
1159 switch (local_status) {
1160 case GRPC_STATUS_CANCELLED:
1161 add_incoming_metadata(
1162 t, s, grpc_mdelem_from_strings(t->metadata_context,
1163 "grpc-message", "Cancelled"));
1164 break;
1165 default:
1166 break;
1167 }
1168 } else {
Craig Tiller1a727fd2015-04-24 13:21:22 -07001169 add_incoming_metadata(
1170 t, s,
1171 grpc_mdelem_from_metadata_strings(
1172 t->metadata_context,
1173 grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
1174 grpc_mdstr_ref(optional_message)));
Craig Tillerbd222712015-04-17 16:09:40 -07001175 }
Craig Tiller7d4a96a2015-04-24 07:54:07 -07001176 add_metadata_batch(t, s);
Craig Tillerc079c112015-04-22 15:23:39 -07001177 maybe_finish_read(t, s);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001178 }
1179 }
1180 if (!id) send_rst = 0;
1181 if (send_rst) {
1182 gpr_slice_buffer_add(&t->qbuf,
1183 grpc_chttp2_rst_stream_create(id, error_code));
1184 }
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001185 if (optional_message) {
1186 grpc_mdstr_unref(optional_message);
1187 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001188}
1189
1190static void cancel_stream_id(transport *t, gpr_uint32 id,
1191 grpc_status_code local_status,
1192 grpc_chttp2_error_code error_code, int send_rst) {
1193 cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001194 NULL, send_rst);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001195}
1196
1197static void cancel_stream(transport *t, stream *s,
1198 grpc_status_code local_status,
Craig Tiller1a727fd2015-04-24 13:21:22 -07001199 grpc_chttp2_error_code error_code,
1200 grpc_mdstr *optional_message, int send_rst) {
1201 cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
1202 send_rst);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001203}
1204
1205static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
1206 cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001207 GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001208}
1209
1210static void end_all_the_calls(transport *t) {
1211 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
1212}
1213
1214static void drop_connection(transport *t) {
1215 if (t->error_state == ERROR_STATE_NONE) {
1216 t->error_state = ERROR_STATE_SEEN;
1217 }
1218 end_all_the_calls(t);
1219}
1220
Craig Tillerc079c112015-04-22 15:23:39 -07001221static void maybe_finish_read(transport *t, stream *s) {
1222 if (s->incoming_sopb) {
1223 stream_list_join(t, s, FINISHED_READ_OP);
1224 }
1225}
1226
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001227static void maybe_join_window_updates(transport *t, stream *s) {
Craig Tillerc079c112015-04-22 15:23:39 -07001228 if (s->incoming_sopb != NULL &&
ctiller493fbcc2014-12-07 15:09:10 -08001229 s->incoming_window <
1230 t->settings[LOCAL_SETTINGS]
1231 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
1232 3 / 4) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001233 stream_list_join(t, s, WINDOW_UPDATE);
1234 }
1235}
1236
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001237static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
1238 if (t->incoming_frame_size > t->incoming_window) {
1239 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1240 t->incoming_frame_size, t->incoming_window);
1241 return GRPC_CHTTP2_CONNECTION_ERROR;
1242 }
1243
1244 if (t->incoming_frame_size > s->incoming_window) {
1245 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1246 t->incoming_frame_size, s->incoming_window);
1247 return GRPC_CHTTP2_CONNECTION_ERROR;
1248 }
1249
1250 t->incoming_window -= t->incoming_frame_size;
1251 s->incoming_window -= t->incoming_frame_size;
1252
1253 /* if the stream incoming window is getting low, schedule an update */
1254 maybe_join_window_updates(t, s);
1255
1256 return GRPC_CHTTP2_PARSE_OK;
1257}
1258
1259static stream *lookup_stream(transport *t, gpr_uint32 id) {
1260 return grpc_chttp2_stream_map_find(&t->stream_map, id);
1261}
1262
1263static grpc_chttp2_parse_error skip_parser(void *parser,
1264 grpc_chttp2_parse_state *st,
1265 gpr_slice slice, int is_last) {
1266 return GRPC_CHTTP2_PARSE_OK;
1267}
1268
1269static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
1270
1271static int init_skip_frame(transport *t, int is_header) {
1272 if (is_header) {
1273 int is_eoh = t->expect_continuation_stream_id != 0;
1274 t->parser = grpc_chttp2_header_parser_parse;
1275 t->parser_data = &t->hpack_parser;
1276 t->hpack_parser.on_header = skip_header;
1277 t->hpack_parser.on_header_user_data = NULL;
1278 t->hpack_parser.is_boundary = is_eoh;
1279 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1280 } else {
1281 t->parser = skip_parser;
1282 }
1283 return 1;
1284}
1285
1286static void become_skip_parser(transport *t) {
1287 init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse);
1288}
1289
1290static int init_data_frame_parser(transport *t) {
1291 stream *s = lookup_stream(t, t->incoming_stream_id);
1292 grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
1293 if (!s || s->read_closed) return init_skip_frame(t, 0);
1294 if (err == GRPC_CHTTP2_PARSE_OK) {
1295 err = update_incoming_window(t, s);
1296 }
1297 if (err == GRPC_CHTTP2_PARSE_OK) {
1298 err = grpc_chttp2_data_parser_begin_frame(&s->parser,
1299 t->incoming_frame_flags);
1300 }
1301 switch (err) {
1302 case GRPC_CHTTP2_PARSE_OK:
1303 t->incoming_stream = s;
1304 t->parser = grpc_chttp2_data_parser_parse;
1305 t->parser_data = &s->parser;
1306 return 1;
1307 case GRPC_CHTTP2_STREAM_ERROR:
1308 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1309 GRPC_CHTTP2_INTERNAL_ERROR),
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001310 GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001311 return init_skip_frame(t, 0);
1312 case GRPC_CHTTP2_CONNECTION_ERROR:
1313 drop_connection(t);
1314 return 0;
1315 }
1316 gpr_log(GPR_ERROR, "should never reach here");
1317 abort();
1318 return 0;
1319}
1320
1321static void free_timeout(void *p) { gpr_free(p); }
1322
1323static void on_header(void *tp, grpc_mdelem *md) {
1324 transport *t = tp;
1325 stream *s = t->incoming_stream;
1326
1327 GPR_ASSERT(s);
Craig Tillerd50e5652015-02-24 16:46:22 -08001328
Craig Tiller1a727fd2015-04-24 13:21:22 -07001329 IF_TRACING(gpr_log(
1330 GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
1331 grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
Craig Tillerd50e5652015-02-24 16:46:22 -08001332
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001333 if (md->key == t->str_grpc_timeout) {
1334 gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
1335 if (!cached_timeout) {
1336 /* not already parsed: parse it now, and store the result away */
1337 cached_timeout = gpr_malloc(sizeof(gpr_timespec));
1338 if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value),
1339 cached_timeout)) {
1340 gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
1341 grpc_mdstr_as_c_string(md->value));
1342 *cached_timeout = gpr_inf_future;
1343 }
1344 grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
1345 }
Craig Tiller9c1043e2015-04-16 16:20:38 -07001346 s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001347 grpc_mdelem_unref(md);
1348 } else {
Craig Tiller9c1043e2015-04-16 16:20:38 -07001349 add_incoming_metadata(t, s, md);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001350 }
Craig Tillerc079c112015-04-22 15:23:39 -07001351 maybe_finish_read(t, s);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001352}
1353
1354static int init_header_frame_parser(transport *t, int is_continuation) {
1355 int is_eoh =
1356 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
1357 stream *s;
1358
1359 if (is_eoh) {
1360 t->expect_continuation_stream_id = 0;
1361 } else {
1362 t->expect_continuation_stream_id = t->incoming_stream_id;
1363 }
1364
1365 if (!is_continuation) {
1366 t->header_eof =
1367 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
1368 }
1369
1370 /* could be a new stream or an existing stream */
1371 s = lookup_stream(t, t->incoming_stream_id);
1372 if (!s) {
1373 if (is_continuation) {
1374 gpr_log(GPR_ERROR, "stream disbanded before CONTINUATION received");
1375 return init_skip_frame(t, 1);
1376 }
1377 if (t->is_client) {
1378 if ((t->incoming_stream_id & 1) &&
1379 t->incoming_stream_id < t->next_stream_id) {
1380 /* this is an old (probably cancelled) stream */
1381 } else {
1382 gpr_log(GPR_ERROR, "ignoring new stream creation on client");
1383 }
1384 return init_skip_frame(t, 1);
nnoble0c475f02014-12-05 15:37:39 -08001385 } else if (t->last_incoming_stream_id > t->incoming_stream_id) {
1386 gpr_log(GPR_ERROR,
1387 "ignoring out of order new stream request on server; last stream "
1388 "id=%d, new stream id=%d",
1389 t->last_incoming_stream_id, t->incoming_stream);
1390 return init_skip_frame(t, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001391 }
1392 t->incoming_stream = NULL;
1393 /* if stream is accepted, we set incoming_stream in init_stream */
1394 t->cb->accept_stream(t->cb_user_data, &t->base,
Craig Tiller5c019ae2015-04-17 16:46:53 -07001395 (void *)(gpr_uintptr)t->incoming_stream_id);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001396 s = t->incoming_stream;
1397 if (!s) {
1398 gpr_log(GPR_ERROR, "stream not accepted");
1399 return init_skip_frame(t, 1);
1400 }
1401 } else {
1402 t->incoming_stream = s;
1403 }
1404 if (t->incoming_stream->read_closed) {
1405 gpr_log(GPR_ERROR, "skipping already closed stream header");
1406 t->incoming_stream = NULL;
1407 return init_skip_frame(t, 1);
1408 }
1409 t->parser = grpc_chttp2_header_parser_parse;
1410 t->parser_data = &t->hpack_parser;
1411 t->hpack_parser.on_header = on_header;
1412 t->hpack_parser.on_header_user_data = t;
1413 t->hpack_parser.is_boundary = is_eoh;
1414 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1415 if (!is_continuation &&
1416 (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
1417 grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser);
1418 }
1419 return 1;
1420}
1421
1422static int init_window_update_frame_parser(transport *t) {
1423 int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
1424 &t->simple_parsers.window_update,
1425 t->incoming_frame_size,
1426 t->incoming_frame_flags);
1427 if (!ok) {
1428 drop_connection(t);
1429 }
1430 t->parser = grpc_chttp2_window_update_parser_parse;
1431 t->parser_data = &t->simple_parsers.window_update;
1432 return ok;
1433}
1434
1435static int init_ping_parser(transport *t) {
1436 int ok = GRPC_CHTTP2_PARSE_OK ==
1437 grpc_chttp2_ping_parser_begin_frame(&t->simple_parsers.ping,
1438 t->incoming_frame_size,
1439 t->incoming_frame_flags);
1440 if (!ok) {
1441 drop_connection(t);
1442 }
1443 t->parser = grpc_chttp2_ping_parser_parse;
1444 t->parser_data = &t->simple_parsers.ping;
1445 return ok;
1446}
1447
nnoble0c475f02014-12-05 15:37:39 -08001448static int init_goaway_parser(transport *t) {
1449 int ok =
1450 GRPC_CHTTP2_PARSE_OK ==
1451 grpc_chttp2_goaway_parser_begin_frame(
1452 &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
1453 if (!ok) {
1454 drop_connection(t);
1455 }
1456 t->parser = grpc_chttp2_goaway_parser_parse;
1457 t->parser_data = &t->goaway_parser;
1458 return ok;
1459}
1460
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001461static int init_settings_frame_parser(transport *t) {
1462 int ok = GRPC_CHTTP2_PARSE_OK ==
1463 grpc_chttp2_settings_parser_begin_frame(
1464 &t->simple_parsers.settings, t->incoming_frame_size,
1465 t->incoming_frame_flags, t->settings[PEER_SETTINGS]);
1466 if (!ok) {
1467 drop_connection(t);
1468 }
1469 if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
1470 memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS],
1471 GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
1472 }
1473 t->parser = grpc_chttp2_settings_parser_parse;
1474 t->parser_data = &t->simple_parsers.settings;
1475 return ok;
1476}
1477
1478static int init_frame_parser(transport *t) {
1479 if (t->expect_continuation_stream_id != 0) {
1480 if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
1481 gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
1482 t->incoming_frame_type);
1483 return 0;
1484 }
1485 if (t->expect_continuation_stream_id != t->incoming_stream_id) {
1486 gpr_log(GPR_ERROR,
1487 "Expected CONTINUATION frame for stream %08x, got stream %08x",
1488 t->expect_continuation_stream_id, t->incoming_stream_id);
1489 return 0;
1490 }
1491 return init_header_frame_parser(t, 1);
1492 }
1493 switch (t->incoming_frame_type) {
1494 case GRPC_CHTTP2_FRAME_DATA:
1495 return init_data_frame_parser(t);
1496 case GRPC_CHTTP2_FRAME_HEADER:
1497 return init_header_frame_parser(t, 0);
1498 case GRPC_CHTTP2_FRAME_CONTINUATION:
1499 gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
1500 return 0;
1501 case GRPC_CHTTP2_FRAME_RST_STREAM:
1502 /* TODO(ctiller): actually parse the reason */
1503 cancel_stream_id(
1504 t, t->incoming_stream_id,
1505 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
1506 GRPC_CHTTP2_CANCEL, 0);
1507 return init_skip_frame(t, 0);
1508 case GRPC_CHTTP2_FRAME_SETTINGS:
1509 return init_settings_frame_parser(t);
1510 case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
1511 return init_window_update_frame_parser(t);
1512 case GRPC_CHTTP2_FRAME_PING:
1513 return init_ping_parser(t);
nnoble0c475f02014-12-05 15:37:39 -08001514 case GRPC_CHTTP2_FRAME_GOAWAY:
1515 return init_goaway_parser(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001516 default:
1517 gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
1518 return init_skip_frame(t, 0);
1519 }
1520}
1521
Craig Tiller84b88842015-04-20 08:47:52 -07001522static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
1523 return window + window_update < MAX_WINDOW;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001524}
1525
Craig Tillerbd222712015-04-17 16:09:40 -07001526static void add_metadata_batch(transport *t, stream *s) {
Craig Tiller9c1043e2015-04-16 16:20:38 -07001527 grpc_metadata_batch b;
Craig Tiller9c1043e2015-04-16 16:20:38 -07001528
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001529 b.list.head = NULL;
1530 /* Store away the last element of the list, so that in patch_metadata_ops
1531 we can reconstitute the list.
1532 We can't do list building here as later incoming metadata may reallocate
1533 the underlying array. */
1534 b.list.tail = (void*)(gpr_intptr)s->incoming_metadata_count;
Craig Tiller9c1043e2015-04-16 16:20:38 -07001535 b.garbage.head = b.garbage.tail = NULL;
1536 b.deadline = s->incoming_deadline;
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001537 s->incoming_deadline = gpr_inf_future;
Craig Tiller9c1043e2015-04-16 16:20:38 -07001538
1539 grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
Craig Tiller9c1043e2015-04-16 16:20:38 -07001540}
1541
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001542static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
1543 grpc_chttp2_parse_state st;
1544 size_t i;
1545 memset(&st, 0, sizeof(st));
1546 switch (t->parser(t->parser_data, &st, slice, is_last)) {
1547 case GRPC_CHTTP2_PARSE_OK:
1548 if (st.end_of_stream) {
1549 t->incoming_stream->read_closed = 1;
Craig Tillerc079c112015-04-22 15:23:39 -07001550 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001551 }
1552 if (st.need_flush_reads) {
Craig Tillerc079c112015-04-22 15:23:39 -07001553 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001554 }
1555 if (st.metadata_boundary) {
Craig Tillerbd222712015-04-17 16:09:40 -07001556 add_metadata_batch(t, t->incoming_stream);
Craig Tillerc079c112015-04-22 15:23:39 -07001557 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001558 }
1559 if (st.ack_settings) {
1560 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
1561 maybe_start_some_streams(t);
1562 }
1563 if (st.send_ping_ack) {
1564 gpr_slice_buffer_add(
1565 &t->qbuf,
1566 grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes));
1567 }
nnoble0c475f02014-12-05 15:37:39 -08001568 if (st.goaway) {
1569 if (t->num_pending_goaways == t->cap_pending_goaways) {
1570 t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
1571 t->pending_goaways =
1572 gpr_realloc(t->pending_goaways,
1573 sizeof(pending_goaway) * t->cap_pending_goaways);
1574 }
1575 t->pending_goaways[t->num_pending_goaways].status =
1576 grpc_chttp2_http2_error_to_grpc_status(st.goaway_error);
1577 t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text;
1578 t->num_pending_goaways++;
1579 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001580 if (st.process_ping_reply) {
1581 for (i = 0; i < t->ping_count; i++) {
1582 if (0 ==
1583 memcmp(t->pings[i].id, t->simple_parsers.ping.opaque_8bytes, 8)) {
1584 t->pings[i].cb(t->pings[i].user_data);
1585 memmove(&t->pings[i], &t->pings[i + 1],
1586 (t->ping_count - i - 1) * sizeof(outstanding_ping));
1587 t->ping_count--;
1588 break;
1589 }
1590 }
1591 }
Yang Gaof1021032015-04-18 00:10:29 -07001592 if (st.initial_window_update) {
1593 for (i = 0; i < t->stream_map.count; i++) {
Craig Tiller06aeea72015-04-23 10:54:45 -07001594 stream *s = (stream *)(t->stream_map.values[i]);
Craig Tiller84b88842015-04-20 08:47:52 -07001595 int was_window_empty = s->outgoing_window <= 0;
1596 s->outgoing_window += st.initial_window_update;
Craig Tiller06aeea72015-04-23 10:54:45 -07001597 if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
1598 s->outgoing_sopb->nops > 0) {
Craig Tiller84b88842015-04-20 08:47:52 -07001599 stream_list_join(t, s, WRITABLE);
Yang Gaof1021032015-04-18 00:10:29 -07001600 }
1601 }
1602 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001603 if (st.window_update) {
1604 if (t->incoming_stream_id) {
1605 /* if there was a stream id, this is for some stream */
1606 stream *s = lookup_stream(t, t->incoming_stream_id);
1607 if (s) {
Craig Tiller84b88842015-04-20 08:47:52 -07001608 int was_window_empty = s->outgoing_window <= 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001609 if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
1610 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1611 GRPC_CHTTP2_FLOW_CONTROL_ERROR),
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001612 GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001613 } else {
1614 s->outgoing_window += st.window_update;
1615 /* if this window update makes outgoing ops writable again,
1616 flag that */
Craig Tiller06aeea72015-04-23 10:54:45 -07001617 if (was_window_empty && s->outgoing_sopb &&
1618 s->outgoing_sopb->nops > 0) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001619 stream_list_join(t, s, WRITABLE);
1620 }
1621 }
1622 }
1623 } else {
1624 /* transport level window update */
1625 if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
1626 drop_connection(t);
1627 } else {
1628 t->outgoing_window += st.window_update;
1629 }
1630 }
1631 }
1632 return 1;
1633 case GRPC_CHTTP2_STREAM_ERROR:
1634 become_skip_parser(t);
1635 cancel_stream_id(
1636 t, t->incoming_stream_id,
1637 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
1638 GRPC_CHTTP2_INTERNAL_ERROR, 1);
1639 return 1;
1640 case GRPC_CHTTP2_CONNECTION_ERROR:
1641 drop_connection(t);
1642 return 0;
1643 }
1644 gpr_log(GPR_ERROR, "should never reach here");
1645 abort();
1646 return 0;
1647}
1648
1649static int process_read(transport *t, gpr_slice slice) {
1650 gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
1651 gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
1652 gpr_uint8 *cur = beg;
1653
1654 if (cur == end) return 1;
1655
1656 switch (t->deframe_state) {
1657 case DTS_CLIENT_PREFIX_0:
1658 case DTS_CLIENT_PREFIX_1:
1659 case DTS_CLIENT_PREFIX_2:
1660 case DTS_CLIENT_PREFIX_3:
1661 case DTS_CLIENT_PREFIX_4:
1662 case DTS_CLIENT_PREFIX_5:
1663 case DTS_CLIENT_PREFIX_6:
1664 case DTS_CLIENT_PREFIX_7:
1665 case DTS_CLIENT_PREFIX_8:
1666 case DTS_CLIENT_PREFIX_9:
1667 case DTS_CLIENT_PREFIX_10:
1668 case DTS_CLIENT_PREFIX_11:
1669 case DTS_CLIENT_PREFIX_12:
1670 case DTS_CLIENT_PREFIX_13:
1671 case DTS_CLIENT_PREFIX_14:
1672 case DTS_CLIENT_PREFIX_15:
1673 case DTS_CLIENT_PREFIX_16:
1674 case DTS_CLIENT_PREFIX_17:
1675 case DTS_CLIENT_PREFIX_18:
1676 case DTS_CLIENT_PREFIX_19:
1677 case DTS_CLIENT_PREFIX_20:
1678 case DTS_CLIENT_PREFIX_21:
1679 case DTS_CLIENT_PREFIX_22:
1680 case DTS_CLIENT_PREFIX_23:
1681 while (cur != end && t->deframe_state != DTS_FH_0) {
1682 if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) {
1683 gpr_log(GPR_ERROR,
1684 "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
1685 "at byte %d",
1686 CLIENT_CONNECT_STRING[t->deframe_state],
Craig Tiller5c019ae2015-04-17 16:46:53 -07001687 (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur,
1688 (int)*cur, t->deframe_state);
Craig Tiller5246e7a2015-01-19 14:59:08 -08001689 drop_connection(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001690 return 0;
1691 }
1692 ++cur;
1693 ++t->deframe_state;
1694 }
1695 if (cur == end) {
1696 return 1;
1697 }
1698 /* fallthrough */
1699 dts_fh_0:
1700 case DTS_FH_0:
1701 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001702 t->incoming_frame_size = ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001703 if (++cur == end) {
1704 t->deframe_state = DTS_FH_1;
1705 return 1;
1706 }
1707 /* fallthrough */
1708 case DTS_FH_1:
1709 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001710 t->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001711 if (++cur == end) {
1712 t->deframe_state = DTS_FH_2;
1713 return 1;
1714 }
1715 /* fallthrough */
1716 case DTS_FH_2:
1717 GPR_ASSERT(cur < end);
1718 t->incoming_frame_size |= *cur;
1719 if (++cur == end) {
1720 t->deframe_state = DTS_FH_3;
1721 return 1;
1722 }
1723 /* fallthrough */
1724 case DTS_FH_3:
1725 GPR_ASSERT(cur < end);
1726 t->incoming_frame_type = *cur;
1727 if (++cur == end) {
1728 t->deframe_state = DTS_FH_4;
1729 return 1;
1730 }
1731 /* fallthrough */
1732 case DTS_FH_4:
1733 GPR_ASSERT(cur < end);
1734 t->incoming_frame_flags = *cur;
1735 if (++cur == end) {
1736 t->deframe_state = DTS_FH_5;
1737 return 1;
1738 }
1739 /* fallthrough */
1740 case DTS_FH_5:
1741 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001742 t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001743 if (++cur == end) {
1744 t->deframe_state = DTS_FH_6;
1745 return 1;
1746 }
1747 /* fallthrough */
1748 case DTS_FH_6:
1749 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001750 t->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001751 if (++cur == end) {
1752 t->deframe_state = DTS_FH_7;
1753 return 1;
1754 }
1755 /* fallthrough */
1756 case DTS_FH_7:
1757 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001758 t->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001759 if (++cur == end) {
1760 t->deframe_state = DTS_FH_8;
1761 return 1;
1762 }
1763 /* fallthrough */
1764 case DTS_FH_8:
1765 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001766 t->incoming_stream_id |= ((gpr_uint32)*cur);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001767 t->deframe_state = DTS_FRAME;
1768 if (!init_frame_parser(t)) {
1769 return 0;
1770 }
Tatsuhiro Tsujikawa1cbf8d72015-03-13 23:59:40 +09001771 /* t->last_incoming_stream_id is used as last-stream-id when
1772 sending GOAWAY frame.
1773 https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
1774 says that last-stream-id is peer-initiated stream ID. So,
1775 since we don't have server pushed streams, client should send
1776 GOAWAY last-stream-id=0 in this case. */
Tatsuhiro Tsujikawad11f6102015-03-12 22:57:22 +09001777 if (!t->is_client) {
1778 t->last_incoming_stream_id = t->incoming_stream_id;
1779 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001780 if (t->incoming_frame_size == 0) {
1781 if (!parse_frame_slice(t, gpr_empty_slice(), 1)) {
1782 return 0;
1783 }
1784 if (++cur == end) {
1785 t->deframe_state = DTS_FH_0;
1786 return 1;
1787 }
1788 goto dts_fh_0; /* loop */
1789 }
1790 if (++cur == end) {
1791 return 1;
1792 }
1793 /* fallthrough */
1794 case DTS_FRAME:
1795 GPR_ASSERT(cur < end);
Craig Tiller54f9a652015-02-19 21:41:20 -08001796 if ((gpr_uint32)(end - cur) == t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001797 if (!parse_frame_slice(
1798 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
1799 return 0;
1800 }
1801 t->deframe_state = DTS_FH_0;
1802 return 1;
Craig Tiller0c0b60c2015-01-21 15:49:28 -08001803 } else if ((gpr_uint32)(end - cur) > t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001804 if (!parse_frame_slice(
1805 t, gpr_slice_sub_no_ref(slice, cur - beg,
1806 cur + t->incoming_frame_size - beg),
1807 1)) {
1808 return 0;
1809 }
1810 cur += t->incoming_frame_size;
1811 goto dts_fh_0; /* loop */
1812 } else {
1813 if (!parse_frame_slice(
1814 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
1815 return 0;
1816 }
1817 t->incoming_frame_size -= (end - cur);
1818 return 1;
1819 }
1820 gpr_log(GPR_ERROR, "should never reach here");
1821 abort();
1822 }
1823
1824 gpr_log(GPR_ERROR, "should never reach here");
1825 abort();
Nicolas "Pixel" Noble7f13eb22015-04-01 20:57:33 -07001826
1827 return 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001828}
1829
1830/* tcp read callback */
1831static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
1832 grpc_endpoint_cb_status error) {
1833 transport *t = tp;
1834 size_t i;
1835 int keep_reading = 0;
1836
1837 switch (error) {
1838 case GRPC_ENDPOINT_CB_SHUTDOWN:
1839 case GRPC_ENDPOINT_CB_EOF:
1840 case GRPC_ENDPOINT_CB_ERROR:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001841 lock(t);
1842 drop_connection(t);
1843 t->reading = 0;
1844 if (!t->writing && t->ep) {
1845 grpc_endpoint_destroy(t->ep);
1846 t->ep = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001847 unref_transport(t); /* safe as we still have a ref for read */
1848 }
1849 unlock(t);
1850 unref_transport(t);
1851 break;
1852 case GRPC_ENDPOINT_CB_OK:
1853 lock(t);
1854 for (i = 0; i < nslices && process_read(t, slices[i]); i++)
1855 ;
1856 unlock(t);
1857 keep_reading = 1;
1858 break;
1859 }
1860
1861 for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
1862
1863 if (keep_reading) {
ctiller58393c22015-01-07 14:03:30 -08001864 grpc_endpoint_notify_on_read(t->ep, recv_data, t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001865 }
1866}
1867
1868/*
1869 * CALLBACK LOOP
1870 */
1871
1872static grpc_stream_state compute_state(gpr_uint8 write_closed,
1873 gpr_uint8 read_closed) {
1874 if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
1875 if (write_closed) return GRPC_STREAM_SEND_CLOSED;
1876 if (read_closed) return GRPC_STREAM_RECV_CLOSED;
1877 return GRPC_STREAM_OPEN;
1878}
1879
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001880static void patch_metadata_ops(stream *s) {
1881 grpc_stream_op *ops = s->incoming_sopb->ops;
1882 size_t nops = s->incoming_sopb->nops;
1883 size_t i;
1884 size_t j;
1885 size_t mdidx = 0;
1886 size_t last_mdidx;
1887
1888 for (i = 0; i < nops; i++) {
1889 grpc_stream_op *op = &ops[i];
1890 if (op->type != GRPC_OP_METADATA) continue;
1891 last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
1892 GPR_ASSERT(last_mdidx > mdidx);
1893 GPR_ASSERT(last_mdidx <= s->incoming_metadata_count);
1894 op->data.metadata.list.head = &s->incoming_metadata[mdidx];
1895 op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1];
1896 for (j = mdidx + 1; j < last_mdidx; j++) {
1897 s->incoming_metadata[j].prev = &s->incoming_metadata[j-1];
1898 s->incoming_metadata[j-1].next = &s->incoming_metadata[j];
1899 }
1900 s->incoming_metadata[mdidx].prev = NULL;
1901 s->incoming_metadata[last_mdidx-1].next = NULL;
1902 mdidx = last_mdidx;
1903 }
1904 GPR_ASSERT(mdidx == s->incoming_metadata_count);
1905
1906 s->incoming_metadata_count = 0;
1907}
1908
Craig Tillerc079c112015-04-22 15:23:39 -07001909static void finish_reads(transport *t) {
1910 stream *s;
1911
1912 while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
1913 int publish = 0;
1914 GPR_ASSERT(s->incoming_sopb);
Craig Tiller06aeea72015-04-23 10:54:45 -07001915 *s->publish_state =
1916 compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
Craig Tillerc079c112015-04-22 15:23:39 -07001917 if (*s->publish_state != s->published_state) {
1918 s->published_state = *s->publish_state;
1919 publish = 1;
Craig Tillerc1f75602015-04-24 11:44:53 -07001920 if (s->published_state == GRPC_STREAM_CLOSED) {
1921 remove_from_stream_map(t, s);
1922 }
Craig Tillerc079c112015-04-22 15:23:39 -07001923 }
1924 if (s->parser.incoming_sopb.nops > 0) {
1925 grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
1926 publish = 1;
1927 }
1928 if (publish) {
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001929 if (s->incoming_metadata_count > 0) {
1930 patch_metadata_ops(s);
1931 }
Craig Tiller7e8489a2015-04-23 12:41:16 -07001932 s->incoming_sopb = NULL;
Craig Tillerc079c112015-04-22 15:23:39 -07001933 schedule_cb(t, s->recv_done_closure, 1);
1934 }
1935 }
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001936
Craig Tillerc079c112015-04-22 15:23:39 -07001937}
1938
1939static void schedule_cb(transport *t, op_closure closure, int success) {
1940 if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
Craig Tiller06aeea72015-04-23 10:54:45 -07001941 t->pending_callbacks.capacity =
1942 GPR_MAX(t->pending_callbacks.capacity * 2, 8);
1943 t->pending_callbacks.callbacks =
1944 gpr_realloc(t->pending_callbacks.callbacks,
1945 t->pending_callbacks.capacity *
1946 sizeof(*t->pending_callbacks.callbacks));
Craig Tillerc079c112015-04-22 15:23:39 -07001947 }
1948 closure.success = success;
1949 t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
1950}
1951
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001952static int prepare_callbacks(transport *t) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001953 op_closure_array temp = t->pending_callbacks;
1954 t->pending_callbacks = t->executing_callbacks;
1955 t->executing_callbacks = temp;
1956 return t->executing_callbacks.count > 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001957}
1958
Craig Tillerd1345de2015-02-24 21:55:20 -08001959static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001960 size_t i;
1961 for (i = 0; i < t->executing_callbacks.count; i++) {
1962 op_closure c = t->executing_callbacks.callbacks[i];
Craig Tillerc079c112015-04-22 15:23:39 -07001963 c.cb(c.user_data, c.success);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001964 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001965 t->executing_callbacks.count = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001966}
1967
Craig Tiller748fe3f2015-03-02 07:48:50 -08001968static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
1969 cb->closed(t->cb_user_data, &t->base);
1970}
1971
Craig Tillerc079c112015-04-22 15:23:39 -07001972/*
1973 * POLLSET STUFF
1974 */
1975
1976static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
ctillerd79b4862014-12-17 16:36:59 -08001977 if (t->ep) {
1978 grpc_endpoint_add_to_pollset(t->ep, pollset);
1979 }
Craig Tillerc079c112015-04-22 15:23:39 -07001980}
1981
1982static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
1983 transport *t = (transport *)gt;
1984 lock(t);
1985 add_to_pollset_locked(t, pollset);
ctillerd79b4862014-12-17 16:36:59 -08001986 unlock(t);
1987}
1988
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001989/*
1990 * INTEGRATION GLUE
1991 */
1992
1993static const grpc_transport_vtable vtable = {
Craig Tiller06aeea72015-04-23 10:54:45 -07001994 sizeof(stream), init_stream, perform_op,
1995 add_to_pollset, destroy_stream, goaway,
1996 close_transport, send_ping, destroy_transport};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001997
1998void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
1999 void *arg,
2000 const grpc_channel_args *channel_args,
2001 grpc_endpoint *ep, gpr_slice *slices,
2002 size_t nslices, grpc_mdctx *mdctx,
2003 int is_client) {
2004 transport *t = gpr_malloc(sizeof(transport));
Nicolas Noble5ea99bb2015-02-04 14:13:09 -08002005 init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx,
2006 is_client);
Craig Tiller190d3602015-02-18 09:23:38 -08002007}