Adding first-cut at generic Netty client and servers implementing the Service interface.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69076530
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClient.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClient.java
new file mode 100644
index 0000000..017a6c4
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClient.java
@@ -0,0 +1,81 @@
+package com.google.net.stubby.newtransport.netty;
+
+import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
+
+import com.google.common.util.concurrent.AbstractService;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * Implementation of the {@link com.google.common.util.concurrent.Service} interface for a
+ * Netty-based client.
+ */
+public class NettyClient extends AbstractService {
+  private final String host;
+  private final int port;
+  private final ChannelInitializer<SocketChannel> channelInitializer;
+  private Channel channel;
+  private EventLoopGroup eventGroup;
+
+  public NettyClient(String host, int port, ChannelInitializer<SocketChannel> channelInitializer) {
+    this.host = host;
+    this.port = port;
+    this.channelInitializer = channelInitializer;
+  }
+
+  public Channel channel() {
+    return channel;
+  }
+
+  @Override
+  protected void doStart() {
+    eventGroup = new NioEventLoopGroup();
+
+    Bootstrap b = new Bootstrap();
+    b.group(eventGroup);
+    b.channel(NioSocketChannel.class);
+    b.option(SO_KEEPALIVE, true);
+    b.handler(channelInitializer);
+
+    // Start the connection operation to the server.
+    b.connect(host, port).addListener(new ChannelFutureListener() {
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        if (future.isSuccess()) {
+          channel = future.channel();
+          notifyStarted();
+        } else {
+          notifyFailed(future.cause());
+        }
+      }
+    });
+  }
+
+  @Override
+  protected void doStop() {
+    if (channel != null && channel.isOpen()) {
+      channel.close().addListener(new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (future.isSuccess()) {
+            notifyStopped();
+          } else {
+            notifyFailed(future.cause());
+          }
+        }
+      });
+    }
+
+    if (eventGroup != null) {
+      eventGroup.shutdownGracefully();
+    }
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java
new file mode 100644
index 0000000..048235a
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java
@@ -0,0 +1,84 @@
+package com.google.net.stubby.newtransport.netty;
+
+import static io.netty.channel.ChannelOption.SO_BACKLOG;
+import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
+
+import com.google.common.util.concurrent.AbstractService;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+/**
+ * Implementation of the {@link com.google.common.util.concurrent.Service} interface for a
+ * Netty-based server.
+ */
+public class NettyServer extends AbstractService {
+  private final int port;
+  private final ChannelInitializer<SocketChannel> channelInitializer;
+  private Channel channel;
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+
+  public NettyServer(int port, ChannelInitializer<SocketChannel> channelInitializer) {
+    this.port = port;
+    this.channelInitializer = channelInitializer;
+  }
+
+  @Override
+  protected void doStart() {
+    bossGroup = new NioEventLoopGroup();
+    workerGroup = new NioEventLoopGroup();
+
+    ServerBootstrap b = new ServerBootstrap();
+    b.group(bossGroup, workerGroup);
+    b.channel(NioServerSocketChannel.class);
+    b.option(SO_BACKLOG, 128);
+    b.childOption(SO_KEEPALIVE, true);
+    b.childHandler(channelInitializer);
+
+    // Bind and start to accept incoming connections.
+    b.bind(port).addListener(new ChannelFutureListener() {
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        if (future.isSuccess()) {
+          channel = future.channel();
+          notifyStarted();
+        } else {
+          notifyFailed(future.cause());
+        }
+      }
+    });
+  }
+
+  @Override
+  protected void doStop() {
+    // Wait for the channel to close.
+    if (channel != null && channel.isOpen()) {
+      channel.close().addListener(new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (future.isSuccess()) {
+            notifyStopped();
+          } else {
+            notifyFailed(future.cause());
+          }
+        }
+      });
+    }
+
+    // Wait for the event loop group to shutdown.
+    if (bossGroup != null) {
+      bossGroup.shutdownGracefully();
+    }
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully();
+    }
+  }
+}