| package com.google.net.stubby.http2.netty; |
| |
| import com.google.net.stubby.Metadata; |
| import com.google.net.stubby.NoOpRequest; |
| import com.google.net.stubby.Operation; |
| import com.google.net.stubby.Operation.Phase; |
| import com.google.net.stubby.Request; |
| import com.google.net.stubby.RequestRegistry; |
| import com.google.net.stubby.Response; |
| import com.google.net.stubby.Session; |
| import com.google.net.stubby.Status; |
| import com.google.net.stubby.transport.MessageFramer; |
| import com.google.net.stubby.transport.Transport.Code; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelHandlerContext; |
| import io.netty.handler.codec.AsciiString; |
| import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; |
| import io.netty.handler.codec.http2.DefaultHttp2Connection; |
| import io.netty.handler.codec.http2.Http2Connection; |
| import io.netty.handler.codec.http2.Http2Error; |
| import io.netty.handler.codec.http2.Http2Exception; |
| import io.netty.handler.codec.http2.Http2Headers; |
| import io.netty.handler.codec.http2.Http2Settings; |
| |
| import java.util.Map; |
| |
| /** |
| * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing |
| * request-response dialog |
| */ |
| public class Http2Codec extends AbstractHttp2ConnectionHandler { |
| public static final int PADDING = 0; |
| private final RequestRegistry requestRegistry; |
| private final Session session; |
| private Http2Codec.Http2Writer http2Writer; |
| |
| /** |
| * Constructor used by servers, takes a session which will receive operation events. |
| */ |
| public Http2Codec(Session session, RequestRegistry requestRegistry) { |
| this(new DefaultHttp2Connection(true), session, requestRegistry); |
| } |
| |
| /** |
| * Constructor used by clients to send operations to a remote server |
| */ |
| public Http2Codec(RequestRegistry requestRegistry) { |
| this(new DefaultHttp2Connection(false), null, requestRegistry); |
| } |
| |
| /** |
| * Constructor used by servers, takes a session which will receive operation events. |
| */ |
| private Http2Codec(Http2Connection connection, Session session, RequestRegistry requestRegistry) { |
| super(connection); |
| this.session = session; |
| this.requestRegistry = requestRegistry; |
| } |
| |
| @Override |
| public void handlerAdded(ChannelHandlerContext ctx) throws Exception { |
| http2Writer = new Http2Writer(ctx); |
| } |
| |
| public Http2Writer getWriter() { |
| return http2Writer; |
| } |
| |
| @Override |
| public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, |
| boolean endOfStream) throws Http2Exception { |
| Request request = requestRegistry.lookup(streamId); |
| if (request == null) { |
| // Stream may have been terminated already or this is just plain spurious |
| throw Http2Exception.format(Http2Error.STREAM_CLOSED, "Stream does not exist"); |
| } |
| Operation operation = isClient() ? request.getResponse() : request; |
| try { |
| ByteBufDeframer deframer = getOrCreateDeframer(operation, ctx); |
| deframer.deframe(data, operation); |
| if (endOfStream) { |
| finish(operation); |
| } |
| } catch (Throwable e) { |
| // TODO(user): Need to disambiguate between stream corruption as well as client/server |
| // generated errors. For stream corruption we always just send reset stream. For |
| // clients we will also generally reset-stream on error, servers may send a more detailed |
| // status. |
| Status status = Status.fromThrowable(e); |
| closeWithError(request, status); |
| } |
| } |
| |
| @Override |
| public void onHeadersRead(ChannelHandlerContext ctx, |
| int streamId, |
| Http2Headers headers, |
| int streamDependency, |
| short weight, |
| boolean exclusive, |
| int padding, |
| boolean endStream) throws Http2Exception { |
| Request operation = requestRegistry.lookup(streamId); |
| if (operation == null) { |
| if (isClient()) { |
| // For clients an operation must already exist in the registry |
| throw Http2Exception.format(Http2Error.REFUSED_STREAM, "Stream does not exist"); |
| } else { |
| operation = serverStart(ctx, streamId, headers); |
| if (operation == null) { |
| closeWithError(new NoOpRequest(createResponse(new Http2Writer(ctx), streamId).build()), |
| new Status(Code.NOT_FOUND)); |
| } |
| } |
| } |
| if (endStream) { |
| finish(isClient() ? operation.getResponse() : operation); |
| } |
| } |
| |
| @Override |
| public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, |
| short weight, boolean exclusive) throws Http2Exception { |
| // TODO |
| } |
| |
| @Override |
| public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) |
| throws Http2Exception { |
| Request request = requestRegistry.lookup(streamId); |
| if (request != null) { |
| closeWithError(request, new Status(Code.CANCELLED, "Stream reset")); |
| requestRegistry.remove(streamId); |
| } |
| } |
| |
| @Override |
| public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { |
| // TOOD |
| } |
| |
| @Override |
| public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) |
| throws Http2Exception { |
| // TOOD |
| } |
| |
| @Override |
| public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { |
| // TODO |
| } |
| |
| @Override |
| public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { |
| // TODO |
| } |
| |
| @Override |
| public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, |
| Http2Headers headers, int padding) throws Http2Exception { |
| // TODO |
| } |
| |
| @Override |
| public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, |
| ByteBuf debugData) throws Http2Exception { |
| // TODO |
| } |
| |
| @Override |
| public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) |
| throws Http2Exception { |
| // TODO |
| } |
| |
| private boolean isClient() { |
| return !connection().isServer(); |
| } |
| |
| /** |
| * Closes the request and its associated response with an internal error. |
| */ |
| private void closeWithError(Request request, Status status) { |
| try { |
| request.close(status); |
| request.getResponse().close(status); |
| } finally { |
| requestRegistry.remove(request.getId()); |
| disposeDeframer(request); |
| } |
| } |
| |
| /** |
| * Create an HTTP2 response handler |
| */ |
| private Response.ResponseBuilder createResponse(Http2Writer writer, int streamId) { |
| return Http2Response.builder(streamId, writer, new MessageFramer(4096)); |
| } |
| |
| /** |
| * Start the Request operation on the server |
| */ |
| private Request serverStart(ChannelHandlerContext ctx, int streamId, Http2Headers headers) { |
| if (!Http2Session.PROTORPC.equals(headers.get(Http2Session.CONTENT_TYPE))) { |
| return null; |
| } |
| // Use Path to specify the operation |
| String operationName = |
| normalizeOperationName(headers.get(Http2Headers.PseudoHeaderName.PATH.value()).toString()); |
| if (operationName == null) { |
| return null; |
| } |
| |
| // The Netty AsciiString class is really just a wrapper around a byte[] and supports |
| // arbitrary binary data, not just ASCII. |
| byte[][] headerValues = new byte[headers.size() * 2][]; |
| int i = 0; |
| for (Map.Entry<AsciiString, AsciiString> entry : headers) { |
| headerValues[i++] = entry.getKey().array(); |
| headerValues[i++] = entry.getValue().array(); |
| } |
| Metadata.Headers grpcHeaders = new Metadata.Headers(headerValues); |
| |
| // Create the operation and bind a HTTP2 response operation |
| Request op = session.startRequest(operationName, grpcHeaders, |
| createResponse(new Http2Writer(ctx), streamId)); |
| if (op == null) { |
| return null; |
| } |
| requestRegistry.register(op); |
| return op; |
| } |
| |
| // TODO(user): This needs proper namespacing support, this is currently just a hack |
| private static String normalizeOperationName(String path) { |
| return path.substring(1); |
| } |
| |
| /** |
| * Called when a HTTP2 stream is closed. |
| */ |
| private void finish(Operation operation) { |
| disposeDeframer(operation); |
| requestRegistry.remove(operation.getId()); |
| if (operation.getPhase() != Phase.CLOSED) { |
| operation.close(Status.OK); |
| } |
| } |
| |
| public ByteBufDeframer getOrCreateDeframer(Operation operation, ChannelHandlerContext ctx) { |
| ByteBufDeframer deframer = operation.get(ByteBufDeframer.class); |
| if (deframer == null) { |
| deframer = new ByteBufDeframer(ctx.alloc()); |
| operation.put(ByteBufDeframer.class, deframer); |
| } |
| return deframer; |
| } |
| |
| public void disposeDeframer(Operation operation) { |
| ByteBufDeframer deframer = operation.remove(ByteBufDeframer.class); |
| if (deframer != null) { |
| deframer.dispose(); |
| } |
| } |
| |
| public class Http2Writer { |
| private final ChannelHandlerContext ctx; |
| |
| public Http2Writer(ChannelHandlerContext ctx) { |
| this.ctx = ctx; |
| } |
| |
| public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream) { |
| return Http2Codec.this.writeData(ctx, streamId, data, PADDING, endStream, ctx.newPromise()); |
| } |
| |
| public ChannelFuture writeHeaders(int streamId, Http2Headers headers, boolean endStream) { |
| |
| return Http2Codec.this.writeHeaders(ctx, |
| streamId, |
| headers, |
| PADDING, |
| endStream, |
| ctx.newPromise()); |
| } |
| |
| public ChannelFuture writeHeaders(int streamId, |
| Http2Headers headers, |
| int streamDependency, |
| short weight, |
| boolean exclusive, |
| boolean endStream) { |
| return Http2Codec.this.writeHeaders(ctx, |
| streamId, |
| headers, |
| streamDependency, |
| weight, |
| exclusive, |
| PADDING, |
| endStream, |
| ctx.newPromise()); |
| } |
| |
| public ChannelFuture writeRstStream(int streamId, long errorCode) { |
| return Http2Codec.this.writeRstStream(ctx, streamId, errorCode, ctx.newPromise()); |
| } |
| } |
| } |