blob: 5e8dea548b93ed3f690d404dfc637a8b424f328a [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.http2.netty;
2
lryane4bd1c72014-09-08 14:03:35 -07003import com.google.net.stubby.Metadata;
lryanf8bbc122014-06-03 14:22:33 -07004import com.google.net.stubby.NoOpRequest;
ejona07d3f6a2014-05-14 11:26:57 -07005import com.google.net.stubby.Operation;
6import com.google.net.stubby.Operation.Phase;
7import com.google.net.stubby.Request;
8import com.google.net.stubby.RequestRegistry;
lryanf8bbc122014-06-03 14:22:33 -07009import com.google.net.stubby.Response;
ejona07d3f6a2014-05-14 11:26:57 -070010import com.google.net.stubby.Session;
11import com.google.net.stubby.Status;
12import com.google.net.stubby.transport.MessageFramer;
ejona07d3f6a2014-05-14 11:26:57 -070013import com.google.net.stubby.transport.Transport.Code;
14
15import io.netty.buffer.ByteBuf;
lryan0cf7d4c2014-06-11 11:50:49 -070016import io.netty.channel.ChannelFuture;
ejona07d3f6a2014-05-14 11:26:57 -070017import io.netty.channel.ChannelHandlerContext;
nathanmittlerfc7a0522014-09-18 16:10:01 -070018import io.netty.handler.codec.AsciiString;
lryan0cf7d4c2014-06-11 11:50:49 -070019import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
lryan03599072014-07-07 18:44:00 -070020import io.netty.handler.codec.http2.DefaultHttp2Connection;
lryan03599072014-07-07 18:44:00 -070021import io.netty.handler.codec.http2.Http2Connection;
lryan0cf7d4c2014-06-11 11:50:49 -070022import io.netty.handler.codec.http2.Http2Error;
23import io.netty.handler.codec.http2.Http2Exception;
24import io.netty.handler.codec.http2.Http2Headers;
25import io.netty.handler.codec.http2.Http2Settings;
ejona07d3f6a2014-05-14 11:26:57 -070026
nathanmittlerfc7a0522014-09-18 16:10:01 -070027import java.util.Map;
28
ejona07d3f6a2014-05-14 11:26:57 -070029/**
30 * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
31 * request-response dialog
32 */
lryan0cf7d4c2014-06-11 11:50:49 -070033public class Http2Codec extends AbstractHttp2ConnectionHandler {
lryan0cf7d4c2014-06-11 11:50:49 -070034 public static final int PADDING = 0;
ejona07d3f6a2014-05-14 11:26:57 -070035 private final RequestRegistry requestRegistry;
36 private final Session session;
lryan0cf7d4c2014-06-11 11:50:49 -070037 private Http2Codec.Http2Writer http2Writer;
ejona07d3f6a2014-05-14 11:26:57 -070038
39 /**
40 * Constructor used by servers, takes a session which will receive operation events.
41 */
42 public Http2Codec(Session session, RequestRegistry requestRegistry) {
nathanmittlerbf8870b2014-07-16 13:28:33 -070043 this(new DefaultHttp2Connection(true), session, requestRegistry);
ejona07d3f6a2014-05-14 11:26:57 -070044 }
45
46 /**
47 * Constructor used by clients to send operations to a remote server
48 */
49 public Http2Codec(RequestRegistry requestRegistry) {
nathanmittlerbf8870b2014-07-16 13:28:33 -070050 this(new DefaultHttp2Connection(false), null, requestRegistry);
lryan03599072014-07-07 18:44:00 -070051 }
52
53 /**
54 * Constructor used by servers, takes a session which will receive operation events.
55 */
nathanmittlere081a222014-08-27 14:00:15 -070056 private Http2Codec(Http2Connection connection, Session session, RequestRegistry requestRegistry) {
nathanmittlerbf8870b2014-07-16 13:28:33 -070057 super(connection);
lryan03599072014-07-07 18:44:00 -070058 this.session = session;
ejona07d3f6a2014-05-14 11:26:57 -070059 this.requestRegistry = requestRegistry;
60 }
61
62 @Override
lryan0cf7d4c2014-06-11 11:50:49 -070063 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
64 http2Writer = new Http2Writer(ctx);
65 }
ejona07d3f6a2014-05-14 11:26:57 -070066
lryan0cf7d4c2014-06-11 11:50:49 -070067 public Http2Writer getWriter() {
68 return http2Writer;
ejona07d3f6a2014-05-14 11:26:57 -070069 }
70
71 @Override
lryan0cf7d4c2014-06-11 11:50:49 -070072 public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
nathanmittlere081a222014-08-27 14:00:15 -070073 boolean endOfStream) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -070074 Request request = requestRegistry.lookup(streamId);
75 if (request == null) {
76 // Stream may have been terminated already or this is just plain spurious
nathanmittlere081a222014-08-27 14:00:15 -070077 throw Http2Exception.format(Http2Error.STREAM_CLOSED, "Stream does not exist");
ejona07d3f6a2014-05-14 11:26:57 -070078 }
nathanmittlerbf8870b2014-07-16 13:28:33 -070079 Operation operation = isClient() ? request.getResponse() : request;
ejona07d3f6a2014-05-14 11:26:57 -070080 try {
lryan0cf7d4c2014-06-11 11:50:49 -070081 ByteBufDeframer deframer = getOrCreateDeframer(operation, ctx);
82 deframer.deframe(data, operation);
83 if (endOfStream) {
84 finish(operation);
ejona07d3f6a2014-05-14 11:26:57 -070085 }
86 } catch (Throwable e) {
lryan0cf7d4c2014-06-11 11:50:49 -070087 // TODO(user): Need to disambiguate between stream corruption as well as client/server
88 // generated errors. For stream corruption we always just send reset stream. For
89 // clients we will also generally reset-stream on error, servers may send a more detailed
90 // status.
lryanf8bbc122014-06-03 14:22:33 -070091 Status status = Status.fromThrowable(e);
lryan0cf7d4c2014-06-11 11:50:49 -070092 closeWithError(request, status);
lryanf8bbc122014-06-03 14:22:33 -070093 }
94 }
95
lryan0cf7d4c2014-06-11 11:50:49 -070096 @Override
nathanmittlere081a222014-08-27 14:00:15 -070097 public void onHeadersRead(ChannelHandlerContext ctx,
98 int streamId,
99 Http2Headers headers,
100 int streamDependency,
101 short weight,
102 boolean exclusive,
103 int padding,
104 boolean endStream) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -0700105 Request operation = requestRegistry.lookup(streamId);
106 if (operation == null) {
nathanmittlerbf8870b2014-07-16 13:28:33 -0700107 if (isClient()) {
lryan0cf7d4c2014-06-11 11:50:49 -0700108 // For clients an operation must already exist in the registry
109 throw Http2Exception.format(Http2Error.REFUSED_STREAM, "Stream does not exist");
110 } else {
111 operation = serverStart(ctx, streamId, headers);
112 if (operation == null) {
113 closeWithError(new NoOpRequest(createResponse(new Http2Writer(ctx), streamId).build()),
114 new Status(Code.NOT_FOUND));
115 }
116 }
117 }
118 if (endStream) {
nathanmittlerbf8870b2014-07-16 13:28:33 -0700119 finish(isClient() ? operation.getResponse() : operation);
lryan0cf7d4c2014-06-11 11:50:49 -0700120 }
121 }
122
123 @Override
lryan0cf7d4c2014-06-11 11:50:49 -0700124 public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
nathanmittlere081a222014-08-27 14:00:15 -0700125 short weight, boolean exclusive) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -0700126 // TODO
127 }
128
129 @Override
130 public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
131 throws Http2Exception {
132 Request request = requestRegistry.lookup(streamId);
133 if (request != null) {
134 closeWithError(request, new Status(Code.CANCELLED, "Stream reset"));
135 requestRegistry.remove(streamId);
136 }
137 }
138
139 @Override
140 public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
141 // TOOD
142 }
143
144 @Override
145 public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
146 throws Http2Exception {
147 // TOOD
148 }
149
150 @Override
151 public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
152 // TODO
153 }
154
155 @Override
156 public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
157 // TODO
158 }
159
160 @Override
161 public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
nathanmittlere081a222014-08-27 14:00:15 -0700162 Http2Headers headers, int padding) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -0700163 // TODO
164 }
165
166 @Override
167 public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
nathanmittlere081a222014-08-27 14:00:15 -0700168 ByteBuf debugData) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -0700169 // TODO
170 }
171
172 @Override
173 public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
174 throws Http2Exception {
175 // TODO
176 }
177
nathanmittlerbf8870b2014-07-16 13:28:33 -0700178 private boolean isClient() {
179 return !connection().isServer();
lryan0cf7d4c2014-06-11 11:50:49 -0700180 }
lryanf8bbc122014-06-03 14:22:33 -0700181
182 /**
183 * Closes the request and its associated response with an internal error.
184 */
185 private void closeWithError(Request request, Status status) {
186 try {
187 request.close(status);
188 request.getResponse().close(status);
189 } finally {
190 requestRegistry.remove(request.getId());
lryan0cf7d4c2014-06-11 11:50:49 -0700191 disposeDeframer(request);
ejona07d3f6a2014-05-14 11:26:57 -0700192 }
193 }
194
195 /**
lryanf8bbc122014-06-03 14:22:33 -0700196 * Create an HTTP2 response handler
ejona07d3f6a2014-05-14 11:26:57 -0700197 */
lryan0cf7d4c2014-06-11 11:50:49 -0700198 private Response.ResponseBuilder createResponse(Http2Writer writer, int streamId) {
199 return Http2Response.builder(streamId, writer, new MessageFramer(4096));
ejona07d3f6a2014-05-14 11:26:57 -0700200 }
201
202 /**
203 * Start the Request operation on the server
204 */
lryan0cf7d4c2014-06-11 11:50:49 -0700205 private Request serverStart(ChannelHandlerContext ctx, int streamId, Http2Headers headers) {
nathanmittlerfc7a0522014-09-18 16:10:01 -0700206 if (!Http2Session.PROTORPC.equals(headers.get(Http2Session.CONTENT_TYPE))) {
ejona07d3f6a2014-05-14 11:26:57 -0700207 return null;
208 }
209 // Use Path to specify the operation
210 String operationName =
nathanmittlerfc7a0522014-09-18 16:10:01 -0700211 normalizeOperationName(headers.get(Http2Headers.PseudoHeaderName.PATH.value()).toString());
ejona07d3f6a2014-05-14 11:26:57 -0700212 if (operationName == null) {
213 return null;
214 }
nathanmittlerfc7a0522014-09-18 16:10:01 -0700215
216 // The Netty AsciiString class is really just a wrapper around a byte[] and supports
217 // arbitrary binary data, not just ASCII.
218 byte[][] headerValues = new byte[headers.size() * 2][];
219 int i = 0;
220 for (Map.Entry<AsciiString, AsciiString> entry : headers) {
221 headerValues[i++] = entry.getKey().array();
222 headerValues[i++] = entry.getValue().array();
223 }
224 Metadata.Headers grpcHeaders = new Metadata.Headers(headerValues);
225
ejona07d3f6a2014-05-14 11:26:57 -0700226 // Create the operation and bind a HTTP2 response operation
lryane4bd1c72014-09-08 14:03:35 -0700227 Request op = session.startRequest(operationName, grpcHeaders,
nathanmittlere081a222014-08-27 14:00:15 -0700228 createResponse(new Http2Writer(ctx), streamId));
ejona07d3f6a2014-05-14 11:26:57 -0700229 if (op == null) {
230 return null;
231 }
232 requestRegistry.register(op);
ejona07d3f6a2014-05-14 11:26:57 -0700233 return op;
234 }
235
236 // TODO(user): This needs proper namespacing support, this is currently just a hack
237 private static String normalizeOperationName(String path) {
238 return path.substring(1);
239 }
240
ejona07d3f6a2014-05-14 11:26:57 -0700241 /**
242 * Called when a HTTP2 stream is closed.
243 */
244 private void finish(Operation operation) {
245 disposeDeframer(operation);
246 requestRegistry.remove(operation.getId());
247 if (operation.getPhase() != Phase.CLOSED) {
248 operation.close(Status.OK);
249 }
250 }
251
lryan0cf7d4c2014-06-11 11:50:49 -0700252 public ByteBufDeframer getOrCreateDeframer(Operation operation, ChannelHandlerContext ctx) {
ejona07d3f6a2014-05-14 11:26:57 -0700253 ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
254 if (deframer == null) {
lryan0cf7d4c2014-06-11 11:50:49 -0700255 deframer = new ByteBufDeframer(ctx.alloc());
ejona07d3f6a2014-05-14 11:26:57 -0700256 operation.put(ByteBufDeframer.class, deframer);
257 }
258 return deframer;
259 }
260
261 public void disposeDeframer(Operation operation) {
262 ByteBufDeframer deframer = operation.remove(ByteBufDeframer.class);
263 if (deframer != null) {
264 deframer.dispose();
265 }
266 }
lryan0cf7d4c2014-06-11 11:50:49 -0700267
268 public class Http2Writer {
269 private final ChannelHandlerContext ctx;
270
271 public Http2Writer(ChannelHandlerContext ctx) {
272 this.ctx = ctx;
273 }
274
nathanmittler42af07c2014-08-19 12:26:11 -0700275 public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream) {
nathanmittlere081a222014-08-27 14:00:15 -0700276 return Http2Codec.this.writeData(ctx, streamId, data, PADDING, endStream, ctx.newPromise());
277 }
278
279 public ChannelFuture writeHeaders(int streamId, Http2Headers headers, boolean endStream) {
280
281 return Http2Codec.this.writeHeaders(ctx,
282 streamId,
283 headers,
284 PADDING,
285 endStream,
286 ctx.newPromise());
lryan0cf7d4c2014-06-11 11:50:49 -0700287 }
288
289 public ChannelFuture writeHeaders(int streamId,
nathanmittlere081a222014-08-27 14:00:15 -0700290 Http2Headers headers,
291 int streamDependency,
292 short weight,
293 boolean exclusive,
294 boolean endStream) {
295 return Http2Codec.this.writeHeaders(ctx,
296 streamId,
297 headers,
298 streamDependency,
299 weight,
300 exclusive,
301 PADDING,
302 endStream,
303 ctx.newPromise());
lryan0cf7d4c2014-06-11 11:50:49 -0700304 }
305
306 public ChannelFuture writeRstStream(int streamId, long errorCode) {
nathanmittlere081a222014-08-27 14:00:15 -0700307 return Http2Codec.this.writeRstStream(ctx, streamId, errorCode, ctx.newPromise());
lryan0cf7d4c2014-06-11 11:50:49 -0700308 }
309 }
ejona07d3f6a2014-05-14 11:26:57 -0700310}