blob: 221c1703aa0321ce6c26177c8f1ec06234fe00d0 [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.http2.netty;
2
lryanf8bbc122014-06-03 14:22:33 -07003import com.google.net.stubby.NoOpRequest;
ejona07d3f6a2014-05-14 11:26:57 -07004import com.google.net.stubby.Operation;
5import com.google.net.stubby.Operation.Phase;
6import com.google.net.stubby.Request;
7import com.google.net.stubby.RequestRegistry;
lryanf8bbc122014-06-03 14:22:33 -07008import com.google.net.stubby.Response;
ejona07d3f6a2014-05-14 11:26:57 -07009import com.google.net.stubby.Session;
10import com.google.net.stubby.Status;
11import com.google.net.stubby.transport.MessageFramer;
ejona07d3f6a2014-05-14 11:26:57 -070012import com.google.net.stubby.transport.Transport.Code;
13
14import io.netty.buffer.ByteBuf;
lryan0cf7d4c2014-06-11 11:50:49 -070015import io.netty.channel.ChannelFuture;
ejona07d3f6a2014-05-14 11:26:57 -070016import io.netty.channel.ChannelHandlerContext;
lryan0cf7d4c2014-06-11 11:50:49 -070017import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
18import io.netty.handler.codec.http2.Http2Error;
19import io.netty.handler.codec.http2.Http2Exception;
20import io.netty.handler.codec.http2.Http2Headers;
21import io.netty.handler.codec.http2.Http2Settings;
ejona07d3f6a2014-05-14 11:26:57 -070022
23/**
24 * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
25 * request-response dialog
26 */
lryan0cf7d4c2014-06-11 11:50:49 -070027public class Http2Codec extends AbstractHttp2ConnectionHandler {
ejona07d3f6a2014-05-14 11:26:57 -070028
lryan0cf7d4c2014-06-11 11:50:49 -070029 public static final int PADDING = 0;
ejona07d3f6a2014-05-14 11:26:57 -070030 private final boolean client;
31 private final RequestRegistry requestRegistry;
32 private final Session session;
lryan0cf7d4c2014-06-11 11:50:49 -070033 private Http2Codec.Http2Writer http2Writer;
ejona07d3f6a2014-05-14 11:26:57 -070034
35 /**
36 * Constructor used by servers, takes a session which will receive operation events.
37 */
38 public Http2Codec(Session session, RequestRegistry requestRegistry) {
lryan0cf7d4c2014-06-11 11:50:49 -070039 super(true, true);
40 // TODO(user): Use connection.isServer when not private in base class
ejona07d3f6a2014-05-14 11:26:57 -070041 this.client = false;
42 this.session = session;
43 this.requestRegistry = requestRegistry;
44 }
45
46 /**
47 * Constructor used by clients to send operations to a remote server
48 */
49 public Http2Codec(RequestRegistry requestRegistry) {
lryan0cf7d4c2014-06-11 11:50:49 -070050 super(false, true);
ejona07d3f6a2014-05-14 11:26:57 -070051 this.client = true;
52 this.session = null;
53 this.requestRegistry = requestRegistry;
54 }
55
56 @Override
lryan0cf7d4c2014-06-11 11:50:49 -070057 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
58 http2Writer = new Http2Writer(ctx);
59 }
ejona07d3f6a2014-05-14 11:26:57 -070060
lryan0cf7d4c2014-06-11 11:50:49 -070061 public Http2Writer getWriter() {
62 return http2Writer;
ejona07d3f6a2014-05-14 11:26:57 -070063 }
64
65 @Override
lryan0cf7d4c2014-06-11 11:50:49 -070066 public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
67 boolean endOfStream, boolean endOfSegment, boolean compressed)
68 throws Http2Exception {
69 Request request = requestRegistry.lookup(streamId);
70 if (request == null) {
71 // Stream may have been terminated already or this is just plain spurious
72 throw Http2Exception.format(Http2Error.STREAM_CLOSED, "Stream does not exist");
ejona07d3f6a2014-05-14 11:26:57 -070073 }
lryan0cf7d4c2014-06-11 11:50:49 -070074 Operation operation = client ? request.getResponse() : request;
ejona07d3f6a2014-05-14 11:26:57 -070075 try {
lryan0cf7d4c2014-06-11 11:50:49 -070076 ByteBufDeframer deframer = getOrCreateDeframer(operation, ctx);
77 deframer.deframe(data, operation);
78 if (endOfStream) {
79 finish(operation);
ejona07d3f6a2014-05-14 11:26:57 -070080 }
81 } catch (Throwable e) {
lryan0cf7d4c2014-06-11 11:50:49 -070082 // TODO(user): Need to disambiguate between stream corruption as well as client/server
83 // generated errors. For stream corruption we always just send reset stream. For
84 // clients we will also generally reset-stream on error, servers may send a more detailed
85 // status.
lryanf8bbc122014-06-03 14:22:33 -070086 Status status = Status.fromThrowable(e);
lryan0cf7d4c2014-06-11 11:50:49 -070087 closeWithError(request, status);
lryanf8bbc122014-06-03 14:22:33 -070088 }
89 }
90
lryan0cf7d4c2014-06-11 11:50:49 -070091 @Override
92 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
93 int padding, boolean endStream, boolean endSegment) throws Http2Exception {
94 Request operation = requestRegistry.lookup(streamId);
95 if (operation == null) {
96 if (client) {
97 // For clients an operation must already exist in the registry
98 throw Http2Exception.format(Http2Error.REFUSED_STREAM, "Stream does not exist");
99 } else {
100 operation = serverStart(ctx, streamId, headers);
101 if (operation == null) {
102 closeWithError(new NoOpRequest(createResponse(new Http2Writer(ctx), streamId).build()),
103 new Status(Code.NOT_FOUND));
104 }
105 }
106 }
107 if (endStream) {
108 finish(client ? operation.getResponse() : operation);
109 }
110 }
111
112 @Override
113 public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
114 int streamDependency, short weight, boolean exclusive, int padding,
115 boolean endStream, boolean endSegment) throws Http2Exception {
116 onHeadersRead(ctx, streamId, headers, padding, endStream, endSegment);
117 }
118
119 @Override
120 public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
121 short weight, boolean exclusive) throws Http2Exception {
122 // TODO
123 }
124
125 @Override
126 public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
127 throws Http2Exception {
128 Request request = requestRegistry.lookup(streamId);
129 if (request != null) {
130 closeWithError(request, new Status(Code.CANCELLED, "Stream reset"));
131 requestRegistry.remove(streamId);
132 }
133 }
134
135 @Override
136 public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
137 // TOOD
138 }
139
140 @Override
141 public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
142 throws Http2Exception {
143 // TOOD
144 }
145
146 @Override
147 public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
148 // TODO
149 }
150
151 @Override
152 public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
153 // TODO
154 }
155
156 @Override
157 public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
158 Http2Headers headers, int padding) throws Http2Exception {
159 // TODO
160 }
161
162 @Override
163 public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
164 ByteBuf debugData) throws Http2Exception {
165 // TODO
166 }
167
168 @Override
169 public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
170 throws Http2Exception {
171 // TODO
172 }
173
174 @Override
175 public void onAltSvcRead(ChannelHandlerContext ctx, int streamId, long maxAge, int port,
176 ByteBuf protocolId, String host, String origin) throws Http2Exception {
177 // TODO
178 }
179
180 @Override
181 public void onBlockedRead(ChannelHandlerContext ctx, int streamId) throws Http2Exception {
182 // TODO
183 }
lryanf8bbc122014-06-03 14:22:33 -0700184
185 /**
186 * Closes the request and its associated response with an internal error.
187 */
188 private void closeWithError(Request request, Status status) {
189 try {
190 request.close(status);
191 request.getResponse().close(status);
192 } finally {
193 requestRegistry.remove(request.getId());
lryan0cf7d4c2014-06-11 11:50:49 -0700194 disposeDeframer(request);
ejona07d3f6a2014-05-14 11:26:57 -0700195 }
196 }
197
198 /**
lryanf8bbc122014-06-03 14:22:33 -0700199 * Create an HTTP2 response handler
ejona07d3f6a2014-05-14 11:26:57 -0700200 */
lryan0cf7d4c2014-06-11 11:50:49 -0700201 private Response.ResponseBuilder createResponse(Http2Writer writer, int streamId) {
202 return Http2Response.builder(streamId, writer, new MessageFramer(4096));
ejona07d3f6a2014-05-14 11:26:57 -0700203 }
204
205 /**
206 * Start the Request operation on the server
207 */
lryan0cf7d4c2014-06-11 11:50:49 -0700208 private Request serverStart(ChannelHandlerContext ctx, int streamId, Http2Headers headers) {
209 if (!Http2Session.PROTORPC.equals(headers.get("content-type"))) {
ejona07d3f6a2014-05-14 11:26:57 -0700210 return null;
211 }
212 // Use Path to specify the operation
213 String operationName =
lryan0cf7d4c2014-06-11 11:50:49 -0700214 normalizeOperationName(headers.get(Http2Headers.HttpName.PATH.value()));
ejona07d3f6a2014-05-14 11:26:57 -0700215 if (operationName == null) {
216 return null;
217 }
218 // Create the operation and bind a HTTP2 response operation
lryan0cf7d4c2014-06-11 11:50:49 -0700219 Request op = session.startRequest(operationName, createResponse(new Http2Writer(ctx),
220 streamId));
ejona07d3f6a2014-05-14 11:26:57 -0700221 if (op == null) {
222 return null;
223 }
224 requestRegistry.register(op);
ejona07d3f6a2014-05-14 11:26:57 -0700225 return op;
226 }
227
228 // TODO(user): This needs proper namespacing support, this is currently just a hack
229 private static String normalizeOperationName(String path) {
230 return path.substring(1);
231 }
232
ejona07d3f6a2014-05-14 11:26:57 -0700233 /**
234 * Called when a HTTP2 stream is closed.
235 */
236 private void finish(Operation operation) {
237 disposeDeframer(operation);
238 requestRegistry.remove(operation.getId());
239 if (operation.getPhase() != Phase.CLOSED) {
240 operation.close(Status.OK);
241 }
242 }
243
lryan0cf7d4c2014-06-11 11:50:49 -0700244 public ByteBufDeframer getOrCreateDeframer(Operation operation, ChannelHandlerContext ctx) {
ejona07d3f6a2014-05-14 11:26:57 -0700245 ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
246 if (deframer == null) {
lryan0cf7d4c2014-06-11 11:50:49 -0700247 deframer = new ByteBufDeframer(ctx.alloc());
ejona07d3f6a2014-05-14 11:26:57 -0700248 operation.put(ByteBufDeframer.class, deframer);
249 }
250 return deframer;
251 }
252
253 public void disposeDeframer(Operation operation) {
254 ByteBufDeframer deframer = operation.remove(ByteBufDeframer.class);
255 if (deframer != null) {
256 deframer.dispose();
257 }
258 }
lryan0cf7d4c2014-06-11 11:50:49 -0700259
260 public class Http2Writer {
261 private final ChannelHandlerContext ctx;
262
263 public Http2Writer(ChannelHandlerContext ctx) {
264 this.ctx = ctx;
265 }
266
267 public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream,
268 boolean endSegment, boolean compressed) {
269 return Http2Codec.this.writeData(ctx, ctx.newPromise(),
270 streamId, data, PADDING, endStream, endSegment, compressed);
271 }
272
273 public ChannelFuture writeHeaders(int streamId,
274 Http2Headers headers,
275 boolean endStream, boolean endSegment) {
276
277 return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
278 headers, PADDING, endStream, endSegment);
279 }
280
281 public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int streamDependency,
282 short weight, boolean exclusive,
283 boolean endStream, boolean endSegment) {
284 return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
285 headers, streamDependency, weight, exclusive, PADDING, endStream, endSegment);
286 }
287
288 public ChannelFuture writeRstStream(int streamId, long errorCode) {
289 return Http2Codec.this.writeRstStream(ctx, ctx.newPromise(),
290 streamId,
291 errorCode);
292 }
293 }
ejona07d3f6a2014-05-14 11:26:57 -0700294}