blob: 7a7c2bdfd48a91f2c9e6982604ed468b62cc2ba5 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/transport/chttp2_transport.h"
35
36#include <math.h>
37#include <stdio.h>
38#include <string.h>
39
Craig Tiller485d7762015-01-23 12:54:05 -080040#include "src/core/support/string.h"
nnoble0c475f02014-12-05 15:37:39 -080041#include "src/core/transport/chttp2/frame_data.h"
42#include "src/core/transport/chttp2/frame_goaway.h"
43#include "src/core/transport/chttp2/frame_ping.h"
44#include "src/core/transport/chttp2/frame_rst_stream.h"
45#include "src/core/transport/chttp2/frame_settings.h"
46#include "src/core/transport/chttp2/frame_window_update.h"
47#include "src/core/transport/chttp2/hpack_parser.h"
48#include "src/core/transport/chttp2/http2_errors.h"
49#include "src/core/transport/chttp2/status_conversion.h"
50#include "src/core/transport/chttp2/stream_encoder.h"
51#include "src/core/transport/chttp2/stream_map.h"
52#include "src/core/transport/chttp2/timeout_encoding.h"
53#include "src/core/transport/transport_impl.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080054#include <grpc/support/alloc.h>
55#include <grpc/support/log.h>
56#include <grpc/support/slice_buffer.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080057#include <grpc/support/useful.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080058
ctiller493fbcc2014-12-07 15:09:10 -080059#define DEFAULT_WINDOW 65535
60#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080061#define MAX_WINDOW 0x7fffffffu
62
63#define CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
64#define CLIENT_CONNECT_STRLEN 24
65
Craig Tillerfaa84802015-03-01 21:56:38 -080066int grpc_http_trace = 0;
67
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080068typedef struct transport transport;
69typedef struct stream stream;
70
Craig Tiller5c019ae2015-04-17 16:46:53 -070071#define IF_TRACING(stmt) \
72 if (!(grpc_http_trace)) \
73 ; \
74 else \
Craig Tillerd50e5652015-02-24 16:46:22 -080075 stmt
76
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080077/* streams are kept in various linked lists depending on what things need to
78 happen to them... this enum labels each list */
79typedef enum {
80 /* streams that have pending writes */
81 WRITABLE = 0,
ctiller00297df2015-01-12 11:23:09 -080082 /* streams that have been selected to be written */
83 WRITING,
84 /* streams that have just been written, and included a close */
85 WRITTEN_CLOSED,
86 /* streams that have been cancelled and have some pending state updates
87 to perform */
88 CANCELLED,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080089 /* streams that want to send window updates */
90 WINDOW_UPDATE,
91 /* streams that are waiting to start because there are too many concurrent
92 streams on the connection */
93 WAITING_FOR_CONCURRENCY,
Craig Tillerc079c112015-04-22 15:23:39 -070094 /* streams that have finished reading: we wait until unlock to coalesce
95 all changes into one callback */
96 FINISHED_READ_OP,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080097 STREAM_LIST_COUNT /* must be last */
98} stream_list_id;
99
100/* deframer state for the overall http2 stream of bytes */
101typedef enum {
102 /* prefix: one entry per http2 connection prefix byte */
103 DTS_CLIENT_PREFIX_0 = 0,
104 DTS_CLIENT_PREFIX_1,
105 DTS_CLIENT_PREFIX_2,
106 DTS_CLIENT_PREFIX_3,
107 DTS_CLIENT_PREFIX_4,
108 DTS_CLIENT_PREFIX_5,
109 DTS_CLIENT_PREFIX_6,
110 DTS_CLIENT_PREFIX_7,
111 DTS_CLIENT_PREFIX_8,
112 DTS_CLIENT_PREFIX_9,
113 DTS_CLIENT_PREFIX_10,
114 DTS_CLIENT_PREFIX_11,
115 DTS_CLIENT_PREFIX_12,
116 DTS_CLIENT_PREFIX_13,
117 DTS_CLIENT_PREFIX_14,
118 DTS_CLIENT_PREFIX_15,
119 DTS_CLIENT_PREFIX_16,
120 DTS_CLIENT_PREFIX_17,
121 DTS_CLIENT_PREFIX_18,
122 DTS_CLIENT_PREFIX_19,
123 DTS_CLIENT_PREFIX_20,
124 DTS_CLIENT_PREFIX_21,
125 DTS_CLIENT_PREFIX_22,
126 DTS_CLIENT_PREFIX_23,
127 /* frame header byte 0... */
128 /* must follow from the prefix states */
129 DTS_FH_0,
130 DTS_FH_1,
131 DTS_FH_2,
132 DTS_FH_3,
133 DTS_FH_4,
134 DTS_FH_5,
135 DTS_FH_6,
136 DTS_FH_7,
137 /* ... frame header byte 8 */
138 DTS_FH_8,
139 /* inside a http2 frame */
140 DTS_FRAME
141} deframe_transport_state;
142
Craig Tillerc079c112015-04-22 15:23:39 -0700143typedef enum {
144 WRITE_STATE_OPEN,
145 WRITE_STATE_QUEUED_CLOSE,
146 WRITE_STATE_SENT_CLOSE
147} WRITE_STATE;
148
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800149typedef struct {
150 stream *head;
151 stream *tail;
152} stream_list;
153
154typedef struct {
155 stream *next;
156 stream *prev;
157} stream_link;
158
159typedef enum {
160 ERROR_STATE_NONE,
161 ERROR_STATE_SEEN,
162 ERROR_STATE_NOTIFIED
163} error_state;
164
165/* We keep several sets of connection wide parameters */
166typedef enum {
167 /* The settings our peer has asked for (and we have acked) */
168 PEER_SETTINGS = 0,
169 /* The settings we'd like to have */
170 LOCAL_SETTINGS,
171 /* The settings we've published to our peer */
172 SENT_SETTINGS,
173 /* The settings the peer has acked */
174 ACKED_SETTINGS,
175 NUM_SETTING_SETS
176} setting_set;
177
178/* Outstanding ping request data */
179typedef struct {
180 gpr_uint8 id[8];
181 void (*cb)(void *user_data);
182 void *user_data;
183} outstanding_ping;
184
nnoble0c475f02014-12-05 15:37:39 -0800185typedef struct {
186 grpc_status_code status;
187 gpr_slice debug;
188} pending_goaway;
189
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700190typedef struct {
191 void (*cb)(void *user_data, int success);
192 void *user_data;
Craig Tillerc079c112015-04-22 15:23:39 -0700193 int success;
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700194} op_closure;
195
196typedef struct {
197 op_closure *callbacks;
198 size_t count;
199 size_t capacity;
200} op_closure_array;
201
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202struct transport {
203 grpc_transport base; /* must be first */
204 const grpc_transport_callbacks *cb;
205 void *cb_user_data;
206 grpc_endpoint *ep;
207 grpc_mdctx *metadata_context;
208 gpr_refcount refs;
209 gpr_uint8 is_client;
210
211 gpr_mu mu;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800212 gpr_cv cv;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800213
214 /* basic state management - what are we doing at the moment? */
215 gpr_uint8 reading;
216 gpr_uint8 writing;
217 gpr_uint8 calling_back;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800218 gpr_uint8 destroying;
Craig Tillerd75fe662015-02-21 07:30:49 -0800219 gpr_uint8 closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800220 error_state error_state;
221
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700222 /* queued callbacks */
223 op_closure_array pending_callbacks;
224 op_closure_array executing_callbacks;
225
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800226 /* stream indexing */
227 gpr_uint32 next_stream_id;
nnoble0c475f02014-12-05 15:37:39 -0800228 gpr_uint32 last_incoming_stream_id;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800229
230 /* settings */
231 gpr_uint32 settings[NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
ctiller493fbcc2014-12-07 15:09:10 -0800232 gpr_uint32 force_send_settings; /* bitmask of setting indexes to send out */
233 gpr_uint8 sent_local_settings; /* have local settings been sent? */
234 gpr_uint8 dirtied_local_settings; /* are the local settings dirty? */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800235
236 /* window management */
237 gpr_uint32 outgoing_window;
238 gpr_uint32 incoming_window;
ctiller493fbcc2014-12-07 15:09:10 -0800239 gpr_uint32 connection_window_target;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800240
241 /* deframing */
242 deframe_transport_state deframe_state;
243 gpr_uint8 incoming_frame_type;
244 gpr_uint8 incoming_frame_flags;
245 gpr_uint8 header_eof;
246 gpr_uint32 expect_continuation_stream_id;
247 gpr_uint32 incoming_frame_size;
248 gpr_uint32 incoming_stream_id;
249
250 /* hpack encoding */
251 grpc_chttp2_hpack_compressor hpack_compressor;
252
253 /* various parsers */
254 grpc_chttp2_hpack_parser hpack_parser;
255 /* simple one shot parsers */
256 union {
257 grpc_chttp2_window_update_parser window_update;
258 grpc_chttp2_settings_parser settings;
259 grpc_chttp2_ping_parser ping;
260 } simple_parsers;
261
nnoble0c475f02014-12-05 15:37:39 -0800262 /* goaway */
263 grpc_chttp2_goaway_parser goaway_parser;
264 pending_goaway *pending_goaways;
265 size_t num_pending_goaways;
266 size_t cap_pending_goaways;
267
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800268 /* state for a stream that's not yet been created */
269 grpc_stream_op_buffer new_stream_sopb;
270
Craig Tillercb818ba2015-01-29 17:08:01 -0800271 /* stream ops that need to be destroyed, but outside of the lock */
272 grpc_stream_op_buffer nuke_later_sopb;
273
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800274 /* active parser */
275 void *parser_data;
276 stream *incoming_stream;
277 grpc_chttp2_parse_error (*parser)(void *parser_user_data,
278 grpc_chttp2_parse_state *state,
279 gpr_slice slice, int is_last);
280
281 gpr_slice_buffer outbuf;
282 gpr_slice_buffer qbuf;
283
284 stream_list lists[STREAM_LIST_COUNT];
285 grpc_chttp2_stream_map stream_map;
286
287 /* metadata object cache */
288 grpc_mdstr *str_grpc_timeout;
289
290 /* pings */
291 outstanding_ping *pings;
292 size_t ping_count;
293 size_t ping_capacity;
294 gpr_int64 ping_counter;
295};
296
297struct stream {
298 gpr_uint32 id;
299
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800300 gpr_uint32 incoming_window;
Craig Tiller84b88842015-04-20 08:47:52 -0700301 gpr_int64 outgoing_window;
ctiller00297df2015-01-12 11:23:09 -0800302 /* when the application requests writes be closed, the write_closed is
303 'queued'; when the close is flow controlled into the send path, we are
304 'sending' it; when the write has been performed it is 'sent' */
Craig Tillerc079c112015-04-22 15:23:39 -0700305 WRITE_STATE write_state;
306 gpr_uint8 send_closed;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800307 gpr_uint8 read_closed;
308 gpr_uint8 cancelled;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800309 gpr_uint8 published_close;
310
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700311 op_closure send_done_closure;
312 op_closure recv_done_closure;
313
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800314 stream_link links[STREAM_LIST_COUNT];
315 gpr_uint8 included[STREAM_LIST_COUNT];
316
Craig Tiller9c1043e2015-04-16 16:20:38 -0700317 /* incoming metadata */
318 grpc_linked_mdelem *incoming_metadata;
319 size_t incoming_metadata_count;
320 size_t incoming_metadata_capacity;
321 gpr_timespec incoming_deadline;
322
ctiller00297df2015-01-12 11:23:09 -0800323 /* sops from application */
Craig Tillerc079c112015-04-22 15:23:39 -0700324 grpc_stream_op_buffer *outgoing_sopb;
325 grpc_stream_op_buffer *incoming_sopb;
326 grpc_stream_state *publish_state;
327 grpc_stream_state published_state;
ctiller00297df2015-01-12 11:23:09 -0800328 /* sops that have passed flow control to be written */
329 grpc_stream_op_buffer writing_sopb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800330
331 grpc_chttp2_data_parser parser;
332
333 grpc_stream_state callback_state;
334 grpc_stream_op_buffer callback_sopb;
335};
336
337static const grpc_transport_vtable vtable;
338
339static void push_setting(transport *t, grpc_chttp2_setting_id id,
340 gpr_uint32 value);
341
342static int prepare_callbacks(transport *t);
Craig Tillerd1345de2015-02-24 21:55:20 -0800343static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
Craig Tiller748fe3f2015-03-02 07:48:50 -0800344static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800345
346static int prepare_write(transport *t);
ctiller00297df2015-01-12 11:23:09 -0800347static void perform_write(transport *t, grpc_endpoint *ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800348
349static void lock(transport *t);
350static void unlock(transport *t);
351
352static void drop_connection(transport *t);
353static void end_all_the_calls(transport *t);
354
355static stream *stream_list_remove_head(transport *t, stream_list_id id);
356static void stream_list_remove(transport *t, stream *s, stream_list_id id);
357static void stream_list_add_tail(transport *t, stream *s, stream_list_id id);
358static void stream_list_join(transport *t, stream *s, stream_list_id id);
359
360static void cancel_stream_id(transport *t, gpr_uint32 id,
361 grpc_status_code local_status,
362 grpc_chttp2_error_code error_code, int send_rst);
363static void cancel_stream(transport *t, stream *s,
364 grpc_status_code local_status,
365 grpc_chttp2_error_code error_code, int send_rst);
ctiller00297df2015-01-12 11:23:09 -0800366static void finalize_cancellations(transport *t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367static stream *lookup_stream(transport *t, gpr_uint32 id);
368static void remove_from_stream_map(transport *t, stream *s);
369static void maybe_start_some_streams(transport *t);
370
371static void become_skip_parser(transport *t);
372
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800373static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
374 grpc_endpoint_cb_status error);
375
Craig Tillerc079c112015-04-22 15:23:39 -0700376static void schedule_cb(transport *t, op_closure closure, int success);
377static void maybe_finish_read(transport *t, stream *s);
378static void maybe_join_window_updates(transport *t, stream *s);
379static void finish_reads(transport *t);
380static void add_to_pollset_locked(transport *t, grpc_pollset *pollset);
Craig Tiller50d9db52015-04-23 10:52:14 -0700381static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op);
Craig Tillerc079c112015-04-22 15:23:39 -0700382
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800383/*
384 * CONSTRUCTION/DESTRUCTION/REFCOUNTING
385 */
386
Craig Tiller9be83ee2015-02-18 14:16:15 -0800387static void destruct_transport(transport *t) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800388 size_t i;
389
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800390 gpr_mu_lock(&t->mu);
391
392 GPR_ASSERT(t->ep == NULL);
393
394 gpr_slice_buffer_destroy(&t->outbuf);
395 gpr_slice_buffer_destroy(&t->qbuf);
396 grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
397 grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
nnoble0c475f02014-12-05 15:37:39 -0800398 grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800399
400 grpc_mdstr_unref(t->str_grpc_timeout);
401
402 for (i = 0; i < STREAM_LIST_COUNT; i++) {
403 GPR_ASSERT(t->lists[i].head == NULL);
404 GPR_ASSERT(t->lists[i].tail == NULL);
405 }
406
407 GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
408
409 grpc_chttp2_stream_map_destroy(&t->stream_map);
410
411 gpr_mu_unlock(&t->mu);
412 gpr_mu_destroy(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800413 gpr_cv_destroy(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800414
415 /* callback remaining pings: they're not allowed to call into the transpot,
416 and maybe they hold resources that need to be freed */
417 for (i = 0; i < t->ping_count; i++) {
418 t->pings[i].cb(t->pings[i].user_data);
419 }
420 gpr_free(t->pings);
421
nnoble0c475f02014-12-05 15:37:39 -0800422 for (i = 0; i < t->num_pending_goaways; i++) {
423 gpr_slice_unref(t->pending_goaways[i].debug);
424 }
425 gpr_free(t->pending_goaways);
426
Craig Tiller8ed35ea2015-01-30 11:27:43 -0800427 grpc_sopb_destroy(&t->nuke_later_sopb);
428
Craig Tiller9be83ee2015-02-18 14:16:15 -0800429 grpc_mdctx_unref(t->metadata_context);
430
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800431 gpr_free(t);
432}
433
Craig Tiller9be83ee2015-02-18 14:16:15 -0800434static void unref_transport(transport *t) {
435 if (!gpr_unref(&t->refs)) return;
436 destruct_transport(t);
437}
438
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800439static void ref_transport(transport *t) { gpr_ref(&t->refs); }
440
441static void init_transport(transport *t, grpc_transport_setup_callback setup,
442 void *arg, const grpc_channel_args *channel_args,
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800443 grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
444 grpc_mdctx *mdctx, int is_client) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800445 size_t i;
446 int j;
447 grpc_transport_setup_result sr;
448
449 GPR_ASSERT(strlen(CLIENT_CONNECT_STRING) == CLIENT_CONNECT_STRLEN);
450
Craig Tillerbe18b8d2015-04-22 14:00:47 -0700451 memset(t, 0, sizeof(*t));
452
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800453 t->base.vtable = &vtable;
454 t->ep = ep;
455 /* one ref is for destroy, the other for when ep becomes NULL */
456 gpr_ref_init(&t->refs, 2);
457 gpr_mu_init(&t->mu);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800458 gpr_cv_init(&t->cv);
Craig Tiller9be83ee2015-02-18 14:16:15 -0800459 grpc_mdctx_ref(mdctx);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800460 t->metadata_context = mdctx;
461 t->str_grpc_timeout =
462 grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
463 t->reading = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800464 t->error_state = ERROR_STATE_NONE;
465 t->next_stream_id = is_client ? 1 : 2;
466 t->is_client = is_client;
467 t->outgoing_window = DEFAULT_WINDOW;
468 t->incoming_window = DEFAULT_WINDOW;
ctiller493fbcc2014-12-07 15:09:10 -0800469 t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800470 t->deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800471 t->ping_counter = gpr_now().tv_nsec;
472 grpc_chttp2_hpack_compressor_init(&t->hpack_compressor, mdctx);
nnoble0c475f02014-12-05 15:37:39 -0800473 grpc_chttp2_goaway_parser_init(&t->goaway_parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800474 gpr_slice_buffer_init(&t->outbuf);
475 gpr_slice_buffer_init(&t->qbuf);
Craig Tillercb818ba2015-01-29 17:08:01 -0800476 grpc_sopb_init(&t->nuke_later_sopb);
Nicolas Noble5ea99bb2015-02-04 14:13:09 -0800477 grpc_chttp2_hpack_parser_init(&t->hpack_parser, t->metadata_context);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800478 if (is_client) {
479 gpr_slice_buffer_add(&t->qbuf,
480 gpr_slice_from_copied_string(CLIENT_CONNECT_STRING));
481 }
482 /* 8 is a random stab in the dark as to a good initial size: it's small enough
483 that it shouldn't waste memory for infrequently used connections, yet
484 large enough that the exponential growth should happen nicely when it's
485 needed.
486 TODO(ctiller): tune this */
487 grpc_chttp2_stream_map_init(&t->stream_map, 8);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800488
489 /* copy in initial settings to all setting sets */
490 for (i = 0; i < NUM_SETTING_SETS; i++) {
491 for (j = 0; j < GRPC_CHTTP2_NUM_SETTINGS; j++) {
492 t->settings[i][j] = grpc_chttp2_settings_parameters[j].default_value;
493 }
494 }
495 t->dirtied_local_settings = 1;
ctiller493fbcc2014-12-07 15:09:10 -0800496 /* Hack: it's common for implementations to assume 65536 bytes initial send
497 window -- this should by rights be 0 */
498 t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800499 t->sent_local_settings = 0;
500
501 /* configure http2 the way we like it */
502 if (t->is_client) {
503 push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
504 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
505 }
ctiller493fbcc2014-12-07 15:09:10 -0800506 push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800507
508 if (channel_args) {
509 for (i = 0; i < channel_args->num_args; i++) {
510 if (0 ==
511 strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
512 if (t->is_client) {
513 gpr_log(GPR_ERROR, "%s: is ignored on the client",
514 GRPC_ARG_MAX_CONCURRENT_STREAMS);
515 } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
516 gpr_log(GPR_ERROR, "%s: must be an integer",
517 GRPC_ARG_MAX_CONCURRENT_STREAMS);
518 } else {
519 push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
520 channel_args->args[i].value.integer);
521 }
522 }
523 }
524 }
525
526 gpr_mu_lock(&t->mu);
527 t->calling_back = 1;
Craig Tiller06aeea72015-04-23 10:54:45 -0700528 ref_transport(t); /* matches unref at end of this function */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800529 gpr_mu_unlock(&t->mu);
530
531 sr = setup(arg, &t->base, t->metadata_context);
532
533 lock(t);
534 t->cb = sr.callbacks;
535 t->cb_user_data = sr.user_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800536 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800537 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800538 unlock(t);
Craig Tillerdcf9c0e2015-02-11 16:12:41 -0800539
Craig Tiller06aeea72015-04-23 10:54:45 -0700540 ref_transport(t); /* matches unref inside recv_data */
Craig Tillerdcf9c0e2015-02-11 16:12:41 -0800541 recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
542
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800543 unref_transport(t);
544}
545
546static void destroy_transport(grpc_transport *gt) {
547 transport *t = (transport *)gt;
548
Craig Tiller748fe3f2015-03-02 07:48:50 -0800549 lock(t);
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800550 t->destroying = 1;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000551 /* Wait for pending stuff to finish.
552 We need to be not calling back to ensure that closed() gets a chance to
553 trigger if needed during unlock() before we die.
554 We need to be not writing as cancellation finalization may produce some
555 callbacks that NEED to be made to close out some streams when t->writing
556 becomes 0. */
557 while (t->calling_back || t->writing) {
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800558 gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
559 }
Craig Tiller748fe3f2015-03-02 07:48:50 -0800560 drop_connection(t);
561 unlock(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800562
Craig Tillerbb88a042015-03-02 10:56:33 -0800563 /* The drop_connection() above puts the transport into an error state, and
564 the follow-up unlock should then (as part of the cleanup work it does)
565 ensure that cb is NULL, and therefore not call back anything further.
566 This check validates this very subtle behavior.
567 It's shutdown path, so I don't believe an extra lock pair is going to be
568 problematic for performance. */
Craig Tillerb9eb1802015-03-02 16:41:32 +0000569 lock(t);
570 GPR_ASSERT(!t->cb);
571 unlock(t);
572
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800573 unref_transport(t);
574}
575
576static void close_transport(grpc_transport *gt) {
577 transport *t = (transport *)gt;
578 gpr_mu_lock(&t->mu);
Craig Tillerd75fe662015-02-21 07:30:49 -0800579 GPR_ASSERT(!t->closed);
580 t->closed = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800581 if (t->ep) {
582 grpc_endpoint_shutdown(t->ep);
583 }
584 gpr_mu_unlock(&t->mu);
585}
586
nnoble0c475f02014-12-05 15:37:39 -0800587static void goaway(grpc_transport *gt, grpc_status_code status,
588 gpr_slice debug_data) {
589 transport *t = (transport *)gt;
590 lock(t);
591 grpc_chttp2_goaway_append(t->last_incoming_stream_id,
592 grpc_chttp2_grpc_status_to_http2_error(status),
593 debug_data, &t->qbuf);
594 unlock(t);
595}
596
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800597static int init_stream(grpc_transport *gt, grpc_stream *gs,
Craig Tiller50d9db52015-04-23 10:52:14 -0700598 const void *server_data, grpc_transport_op *initial_op) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800599 transport *t = (transport *)gt;
600 stream *s = (stream *)gs;
601
Craig Tillerc079c112015-04-22 15:23:39 -0700602 memset(s, 0, sizeof(*s));
603
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800604 ref_transport(t);
605
606 if (!server_data) {
607 lock(t);
608 s->id = 0;
609 } else {
Craig Tiller3f2c2212015-04-23 07:56:33 -0700610 /* already locked */
Craig Tiller5c019ae2015-04-17 16:46:53 -0700611 s->id = (gpr_uint32)(gpr_uintptr)server_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800612 t->incoming_stream = s;
613 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
614 }
615
ctiller493fbcc2014-12-07 15:09:10 -0800616 s->outgoing_window =
617 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
618 s->incoming_window =
619 t->settings[SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
Craig Tiller9c1043e2015-04-16 16:20:38 -0700620 s->incoming_deadline = gpr_inf_future;
ctiller00297df2015-01-12 11:23:09 -0800621 grpc_sopb_init(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800622 grpc_sopb_init(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800623 grpc_chttp2_data_parser_init(&s->parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800624
Craig Tiller50d9db52015-04-23 10:52:14 -0700625 if (initial_op) perform_op_locked(t, s, initial_op);
626
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800627 if (!server_data) {
628 unlock(t);
629 }
630
631 return 0;
632}
633
Craig Tillercb818ba2015-01-29 17:08:01 -0800634static void schedule_nuke_sopb(transport *t, grpc_stream_op_buffer *sopb) {
635 grpc_sopb_append(&t->nuke_later_sopb, sopb->ops, sopb->nops);
636 sopb->nops = 0;
637}
638
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800639static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
640 transport *t = (transport *)gt;
641 stream *s = (stream *)gs;
642 size_t i;
643
644 gpr_mu_lock(&t->mu);
645
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800646 /* stop parsing if we're currently parsing this stream */
647 if (t->deframe_state == DTS_FRAME && t->incoming_stream_id == s->id &&
648 s->id != 0) {
649 become_skip_parser(t);
650 }
651
652 for (i = 0; i < STREAM_LIST_COUNT; i++) {
653 stream_list_remove(t, s, i);
654 }
655 remove_from_stream_map(t, s);
656
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800657 gpr_mu_unlock(&t->mu);
658
Craig Tillerc079c112015-04-22 15:23:39 -0700659 GPR_ASSERT(s->outgoing_sopb == NULL);
ctiller00297df2015-01-12 11:23:09 -0800660 grpc_sopb_destroy(&s->writing_sopb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800661 grpc_sopb_destroy(&s->callback_sopb);
ctiller00297df2015-01-12 11:23:09 -0800662 grpc_chttp2_data_parser_destroy(&s->parser);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800663
664 unref_transport(t);
665}
666
667/*
668 * LIST MANAGEMENT
669 */
670
ctiller00297df2015-01-12 11:23:09 -0800671static int stream_list_empty(transport *t, stream_list_id id) {
672 return t->lists[id].head == NULL;
673}
674
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800675static stream *stream_list_remove_head(transport *t, stream_list_id id) {
676 stream *s = t->lists[id].head;
677 if (s) {
678 stream *new_head = s->links[id].next;
679 GPR_ASSERT(s->included[id]);
680 if (new_head) {
681 t->lists[id].head = new_head;
682 new_head->links[id].prev = NULL;
683 } else {
684 t->lists[id].head = NULL;
685 t->lists[id].tail = NULL;
686 }
687 s->included[id] = 0;
688 }
689 return s;
690}
691
692static void stream_list_remove(transport *t, stream *s, stream_list_id id) {
693 if (!s->included[id]) return;
694 s->included[id] = 0;
695 if (s->links[id].prev) {
696 s->links[id].prev->links[id].next = s->links[id].next;
697 } else {
698 GPR_ASSERT(t->lists[id].head == s);
699 t->lists[id].head = s->links[id].next;
700 }
701 if (s->links[id].next) {
702 s->links[id].next->links[id].prev = s->links[id].prev;
703 } else {
704 t->lists[id].tail = s->links[id].prev;
705 }
706}
707
708static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
709 stream *old_tail;
710 GPR_ASSERT(!s->included[id]);
711 old_tail = t->lists[id].tail;
712 s->links[id].next = NULL;
713 s->links[id].prev = old_tail;
714 if (old_tail) {
715 old_tail->links[id].next = s;
716 } else {
717 s->links[id].prev = NULL;
718 t->lists[id].head = s;
719 }
720 t->lists[id].tail = s;
721 s->included[id] = 1;
722}
723
724static void stream_list_join(transport *t, stream *s, stream_list_id id) {
725 if (s->included[id]) {
726 return;
727 }
728 stream_list_add_tail(t, s, id);
729}
730
731static void remove_from_stream_map(transport *t, stream *s) {
732 if (s->id == 0) return;
733 if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
734 maybe_start_some_streams(t);
735 }
736}
737
738/*
739 * LOCK MANAGEMENT
740 */
741
742/* We take a transport-global lock in response to calls coming in from above,
743 and in response to data being received from below. New data to be written
744 is always queued, as are callbacks to process data. During unlock() we
745 check our todo lists and initiate callbacks and flush writes. */
746
747static void lock(transport *t) { gpr_mu_lock(&t->mu); }
748
749static void unlock(transport *t) {
750 int start_write = 0;
751 int perform_callbacks = 0;
752 int call_closed = 0;
nnoble0c475f02014-12-05 15:37:39 -0800753 int num_goaways = 0;
754 int i;
755 pending_goaway *goaways = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800756 grpc_endpoint *ep = t->ep;
Craig Tillere3018e62015-02-13 17:05:19 -0800757 grpc_stream_op_buffer nuke_now;
Craig Tillerd1345de2015-02-24 21:55:20 -0800758 const grpc_transport_callbacks *cb = t->cb;
Craig Tiller06059952015-02-18 08:34:56 -0800759
Craig Tillere3018e62015-02-13 17:05:19 -0800760 grpc_sopb_init(&nuke_now);
761 if (t->nuke_later_sopb.nops) {
762 grpc_sopb_swap(&nuke_now, &t->nuke_later_sopb);
Craig Tillercb818ba2015-01-29 17:08:01 -0800763 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800764
765 /* see if we need to trigger a write - and if so, get the data ready */
766 if (ep && !t->writing) {
767 t->writing = start_write = prepare_write(t);
768 if (start_write) {
769 ref_transport(t);
770 }
771 }
772
ctiller00297df2015-01-12 11:23:09 -0800773 if (!t->writing) {
774 finalize_cancellations(t);
775 }
776
Craig Tillerc079c112015-04-22 15:23:39 -0700777 finish_reads(t);
778
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800779 /* gather any callbacks that need to be made */
Craig Tillerd1345de2015-02-24 21:55:20 -0800780 if (!t->calling_back && cb) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800781 perform_callbacks = prepare_callbacks(t);
782 if (perform_callbacks) {
783 t->calling_back = 1;
784 }
Craig Tillerb9eb1802015-03-02 16:41:32 +0000785 if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800786 call_closed = 1;
787 t->calling_back = 1;
Craig Tiller5c019ae2015-04-17 16:46:53 -0700788 t->cb = NULL; /* no more callbacks */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800789 t->error_state = ERROR_STATE_NOTIFIED;
790 }
nnoble0c475f02014-12-05 15:37:39 -0800791 if (t->num_pending_goaways) {
792 goaways = t->pending_goaways;
793 num_goaways = t->num_pending_goaways;
794 t->pending_goaways = NULL;
795 t->num_pending_goaways = 0;
ctiller82e275f2014-12-12 08:43:28 -0800796 t->cap_pending_goaways = 0;
nnoble0c475f02014-12-05 15:37:39 -0800797 t->calling_back = 1;
798 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800799 }
800
nnoble0c475f02014-12-05 15:37:39 -0800801 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800802 ref_transport(t);
803 }
804
805 /* finally unlock */
806 gpr_mu_unlock(&t->mu);
807
808 /* perform some callbacks if necessary */
nnoble0c475f02014-12-05 15:37:39 -0800809 for (i = 0; i < num_goaways; i++) {
Craig Tiller5c019ae2015-04-17 16:46:53 -0700810 cb->goaway(t->cb_user_data, &t->base, goaways[i].status, goaways[i].debug);
nnoble0c475f02014-12-05 15:37:39 -0800811 }
812
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800813 if (perform_callbacks) {
Craig Tillerd1345de2015-02-24 21:55:20 -0800814 run_callbacks(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800815 }
816
817 if (call_closed) {
Craig Tiller748fe3f2015-03-02 07:48:50 -0800818 call_cb_closed(t, cb);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800819 }
820
821 /* write some bytes if necessary */
ctiller00297df2015-01-12 11:23:09 -0800822 if (start_write) {
823 /* ultimately calls unref_transport(t); and clears t->writing */
824 perform_write(t, ep);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800825 }
826
nnoble0c475f02014-12-05 15:37:39 -0800827 if (perform_callbacks || call_closed || num_goaways) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800828 lock(t);
829 t->calling_back = 0;
Craig Tiller1fe7b9d2015-02-17 11:57:02 -0800830 if (t->destroying) gpr_cv_signal(&t->cv);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800831 unlock(t);
832 unref_transport(t);
833 }
nnoble0c475f02014-12-05 15:37:39 -0800834
Craig Tillere3018e62015-02-13 17:05:19 -0800835 grpc_sopb_destroy(&nuke_now);
Craig Tillercb818ba2015-01-29 17:08:01 -0800836
nnoble0c475f02014-12-05 15:37:39 -0800837 gpr_free(goaways);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800838}
839
840/*
841 * OUTPUT PROCESSING
842 */
843
844static void push_setting(transport *t, grpc_chttp2_setting_id id,
845 gpr_uint32 value) {
846 const grpc_chttp2_setting_parameters *sp =
847 &grpc_chttp2_settings_parameters[id];
848 gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
849 if (use_value != value) {
850 gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
851 value, use_value);
852 }
853 if (use_value != t->settings[LOCAL_SETTINGS][id]) {
854 t->settings[LOCAL_SETTINGS][id] = use_value;
855 t->dirtied_local_settings = 1;
856 }
857}
858
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800859static int prepare_write(transport *t) {
860 stream *s;
ctiller00297df2015-01-12 11:23:09 -0800861 gpr_uint32 window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800862
863 /* simple writes are queued to qbuf, and flushed here */
Craig Tiller721f3622015-04-13 16:14:28 -0700864 gpr_slice_buffer_swap(&t->qbuf, &t->outbuf);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800865 GPR_ASSERT(t->qbuf.count == 0);
866
867 if (t->dirtied_local_settings && !t->sent_local_settings) {
868 gpr_slice_buffer_add(
ctiller493fbcc2014-12-07 15:09:10 -0800869 &t->outbuf, grpc_chttp2_settings_create(
870 t->settings[SENT_SETTINGS], t->settings[LOCAL_SETTINGS],
871 t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
872 t->force_send_settings = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800873 t->dirtied_local_settings = 0;
874 t->sent_local_settings = 1;
875 }
876
877 /* for each stream that's become writable, frame it's data (according to
878 available window sizes) and add to the output buffer */
Craig Tiller84b88842015-04-20 08:47:52 -0700879 while (t->outgoing_window && (s = stream_list_remove_head(t, WRITABLE)) &&
880 s->outgoing_window > 0) {
ctiller00297df2015-01-12 11:23:09 -0800881 window_delta = grpc_chttp2_preencode(
Craig Tillerc079c112015-04-22 15:23:39 -0700882 s->outgoing_sopb->ops, &s->outgoing_sopb->nops,
ctiller00297df2015-01-12 11:23:09 -0800883 GPR_MIN(t->outgoing_window, s->outgoing_window), &s->writing_sopb);
884 t->outgoing_window -= window_delta;
885 s->outgoing_window -= window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800886
Craig Tiller06aeea72015-04-23 10:54:45 -0700887 if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
888 s->outgoing_sopb->nops == 0) {
Craig Tillerc079c112015-04-22 15:23:39 -0700889 s->send_closed = 1;
890 }
891 if (s->writing_sopb.nops > 0 || s->send_closed) {
ctiller00297df2015-01-12 11:23:09 -0800892 stream_list_join(t, s, WRITING);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800893 }
894
Craig Tillerc079c112015-04-22 15:23:39 -0700895 /* we should either exhaust window or have no ops left, but not both */
Craig Tillerc079c112015-04-22 15:23:39 -0700896 if (s->outgoing_sopb->nops == 0) {
897 s->outgoing_sopb = NULL;
898 schedule_cb(t, s->send_done_closure, 1);
Craig Tillere8893142015-04-23 16:02:01 -0700899 } else if (s->outgoing_window) {
900 stream_list_add_tail(t, s, WRITABLE);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800901 }
902 }
903
904 /* for each stream that wants to update its window, add that window here */
905 while ((s = stream_list_remove_head(t, WINDOW_UPDATE))) {
ctiller00297df2015-01-12 11:23:09 -0800906 window_delta =
ctiller493fbcc2014-12-07 15:09:10 -0800907 t->settings[LOCAL_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
908 s->incoming_window;
ctiller00297df2015-01-12 11:23:09 -0800909 if (!s->read_closed && window_delta) {
910 gpr_slice_buffer_add(
911 &t->outbuf, grpc_chttp2_window_update_create(s->id, window_delta));
912 s->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800913 }
914 }
915
916 /* if the transport is ready to send a window update, do so here also */
ctiller493fbcc2014-12-07 15:09:10 -0800917 if (t->incoming_window < t->connection_window_target * 3 / 4) {
ctiller00297df2015-01-12 11:23:09 -0800918 window_delta = t->connection_window_target - t->incoming_window;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800919 gpr_slice_buffer_add(&t->outbuf,
ctiller00297df2015-01-12 11:23:09 -0800920 grpc_chttp2_window_update_create(0, window_delta));
921 t->incoming_window += window_delta;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800922 }
923
ctiller00297df2015-01-12 11:23:09 -0800924 return t->outbuf.length > 0 || !stream_list_empty(t, WRITING);
925}
926
927static void finalize_outbuf(transport *t) {
928 stream *s;
929
930 while ((s = stream_list_remove_head(t, WRITING))) {
931 grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
Craig Tiller06aeea72015-04-23 10:54:45 -0700932 s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
ctiller00297df2015-01-12 11:23:09 -0800933 s->writing_sopb.nops = 0;
Craig Tillerc079c112015-04-22 15:23:39 -0700934 if (s->send_closed) {
ctiller00297df2015-01-12 11:23:09 -0800935 stream_list_join(t, s, WRITTEN_CLOSED);
936 }
937 }
938}
939
940static void finish_write_common(transport *t, int success) {
941 stream *s;
942
943 lock(t);
944 if (!success) {
945 drop_connection(t);
946 }
947 while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
Craig Tillerc079c112015-04-22 15:23:39 -0700948 s->write_state = WRITE_STATE_SENT_CLOSE;
949 if (!s->cancelled) {
950 maybe_finish_read(t, s);
951 }
ctiller00297df2015-01-12 11:23:09 -0800952 }
953 t->outbuf.count = 0;
954 t->outbuf.length = 0;
955 /* leave the writing flag up on shutdown to prevent further writes in unlock()
956 from starting */
957 t->writing = 0;
Craig Tillerb9eb1802015-03-02 16:41:32 +0000958 if (t->destroying) {
959 gpr_cv_signal(&t->cv);
960 }
ctiller00297df2015-01-12 11:23:09 -0800961 if (!t->reading) {
962 grpc_endpoint_destroy(t->ep);
963 t->ep = NULL;
ctiller00297df2015-01-12 11:23:09 -0800964 unref_transport(t); /* safe because we'll still have the ref for write */
965 }
966 unlock(t);
967
968 unref_transport(t);
969}
970
971static void finish_write(void *tp, grpc_endpoint_cb_status error) {
972 transport *t = tp;
973 finish_write_common(t, error == GRPC_ENDPOINT_CB_OK);
974}
975
976static void perform_write(transport *t, grpc_endpoint *ep) {
977 finalize_outbuf(t);
978
979 GPR_ASSERT(t->outbuf.count > 0);
980
981 switch (grpc_endpoint_write(ep, t->outbuf.slices, t->outbuf.count,
982 finish_write, t)) {
983 case GRPC_ENDPOINT_WRITE_DONE:
984 finish_write_common(t, 1);
985 break;
986 case GRPC_ENDPOINT_WRITE_ERROR:
987 finish_write_common(t, 0);
988 break;
989 case GRPC_ENDPOINT_WRITE_PENDING:
990 break;
991 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800992}
993
994static void maybe_start_some_streams(transport *t) {
995 while (
996 grpc_chttp2_stream_map_size(&t->stream_map) <
997 t->settings[PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
998 stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
999 if (!s) break;
1000
1001 GPR_ASSERT(s->id == 0);
1002 s->id = t->next_stream_id;
1003 t->next_stream_id += 2;
1004 grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
1005 stream_list_join(t, s, WRITABLE);
1006 }
1007}
1008
Craig Tiller50d9db52015-04-23 10:52:14 -07001009static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001010 if (op->send_ops) {
Craig Tillerc079c112015-04-22 15:23:39 -07001011 GPR_ASSERT(s->outgoing_sopb == NULL);
1012 s->send_done_closure.cb = op->on_done_send;
1013 s->send_done_closure.user_data = op->send_user_data;
1014 if (!s->cancelled) {
1015 s->outgoing_sopb = op->send_ops;
1016 if (op->is_last_send && s->write_state == WRITE_STATE_OPEN) {
1017 s->write_state = WRITE_STATE_QUEUED_CLOSE;
1018 }
1019 if (s->id == 0) {
1020 stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
1021 maybe_start_some_streams(t);
1022 } else if (s->outgoing_window > 0) {
1023 stream_list_join(t, s, WRITABLE);
1024 }
1025 } else {
1026 schedule_nuke_sopb(t, op->send_ops);
1027 schedule_cb(t, s->send_done_closure, 0);
1028 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001029 }
1030
1031 if (op->recv_ops) {
Craig Tillerc079c112015-04-22 15:23:39 -07001032 GPR_ASSERT(s->incoming_sopb == NULL);
1033 s->recv_done_closure.cb = op->on_done_recv;
1034 s->recv_done_closure.user_data = op->recv_user_data;
1035 if (!s->cancelled) {
1036 s->incoming_sopb = op->recv_ops;
1037 s->incoming_sopb->nops = 0;
1038 s->publish_state = op->recv_state;
1039 maybe_finish_read(t, s);
1040 maybe_join_window_updates(t, s);
1041 } else {
1042 schedule_cb(t, s->recv_done_closure, 0);
1043 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001044 }
1045
1046 if (op->bind_pollset) {
Craig Tillerc079c112015-04-22 15:23:39 -07001047 add_to_pollset_locked(t, op->bind_pollset);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001048 }
1049
Craig Tillerc079c112015-04-22 15:23:39 -07001050 if (op->cancel_with_status != GRPC_STATUS_OK) {
Craig Tiller06aeea72015-04-23 10:54:45 -07001051 cancel_stream(
1052 t, s, op->cancel_with_status,
1053 grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), 1);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001054 }
Craig Tiller50d9db52015-04-23 10:52:14 -07001055}
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001056
Craig Tiller06aeea72015-04-23 10:54:45 -07001057static void perform_op(grpc_transport *gt, grpc_stream *gs,
1058 grpc_transport_op *op) {
Craig Tiller50d9db52015-04-23 10:52:14 -07001059 transport *t = (transport *)gt;
1060 stream *s = (stream *)gs;
1061
1062 lock(t);
1063 perform_op_locked(t, s, op);
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001064 unlock(t);
1065}
1066
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001067static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
1068 void *user_data) {
1069 transport *t = (transport *)gt;
1070 outstanding_ping *p;
1071
1072 lock(t);
1073 if (t->ping_capacity == t->ping_count) {
1074 t->ping_capacity = GPR_MAX(1, t->ping_capacity * 3 / 2);
1075 t->pings =
1076 gpr_realloc(t->pings, sizeof(outstanding_ping) * t->ping_capacity);
1077 }
1078 p = &t->pings[t->ping_count++];
nnoble8f4e42c2014-12-11 16:36:46 -08001079 p->id[0] = (t->ping_counter >> 56) & 0xff;
1080 p->id[1] = (t->ping_counter >> 48) & 0xff;
1081 p->id[2] = (t->ping_counter >> 40) & 0xff;
1082 p->id[3] = (t->ping_counter >> 32) & 0xff;
1083 p->id[4] = (t->ping_counter >> 24) & 0xff;
1084 p->id[5] = (t->ping_counter >> 16) & 0xff;
1085 p->id[6] = (t->ping_counter >> 8) & 0xff;
1086 p->id[7] = t->ping_counter & 0xff;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001087 p->cb = cb;
1088 p->user_data = user_data;
1089 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
1090 unlock(t);
1091}
1092
1093/*
1094 * INPUT PROCESSING
1095 */
1096
ctiller00297df2015-01-12 11:23:09 -08001097static void finalize_cancellations(transport *t) {
1098 stream *s;
1099
1100 while ((s = stream_list_remove_head(t, CANCELLED))) {
1101 s->read_closed = 1;
Craig Tillerc079c112015-04-22 15:23:39 -07001102 s->write_state = WRITE_STATE_SENT_CLOSE;
1103 maybe_finish_read(t, s);
ctiller00297df2015-01-12 11:23:09 -08001104 }
1105}
1106
Craig Tiller9c1043e2015-04-16 16:20:38 -07001107static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
1108 if (s->incoming_metadata_capacity == s->incoming_metadata_count) {
Craig Tiller5c019ae2015-04-17 16:46:53 -07001109 s->incoming_metadata_capacity =
1110 GPR_MAX(8, 2 * s->incoming_metadata_capacity);
1111 s->incoming_metadata =
1112 gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) *
1113 s->incoming_metadata_capacity);
Craig Tiller9c1043e2015-04-16 16:20:38 -07001114 }
1115 s->incoming_metadata[s->incoming_metadata_count++].md = elem;
1116}
1117
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001118static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
1119 grpc_status_code local_status,
1120 grpc_chttp2_error_code error_code,
1121 int send_rst) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001122 int had_outgoing;
Craig Tiller8b433a22015-01-23 14:47:07 -08001123 char buffer[GPR_LTOA_MIN_BUFSIZE];
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001124
1125 if (s) {
1126 /* clear out any unreported input & output: nobody cares anymore */
Craig Tillerc079c112015-04-22 15:23:39 -07001127 had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
Craig Tillercb818ba2015-01-29 17:08:01 -08001128 schedule_nuke_sopb(t, &s->parser.incoming_sopb);
Craig Tillerc079c112015-04-22 15:23:39 -07001129 if (s->outgoing_sopb) {
1130 schedule_nuke_sopb(t, s->outgoing_sopb);
Craig Tiller7abc8d22015-04-23 16:43:55 -07001131 s->outgoing_sopb = NULL;
Craig Tillerc079c112015-04-22 15:23:39 -07001132 schedule_cb(t, s->send_done_closure, 0);
1133 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001134 if (s->cancelled) {
1135 send_rst = 0;
Craig Tiller06aeea72015-04-23 10:54:45 -07001136 } else if (!s->read_closed || s->write_state != WRITE_STATE_SENT_CLOSE ||
1137 had_outgoing) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001138 s->cancelled = 1;
ctiller00297df2015-01-12 11:23:09 -08001139 stream_list_join(t, s, CANCELLED);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001140
Craig Tillera7ed5d92015-01-23 11:30:16 -08001141 gpr_ltoa(local_status, buffer);
Craig Tiller5c019ae2015-04-17 16:46:53 -07001142 add_incoming_metadata(
1143 t, s,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001144 grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
Craig Tillerbd222712015-04-17 16:09:40 -07001145 switch (local_status) {
1146 case GRPC_STATUS_CANCELLED:
Craig Tiller5c019ae2015-04-17 16:46:53 -07001147 add_incoming_metadata(
1148 t, s, grpc_mdelem_from_strings(t->metadata_context,
1149 "grpc-message", "Cancelled"));
Craig Tillerbd222712015-04-17 16:09:40 -07001150 break;
1151 default:
1152 break;
1153 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001154
Craig Tillerc079c112015-04-22 15:23:39 -07001155 maybe_finish_read(t, s);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001156 }
1157 }
1158 if (!id) send_rst = 0;
1159 if (send_rst) {
1160 gpr_slice_buffer_add(&t->qbuf,
1161 grpc_chttp2_rst_stream_create(id, error_code));
1162 }
1163}
1164
1165static void cancel_stream_id(transport *t, gpr_uint32 id,
1166 grpc_status_code local_status,
1167 grpc_chttp2_error_code error_code, int send_rst) {
1168 cancel_stream_inner(t, lookup_stream(t, id), id, local_status, error_code,
1169 send_rst);
1170}
1171
1172static void cancel_stream(transport *t, stream *s,
1173 grpc_status_code local_status,
1174 grpc_chttp2_error_code error_code, int send_rst) {
1175 cancel_stream_inner(t, s, s->id, local_status, error_code, send_rst);
1176}
1177
1178static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
1179 cancel_stream(user_data, stream, GRPC_STATUS_UNAVAILABLE,
1180 GRPC_CHTTP2_INTERNAL_ERROR, 0);
1181}
1182
1183static void end_all_the_calls(transport *t) {
1184 grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, t);
1185}
1186
1187static void drop_connection(transport *t) {
1188 if (t->error_state == ERROR_STATE_NONE) {
1189 t->error_state = ERROR_STATE_SEEN;
1190 }
1191 end_all_the_calls(t);
1192}
1193
Craig Tillerc079c112015-04-22 15:23:39 -07001194static void maybe_finish_read(transport *t, stream *s) {
1195 if (s->incoming_sopb) {
1196 stream_list_join(t, s, FINISHED_READ_OP);
1197 }
1198}
1199
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001200static void maybe_join_window_updates(transport *t, stream *s) {
Craig Tillerc079c112015-04-22 15:23:39 -07001201 if (s->incoming_sopb != NULL &&
ctiller493fbcc2014-12-07 15:09:10 -08001202 s->incoming_window <
1203 t->settings[LOCAL_SETTINGS]
1204 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] *
1205 3 / 4) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001206 stream_list_join(t, s, WINDOW_UPDATE);
1207 }
1208}
1209
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001210static grpc_chttp2_parse_error update_incoming_window(transport *t, stream *s) {
1211 if (t->incoming_frame_size > t->incoming_window) {
1212 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1213 t->incoming_frame_size, t->incoming_window);
1214 return GRPC_CHTTP2_CONNECTION_ERROR;
1215 }
1216
1217 if (t->incoming_frame_size > s->incoming_window) {
1218 gpr_log(GPR_ERROR, "frame of size %d overflows incoming window of %d",
1219 t->incoming_frame_size, s->incoming_window);
1220 return GRPC_CHTTP2_CONNECTION_ERROR;
1221 }
1222
1223 t->incoming_window -= t->incoming_frame_size;
1224 s->incoming_window -= t->incoming_frame_size;
1225
1226 /* if the stream incoming window is getting low, schedule an update */
1227 maybe_join_window_updates(t, s);
1228
1229 return GRPC_CHTTP2_PARSE_OK;
1230}
1231
1232static stream *lookup_stream(transport *t, gpr_uint32 id) {
1233 return grpc_chttp2_stream_map_find(&t->stream_map, id);
1234}
1235
1236static grpc_chttp2_parse_error skip_parser(void *parser,
1237 grpc_chttp2_parse_state *st,
1238 gpr_slice slice, int is_last) {
1239 return GRPC_CHTTP2_PARSE_OK;
1240}
1241
1242static void skip_header(void *tp, grpc_mdelem *md) { grpc_mdelem_unref(md); }
1243
1244static int init_skip_frame(transport *t, int is_header) {
1245 if (is_header) {
1246 int is_eoh = t->expect_continuation_stream_id != 0;
1247 t->parser = grpc_chttp2_header_parser_parse;
1248 t->parser_data = &t->hpack_parser;
1249 t->hpack_parser.on_header = skip_header;
1250 t->hpack_parser.on_header_user_data = NULL;
1251 t->hpack_parser.is_boundary = is_eoh;
1252 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1253 } else {
1254 t->parser = skip_parser;
1255 }
1256 return 1;
1257}
1258
1259static void become_skip_parser(transport *t) {
1260 init_skip_frame(t, t->parser == grpc_chttp2_header_parser_parse);
1261}
1262
1263static int init_data_frame_parser(transport *t) {
1264 stream *s = lookup_stream(t, t->incoming_stream_id);
1265 grpc_chttp2_parse_error err = GRPC_CHTTP2_PARSE_OK;
1266 if (!s || s->read_closed) return init_skip_frame(t, 0);
1267 if (err == GRPC_CHTTP2_PARSE_OK) {
1268 err = update_incoming_window(t, s);
1269 }
1270 if (err == GRPC_CHTTP2_PARSE_OK) {
1271 err = grpc_chttp2_data_parser_begin_frame(&s->parser,
1272 t->incoming_frame_flags);
1273 }
1274 switch (err) {
1275 case GRPC_CHTTP2_PARSE_OK:
1276 t->incoming_stream = s;
1277 t->parser = grpc_chttp2_data_parser_parse;
1278 t->parser_data = &s->parser;
1279 return 1;
1280 case GRPC_CHTTP2_STREAM_ERROR:
1281 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1282 GRPC_CHTTP2_INTERNAL_ERROR),
1283 GRPC_CHTTP2_INTERNAL_ERROR, 1);
1284 return init_skip_frame(t, 0);
1285 case GRPC_CHTTP2_CONNECTION_ERROR:
1286 drop_connection(t);
1287 return 0;
1288 }
1289 gpr_log(GPR_ERROR, "should never reach here");
1290 abort();
1291 return 0;
1292}
1293
1294static void free_timeout(void *p) { gpr_free(p); }
1295
1296static void on_header(void *tp, grpc_mdelem *md) {
1297 transport *t = tp;
1298 stream *s = t->incoming_stream;
1299
1300 GPR_ASSERT(s);
Craig Tillerd50e5652015-02-24 16:46:22 -08001301
1302 IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id,
1303 grpc_mdstr_as_c_string(md->key),
1304 grpc_mdstr_as_c_string(md->value)));
1305
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001306 if (md->key == t->str_grpc_timeout) {
1307 gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);
1308 if (!cached_timeout) {
1309 /* not already parsed: parse it now, and store the result away */
1310 cached_timeout = gpr_malloc(sizeof(gpr_timespec));
1311 if (!grpc_chttp2_decode_timeout(grpc_mdstr_as_c_string(md->value),
1312 cached_timeout)) {
1313 gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'",
1314 grpc_mdstr_as_c_string(md->value));
1315 *cached_timeout = gpr_inf_future;
1316 }
1317 grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
1318 }
Craig Tiller9c1043e2015-04-16 16:20:38 -07001319 s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001320 grpc_mdelem_unref(md);
1321 } else {
Craig Tiller9c1043e2015-04-16 16:20:38 -07001322 add_incoming_metadata(t, s, md);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001323 }
Craig Tillerc079c112015-04-22 15:23:39 -07001324 maybe_finish_read(t, s);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001325}
1326
1327static int init_header_frame_parser(transport *t, int is_continuation) {
1328 int is_eoh =
1329 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_HEADERS) != 0;
1330 stream *s;
1331
1332 if (is_eoh) {
1333 t->expect_continuation_stream_id = 0;
1334 } else {
1335 t->expect_continuation_stream_id = t->incoming_stream_id;
1336 }
1337
1338 if (!is_continuation) {
1339 t->header_eof =
1340 (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0;
1341 }
1342
1343 /* could be a new stream or an existing stream */
1344 s = lookup_stream(t, t->incoming_stream_id);
1345 if (!s) {
1346 if (is_continuation) {
1347 gpr_log(GPR_ERROR, "stream disbanded before CONTINUATION received");
1348 return init_skip_frame(t, 1);
1349 }
1350 if (t->is_client) {
1351 if ((t->incoming_stream_id & 1) &&
1352 t->incoming_stream_id < t->next_stream_id) {
1353 /* this is an old (probably cancelled) stream */
1354 } else {
1355 gpr_log(GPR_ERROR, "ignoring new stream creation on client");
1356 }
1357 return init_skip_frame(t, 1);
nnoble0c475f02014-12-05 15:37:39 -08001358 } else if (t->last_incoming_stream_id > t->incoming_stream_id) {
1359 gpr_log(GPR_ERROR,
1360 "ignoring out of order new stream request on server; last stream "
1361 "id=%d, new stream id=%d",
1362 t->last_incoming_stream_id, t->incoming_stream);
1363 return init_skip_frame(t, 1);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001364 }
1365 t->incoming_stream = NULL;
1366 /* if stream is accepted, we set incoming_stream in init_stream */
1367 t->cb->accept_stream(t->cb_user_data, &t->base,
Craig Tiller5c019ae2015-04-17 16:46:53 -07001368 (void *)(gpr_uintptr)t->incoming_stream_id);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001369 s = t->incoming_stream;
1370 if (!s) {
1371 gpr_log(GPR_ERROR, "stream not accepted");
1372 return init_skip_frame(t, 1);
1373 }
1374 } else {
1375 t->incoming_stream = s;
1376 }
1377 if (t->incoming_stream->read_closed) {
1378 gpr_log(GPR_ERROR, "skipping already closed stream header");
1379 t->incoming_stream = NULL;
1380 return init_skip_frame(t, 1);
1381 }
1382 t->parser = grpc_chttp2_header_parser_parse;
1383 t->parser_data = &t->hpack_parser;
1384 t->hpack_parser.on_header = on_header;
1385 t->hpack_parser.on_header_user_data = t;
1386 t->hpack_parser.is_boundary = is_eoh;
1387 t->hpack_parser.is_eof = is_eoh ? t->header_eof : 0;
1388 if (!is_continuation &&
1389 (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_HAS_PRIORITY)) {
1390 grpc_chttp2_hpack_parser_set_has_priority(&t->hpack_parser);
1391 }
1392 return 1;
1393}
1394
1395static int init_window_update_frame_parser(transport *t) {
1396 int ok = GRPC_CHTTP2_PARSE_OK == grpc_chttp2_window_update_parser_begin_frame(
1397 &t->simple_parsers.window_update,
1398 t->incoming_frame_size,
1399 t->incoming_frame_flags);
1400 if (!ok) {
1401 drop_connection(t);
1402 }
1403 t->parser = grpc_chttp2_window_update_parser_parse;
1404 t->parser_data = &t->simple_parsers.window_update;
1405 return ok;
1406}
1407
1408static int init_ping_parser(transport *t) {
1409 int ok = GRPC_CHTTP2_PARSE_OK ==
1410 grpc_chttp2_ping_parser_begin_frame(&t->simple_parsers.ping,
1411 t->incoming_frame_size,
1412 t->incoming_frame_flags);
1413 if (!ok) {
1414 drop_connection(t);
1415 }
1416 t->parser = grpc_chttp2_ping_parser_parse;
1417 t->parser_data = &t->simple_parsers.ping;
1418 return ok;
1419}
1420
nnoble0c475f02014-12-05 15:37:39 -08001421static int init_goaway_parser(transport *t) {
1422 int ok =
1423 GRPC_CHTTP2_PARSE_OK ==
1424 grpc_chttp2_goaway_parser_begin_frame(
1425 &t->goaway_parser, t->incoming_frame_size, t->incoming_frame_flags);
1426 if (!ok) {
1427 drop_connection(t);
1428 }
1429 t->parser = grpc_chttp2_goaway_parser_parse;
1430 t->parser_data = &t->goaway_parser;
1431 return ok;
1432}
1433
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001434static int init_settings_frame_parser(transport *t) {
1435 int ok = GRPC_CHTTP2_PARSE_OK ==
1436 grpc_chttp2_settings_parser_begin_frame(
1437 &t->simple_parsers.settings, t->incoming_frame_size,
1438 t->incoming_frame_flags, t->settings[PEER_SETTINGS]);
1439 if (!ok) {
1440 drop_connection(t);
1441 }
1442 if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) {
1443 memcpy(t->settings[ACKED_SETTINGS], t->settings[SENT_SETTINGS],
1444 GRPC_CHTTP2_NUM_SETTINGS * sizeof(gpr_uint32));
1445 }
1446 t->parser = grpc_chttp2_settings_parser_parse;
1447 t->parser_data = &t->simple_parsers.settings;
1448 return ok;
1449}
1450
1451static int init_frame_parser(transport *t) {
1452 if (t->expect_continuation_stream_id != 0) {
1453 if (t->incoming_frame_type != GRPC_CHTTP2_FRAME_CONTINUATION) {
1454 gpr_log(GPR_ERROR, "Expected CONTINUATION frame, got frame type %02x",
1455 t->incoming_frame_type);
1456 return 0;
1457 }
1458 if (t->expect_continuation_stream_id != t->incoming_stream_id) {
1459 gpr_log(GPR_ERROR,
1460 "Expected CONTINUATION frame for stream %08x, got stream %08x",
1461 t->expect_continuation_stream_id, t->incoming_stream_id);
1462 return 0;
1463 }
1464 return init_header_frame_parser(t, 1);
1465 }
1466 switch (t->incoming_frame_type) {
1467 case GRPC_CHTTP2_FRAME_DATA:
1468 return init_data_frame_parser(t);
1469 case GRPC_CHTTP2_FRAME_HEADER:
1470 return init_header_frame_parser(t, 0);
1471 case GRPC_CHTTP2_FRAME_CONTINUATION:
1472 gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
1473 return 0;
1474 case GRPC_CHTTP2_FRAME_RST_STREAM:
1475 /* TODO(ctiller): actually parse the reason */
1476 cancel_stream_id(
1477 t, t->incoming_stream_id,
1478 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
1479 GRPC_CHTTP2_CANCEL, 0);
1480 return init_skip_frame(t, 0);
1481 case GRPC_CHTTP2_FRAME_SETTINGS:
1482 return init_settings_frame_parser(t);
1483 case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
1484 return init_window_update_frame_parser(t);
1485 case GRPC_CHTTP2_FRAME_PING:
1486 return init_ping_parser(t);
nnoble0c475f02014-12-05 15:37:39 -08001487 case GRPC_CHTTP2_FRAME_GOAWAY:
1488 return init_goaway_parser(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001489 default:
1490 gpr_log(GPR_ERROR, "Unknown frame type %02x", t->incoming_frame_type);
1491 return init_skip_frame(t, 0);
1492 }
1493}
1494
Craig Tiller84b88842015-04-20 08:47:52 -07001495static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
1496 return window + window_update < MAX_WINDOW;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001497}
1498
Craig Tillerbd222712015-04-17 16:09:40 -07001499static void add_metadata_batch(transport *t, stream *s) {
Craig Tiller9c1043e2015-04-16 16:20:38 -07001500 grpc_metadata_batch b;
Craig Tiller9c1043e2015-04-16 16:20:38 -07001501 size_t i;
1502
1503 b.list.head = &s->incoming_metadata[0];
1504 b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1];
1505 b.garbage.head = b.garbage.tail = NULL;
1506 b.deadline = s->incoming_deadline;
1507
1508 for (i = 1; i < s->incoming_metadata_count; i++) {
1509 s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1];
1510 s->incoming_metadata[i - 1].next = &s->incoming_metadata[i];
1511 }
1512 s->incoming_metadata[0].prev = NULL;
1513 s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
1514
1515 grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
Craig Tillerfbf5be22015-04-22 16:17:09 -07001516 /* TODO(ctiller): don't leak incoming_metadata */
Craig Tiller9c1043e2015-04-16 16:20:38 -07001517
1518 /* reset */
1519 s->incoming_deadline = gpr_inf_future;
1520 s->incoming_metadata = NULL;
1521 s->incoming_metadata_count = 0;
1522 s->incoming_metadata_capacity = 0;
1523}
1524
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001525static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
1526 grpc_chttp2_parse_state st;
1527 size_t i;
1528 memset(&st, 0, sizeof(st));
1529 switch (t->parser(t->parser_data, &st, slice, is_last)) {
1530 case GRPC_CHTTP2_PARSE_OK:
1531 if (st.end_of_stream) {
1532 t->incoming_stream->read_closed = 1;
Craig Tillerc079c112015-04-22 15:23:39 -07001533 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001534 }
1535 if (st.need_flush_reads) {
Craig Tillerc079c112015-04-22 15:23:39 -07001536 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001537 }
1538 if (st.metadata_boundary) {
Craig Tillerbd222712015-04-17 16:09:40 -07001539 add_metadata_batch(t, t->incoming_stream);
Craig Tillerc079c112015-04-22 15:23:39 -07001540 maybe_finish_read(t, t->incoming_stream);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001541 }
1542 if (st.ack_settings) {
1543 gpr_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
1544 maybe_start_some_streams(t);
1545 }
1546 if (st.send_ping_ack) {
1547 gpr_slice_buffer_add(
1548 &t->qbuf,
1549 grpc_chttp2_ping_create(1, t->simple_parsers.ping.opaque_8bytes));
1550 }
nnoble0c475f02014-12-05 15:37:39 -08001551 if (st.goaway) {
1552 if (t->num_pending_goaways == t->cap_pending_goaways) {
1553 t->cap_pending_goaways = GPR_MAX(1, t->cap_pending_goaways * 2);
1554 t->pending_goaways =
1555 gpr_realloc(t->pending_goaways,
1556 sizeof(pending_goaway) * t->cap_pending_goaways);
1557 }
1558 t->pending_goaways[t->num_pending_goaways].status =
1559 grpc_chttp2_http2_error_to_grpc_status(st.goaway_error);
1560 t->pending_goaways[t->num_pending_goaways].debug = st.goaway_text;
1561 t->num_pending_goaways++;
1562 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001563 if (st.process_ping_reply) {
1564 for (i = 0; i < t->ping_count; i++) {
1565 if (0 ==
1566 memcmp(t->pings[i].id, t->simple_parsers.ping.opaque_8bytes, 8)) {
1567 t->pings[i].cb(t->pings[i].user_data);
1568 memmove(&t->pings[i], &t->pings[i + 1],
1569 (t->ping_count - i - 1) * sizeof(outstanding_ping));
1570 t->ping_count--;
1571 break;
1572 }
1573 }
1574 }
Yang Gaof1021032015-04-18 00:10:29 -07001575 if (st.initial_window_update) {
1576 for (i = 0; i < t->stream_map.count; i++) {
Craig Tiller06aeea72015-04-23 10:54:45 -07001577 stream *s = (stream *)(t->stream_map.values[i]);
Craig Tiller84b88842015-04-20 08:47:52 -07001578 int was_window_empty = s->outgoing_window <= 0;
1579 s->outgoing_window += st.initial_window_update;
Craig Tiller06aeea72015-04-23 10:54:45 -07001580 if (was_window_empty && s->outgoing_window > 0 && s->outgoing_sopb &&
1581 s->outgoing_sopb->nops > 0) {
Craig Tiller84b88842015-04-20 08:47:52 -07001582 stream_list_join(t, s, WRITABLE);
Yang Gaof1021032015-04-18 00:10:29 -07001583 }
1584 }
1585 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001586 if (st.window_update) {
1587 if (t->incoming_stream_id) {
1588 /* if there was a stream id, this is for some stream */
1589 stream *s = lookup_stream(t, t->incoming_stream_id);
1590 if (s) {
Craig Tiller84b88842015-04-20 08:47:52 -07001591 int was_window_empty = s->outgoing_window <= 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001592 if (!is_window_update_legal(st.window_update, s->outgoing_window)) {
1593 cancel_stream(t, s, grpc_chttp2_http2_error_to_grpc_status(
1594 GRPC_CHTTP2_FLOW_CONTROL_ERROR),
1595 GRPC_CHTTP2_FLOW_CONTROL_ERROR, 1);
1596 } else {
1597 s->outgoing_window += st.window_update;
1598 /* if this window update makes outgoing ops writable again,
1599 flag that */
Craig Tiller06aeea72015-04-23 10:54:45 -07001600 if (was_window_empty && s->outgoing_sopb &&
1601 s->outgoing_sopb->nops > 0) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001602 stream_list_join(t, s, WRITABLE);
1603 }
1604 }
1605 }
1606 } else {
1607 /* transport level window update */
1608 if (!is_window_update_legal(st.window_update, t->outgoing_window)) {
1609 drop_connection(t);
1610 } else {
1611 t->outgoing_window += st.window_update;
1612 }
1613 }
1614 }
1615 return 1;
1616 case GRPC_CHTTP2_STREAM_ERROR:
1617 become_skip_parser(t);
1618 cancel_stream_id(
1619 t, t->incoming_stream_id,
1620 grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_INTERNAL_ERROR),
1621 GRPC_CHTTP2_INTERNAL_ERROR, 1);
1622 return 1;
1623 case GRPC_CHTTP2_CONNECTION_ERROR:
1624 drop_connection(t);
1625 return 0;
1626 }
1627 gpr_log(GPR_ERROR, "should never reach here");
1628 abort();
1629 return 0;
1630}
1631
1632static int process_read(transport *t, gpr_slice slice) {
1633 gpr_uint8 *beg = GPR_SLICE_START_PTR(slice);
1634 gpr_uint8 *end = GPR_SLICE_END_PTR(slice);
1635 gpr_uint8 *cur = beg;
1636
1637 if (cur == end) return 1;
1638
1639 switch (t->deframe_state) {
1640 case DTS_CLIENT_PREFIX_0:
1641 case DTS_CLIENT_PREFIX_1:
1642 case DTS_CLIENT_PREFIX_2:
1643 case DTS_CLIENT_PREFIX_3:
1644 case DTS_CLIENT_PREFIX_4:
1645 case DTS_CLIENT_PREFIX_5:
1646 case DTS_CLIENT_PREFIX_6:
1647 case DTS_CLIENT_PREFIX_7:
1648 case DTS_CLIENT_PREFIX_8:
1649 case DTS_CLIENT_PREFIX_9:
1650 case DTS_CLIENT_PREFIX_10:
1651 case DTS_CLIENT_PREFIX_11:
1652 case DTS_CLIENT_PREFIX_12:
1653 case DTS_CLIENT_PREFIX_13:
1654 case DTS_CLIENT_PREFIX_14:
1655 case DTS_CLIENT_PREFIX_15:
1656 case DTS_CLIENT_PREFIX_16:
1657 case DTS_CLIENT_PREFIX_17:
1658 case DTS_CLIENT_PREFIX_18:
1659 case DTS_CLIENT_PREFIX_19:
1660 case DTS_CLIENT_PREFIX_20:
1661 case DTS_CLIENT_PREFIX_21:
1662 case DTS_CLIENT_PREFIX_22:
1663 case DTS_CLIENT_PREFIX_23:
1664 while (cur != end && t->deframe_state != DTS_FH_0) {
1665 if (*cur != CLIENT_CONNECT_STRING[t->deframe_state]) {
1666 gpr_log(GPR_ERROR,
1667 "Connect string mismatch: expected '%c' (%d) got '%c' (%d) "
1668 "at byte %d",
1669 CLIENT_CONNECT_STRING[t->deframe_state],
Craig Tiller5c019ae2015-04-17 16:46:53 -07001670 (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur,
1671 (int)*cur, t->deframe_state);
Craig Tiller5246e7a2015-01-19 14:59:08 -08001672 drop_connection(t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001673 return 0;
1674 }
1675 ++cur;
1676 ++t->deframe_state;
1677 }
1678 if (cur == end) {
1679 return 1;
1680 }
1681 /* fallthrough */
1682 dts_fh_0:
1683 case DTS_FH_0:
1684 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001685 t->incoming_frame_size = ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001686 if (++cur == end) {
1687 t->deframe_state = DTS_FH_1;
1688 return 1;
1689 }
1690 /* fallthrough */
1691 case DTS_FH_1:
1692 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001693 t->incoming_frame_size |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001694 if (++cur == end) {
1695 t->deframe_state = DTS_FH_2;
1696 return 1;
1697 }
1698 /* fallthrough */
1699 case DTS_FH_2:
1700 GPR_ASSERT(cur < end);
1701 t->incoming_frame_size |= *cur;
1702 if (++cur == end) {
1703 t->deframe_state = DTS_FH_3;
1704 return 1;
1705 }
1706 /* fallthrough */
1707 case DTS_FH_3:
1708 GPR_ASSERT(cur < end);
1709 t->incoming_frame_type = *cur;
1710 if (++cur == end) {
1711 t->deframe_state = DTS_FH_4;
1712 return 1;
1713 }
1714 /* fallthrough */
1715 case DTS_FH_4:
1716 GPR_ASSERT(cur < end);
1717 t->incoming_frame_flags = *cur;
1718 if (++cur == end) {
1719 t->deframe_state = DTS_FH_5;
1720 return 1;
1721 }
1722 /* fallthrough */
1723 case DTS_FH_5:
1724 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001725 t->incoming_stream_id = (((gpr_uint32)*cur) << 24) & 0x7f;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001726 if (++cur == end) {
1727 t->deframe_state = DTS_FH_6;
1728 return 1;
1729 }
1730 /* fallthrough */
1731 case DTS_FH_6:
1732 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001733 t->incoming_stream_id |= ((gpr_uint32)*cur) << 16;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001734 if (++cur == end) {
1735 t->deframe_state = DTS_FH_7;
1736 return 1;
1737 }
1738 /* fallthrough */
1739 case DTS_FH_7:
1740 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001741 t->incoming_stream_id |= ((gpr_uint32)*cur) << 8;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001742 if (++cur == end) {
1743 t->deframe_state = DTS_FH_8;
1744 return 1;
1745 }
1746 /* fallthrough */
1747 case DTS_FH_8:
1748 GPR_ASSERT(cur < end);
Craig Tillercb818ba2015-01-29 17:08:01 -08001749 t->incoming_stream_id |= ((gpr_uint32)*cur);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001750 t->deframe_state = DTS_FRAME;
1751 if (!init_frame_parser(t)) {
1752 return 0;
1753 }
Tatsuhiro Tsujikawa1cbf8d72015-03-13 23:59:40 +09001754 /* t->last_incoming_stream_id is used as last-stream-id when
1755 sending GOAWAY frame.
1756 https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.8
1757 says that last-stream-id is peer-initiated stream ID. So,
1758 since we don't have server pushed streams, client should send
1759 GOAWAY last-stream-id=0 in this case. */
Tatsuhiro Tsujikawad11f6102015-03-12 22:57:22 +09001760 if (!t->is_client) {
1761 t->last_incoming_stream_id = t->incoming_stream_id;
1762 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001763 if (t->incoming_frame_size == 0) {
1764 if (!parse_frame_slice(t, gpr_empty_slice(), 1)) {
1765 return 0;
1766 }
1767 if (++cur == end) {
1768 t->deframe_state = DTS_FH_0;
1769 return 1;
1770 }
1771 goto dts_fh_0; /* loop */
1772 }
1773 if (++cur == end) {
1774 return 1;
1775 }
1776 /* fallthrough */
1777 case DTS_FRAME:
1778 GPR_ASSERT(cur < end);
Craig Tiller54f9a652015-02-19 21:41:20 -08001779 if ((gpr_uint32)(end - cur) == t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001780 if (!parse_frame_slice(
1781 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 1)) {
1782 return 0;
1783 }
1784 t->deframe_state = DTS_FH_0;
1785 return 1;
Craig Tiller0c0b60c2015-01-21 15:49:28 -08001786 } else if ((gpr_uint32)(end - cur) > t->incoming_frame_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001787 if (!parse_frame_slice(
1788 t, gpr_slice_sub_no_ref(slice, cur - beg,
1789 cur + t->incoming_frame_size - beg),
1790 1)) {
1791 return 0;
1792 }
1793 cur += t->incoming_frame_size;
1794 goto dts_fh_0; /* loop */
1795 } else {
1796 if (!parse_frame_slice(
1797 t, gpr_slice_sub_no_ref(slice, cur - beg, end - beg), 0)) {
1798 return 0;
1799 }
1800 t->incoming_frame_size -= (end - cur);
1801 return 1;
1802 }
1803 gpr_log(GPR_ERROR, "should never reach here");
1804 abort();
1805 }
1806
1807 gpr_log(GPR_ERROR, "should never reach here");
1808 abort();
Nicolas "Pixel" Noble7f13eb22015-04-01 20:57:33 -07001809
1810 return 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001811}
1812
1813/* tcp read callback */
1814static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
1815 grpc_endpoint_cb_status error) {
1816 transport *t = tp;
1817 size_t i;
1818 int keep_reading = 0;
1819
1820 switch (error) {
1821 case GRPC_ENDPOINT_CB_SHUTDOWN:
1822 case GRPC_ENDPOINT_CB_EOF:
1823 case GRPC_ENDPOINT_CB_ERROR:
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001824 lock(t);
1825 drop_connection(t);
1826 t->reading = 0;
1827 if (!t->writing && t->ep) {
1828 grpc_endpoint_destroy(t->ep);
1829 t->ep = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001830 unref_transport(t); /* safe as we still have a ref for read */
1831 }
1832 unlock(t);
1833 unref_transport(t);
1834 break;
1835 case GRPC_ENDPOINT_CB_OK:
1836 lock(t);
1837 for (i = 0; i < nslices && process_read(t, slices[i]); i++)
1838 ;
1839 unlock(t);
1840 keep_reading = 1;
1841 break;
1842 }
1843
1844 for (i = 0; i < nslices; i++) gpr_slice_unref(slices[i]);
1845
1846 if (keep_reading) {
ctiller58393c22015-01-07 14:03:30 -08001847 grpc_endpoint_notify_on_read(t->ep, recv_data, t);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001848 }
1849}
1850
1851/*
1852 * CALLBACK LOOP
1853 */
1854
1855static grpc_stream_state compute_state(gpr_uint8 write_closed,
1856 gpr_uint8 read_closed) {
1857 if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
1858 if (write_closed) return GRPC_STREAM_SEND_CLOSED;
1859 if (read_closed) return GRPC_STREAM_RECV_CLOSED;
1860 return GRPC_STREAM_OPEN;
1861}
1862
Craig Tillerc079c112015-04-22 15:23:39 -07001863static void finish_reads(transport *t) {
1864 stream *s;
1865
1866 while ((s = stream_list_remove_head(t, FINISHED_READ_OP)) != NULL) {
1867 int publish = 0;
1868 GPR_ASSERT(s->incoming_sopb);
Craig Tiller06aeea72015-04-23 10:54:45 -07001869 *s->publish_state =
1870 compute_state(s->write_state == WRITE_STATE_SENT_CLOSE, s->read_closed);
Craig Tillerc079c112015-04-22 15:23:39 -07001871 if (*s->publish_state != s->published_state) {
1872 s->published_state = *s->publish_state;
1873 publish = 1;
1874 }
1875 if (s->parser.incoming_sopb.nops > 0) {
1876 grpc_sopb_swap(s->incoming_sopb, &s->parser.incoming_sopb);
1877 publish = 1;
1878 }
1879 if (publish) {
Craig Tiller7e8489a2015-04-23 12:41:16 -07001880 s->incoming_sopb = NULL;
Craig Tillerc079c112015-04-22 15:23:39 -07001881 schedule_cb(t, s->recv_done_closure, 1);
1882 }
1883 }
1884}
1885
1886static void schedule_cb(transport *t, op_closure closure, int success) {
1887 if (t->pending_callbacks.capacity == t->pending_callbacks.count) {
Craig Tiller06aeea72015-04-23 10:54:45 -07001888 t->pending_callbacks.capacity =
1889 GPR_MAX(t->pending_callbacks.capacity * 2, 8);
1890 t->pending_callbacks.callbacks =
1891 gpr_realloc(t->pending_callbacks.callbacks,
1892 t->pending_callbacks.capacity *
1893 sizeof(*t->pending_callbacks.callbacks));
Craig Tillerc079c112015-04-22 15:23:39 -07001894 }
1895 closure.success = success;
1896 t->pending_callbacks.callbacks[t->pending_callbacks.count++] = closure;
1897}
1898
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001899static int prepare_callbacks(transport *t) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001900 op_closure_array temp = t->pending_callbacks;
1901 t->pending_callbacks = t->executing_callbacks;
1902 t->executing_callbacks = temp;
1903 return t->executing_callbacks.count > 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001904}
1905
Craig Tillerd1345de2015-02-24 21:55:20 -08001906static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001907 size_t i;
1908 for (i = 0; i < t->executing_callbacks.count; i++) {
1909 op_closure c = t->executing_callbacks.callbacks[i];
Craig Tillerc079c112015-04-22 15:23:39 -07001910 c.cb(c.user_data, c.success);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001911 }
Craig Tillerbe18b8d2015-04-22 14:00:47 -07001912 t->executing_callbacks.count = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001913}
1914
Craig Tiller748fe3f2015-03-02 07:48:50 -08001915static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb) {
1916 cb->closed(t->cb_user_data, &t->base);
1917}
1918
Craig Tillerc079c112015-04-22 15:23:39 -07001919/*
1920 * POLLSET STUFF
1921 */
1922
1923static void add_to_pollset_locked(transport *t, grpc_pollset *pollset) {
ctillerd79b4862014-12-17 16:36:59 -08001924 if (t->ep) {
1925 grpc_endpoint_add_to_pollset(t->ep, pollset);
1926 }
Craig Tillerc079c112015-04-22 15:23:39 -07001927}
1928
1929static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
1930 transport *t = (transport *)gt;
1931 lock(t);
1932 add_to_pollset_locked(t, pollset);
ctillerd79b4862014-12-17 16:36:59 -08001933 unlock(t);
1934}
1935
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001936/*
1937 * INTEGRATION GLUE
1938 */
1939
1940static const grpc_transport_vtable vtable = {
Craig Tiller06aeea72015-04-23 10:54:45 -07001941 sizeof(stream), init_stream, perform_op,
1942 add_to_pollset, destroy_stream, goaway,
1943 close_transport, send_ping, destroy_transport};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001944
1945void grpc_create_chttp2_transport(grpc_transport_setup_callback setup,
1946 void *arg,
1947 const grpc_channel_args *channel_args,
1948 grpc_endpoint *ep, gpr_slice *slices,
1949 size_t nslices, grpc_mdctx *mdctx,
1950 int is_client) {
1951 transport *t = gpr_malloc(sizeof(transport));
Nicolas Noble5ea99bb2015-02-04 14:13:09 -08001952 init_transport(t, setup, arg, channel_args, ep, slices, nslices, mdctx,
1953 is_client);
Craig Tiller190d3602015-02-18 09:23:38 -08001954}