blob: 479a66c9c2f1cc7d93f83d8dace1501ff8d23126 [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.http2.netty;
2
lryanf8bbc122014-06-03 14:22:33 -07003import com.google.net.stubby.NoOpRequest;
ejona07d3f6a2014-05-14 11:26:57 -07004import com.google.net.stubby.Operation;
5import com.google.net.stubby.Operation.Phase;
6import com.google.net.stubby.Request;
7import com.google.net.stubby.RequestRegistry;
lryanf8bbc122014-06-03 14:22:33 -07008import com.google.net.stubby.Response;
ejona07d3f6a2014-05-14 11:26:57 -07009import com.google.net.stubby.Session;
10import com.google.net.stubby.Status;
11import com.google.net.stubby.transport.MessageFramer;
ejona07d3f6a2014-05-14 11:26:57 -070012import com.google.net.stubby.transport.Transport.Code;
13
14import io.netty.buffer.ByteBuf;
lryan0cf7d4c2014-06-11 11:50:49 -070015import io.netty.channel.ChannelFuture;
ejona07d3f6a2014-05-14 11:26:57 -070016import io.netty.channel.ChannelHandlerContext;
lryan03599072014-07-07 18:44:00 -070017import io.netty.channel.ChannelPromise;
lryan0cf7d4c2014-06-11 11:50:49 -070018import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
lryan03599072014-07-07 18:44:00 -070019import io.netty.handler.codec.http2.DefaultHttp2Connection;
20import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
21import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
22import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
23import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
24import io.netty.handler.codec.http2.Http2Connection;
lryan0cf7d4c2014-06-11 11:50:49 -070025import io.netty.handler.codec.http2.Http2Error;
26import io.netty.handler.codec.http2.Http2Exception;
27import io.netty.handler.codec.http2.Http2Headers;
28import io.netty.handler.codec.http2.Http2Settings;
ejona07d3f6a2014-05-14 11:26:57 -070029
30/**
31 * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
32 * request-response dialog
33 */
lryan0cf7d4c2014-06-11 11:50:49 -070034public class Http2Codec extends AbstractHttp2ConnectionHandler {
ejona07d3f6a2014-05-14 11:26:57 -070035
lryan0cf7d4c2014-06-11 11:50:49 -070036 public static final int PADDING = 0;
ejona07d3f6a2014-05-14 11:26:57 -070037 private final boolean client;
38 private final RequestRegistry requestRegistry;
39 private final Session session;
lryan0cf7d4c2014-06-11 11:50:49 -070040 private Http2Codec.Http2Writer http2Writer;
ejona07d3f6a2014-05-14 11:26:57 -070041
42 /**
43 * Constructor used by servers, takes a session which will receive operation events.
44 */
45 public Http2Codec(Session session, RequestRegistry requestRegistry) {
lryan03599072014-07-07 18:44:00 -070046 this(new DefaultHttp2Connection(true, false), false, session, requestRegistry);
ejona07d3f6a2014-05-14 11:26:57 -070047 }
48
49 /**
50 * Constructor used by clients to send operations to a remote server
51 */
52 public Http2Codec(RequestRegistry requestRegistry) {
lryan03599072014-07-07 18:44:00 -070053 this(new DefaultHttp2Connection(false, false), true, null, requestRegistry);
54 }
55
56 /**
57 * Constructor used by servers, takes a session which will receive operation events.
58 */
59 private Http2Codec(Http2Connection connection, boolean client,
60 Session session,
61 RequestRegistry requestRegistry) {
62 super(connection, new DefaultHttp2FrameReader(),
63 new SuppressCompressionSettingsWriter(), new DefaultHttp2InboundFlowController(connection),
64 new DefaultHttp2OutboundFlowController(connection));
65 // TODO(user): Use connection.isServer when not private in base class
66 this.client = client;
67 this.session = session;
ejona07d3f6a2014-05-14 11:26:57 -070068 this.requestRegistry = requestRegistry;
69 }
70
71 @Override
lryan0cf7d4c2014-06-11 11:50:49 -070072 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
73 http2Writer = new Http2Writer(ctx);
74 }
ejona07d3f6a2014-05-14 11:26:57 -070075
lryan0cf7d4c2014-06-11 11:50:49 -070076 public Http2Writer getWriter() {
77 return http2Writer;
ejona07d3f6a2014-05-14 11:26:57 -070078 }
79
80 @Override
lryan0cf7d4c2014-06-11 11:50:49 -070081 public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
82 boolean endOfStream, boolean endOfSegment, boolean compressed)
83 throws Http2Exception {
84 Request request = requestRegistry.lookup(streamId);
85 if (request == null) {
86 // Stream may have been terminated already or this is just plain spurious
87 throw Http2Exception.format(Http2Error.STREAM_CLOSED, "Stream does not exist");
ejona07d3f6a2014-05-14 11:26:57 -070088 }
lryan0cf7d4c2014-06-11 11:50:49 -070089 Operation operation = client ? request.getResponse() : request;
ejona07d3f6a2014-05-14 11:26:57 -070090 try {
lryan0cf7d4c2014-06-11 11:50:49 -070091 ByteBufDeframer deframer = getOrCreateDeframer(operation, ctx);
92 deframer.deframe(data, operation);
93 if (endOfStream) {
94 finish(operation);
ejona07d3f6a2014-05-14 11:26:57 -070095 }
96 } catch (Throwable e) {
lryan0cf7d4c2014-06-11 11:50:49 -070097 // TODO(user): Need to disambiguate between stream corruption as well as client/server
98 // generated errors. For stream corruption we always just send reset stream. For
99 // clients we will also generally reset-stream on error, servers may send a more detailed
100 // status.
lryanf8bbc122014-06-03 14:22:33 -0700101 Status status = Status.fromThrowable(e);
lryan0cf7d4c2014-06-11 11:50:49 -0700102 closeWithError(request, status);
lryanf8bbc122014-06-03 14:22:33 -0700103 }
104 }
105
lryan0cf7d4c2014-06-11 11:50:49 -0700106 @Override
107 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
nathanmittler56c1d8c2014-06-23 08:57:25 -0700108 int streamDependency, short weight, boolean exclusive, int padding,
109 boolean endStream, boolean endSegment) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -0700110 Request operation = requestRegistry.lookup(streamId);
111 if (operation == null) {
112 if (client) {
113 // For clients an operation must already exist in the registry
114 throw Http2Exception.format(Http2Error.REFUSED_STREAM, "Stream does not exist");
115 } else {
116 operation = serverStart(ctx, streamId, headers);
117 if (operation == null) {
118 closeWithError(new NoOpRequest(createResponse(new Http2Writer(ctx), streamId).build()),
119 new Status(Code.NOT_FOUND));
120 }
121 }
122 }
123 if (endStream) {
124 finish(client ? operation.getResponse() : operation);
125 }
126 }
127
128 @Override
lryan0cf7d4c2014-06-11 11:50:49 -0700129 public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
130 short weight, boolean exclusive) throws Http2Exception {
131 // TODO
132 }
133
134 @Override
135 public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
136 throws Http2Exception {
137 Request request = requestRegistry.lookup(streamId);
138 if (request != null) {
139 closeWithError(request, new Status(Code.CANCELLED, "Stream reset"));
140 requestRegistry.remove(streamId);
141 }
142 }
143
144 @Override
145 public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
146 // TOOD
147 }
148
149 @Override
150 public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
151 throws Http2Exception {
152 // TOOD
153 }
154
155 @Override
156 public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
157 // TODO
158 }
159
160 @Override
161 public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
162 // TODO
163 }
164
165 @Override
166 public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
167 Http2Headers headers, int padding) throws Http2Exception {
168 // TODO
169 }
170
171 @Override
172 public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
173 ByteBuf debugData) throws Http2Exception {
174 // TODO
175 }
176
177 @Override
178 public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
179 throws Http2Exception {
180 // TODO
181 }
182
183 @Override
184 public void onAltSvcRead(ChannelHandlerContext ctx, int streamId, long maxAge, int port,
185 ByteBuf protocolId, String host, String origin) throws Http2Exception {
186 // TODO
187 }
188
189 @Override
190 public void onBlockedRead(ChannelHandlerContext ctx, int streamId) throws Http2Exception {
191 // TODO
192 }
lryanf8bbc122014-06-03 14:22:33 -0700193
194 /**
195 * Closes the request and its associated response with an internal error.
196 */
197 private void closeWithError(Request request, Status status) {
198 try {
199 request.close(status);
200 request.getResponse().close(status);
201 } finally {
202 requestRegistry.remove(request.getId());
lryan0cf7d4c2014-06-11 11:50:49 -0700203 disposeDeframer(request);
ejona07d3f6a2014-05-14 11:26:57 -0700204 }
205 }
206
207 /**
lryanf8bbc122014-06-03 14:22:33 -0700208 * Create an HTTP2 response handler
ejona07d3f6a2014-05-14 11:26:57 -0700209 */
lryan0cf7d4c2014-06-11 11:50:49 -0700210 private Response.ResponseBuilder createResponse(Http2Writer writer, int streamId) {
211 return Http2Response.builder(streamId, writer, new MessageFramer(4096));
ejona07d3f6a2014-05-14 11:26:57 -0700212 }
213
214 /**
215 * Start the Request operation on the server
216 */
lryan0cf7d4c2014-06-11 11:50:49 -0700217 private Request serverStart(ChannelHandlerContext ctx, int streamId, Http2Headers headers) {
218 if (!Http2Session.PROTORPC.equals(headers.get("content-type"))) {
ejona07d3f6a2014-05-14 11:26:57 -0700219 return null;
220 }
221 // Use Path to specify the operation
222 String operationName =
lryan0cf7d4c2014-06-11 11:50:49 -0700223 normalizeOperationName(headers.get(Http2Headers.HttpName.PATH.value()));
ejona07d3f6a2014-05-14 11:26:57 -0700224 if (operationName == null) {
225 return null;
226 }
227 // Create the operation and bind a HTTP2 response operation
lryan0cf7d4c2014-06-11 11:50:49 -0700228 Request op = session.startRequest(operationName, createResponse(new Http2Writer(ctx),
229 streamId));
ejona07d3f6a2014-05-14 11:26:57 -0700230 if (op == null) {
231 return null;
232 }
233 requestRegistry.register(op);
ejona07d3f6a2014-05-14 11:26:57 -0700234 return op;
235 }
236
237 // TODO(user): This needs proper namespacing support, this is currently just a hack
238 private static String normalizeOperationName(String path) {
239 return path.substring(1);
240 }
241
ejona07d3f6a2014-05-14 11:26:57 -0700242 /**
243 * Called when a HTTP2 stream is closed.
244 */
245 private void finish(Operation operation) {
246 disposeDeframer(operation);
247 requestRegistry.remove(operation.getId());
248 if (operation.getPhase() != Phase.CLOSED) {
249 operation.close(Status.OK);
250 }
251 }
252
lryan0cf7d4c2014-06-11 11:50:49 -0700253 public ByteBufDeframer getOrCreateDeframer(Operation operation, ChannelHandlerContext ctx) {
ejona07d3f6a2014-05-14 11:26:57 -0700254 ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
255 if (deframer == null) {
lryan0cf7d4c2014-06-11 11:50:49 -0700256 deframer = new ByteBufDeframer(ctx.alloc());
ejona07d3f6a2014-05-14 11:26:57 -0700257 operation.put(ByteBufDeframer.class, deframer);
258 }
259 return deframer;
260 }
261
262 public void disposeDeframer(Operation operation) {
263 ByteBufDeframer deframer = operation.remove(ByteBufDeframer.class);
264 if (deframer != null) {
265 deframer.dispose();
266 }
267 }
lryan0cf7d4c2014-06-11 11:50:49 -0700268
lryan03599072014-07-07 18:44:00 -0700269 // TODO(user): Remove this once fixes are done in netty too
270 private static class SuppressCompressionSettingsWriter extends DefaultHttp2FrameWriter {
271 @Override
272 public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise,
273 Http2Settings settings) {
274 Http2Settings newSettings = new Http2Settings();
275 if (settings.hasInitialWindowSize()) {
276 newSettings.initialWindowSize(settings.initialWindowSize());
277 }
278 if (settings.hasMaxConcurrentStreams()) {
279 newSettings.maxConcurrentStreams(settings.maxConcurrentStreams());
280 }
281 if (settings.hasMaxHeaderTableSize()) {
282 newSettings.maxHeaderTableSize(settings.maxHeaderTableSize());
283 }
284 if (settings.hasPushEnabled()) {
285 newSettings.pushEnabled(settings.pushEnabled());
286 }
287 return super.writeSettings(ctx, promise, newSettings);
288 }
289 }
290
lryan0cf7d4c2014-06-11 11:50:49 -0700291 public class Http2Writer {
292 private final ChannelHandlerContext ctx;
293
294 public Http2Writer(ChannelHandlerContext ctx) {
295 this.ctx = ctx;
296 }
297
298 public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream,
299 boolean endSegment, boolean compressed) {
300 return Http2Codec.this.writeData(ctx, ctx.newPromise(),
301 streamId, data, PADDING, endStream, endSegment, compressed);
302 }
303
304 public ChannelFuture writeHeaders(int streamId,
305 Http2Headers headers,
306 boolean endStream, boolean endSegment) {
307
308 return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
309 headers, PADDING, endStream, endSegment);
310 }
311
312 public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int streamDependency,
313 short weight, boolean exclusive,
314 boolean endStream, boolean endSegment) {
315 return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
316 headers, streamDependency, weight, exclusive, PADDING, endStream, endSegment);
317 }
318
319 public ChannelFuture writeRstStream(int streamId, long errorCode) {
320 return Http2Codec.this.writeRstStream(ctx, ctx.newPromise(),
321 streamId,
322 errorCode);
323 }
324 }
ejona07d3f6a2014-05-14 11:26:57 -0700325}