Adding first-cut at generic Netty client and servers implementing the Service interface.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69076530
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClient.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClient.java
new file mode 100644
index 0000000..017a6c4
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClient.java
@@ -0,0 +1,81 @@
+package com.google.net.stubby.newtransport.netty;
+
+import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
+
+import com.google.common.util.concurrent.AbstractService;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * Implementation of the {@link com.google.common.util.concurrent.Service} interface for a
+ * Netty-based client.
+ */
+public class NettyClient extends AbstractService {
+ private final String host;
+ private final int port;
+ private final ChannelInitializer<SocketChannel> channelInitializer;
+ private Channel channel;
+ private EventLoopGroup eventGroup;
+
+ public NettyClient(String host, int port, ChannelInitializer<SocketChannel> channelInitializer) {
+ this.host = host;
+ this.port = port;
+ this.channelInitializer = channelInitializer;
+ }
+
+ public Channel channel() {
+ return channel;
+ }
+
+ @Override
+ protected void doStart() {
+ eventGroup = new NioEventLoopGroup();
+
+ Bootstrap b = new Bootstrap();
+ b.group(eventGroup);
+ b.channel(NioSocketChannel.class);
+ b.option(SO_KEEPALIVE, true);
+ b.handler(channelInitializer);
+
+ // Start the connection operation to the server.
+ b.connect(host, port).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ channel = future.channel();
+ notifyStarted();
+ } else {
+ notifyFailed(future.cause());
+ }
+ }
+ });
+ }
+
+ @Override
+ protected void doStop() {
+ if (channel != null && channel.isOpen()) {
+ channel.close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ notifyStopped();
+ } else {
+ notifyFailed(future.cause());
+ }
+ }
+ });
+ }
+
+ if (eventGroup != null) {
+ eventGroup.shutdownGracefully();
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java
new file mode 100644
index 0000000..048235a
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java
@@ -0,0 +1,84 @@
+package com.google.net.stubby.newtransport.netty;
+
+import static io.netty.channel.ChannelOption.SO_BACKLOG;
+import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
+
+import com.google.common.util.concurrent.AbstractService;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+/**
+ * Implementation of the {@link com.google.common.util.concurrent.Service} interface for a
+ * Netty-based server.
+ */
+public class NettyServer extends AbstractService {
+ private final int port;
+ private final ChannelInitializer<SocketChannel> channelInitializer;
+ private Channel channel;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+
+ public NettyServer(int port, ChannelInitializer<SocketChannel> channelInitializer) {
+ this.port = port;
+ this.channelInitializer = channelInitializer;
+ }
+
+ @Override
+ protected void doStart() {
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup();
+
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup);
+ b.channel(NioServerSocketChannel.class);
+ b.option(SO_BACKLOG, 128);
+ b.childOption(SO_KEEPALIVE, true);
+ b.childHandler(channelInitializer);
+
+ // Bind and start to accept incoming connections.
+ b.bind(port).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ channel = future.channel();
+ notifyStarted();
+ } else {
+ notifyFailed(future.cause());
+ }
+ }
+ });
+ }
+
+ @Override
+ protected void doStop() {
+ // Wait for the channel to close.
+ if (channel != null && channel.isOpen()) {
+ channel.close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ notifyStopped();
+ } else {
+ notifyFailed(future.cause());
+ }
+ }
+ });
+ }
+
+ // Wait for the event loop group to shutdown.
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
+ }
+}