blob: 2261b087a2052f54227745ec48624a6e84c1b543 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/transport/chttp2_transport.h"
35
36#include <math.h>
37#include <stdio.h>
38#include <string.h>
39
Craig Tiller485d7762015-01-23 12:54:05 -080040#include "src/core/support/string.h"
nnoble0c475f02014-12-05 15:37:39 -080041#include "src/core/transport/chttp2/frame_data.h"
42#include "src/core/transport/chttp2/frame_goaway.h"
43#include "src/core/transport/chttp2/frame_ping.h"
44#include "src/core/transport/chttp2/frame_rst_stream.h"
45#include "src/core/transport/chttp2/frame_settings.h"
46#include "src/core/transport/chttp2/frame_window_update.h"
47#include "src/core/transport/chttp2/hpack_parser.h"
48#include "src/core/transport/chttp2/http2_errors.h"
49#include "src/core/transport/chttp2/status_conversion.h"
50#include "src/core/transport/chttp2/stream_encoder.h"
51#include "src/core/transport/chttp2/stream_map.h"
52#include "src/core/transport/chttp2/timeout_encoding.h"
53#include "src/core/transport/transport_impl.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080054#include <grpc/support/alloc.h>
55#include <grpc/support/log.h>
56#include <grpc/support/slice_buffer.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080057#include <grpc/support/useful.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080058
ctiller493fbcc2014-12-07 15:09:10 -080059#define DEFAULT_WINDOW 65535
60#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080061#define MAX_WINDOW 0x7fffffffu
62
63#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
64#define CLIENT_CONNECT_STRLEN 24
65
Craig Tillerfaa84802015-03-01 21:56:38 -080066int grpc_http_trace = 0;
67
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068typedef struct transport transport;
69typedef struct stream stream;
70
Craig Tiller5c019ae2015-04-17 16:46:53 -070071#define IF_TRACING(stmt) \
72 if (!(grpc_http_trace)) \
73 ; \
74 else \
Craig Tillerd50e5652015-02-24 16:46:22 -080075 stmt
76
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080077/* streams are kept in various linked lists depending on what things need to
78 happen to them... this enum labels each list */
79typedef enum {
80 /* streams that have pending writes */
81 WRITABLE = 0,
ctiller00297df2015-01-12 11:23:09 -080082 /* streams that have been selected to be written */
83 WRITING,
84 /* streams that have just been written, and included a close */
85 WRITTEN_CLOSED,
86 /* streams that have been cancelled and have some pending state updates
87 to perform */
88 CANCELLED,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080089 /* streams that want to send window updates */
90 WINDOW_UPDATE,
91 /* streams that are waiting to start because there are too many concurrent
92 streams on the connection */
93 WAITING_FOR_CONCURRENCY,
Craig Tillerc079c112015-04-22 15:23:39 -070094 /* streams that have finished reading: we wait until unlock to coalesce
95 all changes into one callback */
96 FINISHED_READ_OP,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080097 STREAM_LIST_COUNT /* must be last */
98} stream_list_id;
99
100/* deframer state for the overall http2 stream of bytes */
101typedef enum {
102 /* prefix: one entry per http2 connection prefix byte */
103 DTS_CLIENT_PREFIX_0 = 0,
104 DTS_CLIENT_PREFIX_1,
105 DTS_CLIENT_PREFIX_2,
106 DTS_CLIENT_PREFIX_3,
107 DTS_CLIENT_PREFIX_4,
108 DTS_CLIENT_PREFIX_5,
109 DTS_CLIENT_PREFIX_6,
110 DTS_CLIENT_PREFIX_7,
111 DTS_CLIENT_PREFIX_8,
112 DTS_CLIENT_PREFIX_9,
113 DTS_CLIENT_PREFIX_10,
114 DTS_CLIENT_PREFIX_11,
115 DTS_CLIENT_PREFIX_12,
116 DTS_CLIENT_PREFIX_13,
117 DTS_CLIENT_PREFIX_14,
118 DTS_CLIENT_PREFIX_15,
119 DTS_CLIENT_PREFIX_16,
120 DTS_CLIENT_PREFIX_17,
121 DTS_CLIENT_PREFIX_18,
122 DTS_CLIENT_PREFIX_19,
123 DTS_CLIENT_PREFIX_20,
124 DTS_CLIENT_PREFIX_21,
125 DTS_CLIENT_PREFIX_22,
126 DTS_CLIENT_PREFIX_23,
127 /* frame header byte 0... */
128 /* must follow from the prefix states */
129 DTS_FH_0,
130 DTS_FH_1,
131 DTS_FH_2,
132 DTS_FH_3,
133 DTS_FH_4,
134 DTS_FH_5,
135 DTS_FH_6,
136 DTS_FH_7,
137 /* ... frame header byte 8 */
138 DTS_FH_8,
139 /* inside a http2 frame */
140 DTS_FRAME
141} deframe_transport_state;
142
Craig Tillerc079c112015-04-22 15:23:39 -0700143typedef enum {
144 WRITE_STATE_OPEN,
145 WRITE_STATE_QUEUED_CLOSE,
146 WRITE_STATE_SENT_CLOSE
147} WRITE_STATE;
148
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800149typedef struct {
150 stream *head;
151 stream *tail;
152} stream_list;
153
154typedef struct {
155 stream *next;
156 stream *prev;
157} stream_link;
158
159typedef enum {
160 ERROR_STATE_NONE,
161 ERROR_STATE_SEEN,
162 ERROR_STATE_NOTIFIED
163} error_state;
164
165/* We keep several sets of connection wide parameters */
166typedef enum {
167 /* The settings our peer has asked for (and we have acked) */
168 PEER_SETTINGS = 0,
169 /* The settings we'd like to have */
170 LOCAL_SETTINGS,
171 /* The settings we've published to our peer */
172 SENT_SETTINGS,
173 /* The settings the peer has acked */
174 ACKED_SETTINGS,
175 NUM_SETTING_SETS
176} setting_set;
177
178/* Outstanding ping request data */
179typedef struct {
180 gpr_uint8 id[8];
181 void (*cb)(void *user_data);
182 void *user_data;
183} outstanding_ping;
184
nnoble0c475f02014-12-05 15:37:39 -0800185typedef struct {
186 grpc_status_code status;
187 gpr_slice debug;
188} pending_goaway;
189
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700190typedef struct {
191 void (*cb)(void *user_data, int success);
192 void *user_data;
Craig Tillerc079c112015-04-22 15:23:39 -0700193 int success;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700194} op_closure;
195
196typedef struct {
197 op_closure *callbacks;
198 size_t count;
199 size_t capacity;
200} op_closure_array;
201
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202struct transport {
203 grpc_transport base; /* must be first */
204 const grpc_transport_callbacks *cb;
205 void *cb_user_data;
206 grpc_endpoint *ep;
207 grpc_mdctx *metadata_context;
208 gpr_refcount refs;
209 gpr_uint8 is_client;
210
211 gpr_mu mu;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800212 gpr_cv cv;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213
214 /* basic state management - what are we doing at the moment? */
215 gpr_uint8 reading;
216 gpr_uint8 writing;
217 gpr_uint8 calling_back;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800218 gpr_uint8 destroying;
Craig Tillerd75fe662015-02-21 07:30:49 -0800219 gpr_uint8 closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800220 error_state error_state;
221
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700222 /* queued callbacks */
223 op_closure_array pending_callbacks;
224 op_closure_array executing_callbacks;
225
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800226 /* stream indexing */
227 gpr_uint32 next_stream_id;
nnoble0c475f02014-12-05 15:37:39 -0800228 gpr_uint32 last_incoming_stream_id;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800229
230 /* settings */
231 gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
ctiller493fbcc2014-12-07 15:09:10 -0800232 gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */
233 gpr_uint8 sent_local_settings; /* have local settings been sent? */
234 gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800235
236 /* window management */
237 gpr_uint32 outgoing_window;
238 gpr_uint32 incoming_window;
ctiller493fbcc2014-12-07 15:09:10 -0800239 gpr_uint32 connection_window_target;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800240
241 /* deframing */
242 deframe_transport_state deframe_state;
243 gpr_uint8 incoming_frame_type;
244 gpr_uint8 incoming_frame_flags;
245 gpr_uint8 header_eof;
246 gpr_uint32 expect_continuation_stream_id;
247 gpr_uint32 incoming_frame_size;
248 gpr_uint32 incoming_stream_id;
249
250 /* hpack encoding */
251 grpc_chttp2_hpack_compressor hpack_compressor;
252
253 /* various parsers */
254 grpc_chttp2_hpack_parser hpack_parser;
255 /* simple one shot parsers */
256 union {
257 grpc_chttp2_window_update_parser window_update;
258 grpc_chttp2_settings_parser settings;
259 grpc_chttp2_ping_parser ping;
260 } simple_parsers;
261
nnoble0c475f02014-12-05 15:37:39 -0800262 /* goaway */
263 grpc_chttp2_goaway_parser goaway_parser;
264 pending_goaway *pending_goaways;
265 size_t num_pending_goaways;
266 size_t cap_pending_goaways;
267
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800268 /* state for a stream that's not yet been created */
269 grpc_stream_op_buffer new_stream_sopb;
270
Craig Tillercb818ba2015-01-29 17:08:01 -0800271 /* stream ops that need to be destroyed, but outside of the lock */
272 grpc_stream_op_buffer nuke_later_sopb;
273
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800274 /* active parser */
275 void *parser_data;
276 stream *incoming_stream;
277 grpc_chttp2_parse_error (*parser)(void *parser_user_data,
278 grpc_chttp2_parse_state *state,
279 gpr_slice slice, int is_last);
280
281 gpr_slice_buffer outbuf;
282 gpr_slice_buffer qbuf;
283
284 stream_list lists[STREAM_LIST_COUNT];
285 grpc_chttp2_stream_map stream_map;
286
287 /* metadata object cache */
288 grpc_mdstr *str_grpc_timeout;
289
290 /* pings */
291 outstanding_ping *pings;
292 size_t ping_count;
293 size_t ping_capacity;
294 gpr_int64 ping_counter;
295};
296
297struct stream {
298 gpr_uint32 id;
299
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800300 gpr_uint32 incoming_window;
Craig Tiller84b88842015-04-20 08:47:52 -0700301 gpr_int64 outgoing_window;
ctiller00297df2015-01-12 11:23:09 -0800302 /* when the application requests writes be closed, the write_closed is
303 'queued'; when the close is flow controlled into the send path, we are
304 'sending' it; when the write has been performed it is 'sent' */
Craig Tillerc079c112015-04-22 15:23:39 -0700305 WRITE_STATE write_state;
306 gpr_uint8 send_closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800307 gpr_uint8 read_closed;
308 gpr_uint8 cancelled;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800309 gpr_uint8 published_close;
310
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700311 op_closure send_done_closure;
312 op_closure recv_done_closure;
313
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800314 stream_link links[STREAM_LIST_COUNT];
315 gpr_uint8 included[STREAM_LIST_COUNT];
316
Craig Tiller9c1043e2015-04-16 16:20:38 -0700317 /* incoming metadata */
318 grpc_linked_mdelem *incoming_metadata;
319 size_t incoming_metadata_count;
320 size_t incoming_metadata_capacity;
321 gpr_timespec incoming_deadline;
322
ctiller00297df2015-01-12 11:23:09 -0800323 /* sops from application */
Craig Tillerc079c112015-04-22 15:23:39 -0700324 grpc_stream_op_buffer *outgoing_sopb;
325 grpc_stream_op_buffer *incoming_sopb;
326 grpc_stream_state *publish_state;
327 grpc_stream_state published_state;
ctiller00297df2015-01-12 11:23:09 -0800328 /* sops that have passed flow control to be written */
329 grpc_stream_op_buffer writing_sopb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800330
331 grpc_chttp2_data_parser parser;
332
333 grpc_stream_state callback_state;
334 grpc_stream_op_buffer callback_sopb;
335};
336
337static const grpc_transport_vtable vtable;
338
339static void push_setting(transport *t, grpc_chttp2_setting_id id,
340 gpr_uint32 value);
341
342static int prepare_callbacks(transport *t);
Craig Tillerd1345de2015-02-24 21:55:20 -0800343static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
Craig Tiller748fe3f2015-03-02 07:48:50 -0800344static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800345
346static int prepare_write(transport *t);
ctiller00297df2015-01-12 11:23:09 -0800347static void perform_write(transport *t, grpc_endpoint *ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800348
349static void lock(transport *t);
350static void unlock(transport *t);
351
352static void drop_connection(transport *t);
353static void end_all_the_calls(transport *t);
354
355static stream *stream_list_remove_head(transport *t, stream_list_id id);
356static void stream_list_remove(transport *t, stream *s, stream_list_id id);
357static void stream_list_add_tail(transport *t, stream *s, stream_list_id id);
358static void stream_list_join(transport *t, stream *s, stream_list_id id);
359
360static void cancel_stream_id(transport *t, gpr_uint32 id,
361 grpc_status_code local_status,
362 grpc_chttp2_error_code error_code, int send_rst);
363static void cancel_stream(transport *t, stream *s,
364 grpc_status_code local_status,
365 grpc_chttp2_error_code error_code, int send_rst);
ctiller00297df2015-01-12 11:23:09 -0800366static void finalize_cancellations(transport *t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367static stream *lookup_stream(transport *t, gpr_uint32 id);
368static void remove_from_stream_map(transport *t, stream *s);
369static void maybe_start_some_streams(transport *t);
370
371static void become_skip_parser(transport *t);
372
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800373static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
374 grpc_endpoint_cb_status error);
375
Craig Tillerc079c112015-04-22 15:23:39 -0700376static void schedule_cb(transport *t, op_closure closure, int success);
377static void maybe_finish_read(transport *t, stream *s);
378static void maybe_join_window_updates(transport *t, stream *s);
379static void finish_reads(transport *t);
380static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
Craig Tiller50d9db52015-04-23 10:52:14 -0700381static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
Craig Tillerc079c112015-04-22 15:23:39 -0700382
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800383/*
384 * CONSTRUCTION/DESTRUCTION/REFCOUNTING
385 */
386
Craig Tiller9be83ee2015-02-18 14:16:15 -0800387static void destruct_transport(transport *t) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800388 size_t i;
389
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800390 gpr_mu_lock(&t->mu);
391
392 GPR_ASSERT(t->ep == NULL);
393
394 gpr_slice_buffer_destroy(&t->outbuf);
395 gpr_slice_buffer_destroy(&t->qbuf);
396 grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
397 grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
nnoble0c475f02014-12-05 15:37:39 -0800398 grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800399
400 grpc_mdstr_unref(t->str_grpc_timeout);
401
402 for (i = 0; i < STREAM_LIST_COUNT; i++) {
403 GPR_ASSERT(t->lists[i].head == NULL);
404 GPR_ASSERT(t->lists[i].tail == NULL);
405 }
406
407 GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
408
409 grpc_chttp2_stream_map_destroy(&t->stream_map);
410
411 gpr_mu_unlock(&t->mu);
412 gpr_mu_destroy(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800413 gpr_cv_destroy(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800414
415 /* callback remaining pings: they're not allowed to call into the transpot,
416 and maybe they hold resources that need to be freed */
417 for (i = 0; i < t->ping_count; i++) {
418 t->pings[i].cb(t->pings[i].user_data);
419 }
420 gpr_free(t->pings);
421
nnoble0c475f02014-12-05 15:37:39 -0800422 for (i = 0; i < t->num_pending_goaways; i++) {
423 gpr_slice_unref(t->pending_goaways[i].debug);
424 }
425 gpr_free(t->pending_goaways);
426
Craig Tiller8ed35ea2015-01-30 11:27:43 -0800427 grpc_sopb_destroy(&t->nuke_later_sopb);
428
Craig Tiller9be83ee2015-02-18 14:16:15 -0800429 grpc_mdctx_unref(t->metadata_context);
430
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800431 gpr_free(t);
432}
433
Craig Tiller9be83ee2015-02-18 14:16:15 -0800434static void unref_transport(transport *t) {
435 if (!gpr_unref(&t->refs)) return;
436 destruct_transport(t);
437}
438
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800439static void ref_transport(transport *t) { gpr_ref(&t->refs); }
440
441static void init_transport(transport *t, grpc_transport_setup_callback setup,
442 void *arg, const grpc_channel_args *channel_args,
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800443 grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
444 grpc_mdctx *mdctx, int is_client) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800445 size_t i;
446 int j;
447 grpc_transport_setup_result sr;
448
449 GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
450
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700451 memset(t, 0, sizeof(*t));
452
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800453 t->base.vtable = &vtable;
454 t->ep = ep;
455 /* one ref is for destroy, the other for when ep becomes NULL */
456 gpr_ref_init(&t->refs, 2);
457 gpr_mu_init(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800458 gpr_cv_init(&t->cv);
Craig Tiller9be83ee2015-02-18 14:16:15 -0800459 grpc_mdctx_ref(mdctx);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800460 t->metadata_context = mdctx;
461 t->str_grpc_timeout =
462 grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
463 t->reading = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800464 t->error_state = ERROR_STATE_NONE;
465 t->next_stream_id = is_client ? 1 : 2;
466 t->is_client = is_client;
467 t->outgoing_window = DEFAULT_WINDOW;
468 t->incoming_window = DEFAULT_WINDOW;
ctiller493fbcc2014-12-07 15:09:10 -0800469 t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800470 t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800471 t->ping_counter = gpr_now().tv_nsec;
472 grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
nnoble0c475f02014-12-05 15:37:39 -0800473 grpc_chttp2_goaway_parser_init(&t->goaway_parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800474 gpr_slice_buffer_init(&t->outbuf);
475 gpr_slice_buffer_init(&t->qbuf);
Craig Tillercb818ba2015-01-29 17:08:01 -0800476 grpc_sopb_init(&t->nuke_later_sopb);
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800477 grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800478 if (is_client) {
479 gpr_slice_buffer_add(&t->qbuf,
480 gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
481 }
482 /* 8 is a random stab in the dark as to a good initial size: it's small enough
483 that it shouldn't waste memory for infrequently used connections, yet
484 large enough that the exponential growth should happen nicely when it's
485 needed.
486 TODO(ctiller): tune this */
487 grpc_chttp2_stream_map_init(&t->stream_map, 8);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800488
489 /* copy in initial settings to all setting sets */
490 for (i = 0; i < NUM_SETTING_SETS; i++) {
491 for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) {
492 t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
493 }
494 }
495 t->dirtied_local_settings = 1;
ctiller493fbcc2014-12-07 15:09:10 -0800496 /* Hack: it's common for implementations to assume 65536 bytes initial send
497 window -- this should by rights be 0 */
498 t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800499 t->sent_local_settings = 0;
500
501 /* configure http2 the way we like it */
502 if (t->is_client) {
503 push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
504 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
505 }
ctiller493fbcc2014-12-07 15:09:10 -0800506 push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800507
508 if (channel_args) {
509 for (i = 0; i < channel_args->num_args; i++) {
510 if (0 ==
511 strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
512 if (t->is_client) {
513 gpr_log(GPR_ERROR, "%s: is ignored on the client",
514 GRPC_ARG_MAX_CONCURRENT_STREAMS);
515 } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
516 gpr_log(GPR_ERROR, "%s: must be an integer",
517 GRPC_ARG_MAX_CONCURRENT_STREAMS);
518 } else {
519 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
520 channel_args->args[i].value.integer);
521 }
522 }
523 }
524 }
525
526 gpr_mu_lock(&t->mu);
527 t->calling_back = 1;
Craig Tiller06aeea72015-04-23 10:54:45 -0700528 ref_transport(t); /* matches unref at end of this function */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800529 gpr_mu_unlock(&t->mu);
530
531 sr = setup(arg, &t->base, t->metadata_context);
532
533 lock(t);
534 t->cb = sr.callbacks;
535 t->cb_user_data = sr.user_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800536 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800537 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800538 unlock(t);
Craig Tillerdcf9c0e2015-02-11 16:12:41 -0800539
Craig Tiller06aeea72015-04-23 10:54:45 -0700540 ref_transport(t); /* matches unref inside recv_data */
Craig Tillerdcf9c0e2015-02-11 16:12:41 -0800541 recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
542
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800543 unref_transport(t);
544}
545
546static void destroy_transport(grpc_transport *gt) {
547 transport *t = (transport *)gt;
548
Craig Tiller748fe3f2015-03-02 07:48:50 -0800549 lock(t);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800550 t->destroying = 1;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000551 /* Wait for pending stuff to finish.
552 We need to be not calling back to ensure that closed() gets a chance to
553 trigger if needed during unlock() before we die.
554 We need to be not writing as cancellation finalization may produce some
555 callbacks that NEED to be made to close out some streams when t->writing
556 becomes 0. */
557 while (t->calling_back || t->writing) {
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800558 gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
559 }
Craig Tiller748fe3f2015-03-02 07:48:50 -0800560 drop_connection(t);
561 unlock(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800562
Craig Tillerbb88a042015-03-02 10:56:33 -0800563 /* The drop_connection() above puts the transport into an error state, and
564 the follow-up unlock should then (as part of the cleanup work it does)
565 ensure that cb is NULL, and therefore not call back anything further.
566 This check validates this very subtle behavior.
567 It's shutdown path, so I don't believe an extra lock pair is going to be
568 problematic for performance. */
Craig Tillerb9eb1802015-03-02 16:41:32 +0000569 lock(t);
570 GPR_ASSERT(!t->cb);
571 unlock(t);
572
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800573 unref_transport(t);
574}
575
576static void close_transport(grpc_transport *gt) {
577 transport *t = (transport *)gt;
578 gpr_mu_lock(&t->mu);
Craig Tillerd75fe662015-02-21 07:30:49 -0800579 GPR_ASSERT(!t->closed);
580 t->closed = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800581 if (t->ep) {
582 grpc_endpoint_shutdown(t->ep);
583 }
584 gpr_mu_unlock(&t->mu);
585}
586
nnoble0c475f02014-12-05 15:37:39 -0800587static void goaway(grpc_transport *gt, grpc_status_code status,
588 gpr_slice debug_data) {
589 transport *t = (transport *)gt;
590 lock(t);
591 grpc_chttp2_goaway_append(t->last_incoming_stream_id,
592 grpc_chttp2_grpc_status_to_http2_error(status),
593 debug_data, &t->qbuf);
594 unlock(t);
595}
596
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800597static int init_stream(grpc_transport *gt, grpc_stream *gs,
Craig Tiller50d9db52015-04-23 10:52:14 -0700598 const void *server_data, grpc_transport_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800599 transport *t = (transport *)gt;
600 stream *s = (stream *)gs;
601
Craig Tillerc079c112015-04-22 15:23:39 -0700602 memset(s, 0, sizeof(*s));
603
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800604 ref_transport(t);
605
606 if (!server_data) {
607 lock(t);
608 s->id = 0;
609 } else {
Craig Tiller3f2c2212015-04-23 07:56:33 -0700610 /* already locked */
Craig Tiller5c019ae2015-04-17 16:46:53 -0700611 s->id = (gpr_uint32)(gpr_uintptr)server_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800612 t->incoming_stream = s;
613 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
614 }
615
ctiller493fbcc2014-12-07 15:09:10 -0800616 s->outgoing_window =
617 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
618 s->incoming_window =
619 t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
Craig Tiller9c1043e2015-04-16 16:20:38 -0700620 s->incoming_deadline = gpr_inf_future;
ctiller00297df2015-01-12 11:23:09 -0800621 grpc_sopb_init(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800622 grpc_sopb_init(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800623 grpc_chttp2_data_parser_init(&s->parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800624
Craig Tiller50d9db52015-04-23 10:52:14 -0700625 if (initial_op) perform_op_locked(t, s, initial_op);
626
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800627 if (!server_data) {
628 unlock(t);
629 }
630
631 return 0;
632}
633
Craig Tillercb818ba2015-01-29 17:08:01 -0800634static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) {
635 grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
636 sopb->nops = 0;
637}
638
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800639static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
640 transport *t = (transport *)gt;
641 stream *s = (stream *)gs;
642 size_t i;
643
644 gpr_mu_lock(&t->mu);
645
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800646 /* stop parsing if we're currently parsing this stream */
647 if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id &&
648 s->id != 0) {
649 become_skip_parser(t);
650 }
651
652 for (i = 0; i < STREAM_LIST_COUNT; i++) {
653 stream_list_remove(t, s, i);
654 }
655 remove_from_stream_map(t, s);
656
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800657 gpr_mu_unlock(&t->mu);
658
Craig Tillerc079c112015-04-22 15:23:39 -0700659 GPR_ASSERT(s->outgoing_sopb == NULL);
ctiller00297df2015-01-12 11:23:09 -0800660 grpc_sopb_destroy(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800661 grpc_sopb_destroy(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800662 grpc_chttp2_data_parser_destroy(&s->parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800663
664 unref_transport(t);
665}
666
667/*
668 * LIST MANAGEMENT
669 */
670
ctiller00297df2015-01-12 11:23:09 -0800671static int stream_list_empty(transport *t, stream_list_id id) {
672 return t->lists[id].head == NULL;
673}
674
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800675static stream *stream_list_remove_head(transport *t, stream_list_id id) {
676 stream *s = t->lists[id].head;
677 if (s) {
678 stream *new_head = s->links[id].next;
679 GPR_ASSERT(s->included[id]);
680 if (new_head) {
681 t->lists[id].head = new_head;
682 new_head->links[id].prev = NULL;
683 } else {
684 t->lists[id].head = NULL;
685 t->lists[id].tail = NULL;
686 }
687 s->included[id] = 0;
688 }
689 return s;
690}
691
692static void stream_list_remove(transport *t, stream *s, stream_list_id id) {
693 if (!s->included[id]) return;
694 s->included[id] = 0;
695 if (s->links[id].prev) {
696 s->links[id].prev->links[id].next = s->links[id].next;
697 } else {
698 GPR_ASSERT(t->lists[id].head == s);
699 t->lists[id].head = s->links[id].next;
700 }
701 if (s->links[id].next) {
702 s->links[id].next->links[id].prev = s->links[id].prev;
703 } else {
704 t->lists[id].tail = s->links[id].prev;
705 }
706}
707
708static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
709 stream *old_tail;
710 GPR_ASSERT(!s->included[id]);
711 old_tail = t->lists[id].tail;
712 s->links[id].next = NULL;
713 s->links[id].prev = old_tail;
714 if (old_tail) {
715 old_tail->links[id].next = s;
716 } else {
717 s->links[id].prev = NULL;
718 t->lists[id].head = s;
719 }
720 t->lists[id].tail = s;
721 s->included[id] = 1;
722}
723
724static void stream_list_join(transport *t, stream *s, stream_list_id id) {
725 if (s->included[id]) {
726 return;
727 }
728 stream_list_add_tail(t, s, id);
729}
730
731static void remove_from_stream_map(transport *t, stream *s) {
732 if (s->id == 0) return;
733 if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
734 maybe_start_some_streams(t);
735 }
736}
737
738/*
739 * LOCK MANAGEMENT
740 */
741
742/* We take a transport-global lock in response to calls coming in from above,
743 and in response to data being received from below. New data to be written
744 is always queued, as are callbacks to process data. During unlock() we
745 check our todo lists and initiate callbacks and flush writes. */
746
747static void lock(transport *t) { gpr_mu_lock(&t->mu); }
748
749static void unlock(transport *t) {
750 int start_write = 0;
751 int perform_callbacks = 0;
752 int call_closed = 0;
nnoble0c475f02014-12-05 15:37:39 -0800753 int num_goaways = 0;
754 int i;
755 pending_goaway *goaways = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800756 grpc_endpoint *ep = t->ep;
Craig Tillere3018e62015-02-13 17:05:19 -0800757 grpc_stream_op_buffer nuke_now;
Craig Tillerd1345de2015-02-24 21:55:20 -0800758 const grpc_transport_callbacks *cb = t->cb;
Craig Tiller06059952015-02-18 08:34:56 -0800759
Craig Tillere3018e62015-02-13 17:05:19 -0800760 grpc_sopb_init(&nuke_now);
761 if (t->nuke_later_sopb.nops) {
762 grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
Craig Tillercb818ba2015-01-29 17:08:01 -0800763 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800764
765 /* see if we need to trigger a write - and if so, get the data ready */
766 if (ep && !t->writing) {
767 t->writing = start_write = prepare_write(t);
768 if (start_write) {
769 ref_transport(t);
770 }
771 }
772
ctiller00297df2015-01-12 11:23:09 -0800773 if (!t->writing) {
774 finalize_cancellations(t);
775 }
776
Craig Tillerc079c112015-04-22 15:23:39 -0700777 finish_reads(t);
778
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800779 /* gather any callbacks that need to be made */
Craig Tillerd1345de2015-02-24 21:55:20 -0800780 if (!t->calling_back && cb) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800781 perform_callbacks = prepare_callbacks(t);
782 if (perform_callbacks) {
783 t->calling_back = 1;
784 }
Craig Tillerb9eb1802015-03-02 16:41:32 +0000785 if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800786 call_closed = 1;
787 t->calling_back = 1;
Craig Tiller5c019ae2015-04-17 16:46:53 -0700788 t->cb = NULL; /* no more callbacks */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800789 t->error_state = ERROR_STATE_NOTIFIED;
790 }
nnoble0c475f02014-12-05 15:37:39 -0800791 if (t->num_pending_goaways) {
792 goaways = t->pending_goaways;
793 num_goaways = t->num_pending_goaways;
794 t->pending_goaways = NULL;
795 t->num_pending_goaways = 0;
ctiller82e275f2014-12-12 08:43:28 -0800796 t->cap_pending_goaways = 0;
nnoble0c475f02014-12-05 15:37:39 -0800797 t->calling_back = 1;
798 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800799 }
800
nnoble0c475f02014-12-05 15:37:39 -0800801 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800802 ref_transport(t);
803 }
804
805 /* finally unlock */
806 gpr_mu_unlock(&t->mu);
807
808 /* perform some callbacks if necessary */
nnoble0c475f02014-12-05 15:37:39 -0800809 for (i = 0; i < num_goaways; i++) {
Craig Tiller5c019ae2015-04-17 16:46:53 -0700810 cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
nnoble0c475f02014-12-05 15:37:39 -0800811 }
812
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800813 if (perform_callbacks) {
Craig Tillerd1345de2015-02-24 21:55:20 -0800814 run_callbacks(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800815 }
816
817 if (call_closed) {
Craig Tiller748fe3f2015-03-02 07:48:50 -0800818 call_cb_closed(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800819 }
820
821 /* write some bytes if necessary */
ctiller00297df2015-01-12 11:23:09 -0800822 if (start_write) {
823 /* ultimately calls unref_transport(t); and clears t->writing */
824 perform_write(t, ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800825 }
826
nnoble0c475f02014-12-05 15:37:39 -0800827 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800828 lock(t);
829 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800830 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800831 unlock(t);
832 unref_transport(t);
833 }
nnoble0c475f02014-12-05 15:37:39 -0800834
Craig Tillere3018e62015-02-13 17:05:19 -0800835 grpc_sopb_destroy(&nuke_now);
Craig Tillercb818ba2015-01-29 17:08:01 -0800836
nnoble0c475f02014-12-05 15:37:39 -0800837 gpr_free(goaways);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800838}
839
840/*
841 * OUTPUT PROCESSING
842 */
843
844static void push_setting(transport *t, grpc_chttp2_setting_id id,
845 gpr_uint32 value) {
846 const grpc_chttp2_setting_parameters *sp =
847 &grpc_chttp2_settings_parameters[id];
848 gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
849 if (use_value != value) {
850 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
851 value, use_value);
852 }
853 if (use_value != t->settings[LOCAL_SETTINGS][id]) {
854 t->settings[LOCAL_SETTINGS][id] = use_value;
855 t->dirtied_local_settings = 1;
856 }
857}
858
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800859static int prepare_write(transport *t) {
860 stream *s;
ctiller00297df2015-01-12 11:23:09 -0800861 gpr_uint32 window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800862
863 /* simple writes are queued to qbuf, and flushed here */
Craig Tiller721f3622015-04-13 16:14:28 -0700864 gpr_slice_buffer_swap(&t->qbuf, &t->outbuf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800865 GPR_ASSERT(t->qbuf.count == 0);
866
867 if (t->dirtied_local_settings && !t->sent_local_settings) {
868 gpr_slice_buffer_add(
ctiller493fbcc2014-12-07 15:09:10 -0800869 &t->outbuf, grpc_chttp2_settings_create(
870 t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
871 t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
872 t->force_send_settings = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800873 t->dirtied_local_settings = 0;
874 t->sent_local_settings = 1;
875 }
876
877 /* for each stream that's become writable, frame it's data (according to
878 available window sizes) and add to the output buffer */
Craig Tiller84b88842015-04-20 08:47:52 -0700879 while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
880 s->outgoing_window > 0) {
ctiller00297df2015-01-12 11:23:09 -0800881 window_delta = grpc_chttp2_preencode(
Craig Tillerc079c112015-04-22 15:23:39 -0700882 s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
ctiller00297df2015-01-12 11:23:09 -0800883 GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
884 t->outgoing_window -= window_delta;
885 s->outgoing_window -= window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800886
Craig Tiller06aeea72015-04-23 10:54:45 -0700887 if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
888 s->outgoing_sopb->nops == 0) {
Craig Tillerc079c112015-04-22 15:23:39 -0700889 s->send_closed = 1;
890 }
891 if (s->writing_sopb.nops > 0 || s->send_closed) {
ctiller00297df2015-01-12 11:23:09 -0800892 stream_list_join(t, s, WRITING);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800893 }
894
Craig Tillerc079c112015-04-22 15:23:39 -0700895 /* we should either exhaust window or have no ops left, but not both */
896 GPR_ASSERT(s->outgoing_sopb->nops == 0 || s->outgoing_window <= 0);
897 if (s->outgoing_sopb->nops == 0) {
898 s->outgoing_sopb = NULL;
899 schedule_cb(t, s->send_done_closure, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800900 }
901 }
902
903 /* for each stream that wants to update its window, add that window here */
904 while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
ctiller00297df2015-01-12 11:23:09 -0800905 window_delta =
ctiller493fbcc2014-12-07 15:09:10 -0800906 t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
907 s->incoming_window;
ctiller00297df2015-01-12 11:23:09 -0800908 if (!s->read_closed && window_delta) {
909 gpr_slice_buffer_add(
910 &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
911 s->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800912 }
913 }
914
915 /* if the transport is ready to send a window update, do so here also */
ctiller493fbcc2014-12-07 15:09:10 -0800916 if (t->incoming_window < t->connection_window_target * 3 / 4) {
ctiller00297df2015-01-12 11:23:09 -0800917 window_delta = t->connection_window_target - t->incoming_window;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800918 gpr_slice_buffer_add(&t->outbuf,
ctiller00297df2015-01-12 11:23:09 -0800919 grpc_chttp2_window_update_create(0, window_delta));
920 t->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800921 }
922
ctiller00297df2015-01-12 11:23:09 -0800923 return t->outbuf.length > 0 || !stream_list_empty(t, WRITING);
924}
925
926static void finalize_outbuf(transport *t) {
927 stream *s;
928
929 while ((s = stream_list_remove_head(t, WRITING))) {
930 grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
Craig Tiller06aeea72015-04-23 10:54:45 -0700931 s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
ctiller00297df2015-01-12 11:23:09 -0800932 s->writing_sopb.nops = 0;
Craig Tillerc079c112015-04-22 15:23:39 -0700933 if (s->send_closed) {
ctiller00297df2015-01-12 11:23:09 -0800934 stream_list_join(t, s, WRITTEN_CLOSED);
935 }
936 }
937}
938
939static void finish_write_common(transport *t, int success) {
940 stream *s;
941
942 lock(t);
943 if (!success) {
944 drop_connection(t);
945 }
946 while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
Craig Tillerc079c112015-04-22 15:23:39 -0700947 s->write_state = WRITE_STATE_SENT_CLOSE;
948 if (!s->cancelled) {
949 maybe_finish_read(t, s);
950 }
ctiller00297df2015-01-12 11:23:09 -0800951 }
952 t->outbuf.count = 0;
953 t->outbuf.length = 0;
954 /* leave the writing flag up on shutdown to prevent further writes in unlock()
955 from starting */
956 t->writing = 0;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000957 if (t->destroying) {
958 gpr_cv_signal(&t->cv);
959 }
ctiller00297df2015-01-12 11:23:09 -0800960 if (!t->reading) {
961 grpc_endpoint_destroy(t->ep);
962 t->ep = NULL;
ctiller00297df2015-01-12 11:23:09 -0800963 unref_transport(t); /* safe because we'll still have the ref for write */
964 }
965 unlock(t);
966
967 unref_transport(t);
968}
969
970static void finish_write(void *tp, grpc_endpoint_cb_status error) {
971 transport *t = tp;
972 finish_write_common(t, error == GRPC_ENDPOINT_CB_OK);
973}
974
975static void perform_write(transport *t, grpc_endpoint *ep) {
976 finalize_outbuf(t);
977
978 GPR_ASSERT(t->outbuf.count > 0);
979
980 switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
981 finish_write, t)) {
982 case GRPC_ENDPOINT_WRITE_DONE:
983 finish_write_common(t, 1);
984 break;
985 case GRPC_ENDPOINT_WRITE_ERROR:
986 finish_write_common(t, 0);
987 break;
988 case GRPC_ENDPOINT_WRITE_PENDING:
989 break;
990 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800991}
992
993static void maybe_start_some_streams(transport *t) {
994 while (
995 grpc_chttp2_stream_map_size(&t->stream_map) <
996 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
997 stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
998 if (!s) break;
999
1000 GPR_ASSERT(s->id == 0);
1001 s->id = t->next_stream_id;
1002 t->next_stream_id += 2;
1003 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1004 stream_list_join(t, s, WRITABLE);
1005 }
1006}
1007
Craig Tiller50d9db52015-04-23 10:52:14 -07001008static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001009 if (op->send_ops) {
Craig Tillerc079c112015-04-22 15:23:39 -07001010 GPR_ASSERT(s->outgoing_sopb == NULL);
1011 s->send_done_closure.cb = op->on_done_send;
1012 s->send_done_closure.user_data = op->send_user_data;
1013 if (!s->cancelled) {
1014 s->outgoing_sopb = op->send_ops;
1015 if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
1016 s->write_state = WRITE_STATE_QUEUED_CLOSE;
1017 }
1018 if (s->id == 0) {
1019 stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
1020 maybe_start_some_streams(t);
1021 } else if (s->outgoing_window > 0) {
1022 stream_list_join(t, s, WRITABLE);
1023 }
1024 } else {
1025 schedule_nuke_sopb(t, op->send_ops);
1026 schedule_cb(t, s->send_done_closure, 0);
1027 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001028 }
1029
1030 if (op->recv_ops) {
Craig Tillerc079c112015-04-22 15:23:39 -07001031 GPR_ASSERT(s->incoming_sopb == NULL);
1032 s->recv_done_closure.cb = op->on_done_recv;
1033 s->recv_done_closure.user_data = op->recv_user_data;
1034 if (!s->cancelled) {
1035 s->incoming_sopb = op->recv_ops;
1036 s->incoming_sopb->nops = 0;
1037 s->publish_state = op->recv_state;
1038 maybe_finish_read(t, s);
1039 maybe_join_window_updates(t, s);
1040 } else {
1041 schedule_cb(t, s->recv_done_closure, 0);
1042 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001043 }
1044
1045 if (op->bind_pollset) {
Craig Tillerc079c112015-04-22 15:23:39 -07001046 add_to_pollset_locked(t, op->bind_pollset);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001047 }
1048
Craig Tillerc079c112015-04-22 15:23:39 -07001049 if (op->cancel_with_status != GRPC_STATUS_OK) {
Craig Tiller06aeea72015-04-23 10:54:45 -07001050 cancel_stream(
1051 t, s, op->cancel_with_status,
1052 grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), 1);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001053 }
Craig Tiller50d9db52015-04-23 10:52:14 -07001054}
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001055
Craig Tiller06aeea72015-04-23 10:54:45 -07001056static void perform_op(grpc_transport *gt, grpc_stream *gs,
1057 grpc_transport_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -07001058 transport *t = (transport *)gt;
1059 stream *s = (stream *)gs;
1060
1061 lock(t);
1062 perform_op_locked(t, s, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001063 unlock(t);
1064}
1065
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001066static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
1067 void *user_data) {
1068 transport *t = (transport *)gt;
1069 outstanding_ping *p;
1070
1071 lock(t);
1072 if (t->ping_capacity == t->ping_count) {
1073 t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2);
1074 t->pings =
1075 gpr_realloc(t->pings, sizeof(outstanding_ping) * t->ping_capacity);
1076 }
1077 p = &t->pings[t->ping_count++];
nnoble8f4e42c2014-12-11 16:36:46 -08001078 p->id[0] = (t->ping_counter >> 56) & 0xff;
1079 p->id[1] = (t->ping_counter >> 48) & 0xff;
1080 p->id[2] = (t->ping_counter >> 40) & 0xff;
1081 p->id[3] = (t->ping_counter >> 32) & 0xff;
1082 p->id[4] = (t->ping_counter >> 24) & 0xff;
1083 p->id[5] = (t->ping_counter >> 16) & 0xff;
1084 p->id[6] = (t->ping_counter >> 8) & 0xff;
1085 p->id[7] = t->ping_counter & 0xff;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001086 p->cb = cb;
1087 p->user_data = user_data;
1088 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
1089 unlock(t);
1090}
1091
1092/*
1093 * INPUT PROCESSING
1094 */
1095
ctiller00297df2015-01-12 11:23:09 -08001096static void finalize_cancellations(transport *t) {
1097 stream *s;
1098
1099 while ((s = stream_list_remove_head(t, CANCELLED))) {
1100 s->read_closed = 1;
Craig Tillerc079c112015-04-22 15:23:39 -07001101 s->write_state = WRITE_STATE_SENT_CLOSE;
1102 maybe_finish_read(t, s);
ctiller00297df2015-01-12 11:23:09 -08001103 }
1104}
1105
Craig Tiller9c1043e2015-04-16 16:20:38 -07001106static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
1107 if (s->incoming_metadata_capacity == s->incoming_metadata_count) {
Craig Tiller5c019ae2015-04-17 16:46:53 -07001108 s->incoming_metadata_capacity =
1109 GPR_MAX(8, 2 * s->incoming_metadata_capacity);
1110 s->incoming_metadata =
1111 gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) *
1112 s->incoming_metadata_capacity);
Craig Tiller9c1043e2015-04-16 16:20:38 -07001113 }
1114 s->incoming_metadata[s->incoming_metadata_count++].md = elem;
1115}
1116
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001117static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
1118 grpc_status_code local_status,
1119 grpc_chttp2_error_code error_code,
1120 int send_rst) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001121 int had_outgoing;
Craig Tiller8b433a22015-01-23 14:47:07 -08001122 char buffer[GPR_LTOA_MIN_BUFSIZE];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001123
1124 if (s) {
1125 /* clear out any unreported input & output: nobody cares anymore */
Craig Tillerc079c112015-04-22 15:23:39 -07001126 had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
Craig Tillercb818ba2015-01-29 17:08:01 -08001127 schedule_nuke_sopb(t, &s->parser.incoming_sopb);
Craig Tillerc079c112015-04-22 15:23:39 -07001128 if (s->outgoing_sopb) {
1129 schedule_nuke_sopb(t, s->outgoing_sopb);
1130 schedule_cb(t, s->send_done_closure, 0);
1131 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001132 if (s->cancelled) {
1133 send_rst = 0;
Craig Tiller06aeea72015-04-23 10:54:45 -07001134 } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE ||
1135 had_outgoing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001136 s->cancelled = 1;
ctiller00297df2015-01-12 11:23:09 -08001137 stream_list_join(t, s, CANCELLED);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001138
Craig Tillera7ed5d92015-01-23 11:30:16 -08001139 gpr_ltoa(local_status, buffer);
Craig Tiller5c019ae2015-04-17 16:46:53 -07001140 add_incoming_metadata(
1141 t, s,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001142 grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
Craig Tillerbd222712015-04-17 16:09:40 -07001143 switch (local_status) {
1144 case GRPC_STATUS_CANCELLED:
Craig Tiller5c019ae2015-04-17 16:46:53 -07001145 add_incoming_metadata(
1146 t, s, grpc_mdelem_from_strings(t->metadata_context,
1147 "grpc-message", "Cancelled"));
Craig Tillerbd222712015-04-17 16:09:40 -07001148 break;
1149 default:
1150 break;
1151 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001152
Craig Tillerc079c112015-04-22 15:23:39 -07001153 maybe_finish_read(t, s);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001154 }
1155 }
1156 if (!id) send_rst = 0;
1157 if (send_rst) {
1158 gpr_slice_buffer_add(&t->qbuf,
1159 grpc_chttp2_rst_stream_create(id, error_code));
1160 }
1161}
1162
1163static void cancel_stream_id(transport *t, gpr_uint32 id,
1164 grpc_status_code local_status,
1165 grpc_chttp2_error_code error_code, int send_rst) {
1166 cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
1167 send_rst);
1168}
1169
1170static void cancel_stream(transport *t, stream *s,
1171 grpc_status_code local_status,
1172 grpc_chttp2_error_code error_code, int send_rst) {
1173 cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
1174}
1175
1176static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
1177 cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
1178 GRPC_CHTTP2_INTERNAL_ERROR, 0);
1179}
1180
1181static void end_all_the_calls(transport *t) {
1182 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
1183}
1184
1185static void drop_connection(transport *t) {
1186 if (t->error_state == ERROR_STATE_NONE) {
1187 t->error_state = ERROR_STATE_SEEN;
1188 }
1189 end_all_the_calls(t);
1190}
1191
Craig Tillerc079c112015-04-22 15:23:39 -07001192static void maybe_finish_read(transport *t, stream *s) {
1193 if (s->incoming_sopb) {
1194 stream_list_join(t, s, FINISHED_READ_OP);
1195 }
1196}
1197
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001198static void maybe_join_window_updates(transport *t, stream *s) {
Craig Tillerc079c112015-04-22 15:23:39 -07001199 if (s->incoming_sopb != NULL &&
ctiller493fbcc2014-12-07 15:09:10 -08001200 s->incoming_window <
1201 t->settings[LOCAL_SETTINGS]
1202 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
1203 3 / 4) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001204 stream_list_join(t, s, WINDOW_UPDATE);
1205 }
1206}
1207
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001208static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
1209 if (t->incoming_frame_size > t->incoming_window) {
1210 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1211 t->incoming_frame_size, t->incoming_window);
1212 return GRPC_CHTTP2_CONNECTION_ERROR;
1213 }
1214
1215 if (t->incoming_frame_size > s->incoming_window) {
1216 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1217 t->incoming_frame_size, s->incoming_window);
1218 return GRPC_CHTTP2_CONNECTION_ERROR;
1219 }
1220
1221 t->incoming_window -= t->incoming_frame_size;
1222 s->incoming_window -= t->incoming_frame_size;
1223
1224 /* if the stream incoming window is getting low, schedule an update */
1225 maybe_join_window_updates(t, s);
1226
1227 return GRPC_CHTTP2_PARSE_OK;
1228}
1229
1230static stream *lookup_stream(transport *t, gpr_uint32 id) {
1231 return grpc_chttp2_stream_map_find(&t->stream_map, id);
1232}
1233
1234static grpc_chttp2_parse_error skip_parser(void *parser,
1235 grpc_chttp2_parse_state *st,
1236 gpr_slice slice, int is_last) {
1237 return GRPC_CHTTP2_PARSE_OK;
1238}
1239
1240static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
1241
1242static int init_skip_frame(transport *t, int is_header) {
1243 if (is_header) {
1244 int is_eoh = t->expect_continuation_stream_id != 0;
1245 t->parser = grpc_chttp2_header_parser_parse;
1246 t->parser_data = &t->hpack_parser;
1247 t->hpack_parser.on_header = skip_header;
1248 t->hpack_parser.on_header_user_data = NULL;
1249 t->hpack_parser.is_boundary = is_eoh;
1250 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1251 } else {
1252 t->parser = skip_parser;
1253 }
1254 return 1;
1255}
1256
1257static void become_skip_parser(transport *t) {
1258 init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse);
1259}
1260
1261static int init_data_frame_parser(transport *t) {
1262 stream *s = lookup_stream(t, t->incoming_stream_id);
1263 grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
1264 if (!s || s->read_closed) return init_skip_frame(t, 0);
1265 if (err == GRPC_CHTTP2_PARSE_OK) {
1266 err = update_incoming_window(t, s);
1267 }
1268 if (err == GRPC_CHTTP2_PARSE_OK) {
1269 err = grpc_chttp2_data_parser_begin_frame(&s->parser,
1270 t->incoming_frame_flags);
1271 }
1272 switch (err) {
1273 case GRPC_CHTTP2_PARSE_OK:
1274 t->incoming_stream = s;
1275 t->parser = grpc_chttp2_data_parser_parse;
1276 t->parser_data = &s->parser;
1277 return 1;
1278 case GRPC_CHTTP2_STREAM_ERROR:
1279 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1280 GRPC_CHTTP2_INTERNAL_ERROR),
1281 GRPC_CHTTP2_INTERNAL_ERROR, 1);
1282 return init_skip_frame(t, 0);
1283 case GRPC_CHTTP2_CONNECTION_ERROR:
1284 drop_connection(t);
1285 return 0;
1286 }
1287 gpr_log(GPR_ERROR, "should never reach here");
1288 abort();
1289 return 0;
1290}
1291
1292static void free_timeout(void *p) { gpr_free(p); }
1293
1294static void on_header(void *tp, grpc_mdelem *md) {
1295 transport *t = tp;
1296 stream *s = t->incoming_stream;
1297
1298 GPR_ASSERT(s);
Craig Tillerd50e5652015-02-24 16:46:22 -08001299
1300 IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
1301 grpc_mdstr_as_c_string(md->key),
1302 grpc_mdstr_as_c_string(md->value)));
1303
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001304 if (md->key == t->str_grpc_timeout) {
1305 gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
1306 if (!cached_timeout) {
1307 /* not already parsed: parse it now, and store the result away */
1308 cached_timeout = gpr_malloc(sizeof(gpr_timespec));
1309 if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value),
1310 cached_timeout)) {
1311 gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
1312 grpc_mdstr_as_c_string(md->value));
1313 *cached_timeout = gpr_inf_future;
1314 }
1315 grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
1316 }
Craig Tiller9c1043e2015-04-16 16:20:38 -07001317 s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001318 grpc_mdelem_unref(md);
1319 } else {
Craig Tiller9c1043e2015-04-16 16:20:38 -07001320 add_incoming_metadata(t, s, md);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001321 }
Craig Tillerc079c112015-04-22 15:23:39 -07001322 maybe_finish_read(t, s);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001323}
1324
1325static int init_header_frame_parser(transport *t, int is_continuation) {
1326 int is_eoh =
1327 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
1328 stream *s;
1329
1330 if (is_eoh) {
1331 t->expect_continuation_stream_id = 0;
1332 } else {
1333 t->expect_continuation_stream_id = t->incoming_stream_id;
1334 }
1335
1336 if (!is_continuation) {
1337 t->header_eof =
1338 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
1339 }
1340
1341 /* could be a new stream or an existing stream */
1342 s = lookup_stream(t, t->incoming_stream_id);
1343 if (!s) {
1344 if (is_continuation) {
1345 gpr_log(GPR_ERROR, "stream disbanded before CONTINUATION received");
1346 return init_skip_frame(t, 1);
1347 }
1348 if (t->is_client) {
1349 if ((t->incoming_stream_id & 1) &&
1350 t->incoming_stream_id < t->next_stream_id) {
1351 /* this is an old (probably cancelled) stream */
1352 } else {
1353 gpr_log(GPR_ERROR, "ignoring new stream creation on client");
1354 }
1355 return init_skip_frame(t, 1);
nnoble0c475f02014-12-05 15:37:39 -08001356 } else if (t->last_incoming_stream_id > t->incoming_stream_id) {
1357 gpr_log(GPR_ERROR,
1358 "ignoring out of order new stream request on server; last stream "
1359 "id=%d, new stream id=%d",
1360 t->last_incoming_stream_id, t->incoming_stream);
1361 return init_skip_frame(t, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001362 }
1363 t->incoming_stream = NULL;
1364 /* if stream is accepted, we set incoming_stream in init_stream */
1365 t->cb->accept_stream(t->cb_user_data, &t->base,
Craig Tiller5c019ae2015-04-17 16:46:53 -07001366 (void *)(gpr_uintptr)t->incoming_stream_id);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001367 s = t->incoming_stream;
1368 if (!s) {
1369 gpr_log(GPR_ERROR, "stream not accepted");
1370 return init_skip_frame(t, 1);
1371 }
1372 } else {
1373 t->incoming_stream = s;
1374 }
1375 if (t->incoming_stream->read_closed) {
1376 gpr_log(GPR_ERROR, "skipping already closed stream header");
1377 t->incoming_stream = NULL;
1378 return init_skip_frame(t, 1);
1379 }
1380 t->parser = grpc_chttp2_header_parser_parse;
1381 t->parser_data = &t->hpack_parser;
1382 t->hpack_parser.on_header = on_header;
1383 t->hpack_parser.on_header_user_data = t;
1384 t->hpack_parser.is_boundary = is_eoh;
1385 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1386 if (!is_continuation &&
1387 (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
1388 grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser);
1389 }
1390 return 1;
1391}
1392
1393static int init_window_update_frame_parser(transport *t) {
1394 int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
1395 &t->simple_parsers.window_update,
1396 t->incoming_frame_size,
1397 t->incoming_frame_flags);
1398 if (!ok) {
1399 drop_connection(t);
1400 }
1401 t->parser = grpc_chttp2_window_update_parser_parse;
1402 t->parser_data = &t->simple_parsers.window_update;
1403 return ok;
1404}
1405
1406static int init_ping_parser(transport *t) {
1407 int ok = GRPC_CHTTP2_PARSE_OK ==
1408 grpc_chttp2_ping_parser_begin_frame(&t->simple_parsers.ping,
1409 t->incoming_frame_size,
1410 t->incoming_frame_flags);
1411 if (!ok) {
1412 drop_connection(t);
1413 }
1414 t->parser = grpc_chttp2_ping_parser_parse;
1415 t->parser_data = &t->simple_parsers.ping;
1416 return ok;
1417}
1418
nnoble0c475f02014-12-05 15:37:39 -08001419static int init_goaway_parser(transport *t) {
1420 int ok =
1421 GRPC_CHTTP2_PARSE_OK ==
1422 grpc_chttp2_goaway_parser_begin_frame(
1423 &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
1424 if (!ok) {
1425 drop_connection(t);
1426 }
1427 t->parser = grpc_chttp2_goaway_parser_parse;
1428 t->parser_data = &t->goaway_parser;
1429 return ok;
1430}
1431
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001432static int init_settings_frame_parser(transport *t) {
1433 int ok = GRPC_CHTTP2_PARSE_OK ==
1434 grpc_chttp2_settings_parser_begin_frame(
1435 &t->simple_parsers.settings, t->incoming_frame_size,
1436 t->incoming_frame_flags, t->settings[PEER_SETTINGS]);
1437 if (!ok) {
1438 drop_connection(t);
1439 }
1440 if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
1441 memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS],
1442 GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
1443 }
1444 t->parser = grpc_chttp2_settings_parser_parse;
1445 t->parser_data = &t->simple_parsers.settings;
1446 return ok;
1447}
1448
1449static int init_frame_parser(transport *t) {
1450 if (t->expect_continuation_stream_id != 0) {
1451 if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
1452 gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
1453 t->incoming_frame_type);
1454 return 0;
1455 }
1456 if (t->expect_continuation_stream_id != t->incoming_stream_id) {
1457 gpr_log(GPR_ERROR,
1458 "Expected CONTINUATION frame for stream %08x, got stream %08x",
1459 t->expect_continuation_stream_id, t->incoming_stream_id);
1460 return 0;
1461 }
1462 return init_header_frame_parser(t, 1);
1463 }
1464 switch (t->incoming_frame_type) {
1465 case GRPC_CHTTP2_FRAME_DATA:
1466 return init_data_frame_parser(t);
1467 case GRPC_CHTTP2_FRAME_HEADER:
1468 return init_header_frame_parser(t, 0);
1469 case GRPC_CHTTP2_FRAME_CONTINUATION:
1470 gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
1471 return 0;
1472 case GRPC_CHTTP2_FRAME_RST_STREAM:
1473 /* TODO(ctiller): actually parse the reason */
1474 cancel_stream_id(
1475 t, t->incoming_stream_id,
1476 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
1477 GRPC_CHTTP2_CANCEL, 0);
1478 return init_skip_frame(t, 0);
1479 case GRPC_CHTTP2_FRAME_SETTINGS:
1480 return init_settings_frame_parser(t);
1481 case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
1482 return init_window_update_frame_parser(t);
1483 case GRPC_CHTTP2_FRAME_PING:
1484 return init_ping_parser(t);
nnoble0c475f02014-12-05 15:37:39 -08001485 case GRPC_CHTTP2_FRAME_GOAWAY:
1486 return init_goaway_parser(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001487 default:
1488 gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
1489 return init_skip_frame(t, 0);
1490 }
1491}
1492
Craig Tiller84b88842015-04-20 08:47:52 -07001493static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
1494 return window + window_update < MAX_WINDOW;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001495}
1496
Craig Tillerbd222712015-04-17 16:09:40 -07001497static void add_metadata_batch(transport *t, stream *s) {
Craig Tiller9c1043e2015-04-16 16:20:38 -07001498 grpc_metadata_batch b;
Craig Tiller9c1043e2015-04-16 16:20:38 -07001499 size_t i;
1500
1501 b.list.head = &s->incoming_metadata[0];
1502 b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1];
1503 b.garbage.head = b.garbage.tail = NULL;
1504 b.deadline = s->incoming_deadline;
1505
1506 for (i = 1; i < s->incoming_metadata_count; i++) {
1507 s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1];
1508 s->incoming_metadata[i - 1].next = &s->incoming_metadata[i];
1509 }
1510 s->incoming_metadata[0].prev = NULL;
1511 s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
1512
1513 grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
Craig Tillerfbf5be22015-04-22 16:17:09 -07001514 /* TODO(ctiller): don't leak incoming_metadata */
Craig Tiller9c1043e2015-04-16 16:20:38 -07001515
1516 /* reset */
1517 s->incoming_deadline = gpr_inf_future;
1518 s->incoming_metadata = NULL;
1519 s->incoming_metadata_count = 0;
1520 s->incoming_metadata_capacity = 0;
1521}
1522
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001523static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
1524 grpc_chttp2_parse_state st;
1525 size_t i;
1526 memset(&st, 0, sizeof(st));
1527 switch (t->parser(t->parser_data, &st, slice, is_last)) {
1528 case GRPC_CHTTP2_PARSE_OK:
1529 if (st.end_of_stream) {
1530 t->incoming_stream->read_closed = 1;
Craig Tillerc079c112015-04-22 15:23:39 -07001531 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001532 }
1533 if (st.need_flush_reads) {
Craig Tillerc079c112015-04-22 15:23:39 -07001534 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001535 }
1536 if (st.metadata_boundary) {
Craig Tillerbd222712015-04-17 16:09:40 -07001537 add_metadata_batch(t, t->incoming_stream);
Craig Tillerc079c112015-04-22 15:23:39 -07001538 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001539 }
1540 if (st.ack_settings) {
1541 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
1542 maybe_start_some_streams(t);
1543 }
1544 if (st.send_ping_ack) {
1545 gpr_slice_buffer_add(
1546 &t->qbuf,
1547 grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes));
1548 }
nnoble0c475f02014-12-05 15:37:39 -08001549 if (st.goaway) {
1550 if (t->num_pending_goaways == t->cap_pending_goaways) {
1551 t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
1552 t->pending_goaways =
1553 gpr_realloc(t->pending_goaways,
1554 sizeof(pending_goaway) * t->cap_pending_goaways);
1555 }
1556 t->pending_goaways[t->num_pending_goaways].status =
1557 grpc_chttp2_http2_error_to_grpc_status(st.goaway_error);
1558 t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text;
1559 t->num_pending_goaways++;
1560 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001561 if (st.process_ping_reply) {
1562 for (i = 0; i < t->ping_count; i++) {
1563 if (0 ==
1564 memcmp(t->pings[i].id, t->simple_parsers.ping.opaque_8bytes, 8)) {
1565 t->pings[i].cb(t->pings[i].user_data);
1566 memmove(&t->pings[i], &t->pings[i + 1],
1567 (t->ping_count - i - 1) * sizeof(outstanding_ping));
1568 t->ping_count--;
1569 break;
1570 }
1571 }
1572 }
Yang Gaof1021032015-04-18 00:10:29 -07001573 if (st.initial_window_update) {
1574 for (i = 0; i < t->stream_map.count; i++) {
Craig Tiller06aeea72015-04-23 10:54:45 -07001575 stream *s = (stream *)(t->stream_map.values[i]);
Craig Tiller84b88842015-04-20 08:47:52 -07001576 int was_window_empty = s->outgoing_window <= 0;
1577 s->outgoing_window += st.initial_window_update;
Craig Tiller06aeea72015-04-23 10:54:45 -07001578 if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
1579 s->outgoing_sopb->nops > 0) {
Craig Tiller84b88842015-04-20 08:47:52 -07001580 stream_list_join(t, s, WRITABLE);
Yang Gaof1021032015-04-18 00:10:29 -07001581 }
1582 }
1583 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001584 if (st.window_update) {
1585 if (t->incoming_stream_id) {
1586 /* if there was a stream id, this is for some stream */
1587 stream *s = lookup_stream(t, t->incoming_stream_id);
1588 if (s) {
Craig Tiller84b88842015-04-20 08:47:52 -07001589 int was_window_empty = s->outgoing_window <= 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001590 if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
1591 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1592 GRPC_CHTTP2_FLOW_CONTROL_ERROR),
1593 GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
1594 } else {
1595 s->outgoing_window += st.window_update;
1596 /* if this window update makes outgoing ops writable again,
1597 flag that */
Craig Tiller06aeea72015-04-23 10:54:45 -07001598 if (was_window_empty && s->outgoing_sopb &&
1599 s->outgoing_sopb->nops > 0) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001600 stream_list_join(t, s, WRITABLE);
1601 }
1602 }
1603 }
1604 } else {
1605 /* transport level window update */
1606 if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
1607 drop_connection(t);
1608 } else {
1609 t->outgoing_window += st.window_update;
1610 }
1611 }
1612 }
1613 return 1;
1614 case GRPC_CHTTP2_STREAM_ERROR:
1615 become_skip_parser(t);
1616 cancel_stream_id(
1617 t, t->incoming_stream_id,
1618 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
1619 GRPC_CHTTP2_INTERNAL_ERROR, 1);
1620 return 1;
1621 case GRPC_CHTTP2_CONNECTION_ERROR:
1622 drop_connection(t);
1623 return 0;
1624 }
1625 gpr_log(GPR_ERROR, "should never reach here");
1626 abort();
1627 return 0;
1628}
1629
1630static int process_read(transport *t, gpr_slice slice) {
1631 gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
1632 gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
1633 gpr_uint8 *cur = beg;
1634
1635 if (cur == end) return 1;
1636
1637 switch (t->deframe_state) {
1638 case DTS_CLIENT_PREFIX_0:
1639 case DTS_CLIENT_PREFIX_1:
1640 case DTS_CLIENT_PREFIX_2:
1641 case DTS_CLIENT_PREFIX_3:
1642 case DTS_CLIENT_PREFIX_4:
1643 case DTS_CLIENT_PREFIX_5:
1644 case DTS_CLIENT_PREFIX_6:
1645 case DTS_CLIENT_PREFIX_7:
1646 case DTS_CLIENT_PREFIX_8:
1647 case DTS_CLIENT_PREFIX_9:
1648 case DTS_CLIENT_PREFIX_10:
1649 case DTS_CLIENT_PREFIX_11:
1650 case DTS_CLIENT_PREFIX_12:
1651 case DTS_CLIENT_PREFIX_13:
1652 case DTS_CLIENT_PREFIX_14:
1653 case DTS_CLIENT_PREFIX_15:
1654 case DTS_CLIENT_PREFIX_16:
1655 case DTS_CLIENT_PREFIX_17:
1656 case DTS_CLIENT_PREFIX_18:
1657 case DTS_CLIENT_PREFIX_19:
1658 case DTS_CLIENT_PREFIX_20:
1659 case DTS_CLIENT_PREFIX_21:
1660 case DTS_CLIENT_PREFIX_22:
1661 case DTS_CLIENT_PREFIX_23:
1662 while (cur != end && t->deframe_state != DTS_FH_0) {
1663 if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) {
1664 gpr_log(GPR_ERROR,
1665 "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
1666 "at byte %d",
1667 CLIENT_CONNECT_STRING[t->deframe_state],
Craig Tiller5c019ae2015-04-17 16:46:53 -07001668 (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur,
1669 (int)*cur, t->deframe_state);
Craig Tiller5246e7a2015-01-19 14:59:08 -08001670 drop_connection(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001671 return 0;
1672 }
1673 ++cur;
1674 ++t->deframe_state;
1675 }
1676 if (cur == end) {
1677 return 1;
1678 }
1679 /* fallthrough */
1680 dts_fh_0:
1681 case DTS_FH_0:
1682 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001683 t->incoming_frame_size = ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001684 if (++cur == end) {
1685 t->deframe_state = DTS_FH_1;
1686 return 1;
1687 }
1688 /* fallthrough */
1689 case DTS_FH_1:
1690 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001691 t->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001692 if (++cur == end) {
1693 t->deframe_state = DTS_FH_2;
1694 return 1;
1695 }
1696 /* fallthrough */
1697 case DTS_FH_2:
1698 GPR_ASSERT(cur < end);
1699 t->incoming_frame_size |= *cur;
1700 if (++cur == end) {
1701 t->deframe_state = DTS_FH_3;
1702 return 1;
1703 }
1704 /* fallthrough */
1705 case DTS_FH_3:
1706 GPR_ASSERT(cur < end);
1707 t->incoming_frame_type = *cur;
1708 if (++cur == end) {
1709 t->deframe_state = DTS_FH_4;
1710 return 1;
1711 }
1712 /* fallthrough */
1713 case DTS_FH_4:
1714 GPR_ASSERT(cur < end);
1715 t->incoming_frame_flags = *cur;
1716 if (++cur == end) {
1717 t->deframe_state = DTS_FH_5;
1718 return 1;
1719 }
1720 /* fallthrough */
1721 case DTS_FH_5:
1722 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001723 t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001724 if (++cur == end) {
1725 t->deframe_state = DTS_FH_6;
1726 return 1;
1727 }
1728 /* fallthrough */
1729 case DTS_FH_6:
1730 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001731 t->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001732 if (++cur == end) {
1733 t->deframe_state = DTS_FH_7;
1734 return 1;
1735 }
1736 /* fallthrough */
1737 case DTS_FH_7:
1738 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001739 t->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001740 if (++cur == end) {
1741 t->deframe_state = DTS_FH_8;
1742 return 1;
1743 }
1744 /* fallthrough */
1745 case DTS_FH_8:
1746 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001747 t->incoming_stream_id |= ((gpr_uint32)*cur);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001748 t->deframe_state = DTS_FRAME;
1749 if (!init_frame_parser(t)) {
1750 return 0;
1751 }
Tatsuhiro Tsujikawa1cbf8d72015-03-13 23:59:40 +09001752 /* t->last_incoming_stream_id is used as last-stream-id when
1753 sending GOAWAY frame.
1754 https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
1755 says that last-stream-id is peer-initiated stream ID. So,
1756 since we don't have server pushed streams, client should send
1757 GOAWAY last-stream-id=0 in this case. */
Tatsuhiro Tsujikawad11f6102015-03-12 22:57:22 +09001758 if (!t->is_client) {
1759 t->last_incoming_stream_id = t->incoming_stream_id;
1760 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001761 if (t->incoming_frame_size == 0) {
1762 if (!parse_frame_slice(t, gpr_empty_slice(), 1)) {
1763 return 0;
1764 }
1765 if (++cur == end) {
1766 t->deframe_state = DTS_FH_0;
1767 return 1;
1768 }
1769 goto dts_fh_0; /* loop */
1770 }
1771 if (++cur == end) {
1772 return 1;
1773 }
1774 /* fallthrough */
1775 case DTS_FRAME:
1776 GPR_ASSERT(cur < end);
Craig Tiller54f9a652015-02-19 21:41:20 -08001777 if ((gpr_uint32)(end - cur) == t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001778 if (!parse_frame_slice(
1779 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
1780 return 0;
1781 }
1782 t->deframe_state = DTS_FH_0;
1783 return 1;
Craig Tiller0c0b60c2015-01-21 15:49:28 -08001784 } else if ((gpr_uint32)(end - cur) > t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001785 if (!parse_frame_slice(
1786 t, gpr_slice_sub_no_ref(slice, cur - beg,
1787 cur + t->incoming_frame_size - beg),
1788 1)) {
1789 return 0;
1790 }
1791 cur += t->incoming_frame_size;
1792 goto dts_fh_0; /* loop */
1793 } else {
1794 if (!parse_frame_slice(
1795 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
1796 return 0;
1797 }
1798 t->incoming_frame_size -= (end - cur);
1799 return 1;
1800 }
1801 gpr_log(GPR_ERROR, "should never reach here");
1802 abort();
1803 }
1804
1805 gpr_log(GPR_ERROR, "should never reach here");
1806 abort();
Nicolas "Pixel" Noble7f13eb22015-04-01 20:57:33 -07001807
1808 return 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001809}
1810
1811/* tcp read callback */
1812static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
1813 grpc_endpoint_cb_status error) {
1814 transport *t = tp;
1815 size_t i;
1816 int keep_reading = 0;
1817
1818 switch (error) {
1819 case GRPC_ENDPOINT_CB_SHUTDOWN:
1820 case GRPC_ENDPOINT_CB_EOF:
1821 case GRPC_ENDPOINT_CB_ERROR:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001822 lock(t);
1823 drop_connection(t);
1824 t->reading = 0;
1825 if (!t->writing && t->ep) {
1826 grpc_endpoint_destroy(t->ep);
1827 t->ep = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001828 unref_transport(t); /* safe as we still have a ref for read */
1829 }
1830 unlock(t);
1831 unref_transport(t);
1832 break;
1833 case GRPC_ENDPOINT_CB_OK:
1834 lock(t);
1835 for (i = 0; i < nslices && process_read(t, slices[i]); i++)
1836 ;
1837 unlock(t);
1838 keep_reading = 1;
1839 break;
1840 }
1841
1842 for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
1843
1844 if (keep_reading) {
ctiller58393c22015-01-07 14:03:30 -08001845 grpc_endpoint_notify_on_read(t->ep, recv_data, t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001846 }
1847}
1848
1849/*
1850 * CALLBACK LOOP
1851 */
1852
1853static grpc_stream_state compute_state(gpr_uint8 write_closed,
1854 gpr_uint8 read_closed) {
1855 if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
1856 if (write_closed) return GRPC_STREAM_SEND_CLOSED;
1857 if (read_closed) return GRPC_STREAM_RECV_CLOSED;
1858 return GRPC_STREAM_OPEN;
1859}
1860
Craig Tillerc079c112015-04-22 15:23:39 -07001861static void finish_reads(transport *t) {
1862 stream *s;
1863
1864 while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
1865 int publish = 0;
1866 GPR_ASSERT(s->incoming_sopb);
Craig Tiller06aeea72015-04-23 10:54:45 -07001867 *s->publish_state =
1868 compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
Craig Tillerc079c112015-04-22 15:23:39 -07001869 if (*s->publish_state != s->published_state) {
1870 s->published_state = *s->publish_state;
1871 publish = 1;
1872 }
1873 if (s->parser.incoming_sopb.nops > 0) {
1874 grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
1875 publish = 1;
1876 }
1877 if (publish) {
Craig Tiller7e8489a2015-04-23 12:41:16 -07001878 s->incoming_sopb = NULL;
Craig Tillerc079c112015-04-22 15:23:39 -07001879 schedule_cb(t, s->recv_done_closure, 1);
1880 }
1881 }
1882}
1883
1884static void schedule_cb(transport *t, op_closure closure, int success) {
1885 if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
Craig Tiller06aeea72015-04-23 10:54:45 -07001886 t->pending_callbacks.capacity =
1887 GPR_MAX(t->pending_callbacks.capacity * 2, 8);
1888 t->pending_callbacks.callbacks =
1889 gpr_realloc(t->pending_callbacks.callbacks,
1890 t->pending_callbacks.capacity *
1891 sizeof(*t->pending_callbacks.callbacks));
Craig Tillerc079c112015-04-22 15:23:39 -07001892 }
1893 closure.success = success;
1894 t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
1895}
1896
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001897static int prepare_callbacks(transport *t) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001898 op_closure_array temp = t->pending_callbacks;
1899 t->pending_callbacks = t->executing_callbacks;
1900 t->executing_callbacks = temp;
1901 return t->executing_callbacks.count > 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001902}
1903
Craig Tillerd1345de2015-02-24 21:55:20 -08001904static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001905 size_t i;
1906 for (i = 0; i < t->executing_callbacks.count; i++) {
1907 op_closure c = t->executing_callbacks.callbacks[i];
Craig Tillerc079c112015-04-22 15:23:39 -07001908 c.cb(c.user_data, c.success);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001909 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001910 t->executing_callbacks.count = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001911}
1912
Craig Tiller748fe3f2015-03-02 07:48:50 -08001913static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
1914 cb->closed(t->cb_user_data, &t->base);
1915}
1916
Craig Tillerc079c112015-04-22 15:23:39 -07001917/*
1918 * POLLSET STUFF
1919 */
1920
1921static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
ctillerd79b4862014-12-17 16:36:59 -08001922 if (t->ep) {
1923 grpc_endpoint_add_to_pollset(t->ep, pollset);
1924 }
Craig Tillerc079c112015-04-22 15:23:39 -07001925}
1926
1927static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
1928 transport *t = (transport *)gt;
1929 lock(t);
1930 add_to_pollset_locked(t, pollset);
ctillerd79b4862014-12-17 16:36:59 -08001931 unlock(t);
1932}
1933
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001934/*
1935 * INTEGRATION GLUE
1936 */
1937
1938static const grpc_transport_vtable vtable = {
Craig Tiller06aeea72015-04-23 10:54:45 -07001939 sizeof(stream), init_stream, perform_op,
1940 add_to_pollset, destroy_stream, goaway,
1941 close_transport, send_ping, destroy_transport};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001942
1943void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
1944 void *arg,
1945 const grpc_channel_args *channel_args,
1946 grpc_endpoint *ep, gpr_slice *slices,
1947 size_t nslices, grpc_mdctx *mdctx,
1948 int is_client) {
1949 transport *t = gpr_malloc(sizeof(transport));
Nicolas Noble5ea99bb2015-02-04 14:13:09 -08001950 init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx,
1951 is_client);
Craig Tiller190d3602015-02-18 09:23:38 -08001952}