blob: 873a880adf75a1d93b6c1fb28593624c7c493fc2 [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.http2.netty;
2
lryane4bd1c72014-09-08 14:03:35 -07003import com.google.net.stubby.Metadata;
lryanf8bbc122014-06-03 14:22:33 -07004import com.google.net.stubby.NoOpRequest;
ejona07d3f6a2014-05-14 11:26:57 -07005import com.google.net.stubby.Operation;
6import com.google.net.stubby.Operation.Phase;
7import com.google.net.stubby.Request;
8import com.google.net.stubby.RequestRegistry;
lryanf8bbc122014-06-03 14:22:33 -07009import com.google.net.stubby.Response;
ejona07d3f6a2014-05-14 11:26:57 -070010import com.google.net.stubby.Session;
11import com.google.net.stubby.Status;
12import com.google.net.stubby.transport.MessageFramer;
ejona07d3f6a2014-05-14 11:26:57 -070013import com.google.net.stubby.transport.Transport.Code;
14
15import io.netty.buffer.ByteBuf;
lryan0cf7d4c2014-06-11 11:50:49 -070016import io.netty.channel.ChannelFuture;
ejona07d3f6a2014-05-14 11:26:57 -070017import io.netty.channel.ChannelHandlerContext;
lryan0cf7d4c2014-06-11 11:50:49 -070018import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
lryan03599072014-07-07 18:44:00 -070019import io.netty.handler.codec.http2.DefaultHttp2Connection;
lryan03599072014-07-07 18:44:00 -070020import io.netty.handler.codec.http2.Http2Connection;
lryan0cf7d4c2014-06-11 11:50:49 -070021import io.netty.handler.codec.http2.Http2Error;
22import io.netty.handler.codec.http2.Http2Exception;
23import io.netty.handler.codec.http2.Http2Headers;
24import io.netty.handler.codec.http2.Http2Settings;
ejona07d3f6a2014-05-14 11:26:57 -070025
26/**
27 * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
28 * request-response dialog
29 */
lryan0cf7d4c2014-06-11 11:50:49 -070030public class Http2Codec extends AbstractHttp2ConnectionHandler {
ejona07d3f6a2014-05-14 11:26:57 -070031
lryan0cf7d4c2014-06-11 11:50:49 -070032 public static final int PADDING = 0;
ejona07d3f6a2014-05-14 11:26:57 -070033 private final RequestRegistry requestRegistry;
34 private final Session session;
lryan0cf7d4c2014-06-11 11:50:49 -070035 private Http2Codec.Http2Writer http2Writer;
ejona07d3f6a2014-05-14 11:26:57 -070036
37 /**
38 * Constructor used by servers, takes a session which will receive operation events.
39 */
40 public Http2Codec(Session session, RequestRegistry requestRegistry) {
nathanmittlerbf8870b2014-07-16 13:28:33 -070041 this(new DefaultHttp2Connection(true), session, requestRegistry);
ejona07d3f6a2014-05-14 11:26:57 -070042 }
43
44 /**
45 * Constructor used by clients to send operations to a remote server
46 */
47 public Http2Codec(RequestRegistry requestRegistry) {
nathanmittlerbf8870b2014-07-16 13:28:33 -070048 this(new DefaultHttp2Connection(false), null, requestRegistry);
lryan03599072014-07-07 18:44:00 -070049 }
50
51 /**
52 * Constructor used by servers, takes a session which will receive operation events.
53 */
nathanmittlere081a222014-08-27 14:00:15 -070054 private Http2Codec(Http2Connection connection, Session session, RequestRegistry requestRegistry) {
nathanmittlerbf8870b2014-07-16 13:28:33 -070055 super(connection);
lryan03599072014-07-07 18:44:00 -070056 this.session = session;
ejona07d3f6a2014-05-14 11:26:57 -070057 this.requestRegistry = requestRegistry;
58 }
59
60 @Override
lryan0cf7d4c2014-06-11 11:50:49 -070061 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
62 http2Writer = new Http2Writer(ctx);
63 }
ejona07d3f6a2014-05-14 11:26:57 -070064
lryan0cf7d4c2014-06-11 11:50:49 -070065 public Http2Writer getWriter() {
66 return http2Writer;
ejona07d3f6a2014-05-14 11:26:57 -070067 }
68
69 @Override
lryan0cf7d4c2014-06-11 11:50:49 -070070 public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
nathanmittlere081a222014-08-27 14:00:15 -070071 boolean endOfStream) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -070072 Request request = requestRegistry.lookup(streamId);
73 if (request == null) {
74 // Stream may have been terminated already or this is just plain spurious
nathanmittlere081a222014-08-27 14:00:15 -070075 throw Http2Exception.format(Http2Error.STREAM_CLOSED, "Stream does not exist");
ejona07d3f6a2014-05-14 11:26:57 -070076 }
nathanmittlerbf8870b2014-07-16 13:28:33 -070077 Operation operation = isClient() ? request.getResponse() : request;
ejona07d3f6a2014-05-14 11:26:57 -070078 try {
lryan0cf7d4c2014-06-11 11:50:49 -070079 ByteBufDeframer deframer = getOrCreateDeframer(operation, ctx);
80 deframer.deframe(data, operation);
81 if (endOfStream) {
82 finish(operation);
ejona07d3f6a2014-05-14 11:26:57 -070083 }
84 } catch (Throwable e) {
lryan0cf7d4c2014-06-11 11:50:49 -070085 // TODO(user): Need to disambiguate between stream corruption as well as client/server
86 // generated errors. For stream corruption we always just send reset stream. For
87 // clients we will also generally reset-stream on error, servers may send a more detailed
88 // status.
lryanf8bbc122014-06-03 14:22:33 -070089 Status status = Status.fromThrowable(e);
lryan0cf7d4c2014-06-11 11:50:49 -070090 closeWithError(request, status);
lryanf8bbc122014-06-03 14:22:33 -070091 }
92 }
93
lryan0cf7d4c2014-06-11 11:50:49 -070094 @Override
nathanmittlere081a222014-08-27 14:00:15 -070095 public void onHeadersRead(ChannelHandlerContext ctx,
96 int streamId,
97 Http2Headers headers,
98 int streamDependency,
99 short weight,
100 boolean exclusive,
101 int padding,
102 boolean endStream) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -0700103 Request operation = requestRegistry.lookup(streamId);
104 if (operation == null) {
nathanmittlerbf8870b2014-07-16 13:28:33 -0700105 if (isClient()) {
lryan0cf7d4c2014-06-11 11:50:49 -0700106 // For clients an operation must already exist in the registry
107 throw Http2Exception.format(Http2Error.REFUSED_STREAM, "Stream does not exist");
108 } else {
109 operation = serverStart(ctx, streamId, headers);
110 if (operation == null) {
111 closeWithError(new NoOpRequest(createResponse(new Http2Writer(ctx), streamId).build()),
112 new Status(Code.NOT_FOUND));
113 }
114 }
115 }
116 if (endStream) {
nathanmittlerbf8870b2014-07-16 13:28:33 -0700117 finish(isClient() ? operation.getResponse() : operation);
lryan0cf7d4c2014-06-11 11:50:49 -0700118 }
119 }
120
121 @Override
lryan0cf7d4c2014-06-11 11:50:49 -0700122 public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
nathanmittlere081a222014-08-27 14:00:15 -0700123 short weight, boolean exclusive) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -0700124 // TODO
125 }
126
127 @Override
128 public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
129 throws Http2Exception {
130 Request request = requestRegistry.lookup(streamId);
131 if (request != null) {
132 closeWithError(request, new Status(Code.CANCELLED, "Stream reset"));
133 requestRegistry.remove(streamId);
134 }
135 }
136
137 @Override
138 public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
139 // TOOD
140 }
141
142 @Override
143 public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
144 throws Http2Exception {
145 // TOOD
146 }
147
148 @Override
149 public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
150 // TODO
151 }
152
153 @Override
154 public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
155 // TODO
156 }
157
158 @Override
159 public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
nathanmittlere081a222014-08-27 14:00:15 -0700160 Http2Headers headers, int padding) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -0700161 // TODO
162 }
163
164 @Override
165 public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
nathanmittlere081a222014-08-27 14:00:15 -0700166 ByteBuf debugData) throws Http2Exception {
lryan0cf7d4c2014-06-11 11:50:49 -0700167 // TODO
168 }
169
170 @Override
171 public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
172 throws Http2Exception {
173 // TODO
174 }
175
nathanmittlerbf8870b2014-07-16 13:28:33 -0700176 private boolean isClient() {
177 return !connection().isServer();
lryan0cf7d4c2014-06-11 11:50:49 -0700178 }
lryanf8bbc122014-06-03 14:22:33 -0700179
180 /**
181 * Closes the request and its associated response with an internal error.
182 */
183 private void closeWithError(Request request, Status status) {
184 try {
185 request.close(status);
186 request.getResponse().close(status);
187 } finally {
188 requestRegistry.remove(request.getId());
lryan0cf7d4c2014-06-11 11:50:49 -0700189 disposeDeframer(request);
ejona07d3f6a2014-05-14 11:26:57 -0700190 }
191 }
192
193 /**
lryanf8bbc122014-06-03 14:22:33 -0700194 * Create an HTTP2 response handler
ejona07d3f6a2014-05-14 11:26:57 -0700195 */
lryan0cf7d4c2014-06-11 11:50:49 -0700196 private Response.ResponseBuilder createResponse(Http2Writer writer, int streamId) {
197 return Http2Response.builder(streamId, writer, new MessageFramer(4096));
ejona07d3f6a2014-05-14 11:26:57 -0700198 }
199
200 /**
201 * Start the Request operation on the server
202 */
lryan0cf7d4c2014-06-11 11:50:49 -0700203 private Request serverStart(ChannelHandlerContext ctx, int streamId, Http2Headers headers) {
204 if (!Http2Session.PROTORPC.equals(headers.get("content-type"))) {
ejona07d3f6a2014-05-14 11:26:57 -0700205 return null;
206 }
207 // Use Path to specify the operation
208 String operationName =
nathanmittler42af07c2014-08-19 12:26:11 -0700209 normalizeOperationName(headers.get(Http2Headers.PseudoHeaderName.PATH.value()));
ejona07d3f6a2014-05-14 11:26:57 -0700210 if (operationName == null) {
211 return null;
212 }
lryane4bd1c72014-09-08 14:03:35 -0700213 Metadata.Headers grpcHeaders = new Metadata.Headers(headers);
ejona07d3f6a2014-05-14 11:26:57 -0700214 // Create the operation and bind a HTTP2 response operation
lryane4bd1c72014-09-08 14:03:35 -0700215 Request op = session.startRequest(operationName, grpcHeaders,
nathanmittlere081a222014-08-27 14:00:15 -0700216 createResponse(new Http2Writer(ctx), streamId));
ejona07d3f6a2014-05-14 11:26:57 -0700217 if (op == null) {
218 return null;
219 }
220 requestRegistry.register(op);
ejona07d3f6a2014-05-14 11:26:57 -0700221 return op;
222 }
223
224 // TODO(user): This needs proper namespacing support, this is currently just a hack
225 private static String normalizeOperationName(String path) {
226 return path.substring(1);
227 }
228
ejona07d3f6a2014-05-14 11:26:57 -0700229 /**
230 * Called when a HTTP2 stream is closed.
231 */
232 private void finish(Operation operation) {
233 disposeDeframer(operation);
234 requestRegistry.remove(operation.getId());
235 if (operation.getPhase() != Phase.CLOSED) {
236 operation.close(Status.OK);
237 }
238 }
239
lryan0cf7d4c2014-06-11 11:50:49 -0700240 public ByteBufDeframer getOrCreateDeframer(Operation operation, ChannelHandlerContext ctx) {
ejona07d3f6a2014-05-14 11:26:57 -0700241 ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
242 if (deframer == null) {
lryan0cf7d4c2014-06-11 11:50:49 -0700243 deframer = new ByteBufDeframer(ctx.alloc());
ejona07d3f6a2014-05-14 11:26:57 -0700244 operation.put(ByteBufDeframer.class, deframer);
245 }
246 return deframer;
247 }
248
249 public void disposeDeframer(Operation operation) {
250 ByteBufDeframer deframer = operation.remove(ByteBufDeframer.class);
251 if (deframer != null) {
252 deframer.dispose();
253 }
254 }
lryan0cf7d4c2014-06-11 11:50:49 -0700255
256 public class Http2Writer {
257 private final ChannelHandlerContext ctx;
258
259 public Http2Writer(ChannelHandlerContext ctx) {
260 this.ctx = ctx;
261 }
262
nathanmittler42af07c2014-08-19 12:26:11 -0700263 public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream) {
nathanmittlere081a222014-08-27 14:00:15 -0700264 return Http2Codec.this.writeData(ctx, streamId, data, PADDING, endStream, ctx.newPromise());
265 }
266
267 public ChannelFuture writeHeaders(int streamId, Http2Headers headers, boolean endStream) {
268
269 return Http2Codec.this.writeHeaders(ctx,
270 streamId,
271 headers,
272 PADDING,
273 endStream,
274 ctx.newPromise());
lryan0cf7d4c2014-06-11 11:50:49 -0700275 }
276
277 public ChannelFuture writeHeaders(int streamId,
nathanmittlere081a222014-08-27 14:00:15 -0700278 Http2Headers headers,
279 int streamDependency,
280 short weight,
281 boolean exclusive,
282 boolean endStream) {
283 return Http2Codec.this.writeHeaders(ctx,
284 streamId,
285 headers,
286 streamDependency,
287 weight,
288 exclusive,
289 PADDING,
290 endStream,
291 ctx.newPromise());
lryan0cf7d4c2014-06-11 11:50:49 -0700292 }
293
294 public ChannelFuture writeRstStream(int streamId, long errorCode) {
nathanmittlere081a222014-08-27 14:00:15 -0700295 return Http2Codec.this.writeRstStream(ctx, streamId, errorCode, ctx.newPromise());
lryan0cf7d4c2014-06-11 11:50:49 -0700296 }
297 }
ejona07d3f6a2014-05-14 11:26:57 -0700298}