blob: 9f9005691b7d8bc9f21fce56505ab013922cb78a [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/transport/chttp2_transport.h"
35
36#include <math.h>
37#include <stdio.h>
38#include <string.h>
39
Craig Tiller485d7762015-01-23 12:54:05 -080040#include "src/core/support/string.h"
nnoble0c475f02014-12-05 15:37:39 -080041#include "src/core/transport/chttp2/frame_data.h"
42#include "src/core/transport/chttp2/frame_goaway.h"
43#include "src/core/transport/chttp2/frame_ping.h"
44#include "src/core/transport/chttp2/frame_rst_stream.h"
45#include "src/core/transport/chttp2/frame_settings.h"
46#include "src/core/transport/chttp2/frame_window_update.h"
47#include "src/core/transport/chttp2/hpack_parser.h"
48#include "src/core/transport/chttp2/http2_errors.h"
49#include "src/core/transport/chttp2/status_conversion.h"
50#include "src/core/transport/chttp2/stream_encoder.h"
51#include "src/core/transport/chttp2/stream_map.h"
52#include "src/core/transport/chttp2/timeout_encoding.h"
53#include "src/core/transport/transport_impl.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080054#include <grpc/support/alloc.h>
55#include <grpc/support/log.h>
56#include <grpc/support/slice_buffer.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080057#include <grpc/support/useful.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080058
ctiller493fbcc2014-12-07 15:09:10 -080059#define DEFAULT_WINDOW 65535
60#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080061#define MAX_WINDOW 0x7fffffffu
62
63#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
64#define CLIENT_CONNECT_STRLEN 24
65
Craig Tillerfaa84802015-03-01 21:56:38 -080066int grpc_http_trace = 0;
67
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068typedef struct transport transport;
69typedef struct stream stream;
70
Craig Tiller5c019ae2015-04-17 16:46:53 -070071#define IF_TRACING(stmt) \
72 if (!(grpc_http_trace)) \
73 ; \
74 else \
Craig Tillerd50e5652015-02-24 16:46:22 -080075 stmt
76
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080077/* streams are kept in various linked lists depending on what things need to
78 happen to them... this enum labels each list */
79typedef enum {
80 /* streams that have pending writes */
81 WRITABLE = 0,
ctiller00297df2015-01-12 11:23:09 -080082 /* streams that have been selected to be written */
83 WRITING,
84 /* streams that have just been written, and included a close */
85 WRITTEN_CLOSED,
86 /* streams that have been cancelled and have some pending state updates
87 to perform */
88 CANCELLED,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080089 /* streams that want to send window updates */
90 WINDOW_UPDATE,
91 /* streams that are waiting to start because there are too many concurrent
92 streams on the connection */
93 WAITING_FOR_CONCURRENCY,
Craig Tillerc079c112015-04-22 15:23:39 -070094 /* streams that have finished reading: we wait until unlock to coalesce
95 all changes into one callback */
96 FINISHED_READ_OP,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080097 STREAM_LIST_COUNT /* must be last */
98} stream_list_id;
99
100/* deframer state for the overall http2 stream of bytes */
101typedef enum {
102 /* prefix: one entry per http2 connection prefix byte */
103 DTS_CLIENT_PREFIX_0 = 0,
104 DTS_CLIENT_PREFIX_1,
105 DTS_CLIENT_PREFIX_2,
106 DTS_CLIENT_PREFIX_3,
107 DTS_CLIENT_PREFIX_4,
108 DTS_CLIENT_PREFIX_5,
109 DTS_CLIENT_PREFIX_6,
110 DTS_CLIENT_PREFIX_7,
111 DTS_CLIENT_PREFIX_8,
112 DTS_CLIENT_PREFIX_9,
113 DTS_CLIENT_PREFIX_10,
114 DTS_CLIENT_PREFIX_11,
115 DTS_CLIENT_PREFIX_12,
116 DTS_CLIENT_PREFIX_13,
117 DTS_CLIENT_PREFIX_14,
118 DTS_CLIENT_PREFIX_15,
119 DTS_CLIENT_PREFIX_16,
120 DTS_CLIENT_PREFIX_17,
121 DTS_CLIENT_PREFIX_18,
122 DTS_CLIENT_PREFIX_19,
123 DTS_CLIENT_PREFIX_20,
124 DTS_CLIENT_PREFIX_21,
125 DTS_CLIENT_PREFIX_22,
126 DTS_CLIENT_PREFIX_23,
127 /* frame header byte 0... */
128 /* must follow from the prefix states */
129 DTS_FH_0,
130 DTS_FH_1,
131 DTS_FH_2,
132 DTS_FH_3,
133 DTS_FH_4,
134 DTS_FH_5,
135 DTS_FH_6,
136 DTS_FH_7,
137 /* ... frame header byte 8 */
138 DTS_FH_8,
139 /* inside a http2 frame */
140 DTS_FRAME
141} deframe_transport_state;
142
Craig Tillerc079c112015-04-22 15:23:39 -0700143typedef enum {
144 WRITE_STATE_OPEN,
145 WRITE_STATE_QUEUED_CLOSE,
146 WRITE_STATE_SENT_CLOSE
147} WRITE_STATE;
148
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800149typedef struct {
150 stream *head;
151 stream *tail;
152} stream_list;
153
154typedef struct {
155 stream *next;
156 stream *prev;
157} stream_link;
158
159typedef enum {
160 ERROR_STATE_NONE,
161 ERROR_STATE_SEEN,
162 ERROR_STATE_NOTIFIED
163} error_state;
164
165/* We keep several sets of connection wide parameters */
166typedef enum {
167 /* The settings our peer has asked for (and we have acked) */
168 PEER_SETTINGS = 0,
169 /* The settings we'd like to have */
170 LOCAL_SETTINGS,
171 /* The settings we've published to our peer */
172 SENT_SETTINGS,
173 /* The settings the peer has acked */
174 ACKED_SETTINGS,
175 NUM_SETTING_SETS
176} setting_set;
177
178/* Outstanding ping request data */
179typedef struct {
180 gpr_uint8 id[8];
181 void (*cb)(void *user_data);
182 void *user_data;
183} outstanding_ping;
184
nnoble0c475f02014-12-05 15:37:39 -0800185typedef struct {
186 grpc_status_code status;
187 gpr_slice debug;
188} pending_goaway;
189
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700190typedef struct {
191 void (*cb)(void *user_data, int success);
192 void *user_data;
Craig Tillerc079c112015-04-22 15:23:39 -0700193 int success;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700194} op_closure;
195
196typedef struct {
197 op_closure *callbacks;
198 size_t count;
199 size_t capacity;
200} op_closure_array;
201
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202struct transport {
203 grpc_transport base; /* must be first */
204 const grpc_transport_callbacks *cb;
205 void *cb_user_data;
206 grpc_endpoint *ep;
207 grpc_mdctx *metadata_context;
208 gpr_refcount refs;
209 gpr_uint8 is_client;
210
211 gpr_mu mu;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800212 gpr_cv cv;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213
214 /* basic state management - what are we doing at the moment? */
215 gpr_uint8 reading;
216 gpr_uint8 writing;
217 gpr_uint8 calling_back;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800218 gpr_uint8 destroying;
Craig Tillerd75fe662015-02-21 07:30:49 -0800219 gpr_uint8 closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800220 error_state error_state;
221
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700222 /* queued callbacks */
223 op_closure_array pending_callbacks;
224 op_closure_array executing_callbacks;
225
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800226 /* stream indexing */
227 gpr_uint32 next_stream_id;
nnoble0c475f02014-12-05 15:37:39 -0800228 gpr_uint32 last_incoming_stream_id;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800229
230 /* settings */
231 gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
ctiller493fbcc2014-12-07 15:09:10 -0800232 gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */
233 gpr_uint8 sent_local_settings; /* have local settings been sent? */
234 gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800235
236 /* window management */
237 gpr_uint32 outgoing_window;
238 gpr_uint32 incoming_window;
ctiller493fbcc2014-12-07 15:09:10 -0800239 gpr_uint32 connection_window_target;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800240
241 /* deframing */
242 deframe_transport_state deframe_state;
243 gpr_uint8 incoming_frame_type;
244 gpr_uint8 incoming_frame_flags;
245 gpr_uint8 header_eof;
246 gpr_uint32 expect_continuation_stream_id;
247 gpr_uint32 incoming_frame_size;
248 gpr_uint32 incoming_stream_id;
249
250 /* hpack encoding */
251 grpc_chttp2_hpack_compressor hpack_compressor;
252
253 /* various parsers */
254 grpc_chttp2_hpack_parser hpack_parser;
255 /* simple one shot parsers */
256 union {
257 grpc_chttp2_window_update_parser window_update;
258 grpc_chttp2_settings_parser settings;
259 grpc_chttp2_ping_parser ping;
260 } simple_parsers;
261
nnoble0c475f02014-12-05 15:37:39 -0800262 /* goaway */
263 grpc_chttp2_goaway_parser goaway_parser;
264 pending_goaway *pending_goaways;
265 size_t num_pending_goaways;
266 size_t cap_pending_goaways;
267
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800268 /* state for a stream that's not yet been created */
269 grpc_stream_op_buffer new_stream_sopb;
270
Craig Tillercb818ba2015-01-29 17:08:01 -0800271 /* stream ops that need to be destroyed, but outside of the lock */
272 grpc_stream_op_buffer nuke_later_sopb;
273
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800274 /* active parser */
275 void *parser_data;
276 stream *incoming_stream;
277 grpc_chttp2_parse_error (*parser)(void *parser_user_data,
278 grpc_chttp2_parse_state *state,
279 gpr_slice slice, int is_last);
280
281 gpr_slice_buffer outbuf;
282 gpr_slice_buffer qbuf;
283
284 stream_list lists[STREAM_LIST_COUNT];
285 grpc_chttp2_stream_map stream_map;
286
287 /* metadata object cache */
288 grpc_mdstr *str_grpc_timeout;
289
290 /* pings */
291 outstanding_ping *pings;
292 size_t ping_count;
293 size_t ping_capacity;
294 gpr_int64 ping_counter;
295};
296
297struct stream {
298 gpr_uint32 id;
299
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800300 gpr_uint32 incoming_window;
Craig Tiller84b88842015-04-20 08:47:52 -0700301 gpr_int64 outgoing_window;
ctiller00297df2015-01-12 11:23:09 -0800302 /* when the application requests writes be closed, the write_closed is
303 'queued'; when the close is flow controlled into the send path, we are
304 'sending' it; when the write has been performed it is 'sent' */
Craig Tillerc079c112015-04-22 15:23:39 -0700305 WRITE_STATE write_state;
306 gpr_uint8 send_closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800307 gpr_uint8 read_closed;
308 gpr_uint8 cancelled;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800309
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700310 op_closure send_done_closure;
311 op_closure recv_done_closure;
312
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313 stream_link links[STREAM_LIST_COUNT];
314 gpr_uint8 included[STREAM_LIST_COUNT];
315
Craig Tiller9c1043e2015-04-16 16:20:38 -0700316 /* incoming metadata */
317 grpc_linked_mdelem *incoming_metadata;
318 size_t incoming_metadata_count;
319 size_t incoming_metadata_capacity;
Craig Tiller2b0f7c52015-04-24 17:23:17 -0700320 grpc_linked_mdelem *old_incoming_metadata;
Craig Tiller9c1043e2015-04-16 16:20:38 -0700321 gpr_timespec incoming_deadline;
322
ctiller00297df2015-01-12 11:23:09 -0800323 /* sops from application */
Craig Tillerc079c112015-04-22 15:23:39 -0700324 grpc_stream_op_buffer *outgoing_sopb;
325 grpc_stream_op_buffer *incoming_sopb;
326 grpc_stream_state *publish_state;
327 grpc_stream_state published_state;
ctiller00297df2015-01-12 11:23:09 -0800328 /* sops that have passed flow control to be written */
329 grpc_stream_op_buffer writing_sopb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800330
331 grpc_chttp2_data_parser parser;
332
333 grpc_stream_state callback_state;
334 grpc_stream_op_buffer callback_sopb;
335};
336
337static const grpc_transport_vtable vtable;
338
339static void push_setting(transport *t, grpc_chttp2_setting_id id,
340 gpr_uint32 value);
341
342static int prepare_callbacks(transport *t);
Craig Tillerd1345de2015-02-24 21:55:20 -0800343static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
Craig Tiller748fe3f2015-03-02 07:48:50 -0800344static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800345
346static int prepare_write(transport *t);
ctiller00297df2015-01-12 11:23:09 -0800347static void perform_write(transport *t, grpc_endpoint *ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800348
349static void lock(transport *t);
350static void unlock(transport *t);
351
352static void drop_connection(transport *t);
353static void end_all_the_calls(transport *t);
354
355static stream *stream_list_remove_head(transport *t, stream_list_id id);
356static void stream_list_remove(transport *t, stream *s, stream_list_id id);
357static void stream_list_add_tail(transport *t, stream *s, stream_list_id id);
358static void stream_list_join(transport *t, stream *s, stream_list_id id);
359
360static void cancel_stream_id(transport *t, gpr_uint32 id,
361 grpc_status_code local_status,
362 grpc_chttp2_error_code error_code, int send_rst);
363static void cancel_stream(transport *t, stream *s,
364 grpc_status_code local_status,
Craig Tiller1a727fd2015-04-24 13:21:22 -0700365 grpc_chttp2_error_code error_code,
Craig Tiller2ea37fd2015-04-24 13:03:49 -0700366 grpc_mdstr *optional_message, int send_rst);
ctiller00297df2015-01-12 11:23:09 -0800367static void finalize_cancellations(transport *t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800368static stream *lookup_stream(transport *t, gpr_uint32 id);
369static void remove_from_stream_map(transport *t, stream *s);
370static void maybe_start_some_streams(transport *t);
371
372static void become_skip_parser(transport *t);
373
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800374static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
375 grpc_endpoint_cb_status error);
376
Craig Tillerc079c112015-04-22 15:23:39 -0700377static void schedule_cb(transport *t, op_closure closure, int success);
378static void maybe_finish_read(transport *t, stream *s);
379static void maybe_join_window_updates(transport *t, stream *s);
380static void finish_reads(transport *t);
381static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
Craig Tiller50d9db52015-04-23 10:52:14 -0700382static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
Craig Tiller7d4a96a2015-04-24 07:54:07 -0700383static void add_metadata_batch(transport *t, stream *s);
Craig Tillerc079c112015-04-22 15:23:39 -0700384
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800385/*
386 * CONSTRUCTION/DESTRUCTION/REFCOUNTING
387 */
388
Craig Tiller9be83ee2015-02-18 14:16:15 -0800389static void destruct_transport(transport *t) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800390 size_t i;
391
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800392 gpr_mu_lock(&t->mu);
393
394 GPR_ASSERT(t->ep == NULL);
395
396 gpr_slice_buffer_destroy(&t->outbuf);
397 gpr_slice_buffer_destroy(&t->qbuf);
398 grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
399 grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
nnoble0c475f02014-12-05 15:37:39 -0800400 grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800401
402 grpc_mdstr_unref(t->str_grpc_timeout);
403
404 for (i = 0; i < STREAM_LIST_COUNT; i++) {
405 GPR_ASSERT(t->lists[i].head == NULL);
406 GPR_ASSERT(t->lists[i].tail == NULL);
407 }
408
409 GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
410
411 grpc_chttp2_stream_map_destroy(&t->stream_map);
412
413 gpr_mu_unlock(&t->mu);
414 gpr_mu_destroy(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800415 gpr_cv_destroy(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800416
417 /* callback remaining pings: they're not allowed to call into the transpot,
418 and maybe they hold resources that need to be freed */
419 for (i = 0; i < t->ping_count; i++) {
420 t->pings[i].cb(t->pings[i].user_data);
421 }
422 gpr_free(t->pings);
423
Craig Tiller4df5cae2015-04-24 13:46:12 -0700424 gpr_free(t->pending_callbacks.callbacks);
425 gpr_free(t->executing_callbacks.callbacks);
426
nnoble0c475f02014-12-05 15:37:39 -0800427 for (i = 0; i < t->num_pending_goaways; i++) {
428 gpr_slice_unref(t->pending_goaways[i].debug);
429 }
430 gpr_free(t->pending_goaways);
431
Craig Tiller8ed35ea2015-01-30 11:27:43 -0800432 grpc_sopb_destroy(&t->nuke_later_sopb);
433
Craig Tiller9be83ee2015-02-18 14:16:15 -0800434 grpc_mdctx_unref(t->metadata_context);
435
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800436 gpr_free(t);
437}
438
Craig Tiller9be83ee2015-02-18 14:16:15 -0800439static void unref_transport(transport *t) {
440 if (!gpr_unref(&t->refs)) return;
441 destruct_transport(t);
442}
443
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800444static void ref_transport(transport *t) { gpr_ref(&t->refs); }
445
446static void init_transport(transport *t, grpc_transport_setup_callback setup,
447 void *arg, const grpc_channel_args *channel_args,
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800448 grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
449 grpc_mdctx *mdctx, int is_client) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800450 size_t i;
451 int j;
452 grpc_transport_setup_result sr;
453
454 GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
455
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700456 memset(t, 0, sizeof(*t));
457
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800458 t->base.vtable = &vtable;
459 t->ep = ep;
460 /* one ref is for destroy, the other for when ep becomes NULL */
461 gpr_ref_init(&t->refs, 2);
462 gpr_mu_init(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800463 gpr_cv_init(&t->cv);
Craig Tiller9be83ee2015-02-18 14:16:15 -0800464 grpc_mdctx_ref(mdctx);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800465 t->metadata_context = mdctx;
466 t->str_grpc_timeout =
467 grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
468 t->reading = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800469 t->error_state = ERROR_STATE_NONE;
470 t->next_stream_id = is_client ? 1 : 2;
471 t->is_client = is_client;
472 t->outgoing_window = DEFAULT_WINDOW;
473 t->incoming_window = DEFAULT_WINDOW;
ctiller493fbcc2014-12-07 15:09:10 -0800474 t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800475 t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800476 t->ping_counter = gpr_now().tv_nsec;
477 grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
nnoble0c475f02014-12-05 15:37:39 -0800478 grpc_chttp2_goaway_parser_init(&t->goaway_parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800479 gpr_slice_buffer_init(&t->outbuf);
480 gpr_slice_buffer_init(&t->qbuf);
Craig Tillercb818ba2015-01-29 17:08:01 -0800481 grpc_sopb_init(&t->nuke_later_sopb);
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800482 grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800483 if (is_client) {
484 gpr_slice_buffer_add(&t->qbuf,
485 gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
486 }
487 /* 8 is a random stab in the dark as to a good initial size: it's small enough
488 that it shouldn't waste memory for infrequently used connections, yet
489 large enough that the exponential growth should happen nicely when it's
490 needed.
491 TODO(ctiller): tune this */
492 grpc_chttp2_stream_map_init(&t->stream_map, 8);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800493
494 /* copy in initial settings to all setting sets */
495 for (i = 0; i < NUM_SETTING_SETS; i++) {
496 for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) {
497 t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
498 }
499 }
500 t->dirtied_local_settings = 1;
ctiller493fbcc2014-12-07 15:09:10 -0800501 /* Hack: it's common for implementations to assume 65536 bytes initial send
502 window -- this should by rights be 0 */
503 t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800504 t->sent_local_settings = 0;
505
506 /* configure http2 the way we like it */
507 if (t->is_client) {
508 push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
509 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
510 }
ctiller493fbcc2014-12-07 15:09:10 -0800511 push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800512
513 if (channel_args) {
514 for (i = 0; i < channel_args->num_args; i++) {
515 if (0 ==
516 strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
517 if (t->is_client) {
518 gpr_log(GPR_ERROR, "%s: is ignored on the client",
519 GRPC_ARG_MAX_CONCURRENT_STREAMS);
520 } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
521 gpr_log(GPR_ERROR, "%s: must be an integer",
522 GRPC_ARG_MAX_CONCURRENT_STREAMS);
523 } else {
524 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
525 channel_args->args[i].value.integer);
526 }
527 }
528 }
529 }
530
531 gpr_mu_lock(&t->mu);
532 t->calling_back = 1;
Craig Tiller06aeea72015-04-23 10:54:45 -0700533 ref_transport(t); /* matches unref at end of this function */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800534 gpr_mu_unlock(&t->mu);
535
536 sr = setup(arg, &t->base, t->metadata_context);
537
538 lock(t);
539 t->cb = sr.callbacks;
540 t->cb_user_data = sr.user_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800541 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800542 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800543 unlock(t);
Craig Tillerdcf9c0e2015-02-11 16:12:41 -0800544
Craig Tiller06aeea72015-04-23 10:54:45 -0700545 ref_transport(t); /* matches unref inside recv_data */
Craig Tillerdcf9c0e2015-02-11 16:12:41 -0800546 recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
547
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800548 unref_transport(t);
549}
550
551static void destroy_transport(grpc_transport *gt) {
552 transport *t = (transport *)gt;
553
Craig Tiller748fe3f2015-03-02 07:48:50 -0800554 lock(t);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800555 t->destroying = 1;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000556 /* Wait for pending stuff to finish.
557 We need to be not calling back to ensure that closed() gets a chance to
558 trigger if needed during unlock() before we die.
559 We need to be not writing as cancellation finalization may produce some
560 callbacks that NEED to be made to close out some streams when t->writing
561 becomes 0. */
562 while (t->calling_back || t->writing) {
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800563 gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
564 }
Craig Tiller748fe3f2015-03-02 07:48:50 -0800565 drop_connection(t);
566 unlock(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800567
Craig Tillerbb88a042015-03-02 10:56:33 -0800568 /* The drop_connection() above puts the transport into an error state, and
569 the follow-up unlock should then (as part of the cleanup work it does)
570 ensure that cb is NULL, and therefore not call back anything further.
571 This check validates this very subtle behavior.
572 It's shutdown path, so I don't believe an extra lock pair is going to be
573 problematic for performance. */
Craig Tillerb9eb1802015-03-02 16:41:32 +0000574 lock(t);
575 GPR_ASSERT(!t->cb);
576 unlock(t);
577
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800578 unref_transport(t);
579}
580
581static void close_transport(grpc_transport *gt) {
582 transport *t = (transport *)gt;
583 gpr_mu_lock(&t->mu);
Craig Tillerd75fe662015-02-21 07:30:49 -0800584 GPR_ASSERT(!t->closed);
585 t->closed = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800586 if (t->ep) {
587 grpc_endpoint_shutdown(t->ep);
588 }
589 gpr_mu_unlock(&t->mu);
590}
591
nnoble0c475f02014-12-05 15:37:39 -0800592static void goaway(grpc_transport *gt, grpc_status_code status,
593 gpr_slice debug_data) {
594 transport *t = (transport *)gt;
595 lock(t);
596 grpc_chttp2_goaway_append(t->last_incoming_stream_id,
597 grpc_chttp2_grpc_status_to_http2_error(status),
598 debug_data, &t->qbuf);
599 unlock(t);
600}
601
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800602static int init_stream(grpc_transport *gt, grpc_stream *gs,
Craig Tiller50d9db52015-04-23 10:52:14 -0700603 const void *server_data, grpc_transport_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800604 transport *t = (transport *)gt;
605 stream *s = (stream *)gs;
606
Craig Tillerc079c112015-04-22 15:23:39 -0700607 memset(s, 0, sizeof(*s));
608
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800609 ref_transport(t);
610
611 if (!server_data) {
612 lock(t);
613 s->id = 0;
614 } else {
Craig Tiller3f2c2212015-04-23 07:56:33 -0700615 /* already locked */
Craig Tiller5c019ae2015-04-17 16:46:53 -0700616 s->id = (gpr_uint32)(gpr_uintptr)server_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800617 t->incoming_stream = s;
618 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
619 }
620
ctiller493fbcc2014-12-07 15:09:10 -0800621 s->outgoing_window =
622 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
623 s->incoming_window =
624 t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
Craig Tiller9c1043e2015-04-16 16:20:38 -0700625 s->incoming_deadline = gpr_inf_future;
ctiller00297df2015-01-12 11:23:09 -0800626 grpc_sopb_init(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800627 grpc_sopb_init(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800628 grpc_chttp2_data_parser_init(&s->parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800629
Craig Tiller50d9db52015-04-23 10:52:14 -0700630 if (initial_op) perform_op_locked(t, s, initial_op);
631
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800632 if (!server_data) {
633 unlock(t);
634 }
635
636 return 0;
637}
638
Craig Tillercb818ba2015-01-29 17:08:01 -0800639static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) {
640 grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
641 sopb->nops = 0;
642}
643
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800644static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
645 transport *t = (transport *)gt;
646 stream *s = (stream *)gs;
647 size_t i;
648
649 gpr_mu_lock(&t->mu);
650
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800651 /* stop parsing if we're currently parsing this stream */
652 if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id &&
653 s->id != 0) {
654 become_skip_parser(t);
655 }
656
657 for (i = 0; i < STREAM_LIST_COUNT; i++) {
658 stream_list_remove(t, s, i);
659 }
660 remove_from_stream_map(t, s);
661
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800662 gpr_mu_unlock(&t->mu);
663
Craig Tillerc079c112015-04-22 15:23:39 -0700664 GPR_ASSERT(s->outgoing_sopb == NULL);
Craig Tiller48bfcdc2015-04-24 14:24:27 -0700665 GPR_ASSERT(s->incoming_sopb == NULL);
ctiller00297df2015-01-12 11:23:09 -0800666 grpc_sopb_destroy(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800667 grpc_sopb_destroy(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800668 grpc_chttp2_data_parser_destroy(&s->parser);
Craig Tiller9c71b6f2015-04-24 16:02:00 -0700669 for (i = 0; i < s->incoming_metadata_count; i++) {
670 grpc_mdelem_unref(s->incoming_metadata[i].md);
671 }
Craig Tiller48bfcdc2015-04-24 14:24:27 -0700672 gpr_free(s->incoming_metadata);
Craig Tiller2b0f7c52015-04-24 17:23:17 -0700673 gpr_free(s->old_incoming_metadata);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800674
675 unref_transport(t);
676}
677
678/*
679 * LIST MANAGEMENT
680 */
681
ctiller00297df2015-01-12 11:23:09 -0800682static int stream_list_empty(transport *t, stream_list_id id) {
683 return t->lists[id].head == NULL;
684}
685
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800686static stream *stream_list_remove_head(transport *t, stream_list_id id) {
687 stream *s = t->lists[id].head;
688 if (s) {
689 stream *new_head = s->links[id].next;
690 GPR_ASSERT(s->included[id]);
691 if (new_head) {
692 t->lists[id].head = new_head;
693 new_head->links[id].prev = NULL;
694 } else {
695 t->lists[id].head = NULL;
696 t->lists[id].tail = NULL;
697 }
698 s->included[id] = 0;
699 }
700 return s;
701}
702
703static void stream_list_remove(transport *t, stream *s, stream_list_id id) {
704 if (!s->included[id]) return;
705 s->included[id] = 0;
706 if (s->links[id].prev) {
707 s->links[id].prev->links[id].next = s->links[id].next;
708 } else {
709 GPR_ASSERT(t->lists[id].head == s);
710 t->lists[id].head = s->links[id].next;
711 }
712 if (s->links[id].next) {
713 s->links[id].next->links[id].prev = s->links[id].prev;
714 } else {
715 t->lists[id].tail = s->links[id].prev;
716 }
717}
718
719static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
720 stream *old_tail;
721 GPR_ASSERT(!s->included[id]);
722 old_tail = t->lists[id].tail;
723 s->links[id].next = NULL;
724 s->links[id].prev = old_tail;
725 if (old_tail) {
726 old_tail->links[id].next = s;
727 } else {
728 s->links[id].prev = NULL;
729 t->lists[id].head = s;
730 }
731 t->lists[id].tail = s;
732 s->included[id] = 1;
733}
734
735static void stream_list_join(transport *t, stream *s, stream_list_id id) {
736 if (s->included[id]) {
737 return;
738 }
739 stream_list_add_tail(t, s, id);
740}
741
742static void remove_from_stream_map(transport *t, stream *s) {
743 if (s->id == 0) return;
Craig Tiller1a727fd2015-04-24 13:21:22 -0700744 IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d",
745 t->is_client ? "CLI" : "SVR", s->id));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800746 if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
747 maybe_start_some_streams(t);
748 }
749}
750
751/*
752 * LOCK MANAGEMENT
753 */
754
755/* We take a transport-global lock in response to calls coming in from above,
756 and in response to data being received from below. New data to be written
757 is always queued, as are callbacks to process data. During unlock() we
758 check our todo lists and initiate callbacks and flush writes. */
759
760static void lock(transport *t) { gpr_mu_lock(&t->mu); }
761
762static void unlock(transport *t) {
763 int start_write = 0;
764 int perform_callbacks = 0;
765 int call_closed = 0;
nnoble0c475f02014-12-05 15:37:39 -0800766 int num_goaways = 0;
767 int i;
768 pending_goaway *goaways = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800769 grpc_endpoint *ep = t->ep;
Craig Tillere3018e62015-02-13 17:05:19 -0800770 grpc_stream_op_buffer nuke_now;
Craig Tillerd1345de2015-02-24 21:55:20 -0800771 const grpc_transport_callbacks *cb = t->cb;
Craig Tiller06059952015-02-18 08:34:56 -0800772
Craig Tillere3018e62015-02-13 17:05:19 -0800773 grpc_sopb_init(&nuke_now);
774 if (t->nuke_later_sopb.nops) {
775 grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
Craig Tillercb818ba2015-01-29 17:08:01 -0800776 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800777
778 /* see if we need to trigger a write - and if so, get the data ready */
779 if (ep && !t->writing) {
780 t->writing = start_write = prepare_write(t);
781 if (start_write) {
782 ref_transport(t);
783 }
784 }
785
ctiller00297df2015-01-12 11:23:09 -0800786 if (!t->writing) {
787 finalize_cancellations(t);
788 }
789
Craig Tillerc079c112015-04-22 15:23:39 -0700790 finish_reads(t);
791
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800792 /* gather any callbacks that need to be made */
Craig Tillerd1345de2015-02-24 21:55:20 -0800793 if (!t->calling_back && cb) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800794 perform_callbacks = prepare_callbacks(t);
795 if (perform_callbacks) {
796 t->calling_back = 1;
797 }
Craig Tillerb9eb1802015-03-02 16:41:32 +0000798 if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800799 call_closed = 1;
800 t->calling_back = 1;
Craig Tiller5c019ae2015-04-17 16:46:53 -0700801 t->cb = NULL; /* no more callbacks */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800802 t->error_state = ERROR_STATE_NOTIFIED;
803 }
nnoble0c475f02014-12-05 15:37:39 -0800804 if (t->num_pending_goaways) {
805 goaways = t->pending_goaways;
806 num_goaways = t->num_pending_goaways;
807 t->pending_goaways = NULL;
808 t->num_pending_goaways = 0;
ctiller82e275f2014-12-12 08:43:28 -0800809 t->cap_pending_goaways = 0;
nnoble0c475f02014-12-05 15:37:39 -0800810 t->calling_back = 1;
811 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800812 }
813
nnoble0c475f02014-12-05 15:37:39 -0800814 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800815 ref_transport(t);
816 }
817
818 /* finally unlock */
819 gpr_mu_unlock(&t->mu);
820
821 /* perform some callbacks if necessary */
nnoble0c475f02014-12-05 15:37:39 -0800822 for (i = 0; i < num_goaways; i++) {
Craig Tiller5c019ae2015-04-17 16:46:53 -0700823 cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
nnoble0c475f02014-12-05 15:37:39 -0800824 }
825
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800826 if (perform_callbacks) {
Craig Tillerd1345de2015-02-24 21:55:20 -0800827 run_callbacks(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800828 }
829
830 if (call_closed) {
Craig Tiller748fe3f2015-03-02 07:48:50 -0800831 call_cb_closed(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800832 }
833
834 /* write some bytes if necessary */
ctiller00297df2015-01-12 11:23:09 -0800835 if (start_write) {
836 /* ultimately calls unref_transport(t); and clears t->writing */
837 perform_write(t, ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800838 }
839
nnoble0c475f02014-12-05 15:37:39 -0800840 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800841 lock(t);
842 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800843 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800844 unlock(t);
845 unref_transport(t);
846 }
nnoble0c475f02014-12-05 15:37:39 -0800847
Craig Tillere3018e62015-02-13 17:05:19 -0800848 grpc_sopb_destroy(&nuke_now);
Craig Tillercb818ba2015-01-29 17:08:01 -0800849
nnoble0c475f02014-12-05 15:37:39 -0800850 gpr_free(goaways);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800851}
852
853/*
854 * OUTPUT PROCESSING
855 */
856
857static void push_setting(transport *t, grpc_chttp2_setting_id id,
858 gpr_uint32 value) {
859 const grpc_chttp2_setting_parameters *sp =
860 &grpc_chttp2_settings_parameters[id];
861 gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
862 if (use_value != value) {
863 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
864 value, use_value);
865 }
866 if (use_value != t->settings[LOCAL_SETTINGS][id]) {
867 t->settings[LOCAL_SETTINGS][id] = use_value;
868 t->dirtied_local_settings = 1;
869 }
870}
871
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800872static int prepare_write(transport *t) {
873 stream *s;
ctiller00297df2015-01-12 11:23:09 -0800874 gpr_uint32 window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800875
876 /* simple writes are queued to qbuf, and flushed here */
Craig Tiller721f3622015-04-13 16:14:28 -0700877 gpr_slice_buffer_swap(&t->qbuf, &t->outbuf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800878 GPR_ASSERT(t->qbuf.count == 0);
879
880 if (t->dirtied_local_settings && !t->sent_local_settings) {
881 gpr_slice_buffer_add(
ctiller493fbcc2014-12-07 15:09:10 -0800882 &t->outbuf, grpc_chttp2_settings_create(
883 t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
884 t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
885 t->force_send_settings = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800886 t->dirtied_local_settings = 0;
887 t->sent_local_settings = 1;
888 }
889
890 /* for each stream that's become writable, frame it's data (according to
891 available window sizes) and add to the output buffer */
Craig Tiller84b88842015-04-20 08:47:52 -0700892 while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
893 s->outgoing_window > 0) {
ctiller00297df2015-01-12 11:23:09 -0800894 window_delta = grpc_chttp2_preencode(
Craig Tillerc079c112015-04-22 15:23:39 -0700895 s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
ctiller00297df2015-01-12 11:23:09 -0800896 GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
897 t->outgoing_window -= window_delta;
898 s->outgoing_window -= window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800899
Craig Tiller06aeea72015-04-23 10:54:45 -0700900 if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
901 s->outgoing_sopb->nops == 0) {
Craig Tillerc079c112015-04-22 15:23:39 -0700902 s->send_closed = 1;
903 }
904 if (s->writing_sopb.nops > 0 || s->send_closed) {
ctiller00297df2015-01-12 11:23:09 -0800905 stream_list_join(t, s, WRITING);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800906 }
907
Craig Tillerc079c112015-04-22 15:23:39 -0700908 /* we should either exhaust window or have no ops left, but not both */
Craig Tillerc079c112015-04-22 15:23:39 -0700909 if (s->outgoing_sopb->nops == 0) {
910 s->outgoing_sopb = NULL;
911 schedule_cb(t, s->send_done_closure, 1);
Craig Tillere8893142015-04-23 16:02:01 -0700912 } else if (s->outgoing_window) {
913 stream_list_add_tail(t, s, WRITABLE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800914 }
915 }
916
917 /* for each stream that wants to update its window, add that window here */
918 while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
ctiller00297df2015-01-12 11:23:09 -0800919 window_delta =
ctiller493fbcc2014-12-07 15:09:10 -0800920 t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
921 s->incoming_window;
ctiller00297df2015-01-12 11:23:09 -0800922 if (!s->read_closed && window_delta) {
923 gpr_slice_buffer_add(
924 &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
925 s->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800926 }
927 }
928
929 /* if the transport is ready to send a window update, do so here also */
ctiller493fbcc2014-12-07 15:09:10 -0800930 if (t->incoming_window < t->connection_window_target * 3 / 4) {
ctiller00297df2015-01-12 11:23:09 -0800931 window_delta = t->connection_window_target - t->incoming_window;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800932 gpr_slice_buffer_add(&t->outbuf,
ctiller00297df2015-01-12 11:23:09 -0800933 grpc_chttp2_window_update_create(0, window_delta));
934 t->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800935 }
936
ctiller00297df2015-01-12 11:23:09 -0800937 return t->outbuf.length > 0 || !stream_list_empty(t, WRITING);
938}
939
940static void finalize_outbuf(transport *t) {
941 stream *s;
942
943 while ((s = stream_list_remove_head(t, WRITING))) {
944 grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
Craig Tiller06aeea72015-04-23 10:54:45 -0700945 s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
ctiller00297df2015-01-12 11:23:09 -0800946 s->writing_sopb.nops = 0;
Craig Tillerc079c112015-04-22 15:23:39 -0700947 if (s->send_closed) {
ctiller00297df2015-01-12 11:23:09 -0800948 stream_list_join(t, s, WRITTEN_CLOSED);
949 }
950 }
951}
952
953static void finish_write_common(transport *t, int success) {
954 stream *s;
955
956 lock(t);
957 if (!success) {
958 drop_connection(t);
959 }
960 while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
Craig Tillerc079c112015-04-22 15:23:39 -0700961 s->write_state = WRITE_STATE_SENT_CLOSE;
Craig Tiller65f9f812015-04-24 16:53:20 -0700962 if (1||!s->cancelled) {
Craig Tillerc079c112015-04-22 15:23:39 -0700963 maybe_finish_read(t, s);
964 }
ctiller00297df2015-01-12 11:23:09 -0800965 }
966 t->outbuf.count = 0;
967 t->outbuf.length = 0;
968 /* leave the writing flag up on shutdown to prevent further writes in unlock()
969 from starting */
970 t->writing = 0;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000971 if (t->destroying) {
972 gpr_cv_signal(&t->cv);
973 }
ctiller00297df2015-01-12 11:23:09 -0800974 if (!t->reading) {
975 grpc_endpoint_destroy(t->ep);
976 t->ep = NULL;
ctiller00297df2015-01-12 11:23:09 -0800977 unref_transport(t); /* safe because we'll still have the ref for write */
978 }
979 unlock(t);
980
981 unref_transport(t);
982}
983
984static void finish_write(void *tp, grpc_endpoint_cb_status error) {
985 transport *t = tp;
986 finish_write_common(t, error == GRPC_ENDPOINT_CB_OK);
987}
988
989static void perform_write(transport *t, grpc_endpoint *ep) {
990 finalize_outbuf(t);
991
992 GPR_ASSERT(t->outbuf.count > 0);
993
994 switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
995 finish_write, t)) {
996 case GRPC_ENDPOINT_WRITE_DONE:
997 finish_write_common(t, 1);
998 break;
999 case GRPC_ENDPOINT_WRITE_ERROR:
1000 finish_write_common(t, 0);
1001 break;
1002 case GRPC_ENDPOINT_WRITE_PENDING:
1003 break;
1004 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001005}
1006
1007static void maybe_start_some_streams(transport *t) {
1008 while (
1009 grpc_chttp2_stream_map_size(&t->stream_map) <
1010 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
1011 stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
1012 if (!s) break;
1013
Craig Tiller1a727fd2015-04-24 13:21:22 -07001014 IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d",
1015 t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
Craig Tillerc1f75602015-04-24 11:44:53 -07001016
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001017 GPR_ASSERT(s->id == 0);
1018 s->id = t->next_stream_id;
1019 t->next_stream_id += 2;
1020 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1021 stream_list_join(t, s, WRITABLE);
1022 }
1023}
1024
Craig Tiller50d9db52015-04-23 10:52:14 -07001025static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001026 if (op->cancel_with_status != GRPC_STATUS_OK) {
1027 cancel_stream(
1028 t, s, op->cancel_with_status,
Craig Tiller1a727fd2015-04-24 13:21:22 -07001029 grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
1030 op->cancel_message, 1);
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001031 }
1032
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001033 if (op->send_ops) {
Craig Tillerc079c112015-04-22 15:23:39 -07001034 GPR_ASSERT(s->outgoing_sopb == NULL);
1035 s->send_done_closure.cb = op->on_done_send;
1036 s->send_done_closure.user_data = op->send_user_data;
1037 if (!s->cancelled) {
1038 s->outgoing_sopb = op->send_ops;
1039 if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
1040 s->write_state = WRITE_STATE_QUEUED_CLOSE;
1041 }
1042 if (s->id == 0) {
Craig Tiller1a727fd2015-04-24 13:21:22 -07001043 IF_TRACING(gpr_log(GPR_DEBUG,
1044 "HTTP:%s: New stream %p waiting for concurrency",
1045 t->is_client ? "CLI" : "SVR", s));
Craig Tillerc079c112015-04-22 15:23:39 -07001046 stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
1047 maybe_start_some_streams(t);
1048 } else if (s->outgoing_window > 0) {
1049 stream_list_join(t, s, WRITABLE);
1050 }
1051 } else {
1052 schedule_nuke_sopb(t, op->send_ops);
1053 schedule_cb(t, s->send_done_closure, 0);
1054 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001055 }
1056
1057 if (op->recv_ops) {
Craig Tillerc079c112015-04-22 15:23:39 -07001058 GPR_ASSERT(s->incoming_sopb == NULL);
1059 s->recv_done_closure.cb = op->on_done_recv;
1060 s->recv_done_closure.user_data = op->recv_user_data;
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001061 s->incoming_sopb = op->recv_ops;
1062 s->incoming_sopb->nops = 0;
1063 s->publish_state = op->recv_state;
Craig Tiller2b0f7c52015-04-24 17:23:17 -07001064 gpr_free(s->old_incoming_metadata);
1065 s->old_incoming_metadata = NULL;
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001066 maybe_finish_read(t, s);
1067 maybe_join_window_updates(t, s);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001068 }
1069
1070 if (op->bind_pollset) {
Craig Tillerc079c112015-04-22 15:23:39 -07001071 add_to_pollset_locked(t, op->bind_pollset);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001072 }
Craig Tiller50d9db52015-04-23 10:52:14 -07001073}
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001074
Craig Tiller06aeea72015-04-23 10:54:45 -07001075static void perform_op(grpc_transport *gt, grpc_stream *gs,
1076 grpc_transport_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -07001077 transport *t = (transport *)gt;
1078 stream *s = (stream *)gs;
1079
1080 lock(t);
1081 perform_op_locked(t, s, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001082 unlock(t);
1083}
1084
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001085static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
1086 void *user_data) {
1087 transport *t = (transport *)gt;
1088 outstanding_ping *p;
1089
1090 lock(t);
1091 if (t->ping_capacity == t->ping_count) {
1092 t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2);
1093 t->pings =
1094 gpr_realloc(t->pings, sizeof(outstanding_ping) * t->ping_capacity);
1095 }
1096 p = &t->pings[t->ping_count++];
nnoble8f4e42c2014-12-11 16:36:46 -08001097 p->id[0] = (t->ping_counter >> 56) & 0xff;
1098 p->id[1] = (t->ping_counter >> 48) & 0xff;
1099 p->id[2] = (t->ping_counter >> 40) & 0xff;
1100 p->id[3] = (t->ping_counter >> 32) & 0xff;
1101 p->id[4] = (t->ping_counter >> 24) & 0xff;
1102 p->id[5] = (t->ping_counter >> 16) & 0xff;
1103 p->id[6] = (t->ping_counter >> 8) & 0xff;
1104 p->id[7] = t->ping_counter & 0xff;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001105 p->cb = cb;
1106 p->user_data = user_data;
1107 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
1108 unlock(t);
1109}
1110
1111/*
1112 * INPUT PROCESSING
1113 */
1114
ctiller00297df2015-01-12 11:23:09 -08001115static void finalize_cancellations(transport *t) {
1116 stream *s;
1117
1118 while ((s = stream_list_remove_head(t, CANCELLED))) {
1119 s->read_closed = 1;
Craig Tillerc079c112015-04-22 15:23:39 -07001120 s->write_state = WRITE_STATE_SENT_CLOSE;
1121 maybe_finish_read(t, s);
ctiller00297df2015-01-12 11:23:09 -08001122 }
1123}
1124
Craig Tiller9c1043e2015-04-16 16:20:38 -07001125static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
1126 if (s->incoming_metadata_capacity == s->incoming_metadata_count) {
Craig Tiller5c019ae2015-04-17 16:46:53 -07001127 s->incoming_metadata_capacity =
1128 GPR_MAX(8, 2 * s->incoming_metadata_capacity);
1129 s->incoming_metadata =
1130 gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) *
1131 s->incoming_metadata_capacity);
Craig Tiller9c1043e2015-04-16 16:20:38 -07001132 }
1133 s->incoming_metadata[s->incoming_metadata_count++].md = elem;
1134}
1135
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001136static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
1137 grpc_status_code local_status,
1138 grpc_chttp2_error_code error_code,
Craig Tiller1a727fd2015-04-24 13:21:22 -07001139 grpc_mdstr *optional_message, int send_rst) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001140 int had_outgoing;
Craig Tiller8b433a22015-01-23 14:47:07 -08001141 char buffer[GPR_LTOA_MIN_BUFSIZE];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001142
1143 if (s) {
1144 /* clear out any unreported input & output: nobody cares anymore */
Craig Tillerc079c112015-04-22 15:23:39 -07001145 had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
Craig Tillercb818ba2015-01-29 17:08:01 -08001146 schedule_nuke_sopb(t, &s->parser.incoming_sopb);
Craig Tillerc079c112015-04-22 15:23:39 -07001147 if (s->outgoing_sopb) {
1148 schedule_nuke_sopb(t, s->outgoing_sopb);
Craig Tiller7abc8d22015-04-23 16:43:55 -07001149 s->outgoing_sopb = NULL;
Craig Tillerc52779f2015-04-24 13:19:48 -07001150 stream_list_remove(t, s, WRITABLE);
Craig Tillerc079c112015-04-22 15:23:39 -07001151 schedule_cb(t, s->send_done_closure, 0);
1152 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001153 if (s->cancelled) {
1154 send_rst = 0;
Craig Tiller06aeea72015-04-23 10:54:45 -07001155 } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE ||
1156 had_outgoing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001157 s->cancelled = 1;
ctiller00297df2015-01-12 11:23:09 -08001158 stream_list_join(t, s, CANCELLED);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001159
Craig Tillera7ed5d92015-01-23 11:30:16 -08001160 gpr_ltoa(local_status, buffer);
Craig Tiller5c019ae2015-04-17 16:46:53 -07001161 add_incoming_metadata(
1162 t, s,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001163 grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001164 if (!optional_message) {
1165 switch (local_status) {
1166 case GRPC_STATUS_CANCELLED:
1167 add_incoming_metadata(
1168 t, s, grpc_mdelem_from_strings(t->metadata_context,
1169 "grpc-message", "Cancelled"));
1170 break;
1171 default:
1172 break;
1173 }
1174 } else {
Craig Tiller1a727fd2015-04-24 13:21:22 -07001175 add_incoming_metadata(
1176 t, s,
1177 grpc_mdelem_from_metadata_strings(
1178 t->metadata_context,
1179 grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
1180 grpc_mdstr_ref(optional_message)));
Craig Tillerbd222712015-04-17 16:09:40 -07001181 }
Craig Tiller7d4a96a2015-04-24 07:54:07 -07001182 add_metadata_batch(t, s);
Craig Tillerc079c112015-04-22 15:23:39 -07001183 maybe_finish_read(t, s);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001184 }
1185 }
1186 if (!id) send_rst = 0;
1187 if (send_rst) {
1188 gpr_slice_buffer_add(&t->qbuf,
1189 grpc_chttp2_rst_stream_create(id, error_code));
1190 }
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001191 if (optional_message) {
1192 grpc_mdstr_unref(optional_message);
1193 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001194}
1195
1196static void cancel_stream_id(transport *t, gpr_uint32 id,
1197 grpc_status_code local_status,
1198 grpc_chttp2_error_code error_code, int send_rst) {
1199 cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001200 NULL, send_rst);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001201}
1202
1203static void cancel_stream(transport *t, stream *s,
1204 grpc_status_code local_status,
Craig Tiller1a727fd2015-04-24 13:21:22 -07001205 grpc_chttp2_error_code error_code,
1206 grpc_mdstr *optional_message, int send_rst) {
1207 cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
1208 send_rst);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001209}
1210
1211static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
1212 cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001213 GRPC_CHTTP2_INTERNAL_ERROR, NULL, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001214}
1215
1216static void end_all_the_calls(transport *t) {
1217 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
1218}
1219
1220static void drop_connection(transport *t) {
1221 if (t->error_state == ERROR_STATE_NONE) {
1222 t->error_state = ERROR_STATE_SEEN;
1223 }
1224 end_all_the_calls(t);
1225}
1226
Craig Tillerc079c112015-04-22 15:23:39 -07001227static void maybe_finish_read(transport *t, stream *s) {
1228 if (s->incoming_sopb) {
1229 stream_list_join(t, s, FINISHED_READ_OP);
1230 }
1231}
1232
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001233static void maybe_join_window_updates(transport *t, stream *s) {
Craig Tillerc079c112015-04-22 15:23:39 -07001234 if (s->incoming_sopb != NULL &&
ctiller493fbcc2014-12-07 15:09:10 -08001235 s->incoming_window <
1236 t->settings[LOCAL_SETTINGS]
1237 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
1238 3 / 4) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001239 stream_list_join(t, s, WINDOW_UPDATE);
1240 }
1241}
1242
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001243static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
1244 if (t->incoming_frame_size > t->incoming_window) {
1245 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1246 t->incoming_frame_size, t->incoming_window);
1247 return GRPC_CHTTP2_CONNECTION_ERROR;
1248 }
1249
1250 if (t->incoming_frame_size > s->incoming_window) {
1251 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1252 t->incoming_frame_size, s->incoming_window);
1253 return GRPC_CHTTP2_CONNECTION_ERROR;
1254 }
1255
1256 t->incoming_window -= t->incoming_frame_size;
1257 s->incoming_window -= t->incoming_frame_size;
1258
1259 /* if the stream incoming window is getting low, schedule an update */
1260 maybe_join_window_updates(t, s);
1261
1262 return GRPC_CHTTP2_PARSE_OK;
1263}
1264
1265static stream *lookup_stream(transport *t, gpr_uint32 id) {
1266 return grpc_chttp2_stream_map_find(&t->stream_map, id);
1267}
1268
1269static grpc_chttp2_parse_error skip_parser(void *parser,
1270 grpc_chttp2_parse_state *st,
1271 gpr_slice slice, int is_last) {
1272 return GRPC_CHTTP2_PARSE_OK;
1273}
1274
1275static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
1276
1277static int init_skip_frame(transport *t, int is_header) {
1278 if (is_header) {
1279 int is_eoh = t->expect_continuation_stream_id != 0;
1280 t->parser = grpc_chttp2_header_parser_parse;
1281 t->parser_data = &t->hpack_parser;
1282 t->hpack_parser.on_header = skip_header;
1283 t->hpack_parser.on_header_user_data = NULL;
1284 t->hpack_parser.is_boundary = is_eoh;
1285 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1286 } else {
1287 t->parser = skip_parser;
1288 }
1289 return 1;
1290}
1291
1292static void become_skip_parser(transport *t) {
1293 init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse);
1294}
1295
1296static int init_data_frame_parser(transport *t) {
1297 stream *s = lookup_stream(t, t->incoming_stream_id);
1298 grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
1299 if (!s || s->read_closed) return init_skip_frame(t, 0);
1300 if (err == GRPC_CHTTP2_PARSE_OK) {
1301 err = update_incoming_window(t, s);
1302 }
1303 if (err == GRPC_CHTTP2_PARSE_OK) {
1304 err = grpc_chttp2_data_parser_begin_frame(&s->parser,
1305 t->incoming_frame_flags);
1306 }
1307 switch (err) {
1308 case GRPC_CHTTP2_PARSE_OK:
1309 t->incoming_stream = s;
1310 t->parser = grpc_chttp2_data_parser_parse;
1311 t->parser_data = &s->parser;
1312 return 1;
1313 case GRPC_CHTTP2_STREAM_ERROR:
1314 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1315 GRPC_CHTTP2_INTERNAL_ERROR),
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001316 GRPC_CHTTP2_INTERNAL_ERROR, NULL, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001317 return init_skip_frame(t, 0);
1318 case GRPC_CHTTP2_CONNECTION_ERROR:
1319 drop_connection(t);
1320 return 0;
1321 }
1322 gpr_log(GPR_ERROR, "should never reach here");
1323 abort();
1324 return 0;
1325}
1326
1327static void free_timeout(void *p) { gpr_free(p); }
1328
1329static void on_header(void *tp, grpc_mdelem *md) {
1330 transport *t = tp;
1331 stream *s = t->incoming_stream;
1332
1333 GPR_ASSERT(s);
Craig Tillerd50e5652015-02-24 16:46:22 -08001334
Craig Tiller1a727fd2015-04-24 13:21:22 -07001335 IF_TRACING(gpr_log(
1336 GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
1337 grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
Craig Tillerd50e5652015-02-24 16:46:22 -08001338
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001339 if (md->key == t->str_grpc_timeout) {
1340 gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
1341 if (!cached_timeout) {
1342 /* not already parsed: parse it now, and store the result away */
1343 cached_timeout = gpr_malloc(sizeof(gpr_timespec));
1344 if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value),
1345 cached_timeout)) {
1346 gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
1347 grpc_mdstr_as_c_string(md->value));
1348 *cached_timeout = gpr_inf_future;
1349 }
1350 grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
1351 }
Craig Tiller9c1043e2015-04-16 16:20:38 -07001352 s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001353 grpc_mdelem_unref(md);
1354 } else {
Craig Tiller9c1043e2015-04-16 16:20:38 -07001355 add_incoming_metadata(t, s, md);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001356 }
Craig Tillerc079c112015-04-22 15:23:39 -07001357 maybe_finish_read(t, s);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001358}
1359
1360static int init_header_frame_parser(transport *t, int is_continuation) {
1361 int is_eoh =
1362 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
1363 stream *s;
1364
1365 if (is_eoh) {
1366 t->expect_continuation_stream_id = 0;
1367 } else {
1368 t->expect_continuation_stream_id = t->incoming_stream_id;
1369 }
1370
1371 if (!is_continuation) {
1372 t->header_eof =
1373 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
1374 }
1375
1376 /* could be a new stream or an existing stream */
1377 s = lookup_stream(t, t->incoming_stream_id);
1378 if (!s) {
1379 if (is_continuation) {
1380 gpr_log(GPR_ERROR, "stream disbanded before CONTINUATION received");
1381 return init_skip_frame(t, 1);
1382 }
1383 if (t->is_client) {
1384 if ((t->incoming_stream_id & 1) &&
1385 t->incoming_stream_id < t->next_stream_id) {
1386 /* this is an old (probably cancelled) stream */
1387 } else {
1388 gpr_log(GPR_ERROR, "ignoring new stream creation on client");
1389 }
1390 return init_skip_frame(t, 1);
nnoble0c475f02014-12-05 15:37:39 -08001391 } else if (t->last_incoming_stream_id > t->incoming_stream_id) {
1392 gpr_log(GPR_ERROR,
1393 "ignoring out of order new stream request on server; last stream "
1394 "id=%d, new stream id=%d",
1395 t->last_incoming_stream_id, t->incoming_stream);
1396 return init_skip_frame(t, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001397 }
1398 t->incoming_stream = NULL;
1399 /* if stream is accepted, we set incoming_stream in init_stream */
1400 t->cb->accept_stream(t->cb_user_data, &t->base,
Craig Tiller5c019ae2015-04-17 16:46:53 -07001401 (void *)(gpr_uintptr)t->incoming_stream_id);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001402 s = t->incoming_stream;
1403 if (!s) {
1404 gpr_log(GPR_ERROR, "stream not accepted");
1405 return init_skip_frame(t, 1);
1406 }
1407 } else {
1408 t->incoming_stream = s;
1409 }
1410 if (t->incoming_stream->read_closed) {
1411 gpr_log(GPR_ERROR, "skipping already closed stream header");
1412 t->incoming_stream = NULL;
1413 return init_skip_frame(t, 1);
1414 }
1415 t->parser = grpc_chttp2_header_parser_parse;
1416 t->parser_data = &t->hpack_parser;
1417 t->hpack_parser.on_header = on_header;
1418 t->hpack_parser.on_header_user_data = t;
1419 t->hpack_parser.is_boundary = is_eoh;
1420 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1421 if (!is_continuation &&
1422 (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
1423 grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser);
1424 }
1425 return 1;
1426}
1427
1428static int init_window_update_frame_parser(transport *t) {
1429 int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
1430 &t->simple_parsers.window_update,
1431 t->incoming_frame_size,
1432 t->incoming_frame_flags);
1433 if (!ok) {
1434 drop_connection(t);
1435 }
1436 t->parser = grpc_chttp2_window_update_parser_parse;
1437 t->parser_data = &t->simple_parsers.window_update;
1438 return ok;
1439}
1440
1441static int init_ping_parser(transport *t) {
1442 int ok = GRPC_CHTTP2_PARSE_OK ==
1443 grpc_chttp2_ping_parser_begin_frame(&t->simple_parsers.ping,
1444 t->incoming_frame_size,
1445 t->incoming_frame_flags);
1446 if (!ok) {
1447 drop_connection(t);
1448 }
1449 t->parser = grpc_chttp2_ping_parser_parse;
1450 t->parser_data = &t->simple_parsers.ping;
1451 return ok;
1452}
1453
nnoble0c475f02014-12-05 15:37:39 -08001454static int init_goaway_parser(transport *t) {
1455 int ok =
1456 GRPC_CHTTP2_PARSE_OK ==
1457 grpc_chttp2_goaway_parser_begin_frame(
1458 &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
1459 if (!ok) {
1460 drop_connection(t);
1461 }
1462 t->parser = grpc_chttp2_goaway_parser_parse;
1463 t->parser_data = &t->goaway_parser;
1464 return ok;
1465}
1466
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001467static int init_settings_frame_parser(transport *t) {
1468 int ok = GRPC_CHTTP2_PARSE_OK ==
1469 grpc_chttp2_settings_parser_begin_frame(
1470 &t->simple_parsers.settings, t->incoming_frame_size,
1471 t->incoming_frame_flags, t->settings[PEER_SETTINGS]);
1472 if (!ok) {
1473 drop_connection(t);
1474 }
1475 if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
1476 memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS],
1477 GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
1478 }
1479 t->parser = grpc_chttp2_settings_parser_parse;
1480 t->parser_data = &t->simple_parsers.settings;
1481 return ok;
1482}
1483
1484static int init_frame_parser(transport *t) {
1485 if (t->expect_continuation_stream_id != 0) {
1486 if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
1487 gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
1488 t->incoming_frame_type);
1489 return 0;
1490 }
1491 if (t->expect_continuation_stream_id != t->incoming_stream_id) {
1492 gpr_log(GPR_ERROR,
1493 "Expected CONTINUATION frame for stream %08x, got stream %08x",
1494 t->expect_continuation_stream_id, t->incoming_stream_id);
1495 return 0;
1496 }
1497 return init_header_frame_parser(t, 1);
1498 }
1499 switch (t->incoming_frame_type) {
1500 case GRPC_CHTTP2_FRAME_DATA:
1501 return init_data_frame_parser(t);
1502 case GRPC_CHTTP2_FRAME_HEADER:
1503 return init_header_frame_parser(t, 0);
1504 case GRPC_CHTTP2_FRAME_CONTINUATION:
1505 gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
1506 return 0;
1507 case GRPC_CHTTP2_FRAME_RST_STREAM:
1508 /* TODO(ctiller): actually parse the reason */
1509 cancel_stream_id(
1510 t, t->incoming_stream_id,
1511 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
1512 GRPC_CHTTP2_CANCEL, 0);
1513 return init_skip_frame(t, 0);
1514 case GRPC_CHTTP2_FRAME_SETTINGS:
1515 return init_settings_frame_parser(t);
1516 case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
1517 return init_window_update_frame_parser(t);
1518 case GRPC_CHTTP2_FRAME_PING:
1519 return init_ping_parser(t);
nnoble0c475f02014-12-05 15:37:39 -08001520 case GRPC_CHTTP2_FRAME_GOAWAY:
1521 return init_goaway_parser(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001522 default:
1523 gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
1524 return init_skip_frame(t, 0);
1525 }
1526}
1527
Craig Tiller84b88842015-04-20 08:47:52 -07001528static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
1529 return window + window_update < MAX_WINDOW;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001530}
1531
Craig Tillerbd222712015-04-17 16:09:40 -07001532static void add_metadata_batch(transport *t, stream *s) {
Craig Tiller9c1043e2015-04-16 16:20:38 -07001533 grpc_metadata_batch b;
Craig Tiller9c1043e2015-04-16 16:20:38 -07001534
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001535 b.list.head = NULL;
1536 /* Store away the last element of the list, so that in patch_metadata_ops
1537 we can reconstitute the list.
1538 We can't do list building here as later incoming metadata may reallocate
1539 the underlying array. */
1540 b.list.tail = (void*)(gpr_intptr)s->incoming_metadata_count;
Craig Tiller9c1043e2015-04-16 16:20:38 -07001541 b.garbage.head = b.garbage.tail = NULL;
1542 b.deadline = s->incoming_deadline;
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001543 s->incoming_deadline = gpr_inf_future;
Craig Tiller9c1043e2015-04-16 16:20:38 -07001544
1545 grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
Craig Tiller9c1043e2015-04-16 16:20:38 -07001546}
1547
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001548static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
1549 grpc_chttp2_parse_state st;
1550 size_t i;
1551 memset(&st, 0, sizeof(st));
1552 switch (t->parser(t->parser_data, &st, slice, is_last)) {
1553 case GRPC_CHTTP2_PARSE_OK:
1554 if (st.end_of_stream) {
1555 t->incoming_stream->read_closed = 1;
Craig Tillerc079c112015-04-22 15:23:39 -07001556 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001557 }
1558 if (st.need_flush_reads) {
Craig Tillerc079c112015-04-22 15:23:39 -07001559 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001560 }
1561 if (st.metadata_boundary) {
Craig Tillerbd222712015-04-17 16:09:40 -07001562 add_metadata_batch(t, t->incoming_stream);
Craig Tillerc079c112015-04-22 15:23:39 -07001563 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001564 }
1565 if (st.ack_settings) {
1566 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
1567 maybe_start_some_streams(t);
1568 }
1569 if (st.send_ping_ack) {
1570 gpr_slice_buffer_add(
1571 &t->qbuf,
1572 grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes));
1573 }
nnoble0c475f02014-12-05 15:37:39 -08001574 if (st.goaway) {
1575 if (t->num_pending_goaways == t->cap_pending_goaways) {
1576 t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
1577 t->pending_goaways =
1578 gpr_realloc(t->pending_goaways,
1579 sizeof(pending_goaway) * t->cap_pending_goaways);
1580 }
1581 t->pending_goaways[t->num_pending_goaways].status =
1582 grpc_chttp2_http2_error_to_grpc_status(st.goaway_error);
1583 t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text;
1584 t->num_pending_goaways++;
1585 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001586 if (st.process_ping_reply) {
1587 for (i = 0; i < t->ping_count; i++) {
1588 if (0 ==
1589 memcmp(t->pings[i].id, t->simple_parsers.ping.opaque_8bytes, 8)) {
1590 t->pings[i].cb(t->pings[i].user_data);
1591 memmove(&t->pings[i], &t->pings[i + 1],
1592 (t->ping_count - i - 1) * sizeof(outstanding_ping));
1593 t->ping_count--;
1594 break;
1595 }
1596 }
1597 }
Yang Gaof1021032015-04-18 00:10:29 -07001598 if (st.initial_window_update) {
1599 for (i = 0; i < t->stream_map.count; i++) {
Craig Tiller06aeea72015-04-23 10:54:45 -07001600 stream *s = (stream *)(t->stream_map.values[i]);
Craig Tiller84b88842015-04-20 08:47:52 -07001601 int was_window_empty = s->outgoing_window <= 0;
1602 s->outgoing_window += st.initial_window_update;
Craig Tiller06aeea72015-04-23 10:54:45 -07001603 if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
1604 s->outgoing_sopb->nops > 0) {
Craig Tiller84b88842015-04-20 08:47:52 -07001605 stream_list_join(t, s, WRITABLE);
Yang Gaof1021032015-04-18 00:10:29 -07001606 }
1607 }
1608 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001609 if (st.window_update) {
1610 if (t->incoming_stream_id) {
1611 /* if there was a stream id, this is for some stream */
1612 stream *s = lookup_stream(t, t->incoming_stream_id);
1613 if (s) {
Craig Tiller84b88842015-04-20 08:47:52 -07001614 int was_window_empty = s->outgoing_window <= 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001615 if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
1616 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1617 GRPC_CHTTP2_FLOW_CONTROL_ERROR),
Craig Tiller2ea37fd2015-04-24 13:03:49 -07001618 GRPC_CHTTP2_FLOW_CONTROL_ERROR, NULL, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001619 } else {
1620 s->outgoing_window += st.window_update;
1621 /* if this window update makes outgoing ops writable again,
1622 flag that */
Craig Tiller06aeea72015-04-23 10:54:45 -07001623 if (was_window_empty && s->outgoing_sopb &&
1624 s->outgoing_sopb->nops > 0) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001625 stream_list_join(t, s, WRITABLE);
1626 }
1627 }
1628 }
1629 } else {
1630 /* transport level window update */
1631 if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
1632 drop_connection(t);
1633 } else {
1634 t->outgoing_window += st.window_update;
1635 }
1636 }
1637 }
1638 return 1;
1639 case GRPC_CHTTP2_STREAM_ERROR:
1640 become_skip_parser(t);
1641 cancel_stream_id(
1642 t, t->incoming_stream_id,
1643 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
1644 GRPC_CHTTP2_INTERNAL_ERROR, 1);
1645 return 1;
1646 case GRPC_CHTTP2_CONNECTION_ERROR:
1647 drop_connection(t);
1648 return 0;
1649 }
1650 gpr_log(GPR_ERROR, "should never reach here");
1651 abort();
1652 return 0;
1653}
1654
1655static int process_read(transport *t, gpr_slice slice) {
1656 gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
1657 gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
1658 gpr_uint8 *cur = beg;
1659
1660 if (cur == end) return 1;
1661
1662 switch (t->deframe_state) {
1663 case DTS_CLIENT_PREFIX_0:
1664 case DTS_CLIENT_PREFIX_1:
1665 case DTS_CLIENT_PREFIX_2:
1666 case DTS_CLIENT_PREFIX_3:
1667 case DTS_CLIENT_PREFIX_4:
1668 case DTS_CLIENT_PREFIX_5:
1669 case DTS_CLIENT_PREFIX_6:
1670 case DTS_CLIENT_PREFIX_7:
1671 case DTS_CLIENT_PREFIX_8:
1672 case DTS_CLIENT_PREFIX_9:
1673 case DTS_CLIENT_PREFIX_10:
1674 case DTS_CLIENT_PREFIX_11:
1675 case DTS_CLIENT_PREFIX_12:
1676 case DTS_CLIENT_PREFIX_13:
1677 case DTS_CLIENT_PREFIX_14:
1678 case DTS_CLIENT_PREFIX_15:
1679 case DTS_CLIENT_PREFIX_16:
1680 case DTS_CLIENT_PREFIX_17:
1681 case DTS_CLIENT_PREFIX_18:
1682 case DTS_CLIENT_PREFIX_19:
1683 case DTS_CLIENT_PREFIX_20:
1684 case DTS_CLIENT_PREFIX_21:
1685 case DTS_CLIENT_PREFIX_22:
1686 case DTS_CLIENT_PREFIX_23:
1687 while (cur != end && t->deframe_state != DTS_FH_0) {
1688 if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) {
1689 gpr_log(GPR_ERROR,
1690 "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
1691 "at byte %d",
1692 CLIENT_CONNECT_STRING[t->deframe_state],
Craig Tiller5c019ae2015-04-17 16:46:53 -07001693 (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur,
1694 (int)*cur, t->deframe_state);
Craig Tiller5246e7a2015-01-19 14:59:08 -08001695 drop_connection(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001696 return 0;
1697 }
1698 ++cur;
1699 ++t->deframe_state;
1700 }
1701 if (cur == end) {
1702 return 1;
1703 }
1704 /* fallthrough */
1705 dts_fh_0:
1706 case DTS_FH_0:
1707 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001708 t->incoming_frame_size = ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001709 if (++cur == end) {
1710 t->deframe_state = DTS_FH_1;
1711 return 1;
1712 }
1713 /* fallthrough */
1714 case DTS_FH_1:
1715 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001716 t->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001717 if (++cur == end) {
1718 t->deframe_state = DTS_FH_2;
1719 return 1;
1720 }
1721 /* fallthrough */
1722 case DTS_FH_2:
1723 GPR_ASSERT(cur < end);
1724 t->incoming_frame_size |= *cur;
1725 if (++cur == end) {
1726 t->deframe_state = DTS_FH_3;
1727 return 1;
1728 }
1729 /* fallthrough */
1730 case DTS_FH_3:
1731 GPR_ASSERT(cur < end);
1732 t->incoming_frame_type = *cur;
1733 if (++cur == end) {
1734 t->deframe_state = DTS_FH_4;
1735 return 1;
1736 }
1737 /* fallthrough */
1738 case DTS_FH_4:
1739 GPR_ASSERT(cur < end);
1740 t->incoming_frame_flags = *cur;
1741 if (++cur == end) {
1742 t->deframe_state = DTS_FH_5;
1743 return 1;
1744 }
1745 /* fallthrough */
1746 case DTS_FH_5:
1747 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001748 t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001749 if (++cur == end) {
1750 t->deframe_state = DTS_FH_6;
1751 return 1;
1752 }
1753 /* fallthrough */
1754 case DTS_FH_6:
1755 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001756 t->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001757 if (++cur == end) {
1758 t->deframe_state = DTS_FH_7;
1759 return 1;
1760 }
1761 /* fallthrough */
1762 case DTS_FH_7:
1763 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001764 t->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001765 if (++cur == end) {
1766 t->deframe_state = DTS_FH_8;
1767 return 1;
1768 }
1769 /* fallthrough */
1770 case DTS_FH_8:
1771 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001772 t->incoming_stream_id |= ((gpr_uint32)*cur);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001773 t->deframe_state = DTS_FRAME;
1774 if (!init_frame_parser(t)) {
1775 return 0;
1776 }
Tatsuhiro Tsujikawa1cbf8d72015-03-13 23:59:40 +09001777 /* t->last_incoming_stream_id is used as last-stream-id when
1778 sending GOAWAY frame.
1779 https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
1780 says that last-stream-id is peer-initiated stream ID. So,
1781 since we don't have server pushed streams, client should send
1782 GOAWAY last-stream-id=0 in this case. */
Tatsuhiro Tsujikawad11f6102015-03-12 22:57:22 +09001783 if (!t->is_client) {
1784 t->last_incoming_stream_id = t->incoming_stream_id;
1785 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001786 if (t->incoming_frame_size == 0) {
1787 if (!parse_frame_slice(t, gpr_empty_slice(), 1)) {
1788 return 0;
1789 }
1790 if (++cur == end) {
1791 t->deframe_state = DTS_FH_0;
1792 return 1;
1793 }
1794 goto dts_fh_0; /* loop */
1795 }
1796 if (++cur == end) {
1797 return 1;
1798 }
1799 /* fallthrough */
1800 case DTS_FRAME:
1801 GPR_ASSERT(cur < end);
Craig Tiller54f9a652015-02-19 21:41:20 -08001802 if ((gpr_uint32)(end - cur) == t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001803 if (!parse_frame_slice(
1804 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
1805 return 0;
1806 }
1807 t->deframe_state = DTS_FH_0;
1808 return 1;
Craig Tiller0c0b60c2015-01-21 15:49:28 -08001809 } else if ((gpr_uint32)(end - cur) > t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001810 if (!parse_frame_slice(
1811 t, gpr_slice_sub_no_ref(slice, cur - beg,
1812 cur + t->incoming_frame_size - beg),
1813 1)) {
1814 return 0;
1815 }
1816 cur += t->incoming_frame_size;
1817 goto dts_fh_0; /* loop */
1818 } else {
1819 if (!parse_frame_slice(
1820 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
1821 return 0;
1822 }
1823 t->incoming_frame_size -= (end - cur);
1824 return 1;
1825 }
1826 gpr_log(GPR_ERROR, "should never reach here");
1827 abort();
1828 }
1829
1830 gpr_log(GPR_ERROR, "should never reach here");
1831 abort();
Nicolas "Pixel" Noble7f13eb22015-04-01 20:57:33 -07001832
1833 return 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001834}
1835
1836/* tcp read callback */
1837static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
1838 grpc_endpoint_cb_status error) {
1839 transport *t = tp;
1840 size_t i;
1841 int keep_reading = 0;
1842
1843 switch (error) {
1844 case GRPC_ENDPOINT_CB_SHUTDOWN:
1845 case GRPC_ENDPOINT_CB_EOF:
1846 case GRPC_ENDPOINT_CB_ERROR:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001847 lock(t);
1848 drop_connection(t);
1849 t->reading = 0;
1850 if (!t->writing && t->ep) {
1851 grpc_endpoint_destroy(t->ep);
1852 t->ep = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001853 unref_transport(t); /* safe as we still have a ref for read */
1854 }
1855 unlock(t);
1856 unref_transport(t);
1857 break;
1858 case GRPC_ENDPOINT_CB_OK:
1859 lock(t);
1860 for (i = 0; i < nslices && process_read(t, slices[i]); i++)
1861 ;
1862 unlock(t);
1863 keep_reading = 1;
1864 break;
1865 }
1866
1867 for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
1868
1869 if (keep_reading) {
ctiller58393c22015-01-07 14:03:30 -08001870 grpc_endpoint_notify_on_read(t->ep, recv_data, t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001871 }
1872}
1873
1874/*
1875 * CALLBACK LOOP
1876 */
1877
1878static grpc_stream_state compute_state(gpr_uint8 write_closed,
1879 gpr_uint8 read_closed) {
1880 if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
1881 if (write_closed) return GRPC_STREAM_SEND_CLOSED;
1882 if (read_closed) return GRPC_STREAM_RECV_CLOSED;
1883 return GRPC_STREAM_OPEN;
1884}
1885
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001886static void patch_metadata_ops(stream *s) {
1887 grpc_stream_op *ops = s->incoming_sopb->ops;
1888 size_t nops = s->incoming_sopb->nops;
1889 size_t i;
1890 size_t j;
1891 size_t mdidx = 0;
1892 size_t last_mdidx;
1893
1894 for (i = 0; i < nops; i++) {
1895 grpc_stream_op *op = &ops[i];
1896 if (op->type != GRPC_OP_METADATA) continue;
1897 last_mdidx = (size_t)(gpr_intptr)(op->data.metadata.list.tail);
1898 GPR_ASSERT(last_mdidx > mdidx);
1899 GPR_ASSERT(last_mdidx <= s->incoming_metadata_count);
1900 op->data.metadata.list.head = &s->incoming_metadata[mdidx];
1901 op->data.metadata.list.tail = &s->incoming_metadata[last_mdidx - 1];
1902 for (j = mdidx + 1; j < last_mdidx; j++) {
1903 s->incoming_metadata[j].prev = &s->incoming_metadata[j-1];
1904 s->incoming_metadata[j-1].next = &s->incoming_metadata[j];
1905 }
1906 s->incoming_metadata[mdidx].prev = NULL;
1907 s->incoming_metadata[last_mdidx-1].next = NULL;
1908 mdidx = last_mdidx;
1909 }
1910 GPR_ASSERT(mdidx == s->incoming_metadata_count);
Craig Tiller2b0f7c52015-04-24 17:23:17 -07001911 s->old_incoming_metadata = s->incoming_metadata;
1912 s->incoming_metadata = NULL;
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001913 s->incoming_metadata_count = 0;
Craig Tiller2b0f7c52015-04-24 17:23:17 -07001914 s->incoming_metadata_capacity = 0;
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001915}
1916
Craig Tillerc079c112015-04-22 15:23:39 -07001917static void finish_reads(transport *t) {
1918 stream *s;
1919
1920 while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
1921 int publish = 0;
1922 GPR_ASSERT(s->incoming_sopb);
Craig Tiller06aeea72015-04-23 10:54:45 -07001923 *s->publish_state =
1924 compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
Craig Tillerc079c112015-04-22 15:23:39 -07001925 if (*s->publish_state != s->published_state) {
1926 s->published_state = *s->publish_state;
1927 publish = 1;
Craig Tillerc1f75602015-04-24 11:44:53 -07001928 if (s->published_state == GRPC_STREAM_CLOSED) {
1929 remove_from_stream_map(t, s);
1930 }
Craig Tillerc079c112015-04-22 15:23:39 -07001931 }
1932 if (s->parser.incoming_sopb.nops > 0) {
1933 grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
1934 publish = 1;
1935 }
1936 if (publish) {
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001937 if (s->incoming_metadata_count > 0) {
1938 patch_metadata_ops(s);
1939 }
Craig Tiller7e8489a2015-04-23 12:41:16 -07001940 s->incoming_sopb = NULL;
Craig Tillerc079c112015-04-22 15:23:39 -07001941 schedule_cb(t, s->recv_done_closure, 1);
1942 }
1943 }
Craig Tiller48bfcdc2015-04-24 14:24:27 -07001944
Craig Tillerc079c112015-04-22 15:23:39 -07001945}
1946
1947static void schedule_cb(transport *t, op_closure closure, int success) {
1948 if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
Craig Tiller06aeea72015-04-23 10:54:45 -07001949 t->pending_callbacks.capacity =
1950 GPR_MAX(t->pending_callbacks.capacity * 2, 8);
1951 t->pending_callbacks.callbacks =
1952 gpr_realloc(t->pending_callbacks.callbacks,
1953 t->pending_callbacks.capacity *
1954 sizeof(*t->pending_callbacks.callbacks));
Craig Tillerc079c112015-04-22 15:23:39 -07001955 }
1956 closure.success = success;
1957 t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
1958}
1959
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001960static int prepare_callbacks(transport *t) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001961 op_closure_array temp = t->pending_callbacks;
1962 t->pending_callbacks = t->executing_callbacks;
1963 t->executing_callbacks = temp;
1964 return t->executing_callbacks.count > 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001965}
1966
Craig Tillerd1345de2015-02-24 21:55:20 -08001967static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001968 size_t i;
1969 for (i = 0; i < t->executing_callbacks.count; i++) {
1970 op_closure c = t->executing_callbacks.callbacks[i];
Craig Tillerc079c112015-04-22 15:23:39 -07001971 c.cb(c.user_data, c.success);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001972 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001973 t->executing_callbacks.count = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001974}
1975
Craig Tiller748fe3f2015-03-02 07:48:50 -08001976static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
1977 cb->closed(t->cb_user_data, &t->base);
1978}
1979
Craig Tillerc079c112015-04-22 15:23:39 -07001980/*
1981 * POLLSET STUFF
1982 */
1983
1984static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
ctillerd79b4862014-12-17 16:36:59 -08001985 if (t->ep) {
1986 grpc_endpoint_add_to_pollset(t->ep, pollset);
1987 }
Craig Tillerc079c112015-04-22 15:23:39 -07001988}
1989
1990static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
1991 transport *t = (transport *)gt;
1992 lock(t);
1993 add_to_pollset_locked(t, pollset);
ctillerd79b4862014-12-17 16:36:59 -08001994 unlock(t);
1995}
1996
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001997/*
1998 * INTEGRATION GLUE
1999 */
2000
2001static const grpc_transport_vtable vtable = {
Craig Tiller06aeea72015-04-23 10:54:45 -07002002 sizeof(stream), init_stream, perform_op,
2003 add_to_pollset, destroy_stream, goaway,
2004 close_transport, send_ping, destroy_transport};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08002005
2006void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
2007 void *arg,
2008 const grpc_channel_args *channel_args,
2009 grpc_endpoint *ep, gpr_slice *slices,
2010 size_t nslices, grpc_mdctx *mdctx,
2011 int is_client) {
2012 transport *t = gpr_malloc(sizeof(transport));
Nicolas Noble5ea99bb2015-02-04 14:13:09 -08002013 init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx,
2014 is_client);
Craig Tiller190d3602015-02-18 09:23:38 -08002015}