blob: ab7884a87750b50c965eaa51c762eb01b50dd04f [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.http2.netty;
2
lryan34650402014-07-14 13:39:39 -07003import com.google.common.collect.ImmutableMap;
lryanf8bbc122014-06-03 14:22:33 -07004import com.google.net.stubby.NoOpRequest;
ejona07d3f6a2014-05-14 11:26:57 -07005import com.google.net.stubby.Operation;
6import com.google.net.stubby.Operation.Phase;
7import com.google.net.stubby.Request;
8import com.google.net.stubby.RequestRegistry;
lryanf8bbc122014-06-03 14:22:33 -07009import com.google.net.stubby.Response;
ejona07d3f6a2014-05-14 11:26:57 -070010import com.google.net.stubby.Session;
11import com.google.net.stubby.Status;
12import com.google.net.stubby.transport.MessageFramer;
ejona07d3f6a2014-05-14 11:26:57 -070013import com.google.net.stubby.transport.Transport.Code;
14
15import io.netty.buffer.ByteBuf;
lryan0cf7d4c2014-06-11 11:50:49 -070016import io.netty.channel.ChannelFuture;
ejona07d3f6a2014-05-14 11:26:57 -070017import io.netty.channel.ChannelHandlerContext;
lryan0cf7d4c2014-06-11 11:50:49 -070018import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
lryan03599072014-07-07 18:44:00 -070019import io.netty.handler.codec.http2.DefaultHttp2Connection;
lryan03599072014-07-07 18:44:00 -070020import io.netty.handler.codec.http2.Http2Connection;
lryan0cf7d4c2014-06-11 11:50:49 -070021import io.netty.handler.codec.http2.Http2Error;
22import io.netty.handler.codec.http2.Http2Exception;
23import io.netty.handler.codec.http2.Http2Headers;
24import io.netty.handler.codec.http2.Http2Settings;
ejona07d3f6a2014-05-14 11:26:57 -070025
nathanmittlerbf8870b2014-07-16 13:28:33 -070026import java.util.Map;
27
ejona07d3f6a2014-05-14 11:26:57 -070028/**
29 * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
30 * request-response dialog
31 */
lryan0cf7d4c2014-06-11 11:50:49 -070032public class Http2Codec extends AbstractHttp2ConnectionHandler {
ejona07d3f6a2014-05-14 11:26:57 -070033
lryan0cf7d4c2014-06-11 11:50:49 -070034 public static final int PADDING = 0;
ejona07d3f6a2014-05-14 11:26:57 -070035 private final RequestRegistry requestRegistry;
36 private final Session session;
lryan0cf7d4c2014-06-11 11:50:49 -070037 private Http2Codec.Http2Writer http2Writer;
ejona07d3f6a2014-05-14 11:26:57 -070038
39 /**
40 * Constructor used by servers, takes a session which will receive operation events.
41 */
42 public Http2Codec(Session session, RequestRegistry requestRegistry) {
nathanmittlerbf8870b2014-07-16 13:28:33 -070043 this(new DefaultHttp2Connection(true), session, requestRegistry);
ejona07d3f6a2014-05-14 11:26:57 -070044 }
45
46 /**
47 * Constructor used by clients to send operations to a remote server
48 */
49 public Http2Codec(RequestRegistry requestRegistry) {
nathanmittlerbf8870b2014-07-16 13:28:33 -070050 this(new DefaultHttp2Connection(false), null, requestRegistry);
lryan03599072014-07-07 18:44:00 -070051 }
52
53 /**
54 * Constructor used by servers, takes a session which will receive operation events.
55 */
nathanmittlerbf8870b2014-07-16 13:28:33 -070056 private Http2Codec(Http2Connection connection,
lryan03599072014-07-07 18:44:00 -070057 Session session,
58 RequestRegistry requestRegistry) {
nathanmittlerbf8870b2014-07-16 13:28:33 -070059 super(connection);
lryan03599072014-07-07 18:44:00 -070060 this.session = session;
ejona07d3f6a2014-05-14 11:26:57 -070061 this.requestRegistry = requestRegistry;
62 }
63
64 @Override
lryan0cf7d4c2014-06-11 11:50:49 -070065 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
66 http2Writer = new Http2Writer(ctx);
67 }
ejona07d3f6a2014-05-14 11:26:57 -070068
lryan0cf7d4c2014-06-11 11:50:49 -070069 public Http2Writer getWriter() {
70 return http2Writer;
ejona07d3f6a2014-05-14 11:26:57 -070071 }
72
73 @Override
lryan0cf7d4c2014-06-11 11:50:49 -070074 public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
nathanmittler42af07c2014-08-19 12:26:11 -070075 boolean endOfStream)
lryan0cf7d4c2014-06-11 11:50:49 -070076 throws Http2Exception {
77 Request request = requestRegistry.lookup(streamId);
78 if (request == null) {
79 // Stream may have been terminated already or this is just plain spurious
80 throw Http2Exception.format(Http2Error.STREAM_CLOSED, "Stream does not exist");
ejona07d3f6a2014-05-14 11:26:57 -070081 }
nathanmittlerbf8870b2014-07-16 13:28:33 -070082 Operation operation = isClient() ? request.getResponse() : request;
ejona07d3f6a2014-05-14 11:26:57 -070083 try {
lryan0cf7d4c2014-06-11 11:50:49 -070084 ByteBufDeframer deframer = getOrCreateDeframer(operation, ctx);
85 deframer.deframe(data, operation);
86 if (endOfStream) {
87 finish(operation);
ejona07d3f6a2014-05-14 11:26:57 -070088 }
89 } catch (Throwable e) {
lryan0cf7d4c2014-06-11 11:50:49 -070090 // TODO(user): Need to disambiguate between stream corruption as well as client/server
91 // generated errors. For stream corruption we always just send reset stream. For
92 // clients we will also generally reset-stream on error, servers may send a more detailed
93 // status.
lryanf8bbc122014-06-03 14:22:33 -070094 Status status = Status.fromThrowable(e);
lryan0cf7d4c2014-06-11 11:50:49 -070095 closeWithError(request, status);
lryanf8bbc122014-06-03 14:22:33 -070096 }
97 }
98
lryan0cf7d4c2014-06-11 11:50:49 -070099 @Override
100 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
nathanmittler56c1d8c2014-06-23 08:57:25 -0700101 int streamDependency, short weight, boolean exclusive, int padding,
nathanmittler42af07c2014-08-19 12:26:11 -0700102 boolean endStream) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -0700103 Request operation = requestRegistry.lookup(streamId);
104 if (operation == null) {
nathanmittlerbf8870b2014-07-16 13:28:33 -0700105 if (isClient()) {
lryan0cf7d4c2014-06-11 11:50:49 -0700106 // For clients an operation must already exist in the registry
107 throw Http2Exception.format(Http2Error.REFUSED_STREAM, "Stream does not exist");
108 } else {
109 operation = serverStart(ctx, streamId, headers);
110 if (operation == null) {
111 closeWithError(new NoOpRequest(createResponse(new Http2Writer(ctx), streamId).build()),
112 new Status(Code.NOT_FOUND));
113 }
114 }
115 }
116 if (endStream) {
nathanmittlerbf8870b2014-07-16 13:28:33 -0700117 finish(isClient() ? operation.getResponse() : operation);
lryan0cf7d4c2014-06-11 11:50:49 -0700118 }
119 }
120
121 @Override
lryan0cf7d4c2014-06-11 11:50:49 -0700122 public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
123 short weight, boolean exclusive) throws Http2Exception {
124 // TODO
125 }
126
127 @Override
128 public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
129 throws Http2Exception {
130 Request request = requestRegistry.lookup(streamId);
131 if (request != null) {
132 closeWithError(request, new Status(Code.CANCELLED, "Stream reset"));
133 requestRegistry.remove(streamId);
134 }
135 }
136
137 @Override
138 public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
139 // TOOD
140 }
141
142 @Override
143 public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
144 throws Http2Exception {
145 // TOOD
146 }
147
148 @Override
149 public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
150 // TODO
151 }
152
153 @Override
154 public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
155 // TODO
156 }
157
158 @Override
159 public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
160 Http2Headers headers, int padding) throws Http2Exception {
161 // TODO
162 }
163
164 @Override
165 public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
166 ByteBuf debugData) throws Http2Exception {
167 // TODO
168 }
169
170 @Override
171 public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
172 throws Http2Exception {
173 // TODO
174 }
175
nathanmittlerbf8870b2014-07-16 13:28:33 -0700176 private boolean isClient() {
177 return !connection().isServer();
lryan0cf7d4c2014-06-11 11:50:49 -0700178 }
lryanf8bbc122014-06-03 14:22:33 -0700179
180 /**
181 * Closes the request and its associated response with an internal error.
182 */
183 private void closeWithError(Request request, Status status) {
184 try {
185 request.close(status);
186 request.getResponse().close(status);
187 } finally {
188 requestRegistry.remove(request.getId());
lryan0cf7d4c2014-06-11 11:50:49 -0700189 disposeDeframer(request);
ejona07d3f6a2014-05-14 11:26:57 -0700190 }
191 }
192
193 /**
lryanf8bbc122014-06-03 14:22:33 -0700194 * Create an HTTP2 response handler
ejona07d3f6a2014-05-14 11:26:57 -0700195 */
lryan0cf7d4c2014-06-11 11:50:49 -0700196 private Response.ResponseBuilder createResponse(Http2Writer writer, int streamId) {
197 return Http2Response.builder(streamId, writer, new MessageFramer(4096));
ejona07d3f6a2014-05-14 11:26:57 -0700198 }
199
200 /**
201 * Start the Request operation on the server
202 */
lryan0cf7d4c2014-06-11 11:50:49 -0700203 private Request serverStart(ChannelHandlerContext ctx, int streamId, Http2Headers headers) {
204 if (!Http2Session.PROTORPC.equals(headers.get("content-type"))) {
ejona07d3f6a2014-05-14 11:26:57 -0700205 return null;
206 }
207 // Use Path to specify the operation
208 String operationName =
nathanmittler42af07c2014-08-19 12:26:11 -0700209 normalizeOperationName(headers.get(Http2Headers.PseudoHeaderName.PATH.value()));
ejona07d3f6a2014-05-14 11:26:57 -0700210 if (operationName == null) {
211 return null;
212 }
lryan34650402014-07-14 13:39:39 -0700213 ImmutableMap.Builder<String, String> headerMap = ImmutableMap.builder();
214 for (Map.Entry<String, String> header : headers) {
215 headerMap.put(header);
216 }
ejona07d3f6a2014-05-14 11:26:57 -0700217 // Create the operation and bind a HTTP2 response operation
lryan34650402014-07-14 13:39:39 -0700218 Request op = session.startRequest(operationName, headerMap.build(),
219 createResponse(new Http2Writer(ctx),
lryan0cf7d4c2014-06-11 11:50:49 -0700220 streamId));
ejona07d3f6a2014-05-14 11:26:57 -0700221 if (op == null) {
222 return null;
223 }
224 requestRegistry.register(op);
ejona07d3f6a2014-05-14 11:26:57 -0700225 return op;
226 }
227
228 // TODO(user): This needs proper namespacing support, this is currently just a hack
229 private static String normalizeOperationName(String path) {
230 return path.substring(1);
231 }
232
ejona07d3f6a2014-05-14 11:26:57 -0700233 /**
234 * Called when a HTTP2 stream is closed.
235 */
236 private void finish(Operation operation) {
237 disposeDeframer(operation);
238 requestRegistry.remove(operation.getId());
239 if (operation.getPhase() != Phase.CLOSED) {
240 operation.close(Status.OK);
241 }
242 }
243
lryan0cf7d4c2014-06-11 11:50:49 -0700244 public ByteBufDeframer getOrCreateDeframer(Operation operation, ChannelHandlerContext ctx) {
ejona07d3f6a2014-05-14 11:26:57 -0700245 ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
246 if (deframer == null) {
lryan0cf7d4c2014-06-11 11:50:49 -0700247 deframer = new ByteBufDeframer(ctx.alloc());
ejona07d3f6a2014-05-14 11:26:57 -0700248 operation.put(ByteBufDeframer.class, deframer);
249 }
250 return deframer;
251 }
252
253 public void disposeDeframer(Operation operation) {
254 ByteBufDeframer deframer = operation.remove(ByteBufDeframer.class);
255 if (deframer != null) {
256 deframer.dispose();
257 }
258 }
lryan0cf7d4c2014-06-11 11:50:49 -0700259
260 public class Http2Writer {
261 private final ChannelHandlerContext ctx;
262
263 public Http2Writer(ChannelHandlerContext ctx) {
264 this.ctx = ctx;
265 }
266
nathanmittler42af07c2014-08-19 12:26:11 -0700267 public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream) {
lryan0cf7d4c2014-06-11 11:50:49 -0700268 return Http2Codec.this.writeData(ctx, ctx.newPromise(),
nathanmittler42af07c2014-08-19 12:26:11 -0700269 streamId, data, PADDING, endStream);
lryan0cf7d4c2014-06-11 11:50:49 -0700270 }
271
272 public ChannelFuture writeHeaders(int streamId,
273 Http2Headers headers,
nathanmittler42af07c2014-08-19 12:26:11 -0700274 boolean endStream) {
lryan0cf7d4c2014-06-11 11:50:49 -0700275
276 return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
nathanmittler42af07c2014-08-19 12:26:11 -0700277 headers, PADDING, endStream);
lryan0cf7d4c2014-06-11 11:50:49 -0700278 }
279
280 public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int streamDependency,
281 short weight, boolean exclusive,
nathanmittler42af07c2014-08-19 12:26:11 -0700282 boolean endStream) {
lryan0cf7d4c2014-06-11 11:50:49 -0700283 return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
nathanmittler42af07c2014-08-19 12:26:11 -0700284 headers, streamDependency, weight, exclusive, PADDING, endStream);
lryan0cf7d4c2014-06-11 11:50:49 -0700285 }
286
287 public ChannelFuture writeRstStream(int streamId, long errorCode) {
288 return Http2Codec.this.writeRstStream(ctx, ctx.newPromise(),
289 streamId,
290 errorCode);
291 }
292 }
ejona07d3f6a2014-05-14 11:26:57 -0700293}