Bring netty5 up to head to support HTTP2 draft 12 so we can test with GFE
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69006100
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java b/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java
index e39e8fb..e7f06c8 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java
@@ -60,6 +60,8 @@
+ frameLength + ", readableBytes=" + frame.readableBytes());
}
if (TransportFrameUtil.isNotCompressed(compressionType)) {
+ // Need to retain the frame as we may be holding it over channel events
+ frame.retain();
return frame;
}
throw new IOException("Unknown compression type " + compressionType);
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
index 11698d6..1fc7f59 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
@@ -13,7 +13,6 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http2.draft10.frame.Http2FrameCodec;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.GenericFutureListener;
@@ -28,7 +27,6 @@
private final String host;
private final int port;
private final RequestRegistry requestRegistry;
- private ChannelFuture channelFuture;
private final SSLEngine sslEngine;
public Http2Client(String host, int port, RequestRegistry requestRegistry) {
@@ -56,6 +54,7 @@
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
// TODO(user): Evaluate use of pooled allocator
b.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
+ final Http2Codec http2Codec = new Http2Codec(requestRegistry);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
@@ -63,18 +62,16 @@
// Assume TLS when using SSL
ch.pipeline().addLast(new SslHandler(sslEngine, false));
}
- ch.pipeline().addLast(
- new Http2FrameCodec(),
- new Http2Codec(requestRegistry));
+ ch.pipeline().addLast(http2Codec);
}
});
// Start the client.
- channelFuture = b.connect(host, port);
+ ChannelFuture channelFuture = b.connect(host, port);
// Wait for the connection
channelFuture.sync(); // (5)
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.addListener(new WorkerCleanupListener(workerGroup));
- return new Http2Session(channelFuture.channel(), requestRegistry);
+ return new Http2Session(http2Codec.getWriter(), requestRegistry);
} catch (Throwable t) {
workerGroup.shutdownGracefully();
throw Throwables.propagate(t);
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
index f2cfd77..221c170 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
@@ -9,36 +9,35 @@
import com.google.net.stubby.Session;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.MessageFramer;
-import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.Transport.Code;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http2.draft10.Http2Error;
-import io.netty.handler.codec.http2.draft10.Http2Headers;
-import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2RstStreamFrame;
-import io.netty.handler.codec.http2.draft10.frame.Http2DataFrame;
-import io.netty.handler.codec.http2.draft10.frame.Http2HeadersFrame;
-import io.netty.handler.codec.http2.draft10.frame.Http2RstStreamFrame;
-import io.netty.handler.codec.http2.draft10.frame.Http2StreamFrame;
+import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2Settings;
/**
* Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
* request-response dialog
*/
-public class Http2Codec extends ChannelHandlerAdapter {
+public class Http2Codec extends AbstractHttp2ConnectionHandler {
+ public static final int PADDING = 0;
private final boolean client;
private final RequestRegistry requestRegistry;
private final Session session;
- private ByteBufAllocator alloc;
+ private Http2Codec.Http2Writer http2Writer;
/**
* Constructor used by servers, takes a session which will receive operation events.
*/
public Http2Codec(Session session, RequestRegistry requestRegistry) {
+ super(true, true);
+ // TODO(user): Use connection.isServer when not private in base class
this.client = false;
this.session = session;
this.requestRegistry = requestRegistry;
@@ -48,54 +47,140 @@
* Constructor used by clients to send operations to a remote server
*/
public Http2Codec(RequestRegistry requestRegistry) {
+ super(false, true);
this.client = true;
this.session = null;
this.requestRegistry = requestRegistry;
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- // Abort any active requests.
- requestRegistry.drainAllRequests(new Status(Transport.Code.ABORTED));
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ http2Writer = new Http2Writer(ctx);
+ }
- super.channelInactive(ctx);
+ public Http2Writer getWriter() {
+ return http2Writer;
}
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (!(msg instanceof Http2StreamFrame)) {
- return;
+ public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
+ boolean endOfStream, boolean endOfSegment, boolean compressed)
+ throws Http2Exception {
+ Request request = requestRegistry.lookup(streamId);
+ if (request == null) {
+ // Stream may have been terminated already or this is just plain spurious
+ throw Http2Exception.format(Http2Error.STREAM_CLOSED, "Stream does not exist");
}
- this.alloc = ctx.alloc();
- Http2StreamFrame frame = (Http2StreamFrame) msg;
- int streamId = frame.getStreamId();
- Request operation = requestRegistry.lookup(streamId);
+ Operation operation = client ? request.getResponse() : request;
try {
- if (operation == null) {
- if (client) {
- // For clients an operation must already exist in the registry
- throw new IllegalStateException("Response operation must already be bound");
- } else {
- operation = serverStart(ctx, frame);
- if (operation == null) {
- closeWithError(new NoOpRequest(createResponse(ctx, streamId).build()),
- new Status(Code.NOT_FOUND));
- }
- }
- } else {
- // Consume the frame
- progress(client ? operation.getResponse() : operation, frame);
+ ByteBufDeframer deframer = getOrCreateDeframer(operation, ctx);
+ deframer.deframe(data, operation);
+ if (endOfStream) {
+ finish(operation);
}
} catch (Throwable e) {
+ // TODO(user): Need to disambiguate between stream corruption as well as client/server
+ // generated errors. For stream corruption we always just send reset stream. For
+ // clients we will also generally reset-stream on error, servers may send a more detailed
+ // status.
Status status = Status.fromThrowable(e);
- if (operation == null) {
- // Create a no-op request so we can use common error handling
- operation = new NoOpRequest(createResponse(ctx, streamId).build());
- }
- closeWithError(operation, status);
+ closeWithError(request, status);
}
}
+ @Override
+ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
+ int padding, boolean endStream, boolean endSegment) throws Http2Exception {
+ Request operation = requestRegistry.lookup(streamId);
+ if (operation == null) {
+ if (client) {
+ // For clients an operation must already exist in the registry
+ throw Http2Exception.format(Http2Error.REFUSED_STREAM, "Stream does not exist");
+ } else {
+ operation = serverStart(ctx, streamId, headers);
+ if (operation == null) {
+ closeWithError(new NoOpRequest(createResponse(new Http2Writer(ctx), streamId).build()),
+ new Status(Code.NOT_FOUND));
+ }
+ }
+ }
+ if (endStream) {
+ finish(client ? operation.getResponse() : operation);
+ }
+ }
+
+ @Override
+ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
+ int streamDependency, short weight, boolean exclusive, int padding,
+ boolean endStream, boolean endSegment) throws Http2Exception {
+ onHeadersRead(ctx, streamId, headers, padding, endStream, endSegment);
+ }
+
+ @Override
+ public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
+ short weight, boolean exclusive) throws Http2Exception {
+ // TODO
+ }
+
+ @Override
+ public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
+ throws Http2Exception {
+ Request request = requestRegistry.lookup(streamId);
+ if (request != null) {
+ closeWithError(request, new Status(Code.CANCELLED, "Stream reset"));
+ requestRegistry.remove(streamId);
+ }
+ }
+
+ @Override
+ public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
+ // TOOD
+ }
+
+ @Override
+ public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
+ throws Http2Exception {
+ // TOOD
+ }
+
+ @Override
+ public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
+ // TODO
+ }
+
+ @Override
+ public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
+ // TODO
+ }
+
+ @Override
+ public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
+ Http2Headers headers, int padding) throws Http2Exception {
+ // TODO
+ }
+
+ @Override
+ public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
+ ByteBuf debugData) throws Http2Exception {
+ // TODO
+ }
+
+ @Override
+ public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
+ throws Http2Exception {
+ // TODO
+ }
+
+ @Override
+ public void onAltSvcRead(ChannelHandlerContext ctx, int streamId, long maxAge, int port,
+ ByteBuf protocolId, String host, String origin) throws Http2Exception {
+ // TODO
+ }
+
+ @Override
+ public void onBlockedRead(ChannelHandlerContext ctx, int streamId) throws Http2Exception {
+ // TODO
+ }
/**
* Closes the request and its associated response with an internal error.
@@ -106,51 +191,37 @@
request.getResponse().close(status);
} finally {
requestRegistry.remove(request.getId());
+ disposeDeframer(request);
}
}
/**
* Create an HTTP2 response handler
*/
- private Response.ResponseBuilder createResponse(ChannelHandlerContext ctx, int streamId) {
- return Http2Response.builder(streamId, ctx.channel(), new MessageFramer(4096));
- }
-
- /**
- * Writes the HTTP/2 RST Stream frame to the remote endpoint, indicating a stream failure.
- */
- private void sendRstStream(ChannelHandlerContext ctx, int streamId, Http2Error error) {
- DefaultHttp2RstStreamFrame frame = new DefaultHttp2RstStreamFrame.Builder()
- .setStreamId(streamId).setErrorCode(error.getCode()).build();
- ctx.writeAndFlush(frame);
+ private Response.ResponseBuilder createResponse(Http2Writer writer, int streamId) {
+ return Http2Response.builder(streamId, writer, new MessageFramer(4096));
}
/**
* Start the Request operation on the server
*/
- private Request serverStart(ChannelHandlerContext ctx, Http2StreamFrame frame) {
- if (!(frame instanceof Http2HeadersFrame)) {
- // TODO(user): Better error detail to client here
- return null;
- }
- Http2HeadersFrame headers = (Http2HeadersFrame) frame;
- if (!Http2Session.PROTORPC.equals(headers.getHeaders().get("content-type"))) {
+ private Request serverStart(ChannelHandlerContext ctx, int streamId, Http2Headers headers) {
+ if (!Http2Session.PROTORPC.equals(headers.get("content-type"))) {
return null;
}
// Use Path to specify the operation
String operationName =
- normalizeOperationName(headers.getHeaders().get(Http2Headers.HttpName.PATH.value()));
+ normalizeOperationName(headers.get(Http2Headers.HttpName.PATH.value()));
if (operationName == null) {
return null;
}
// Create the operation and bind a HTTP2 response operation
- Request op = session.startRequest(operationName, createResponse(ctx, frame.getStreamId()));
+ Request op = session.startRequest(operationName, createResponse(new Http2Writer(ctx),
+ streamId));
if (op == null) {
return null;
}
requestRegistry.register(op);
- // Immediately deframe the remaining headers in the frame
- progressHeaders(op, (Http2HeadersFrame) frame);
return op;
}
@@ -159,57 +230,6 @@
return path.substring(1);
}
-
- /**
- * Consume a received frame
- */
- private void progress(Operation operation, Http2StreamFrame frame) {
- if (frame instanceof Http2HeadersFrame) {
- progressHeaders(operation, (Http2HeadersFrame) frame);
- } else if (frame instanceof Http2DataFrame) {
- progressPayload(operation, (Http2DataFrame) frame);
- } else if (frame instanceof Http2RstStreamFrame) {
- // Cancel
- operation.close(new Status(Code.ABORTED, "HTTP2 stream reset"));
- finish(operation);
- } else {
- // TODO(user): More refined handling for PING, GO_AWAY, SYN_STREAM, WINDOW_UPDATE, SETTINGS
- operation.close(Status.OK);
- finish(operation);
- }
- }
-
- /**
- * Consume headers in the frame. Any header starting with ':' is considered reserved
- */
- private void progressHeaders(Operation operation, Http2HeadersFrame frame) {
- // TODO(user): Currently we do not do anything with HTTP2 headers
- if (frame.isEndOfStream()) {
- finish(operation);
- }
- }
-
- private void progressPayload(Operation operation, Http2DataFrame frame) {
- try {
-
- // Copy the data buffer.
- // TODO(user): Need to decide whether to use pooling or not.
- ByteBuf dataCopy = frame.content().copy();
-
- if (operation == null) {
- return;
- }
- ByteBufDeframer deframer = getOrCreateDeframer(operation);
- deframer.deframe(dataCopy, operation);
- if (frame.isEndOfStream()) {
- finish(operation);
- }
-
- } finally {
- frame.release();
- }
- }
-
/**
* Called when a HTTP2 stream is closed.
*/
@@ -221,10 +241,10 @@
}
}
- public ByteBufDeframer getOrCreateDeframer(Operation operation) {
+ public ByteBufDeframer getOrCreateDeframer(Operation operation, ChannelHandlerContext ctx) {
ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
if (deframer == null) {
- deframer = new ByteBufDeframer(alloc);
+ deframer = new ByteBufDeframer(ctx.alloc());
operation.put(ByteBufDeframer.class, deframer);
}
return deframer;
@@ -236,4 +256,39 @@
deframer.dispose();
}
}
+
+ public class Http2Writer {
+ private final ChannelHandlerContext ctx;
+
+ public Http2Writer(ChannelHandlerContext ctx) {
+ this.ctx = ctx;
+ }
+
+ public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream,
+ boolean endSegment, boolean compressed) {
+ return Http2Codec.this.writeData(ctx, ctx.newPromise(),
+ streamId, data, PADDING, endStream, endSegment, compressed);
+ }
+
+ public ChannelFuture writeHeaders(int streamId,
+ Http2Headers headers,
+ boolean endStream, boolean endSegment) {
+
+ return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
+ headers, PADDING, endStream, endSegment);
+ }
+
+ public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int streamDependency,
+ short weight, boolean exclusive,
+ boolean endStream, boolean endSegment) {
+ return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
+ headers, streamDependency, weight, exclusive, PADDING, endStream, endSegment);
+ }
+
+ public ChannelFuture writeRstStream(int streamId, long errorCode) {
+ return Http2Codec.this.writeRstStream(ctx, ctx.newPromise(),
+ streamId,
+ errorCode);
+ }
+ }
}
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java
index 0b72e49..6e4c82d 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java
@@ -6,25 +6,23 @@
import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.Transport;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2DataFrame;
-
import java.io.InputStream;
import java.nio.ByteBuffer;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+
/**
* Base implementation of {@link Operation} that writes HTTP2 frames
*/
abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
- protected final Framer framer;
- private final Channel channel;
+ private final Framer framer;
+ private final Http2Codec.Http2Writer writer;
- Http2Operation(int streamId, Channel channel, Framer framer) {
+ Http2Operation(int streamId, Http2Codec.Http2Writer writer, Framer framer) {
super(streamId);
- this.channel = channel;
+ this.writer = writer;
this.framer = framer;
}
@@ -55,10 +53,10 @@
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
boolean closed = getPhase() == Phase.CLOSED;
- DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame.Builder().setStreamId(getId())
- .setContent(Unpooled.wrappedBuffer(frame)).setEndOfStream(closed).build();
+
try {
- ChannelFuture channelFuture = channel.writeAndFlush(dataFrame);
+ ChannelFuture channelFuture = writer.writeData(getId(),
+ Unpooled.wrappedBuffer(frame), closed, closed, false);
if (!closed) {
// Sync for all except the last frame to prevent buffer corruption.
channelFuture.get();
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
index af7b13a..643ad0f 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
@@ -4,14 +4,12 @@
import com.google.net.stubby.Response;
import com.google.net.stubby.transport.Framer;
-import io.netty.channel.Channel;
-import io.netty.handler.codec.http2.draft10.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.draft10.Http2Headers;
-import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2HeadersFrame;
-
import java.net.InetAddress;
import java.net.UnknownHostException;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.Http2Headers;
+
/**
* A HTTP2 based implementation of {@link Request}
*/
@@ -29,22 +27,19 @@
HOST_NAME = hostName;
}
- private static DefaultHttp2HeadersFrame createHeadersFrame(int id, String operationName) {
- Http2Headers headers = DefaultHttp2Headers.newBuilder()
- .setMethod("POST")
- .setPath("/" + operationName)
- .setAuthority(HOST_NAME)
- .setScheme("https")
- .add("content-type", Http2Session.PROTORPC)
- .build();
- return new DefaultHttp2HeadersFrame.Builder().setStreamId(id).setHeaders(headers).build();
- }
-
private final Response response;
- public Http2Request(Response response, Channel channel, String operationName, Framer framer) {
- super(response.getId(), channel, framer);
- channel.write(createHeadersFrame(response.getId(), operationName));
+ public Http2Request(Response response, String operationName,
+ Http2Codec.Http2Writer writer, Framer framer) {
+ super(response.getId(), writer, framer);
+ Http2Headers headers = DefaultHttp2Headers.newBuilder()
+ .method("POST")
+ .path("/" + operationName)
+ .authority(HOST_NAME)
+ .scheme("https")
+ .add("content-type", Http2Session.PROTORPC)
+ .build();
+ writer.writeHeaders(response.getId(), headers, false, true);
this.response = response;
}
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java
index f7822ce..14c0f88 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java
@@ -3,14 +3,13 @@
import com.google.net.stubby.Response;
import com.google.net.stubby.transport.Framer;
-import io.netty.channel.Channel;
-
/**
* A HTTP2 based implementation of a {@link Response}.
*/
class Http2Response extends Http2Operation implements Response {
- public static ResponseBuilder builder(final int id, final Channel channel, final Framer framer) {
+ public static ResponseBuilder builder(final int id, final Http2Codec.Http2Writer writer,
+ final Framer framer) {
return new ResponseBuilder() {
@Override
public Response build(int id) {
@@ -19,12 +18,12 @@
@Override
public Response build() {
- return new Http2Response(id, channel, framer);
+ return new Http2Response(id, writer, framer);
}
};
}
- private Http2Response(int id, Channel channel, Framer framer) {
- super(id, channel, framer);
+ private Http2Response(int id, Http2Codec.Http2Writer writer, Framer framer) {
+ super(id, writer, framer);
}
}
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Server.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Server.java
index 265a908..7ccc89f 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Server.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Server.java
@@ -13,7 +13,6 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.http2.draft10.frame.Http2FrameCodec;
/**
* Simple server connection startup that attaches a {@link Session} implementation to a connection.
@@ -42,7 +41,7 @@
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new Http2FrameCodec(), new Http2Codec(session, operations));
+ ch.pipeline().addLast(new Http2Codec(session, operations));
}
}).option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
index 44fa751..cb07a66 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
@@ -6,10 +6,10 @@
import com.google.net.stubby.Session;
import com.google.net.stubby.transport.MessageFramer;
-import io.netty.channel.Channel;
-
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.channel.ChannelHandlerContext;
+
/**
* An implementation of {@link Session} that can be used by clients to start
* a {@link Request}
@@ -18,12 +18,12 @@
public static final String PROTORPC = "application/protorpc";
- private final Channel channel;
+ private final Http2Codec.Http2Writer writer;
private final RequestRegistry requestRegistry;
- private AtomicInteger streamId;
+ private final AtomicInteger streamId;
- public Http2Session(Channel channel, RequestRegistry requestRegistry) {
- this.channel = channel;
+ public Http2Session(Http2Codec.Http2Writer writer, RequestRegistry requestRegistry) {
+ this.writer = writer;
this.requestRegistry = requestRegistry;
// Clients are odd numbers starting at 3. A value of 1 is reserved for the upgrade protocol.
streamId = new AtomicInteger(3);
@@ -36,8 +36,8 @@
@Override
public Request startRequest(String operationName, Response.ResponseBuilder response) {
int nextSessionId = getNextStreamId();
- Request operation = new Http2Request(response.build(nextSessionId), channel, operationName,
- new MessageFramer(4096));
+ Request operation = new Http2Request(response.build(nextSessionId), operationName,
+ writer, new MessageFramer(4096));
requestRegistry.register(operation);
return operation;
}