blob: d0ad9440e91f5dc864415f1a26185cb8b4562995 [file] [log] [blame]
ejona07d3f6a2014-05-14 11:26:57 -07001package com.google.net.stubby.http2.netty;
2
3import com.google.net.stubby.Operation;
4import com.google.net.stubby.Operation.Phase;
5import com.google.net.stubby.Request;
6import com.google.net.stubby.RequestRegistry;
7import com.google.net.stubby.Session;
8import com.google.net.stubby.Status;
9import com.google.net.stubby.transport.MessageFramer;
10import com.google.net.stubby.transport.Transport;
11import com.google.net.stubby.transport.Transport.Code;
12
13import io.netty.buffer.ByteBuf;
14import io.netty.buffer.ByteBufAllocator;
15import io.netty.channel.ChannelHandlerAdapter;
16import io.netty.channel.ChannelHandlerContext;
17import io.netty.handler.codec.http2.draft10.Http2Error;
18import io.netty.handler.codec.http2.draft10.Http2Headers;
19import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2RstStreamFrame;
20import io.netty.handler.codec.http2.draft10.frame.Http2DataFrame;
21import io.netty.handler.codec.http2.draft10.frame.Http2HeadersFrame;
22import io.netty.handler.codec.http2.draft10.frame.Http2RstStreamFrame;
23import io.netty.handler.codec.http2.draft10.frame.Http2StreamFrame;
24
25/**
26 * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
27 * request-response dialog
28 */
29public class Http2Codec extends ChannelHandlerAdapter {
30
31 private final boolean client;
32 private final RequestRegistry requestRegistry;
33 private final Session session;
34 private ByteBufAllocator alloc;
35
36 /**
37 * Constructor used by servers, takes a session which will receive operation events.
38 */
39 public Http2Codec(Session session, RequestRegistry requestRegistry) {
40 this.client = false;
41 this.session = session;
42 this.requestRegistry = requestRegistry;
43 }
44
45 /**
46 * Constructor used by clients to send operations to a remote server
47 */
48 public Http2Codec(RequestRegistry requestRegistry) {
49 this.client = true;
50 this.session = null;
51 this.requestRegistry = requestRegistry;
52 }
53
54 @Override
55 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
56 // Abort any active requests.
57 requestRegistry.drainAllRequests(new Status(Transport.Code.ABORTED));
58
59 super.channelInactive(ctx);
60 }
61
62 @Override
63 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
64 if (!(msg instanceof Http2StreamFrame)) {
65 return;
66 }
67 this.alloc = ctx.alloc();
68 Http2StreamFrame frame = (Http2StreamFrame) msg;
69 Request operation = requestRegistry.lookup(frame.getStreamId());
70 try {
71 if (operation == null) {
72 if (client) {
73 // For clients an operation must already exist in the registry
74 throw new IllegalStateException("Response operation must already be bound");
75 } else {
76 operation = serverStart(ctx, frame);
77 if (operation == null) {
78 // Unknown operation, refuse the stream
79 sendRstStream(ctx, frame.getStreamId(), Http2Error.REFUSED_STREAM);
80 }
81 }
82 } else {
83 // Consume the frame
84 progress(client ? operation.getResponse() : operation, frame);
85 }
86 } catch (Throwable e) {
87 closeWithInternalError(operation, e);
88 sendRstStream(ctx, frame.getStreamId(), Http2Error.INTERNAL_ERROR);
89 throw e;
90 }
91 }
92
93 /**
94 * Closes the request and its associate response with an internal error.
95 */
96 private void closeWithInternalError(Request request, Throwable e) {
97 if (request != null) {
98 Status status = new Status(Code.INTERNAL, e);
99 request.close(status);
100 request.getResponse().close(status);
101 requestRegistry.remove(request.getId());
102 }
103 }
104
105 /**
106 * Writes the HTTP/2 RST Stream frame to the remote endpoint, indicating a stream failure.
107 */
108 private void sendRstStream(ChannelHandlerContext ctx, int streamId, Http2Error error) {
109 DefaultHttp2RstStreamFrame frame = new DefaultHttp2RstStreamFrame.Builder()
110 .setStreamId(streamId).setErrorCode(error.getCode()).build();
111 ctx.writeAndFlush(frame);
112 }
113
114 /**
115 * Start the Request operation on the server
116 */
117 private Request serverStart(ChannelHandlerContext ctx, Http2StreamFrame frame) {
118 if (!(frame instanceof Http2HeadersFrame)) {
119 // TODO(user): Better error detail to client here
120 return null;
121 }
122 Http2HeadersFrame headers = (Http2HeadersFrame) frame;
123 if (!Http2Session.PROTORPC.equals(headers.getHeaders().get("content-type"))) {
124 return null;
125 }
126 // Use Path to specify the operation
127 String operationName =
128 normalizeOperationName(headers.getHeaders().get(Http2Headers.HttpName.PATH.value()));
129 if (operationName == null) {
130 return null;
131 }
132 // Create the operation and bind a HTTP2 response operation
133 Request op = session.startRequest(operationName,
134 Http2Response.builder(frame.getStreamId(), ctx.channel(), new MessageFramer(4096)));
135 if (op == null) {
136 return null;
137 }
138 requestRegistry.register(op);
139 // Immediately deframe the remaining headers in the frame
140 progressHeaders(op, (Http2HeadersFrame) frame);
141 return op;
142 }
143
144 // TODO(user): This needs proper namespacing support, this is currently just a hack
145 private static String normalizeOperationName(String path) {
146 return path.substring(1);
147 }
148
149
150 /**
151 * Consume a received frame
152 */
153 private void progress(Operation operation, Http2StreamFrame frame) {
154 if (frame instanceof Http2HeadersFrame) {
155 progressHeaders(operation, (Http2HeadersFrame) frame);
156 } else if (frame instanceof Http2DataFrame) {
157 progressPayload(operation, (Http2DataFrame) frame);
158 } else if (frame instanceof Http2RstStreamFrame) {
159 // Cancel
160 operation.close(null);
161 finish(operation);
162 } else {
163 // TODO(user): More refined handling for PING, GO_AWAY, SYN_STREAM, WINDOW_UPDATE, SETTINGS
164 operation.close(null);
165 finish(operation);
166 }
167 }
168
169 /**
170 * Consume headers in the frame. Any header starting with ':' is considered reserved
171 */
172 private void progressHeaders(Operation operation, Http2HeadersFrame frame) {
173 // TODO(user): Currently we do not do anything with HTTP2 headers
174 if (frame.isEndOfStream()) {
175 finish(operation);
176 }
177 }
178
179 private void progressPayload(Operation operation, Http2DataFrame frame) {
180 try {
181
182 // Copy the data buffer.
183 // TODO(user): Need to decide whether to use pooling or not.
184 ByteBuf dataCopy = frame.content().copy();
185
186 if (operation == null) {
187 return;
188 }
189 ByteBufDeframer deframer = getOrCreateDeframer(operation);
190 deframer.deframe(dataCopy, operation);
191 if (frame.isEndOfStream()) {
192 finish(operation);
193 }
194
195 } finally {
196 frame.release();
197 }
198 }
199
200 /**
201 * Called when a HTTP2 stream is closed.
202 */
203 private void finish(Operation operation) {
204 disposeDeframer(operation);
205 requestRegistry.remove(operation.getId());
206 if (operation.getPhase() != Phase.CLOSED) {
207 operation.close(Status.OK);
208 }
209 }
210
211 public ByteBufDeframer getOrCreateDeframer(Operation operation) {
212 ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
213 if (deframer == null) {
214 deframer = new ByteBufDeframer(alloc);
215 operation.put(ByteBufDeframer.class, deframer);
216 }
217 return deframer;
218 }
219
220 public void disposeDeframer(Operation operation) {
221 ByteBufDeframer deframer = operation.remove(ByteBufDeframer.class);
222 if (deframer != null) {
223 deframer.dispose();
224 }
225 }
226}