blob: 80c34ffb56780b19b0eb6afd017845c612206c3b [file] [log] [blame]
package com.google.net.stubby.newtransport.netty;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.base.Preconditions;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.newtransport.AbstractClientTransport;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.netty.NettyClientTransportFactory.NegotiationType;
import com.google.net.stubby.testing.utils.ssl.SslContextFactory;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import java.util.concurrent.ExecutionException;
import javax.net.ssl.SSLEngine;
/**
* A Netty-based {@link ClientTransport} implementation.
*/
class NettyClientTransport extends AbstractClientTransport {
private final String host;
private final int port;
private final EventLoopGroup eventGroup;
private final Http2Negotiator.Negotiation negotiation;
private final NettyClientHandler handler;
private Channel channel;
NettyClientTransport(String host, int port, NegotiationType negotiationType) {
this(host, port, negotiationType, new NioEventLoopGroup());
}
NettyClientTransport(String host, int port, NegotiationType negotiationType,
EventLoopGroup eventGroup) {
Preconditions.checkNotNull(host, "host");
Preconditions.checkArgument(port >= 0, "port must be positive");
Preconditions.checkNotNull(eventGroup, "eventGroup");
Preconditions.checkNotNull(negotiationType, "negotiationType");
this.host = host;
this.port = port;
this.eventGroup = eventGroup;
handler = newHandler(host, negotiationType == NegotiationType.TLS);
switch (negotiationType) {
case PLAINTEXT:
negotiation = Http2Negotiator.plaintext(handler);
break;
case PLAINTEXT_UPGRADE:
negotiation = Http2Negotiator.plaintextUpgrade(handler);
break;
case TLS:
SSLEngine sslEngine = SslContextFactory.getClientContext().createSSLEngine();
sslEngine.setUseClientMode(true);
negotiation = Http2Negotiator.tls(handler, sslEngine);
break;
default:
throw new IllegalArgumentException("Unsupported negotiationType: " + negotiationType);
}
}
@Override
protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
// Create the stream.
NettyClientStream stream = new NettyClientStream(listener, channel, handler.inboundFlow());
try {
// Write the request and await creation of the stream.
channel.writeAndFlush(new CreateStreamCommand(method, stream)).get();
} catch (InterruptedException e) {
// Restore the interrupt.
Thread.currentThread().interrupt();
stream.dispose();
throw new RuntimeException(e);
} catch (ExecutionException e) {
stream.dispose();
throw new RuntimeException(e);
}
return stream;
}
@Override
protected void doStart() {
Bootstrap b = new Bootstrap();
b.group(eventGroup);
b.channel(NioSocketChannel.class);
b.option(SO_KEEPALIVE, true);
b.handler(negotiation.initializer());
// Start the connection operation to the server.
b.connect(host, port).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
channel = future.channel();
notifyStarted();
// Listen for the channel close event.
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
notifyStopped();
} else {
notifyFailed(future.cause());
}
}
});
} else {
notifyFailed(future.cause());
}
}
});
}
@Override
protected void doStop() {
// No explicit call to notifyStopped() here, since this is automatically done when the
// channel closes.
if (channel != null && channel.isOpen()) {
channel.close();
}
if (eventGroup != null) {
eventGroup.shutdownGracefully();
}
}
private static NettyClientHandler newHandler(String host, boolean ssl) {
Http2Connection connection =
new DefaultHttp2Connection(false, new DefaultHttp2StreamRemovalPolicy());
Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
Http2FrameLogger frameLogger = new Http2FrameLogger(InternalLogLevel.DEBUG);
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
return new NettyClientHandler(host,
ssl,
connection,
frameReader,
frameWriter,
inboundFlow,
outboundFlow);
}
}