blob: f2cfd7774a539aa9c7321d058ea39189aaf49ed4 [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.http2.netty;
2
lryanf8bbc122014-06-03 14:22:33 -07003import com.google.net.stubby.NoOpRequest;
ejona07d3f6a2014-05-14 11:26:57 -07004import com.google.net.stubby.Operation;
5import com.google.net.stubby.Operation.Phase;
6import com.google.net.stubby.Request;
7import com.google.net.stubby.RequestRegistry;
lryanf8bbc122014-06-03 14:22:33 -07008import com.google.net.stubby.Response;
ejona07d3f6a2014-05-14 11:26:57 -07009import com.google.net.stubby.Session;
10import com.google.net.stubby.Status;
11import com.google.net.stubby.transport.MessageFramer;
12import com.google.net.stubby.transport.Transport;
13import com.google.net.stubby.transport.Transport.Code;
14
15import io.netty.buffer.ByteBuf;
16import io.netty.buffer.ByteBufAllocator;
17import io.netty.channel.ChannelHandlerAdapter;
18import io.netty.channel.ChannelHandlerContext;
19import io.netty.handler.codec.http2.draft10.Http2Error;
20import io.netty.handler.codec.http2.draft10.Http2Headers;
21import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2RstStreamFrame;
22import io.netty.handler.codec.http2.draft10.frame.Http2DataFrame;
23import io.netty.handler.codec.http2.draft10.frame.Http2HeadersFrame;
24import io.netty.handler.codec.http2.draft10.frame.Http2RstStreamFrame;
25import io.netty.handler.codec.http2.draft10.frame.Http2StreamFrame;
26
27/**
28 * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
29 * request-response dialog
30 */
31public class Http2Codec extends ChannelHandlerAdapter {
32
33 private final boolean client;
34 private final RequestRegistry requestRegistry;
35 private final Session session;
36 private ByteBufAllocator alloc;
37
38 /**
39 * Constructor used by servers, takes a session which will receive operation events.
40 */
41 public Http2Codec(Session session, RequestRegistry requestRegistry) {
42 this.client = false;
43 this.session = session;
44 this.requestRegistry = requestRegistry;
45 }
46
47 /**
48 * Constructor used by clients to send operations to a remote server
49 */
50 public Http2Codec(RequestRegistry requestRegistry) {
51 this.client = true;
52 this.session = null;
53 this.requestRegistry = requestRegistry;
54 }
55
56 @Override
57 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
58 // Abort any active requests.
59 requestRegistry.drainAllRequests(new Status(Transport.Code.ABORTED));
60
61 super.channelInactive(ctx);
62 }
63
64 @Override
65 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
66 if (!(msg instanceof Http2StreamFrame)) {
67 return;
68 }
69 this.alloc = ctx.alloc();
70 Http2StreamFrame frame = (Http2StreamFrame) msg;
lryanf8bbc122014-06-03 14:22:33 -070071 int streamId = frame.getStreamId();
72 Request operation = requestRegistry.lookup(streamId);
ejona07d3f6a2014-05-14 11:26:57 -070073 try {
74 if (operation == null) {
75 if (client) {
76 // For clients an operation must already exist in the registry
77 throw new IllegalStateException("Response operation must already be bound");
78 } else {
79 operation = serverStart(ctx, frame);
80 if (operation == null) {
lryanf8bbc122014-06-03 14:22:33 -070081 closeWithError(new NoOpRequest(createResponse(ctx, streamId).build()),
82 new Status(Code.NOT_FOUND));
ejona07d3f6a2014-05-14 11:26:57 -070083 }
84 }
85 } else {
86 // Consume the frame
87 progress(client ? operation.getResponse() : operation, frame);
88 }
89 } catch (Throwable e) {
lryanf8bbc122014-06-03 14:22:33 -070090 Status status = Status.fromThrowable(e);
91 if (operation == null) {
92 // Create a no-op request so we can use common error handling
93 operation = new NoOpRequest(createResponse(ctx, streamId).build());
94 }
95 closeWithError(operation, status);
96 }
97 }
98
99
100 /**
101 * Closes the request and its associated response with an internal error.
102 */
103 private void closeWithError(Request request, Status status) {
104 try {
105 request.close(status);
106 request.getResponse().close(status);
107 } finally {
108 requestRegistry.remove(request.getId());
ejona07d3f6a2014-05-14 11:26:57 -0700109 }
110 }
111
112 /**
lryanf8bbc122014-06-03 14:22:33 -0700113 * Create an HTTP2 response handler
ejona07d3f6a2014-05-14 11:26:57 -0700114 */
lryanf8bbc122014-06-03 14:22:33 -0700115 private Response.ResponseBuilder createResponse(ChannelHandlerContext ctx, int streamId) {
116 return Http2Response.builder(streamId, ctx.channel(), new MessageFramer(4096));
ejona07d3f6a2014-05-14 11:26:57 -0700117 }
118
119 /**
120 * Writes the HTTP/2 RST Stream frame to the remote endpoint, indicating a stream failure.
121 */
122 private void sendRstStream(ChannelHandlerContext ctx, int streamId, Http2Error error) {
123 DefaultHttp2RstStreamFrame frame = new DefaultHttp2RstStreamFrame.Builder()
124 .setStreamId(streamId).setErrorCode(error.getCode()).build();
125 ctx.writeAndFlush(frame);
126 }
127
128 /**
129 * Start the Request operation on the server
130 */
131 private Request serverStart(ChannelHandlerContext ctx, Http2StreamFrame frame) {
132 if (!(frame instanceof Http2HeadersFrame)) {
133 // TODO(user): Better error detail to client here
134 return null;
135 }
136 Http2HeadersFrame headers = (Http2HeadersFrame) frame;
137 if (!Http2Session.PROTORPC.equals(headers.getHeaders().get("content-type"))) {
138 return null;
139 }
140 // Use Path to specify the operation
141 String operationName =
142 normalizeOperationName(headers.getHeaders().get(Http2Headers.HttpName.PATH.value()));
143 if (operationName == null) {
144 return null;
145 }
146 // Create the operation and bind a HTTP2 response operation
lryanf8bbc122014-06-03 14:22:33 -0700147 Request op = session.startRequest(operationName, createResponse(ctx, frame.getStreamId()));
ejona07d3f6a2014-05-14 11:26:57 -0700148 if (op == null) {
149 return null;
150 }
151 requestRegistry.register(op);
152 // Immediately deframe the remaining headers in the frame
153 progressHeaders(op, (Http2HeadersFrame) frame);
154 return op;
155 }
156
157 // TODO(user): This needs proper namespacing support, this is currently just a hack
158 private static String normalizeOperationName(String path) {
159 return path.substring(1);
160 }
161
162
163 /**
164 * Consume a received frame
165 */
166 private void progress(Operation operation, Http2StreamFrame frame) {
167 if (frame instanceof Http2HeadersFrame) {
168 progressHeaders(operation, (Http2HeadersFrame) frame);
169 } else if (frame instanceof Http2DataFrame) {
170 progressPayload(operation, (Http2DataFrame) frame);
171 } else if (frame instanceof Http2RstStreamFrame) {
172 // Cancel
lryanf8bbc122014-06-03 14:22:33 -0700173 operation.close(new Status(Code.ABORTED, "HTTP2 stream reset"));
ejona07d3f6a2014-05-14 11:26:57 -0700174 finish(operation);
175 } else {
176 // TODO(user): More refined handling for PING, GO_AWAY, SYN_STREAM, WINDOW_UPDATE, SETTINGS
lryanf8bbc122014-06-03 14:22:33 -0700177 operation.close(Status.OK);
ejona07d3f6a2014-05-14 11:26:57 -0700178 finish(operation);
179 }
180 }
181
182 /**
183 * Consume headers in the frame. Any header starting with ':' is considered reserved
184 */
185 private void progressHeaders(Operation operation, Http2HeadersFrame frame) {
186 // TODO(user): Currently we do not do anything with HTTP2 headers
187 if (frame.isEndOfStream()) {
188 finish(operation);
189 }
190 }
191
192 private void progressPayload(Operation operation, Http2DataFrame frame) {
193 try {
194
195 // Copy the data buffer.
196 // TODO(user): Need to decide whether to use pooling or not.
197 ByteBuf dataCopy = frame.content().copy();
198
199 if (operation == null) {
200 return;
201 }
202 ByteBufDeframer deframer = getOrCreateDeframer(operation);
203 deframer.deframe(dataCopy, operation);
204 if (frame.isEndOfStream()) {
205 finish(operation);
206 }
207
208 } finally {
209 frame.release();
210 }
211 }
212
213 /**
214 * Called when a HTTP2 stream is closed.
215 */
216 private void finish(Operation operation) {
217 disposeDeframer(operation);
218 requestRegistry.remove(operation.getId());
219 if (operation.getPhase() != Phase.CLOSED) {
220 operation.close(Status.OK);
221 }
222 }
223
224 public ByteBufDeframer getOrCreateDeframer(Operation operation) {
225 ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
226 if (deframer == null) {
227 deframer = new ByteBufDeframer(alloc);
228 operation.put(ByteBufDeframer.class, deframer);
229 }
230 return deframer;
231 }
232
233 public void disposeDeframer(Operation operation) {
234 ByteBufDeframer deframer = operation.remove(ByteBufDeframer.class);
235 if (deframer != null) {
236 deframer.dispose();
237 }
238 }
239}