Port client tests to newtransport.

Where possible, I tried to cleanly shutdown the Channel.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69404518
diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java
index 013d3c4..acf4861 100644
--- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java
+++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java
@@ -43,8 +43,6 @@
   public ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor) {
     this.transportFactory = transportFactory;
     this.executor = executor;
-    // FIXME(ejona): Remove once we have our top-level lifecycle.
-    startAsync();
   }
 
   @Override
diff --git a/core/src/main/java/com/google/net/stubby/SessionClientStream.java b/core/src/main/java/com/google/net/stubby/SessionClientStream.java
index 94f8f2e..ca07cf1 100644
--- a/core/src/main/java/com/google/net/stubby/SessionClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/SessionClientStream.java
@@ -19,7 +19,7 @@
  * new transport layer is created.
  */
 // TODO(user): Delete this class when new transport interfaces are introduced
-class SessionClientStream implements ClientStream {
+public class SessionClientStream implements ClientStream {
   private final StreamListener listener;
   /**
    * The {@link Request} used by the stub to dispatch the call
diff --git a/core/src/main/java/com/google/net/stubby/SessionClientTransport.java b/core/src/main/java/com/google/net/stubby/SessionClientTransport.java
new file mode 100644
index 0000000..a6bfa4f
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/SessionClientTransport.java
@@ -0,0 +1,35 @@
+package com.google.net.stubby;
+
+import com.google.common.util.concurrent.AbstractService;
+import com.google.net.stubby.newtransport.ClientStream;
+import com.google.net.stubby.newtransport.ClientTransport;
+import com.google.net.stubby.newtransport.StreamListener;
+
+/**
+ * Shim between Session and Channel. Will be removed when Session is removed.
+ */
+public class SessionClientTransport extends AbstractService implements ClientTransport {
+  private final Session session;
+
+  public SessionClientTransport(Session session) {
+    this.session = session;
+  }
+
+  @Override
+  protected void doStart() {
+    notifyStarted();
+  }
+
+  @Override
+  public void doStop() {
+    notifyStopped();
+  }
+
+  @Override
+  public ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
+    final SessionClientStream stream = new SessionClientStream(listener);
+    Request request = session.startRequest(method.getName(), stream.responseBuilder());
+    stream.start(request);
+    return stream;
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java b/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java
index 9c32b77..9fe65c4 100644
--- a/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java
+++ b/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java
@@ -1,11 +1,7 @@
 package com.google.net.stubby;
 
-import com.google.common.util.concurrent.AbstractService;
-import com.google.net.stubby.MethodDescriptor;
-import com.google.net.stubby.newtransport.ClientStream;
 import com.google.net.stubby.newtransport.ClientTransport;
 import com.google.net.stubby.newtransport.ClientTransportFactory;
-import com.google.net.stubby.newtransport.StreamListener;
 
 /**
  * Shim between Session and Channel. Will be removed when Session is removed.
@@ -23,26 +19,4 @@
   public ClientTransport newClientTransport() {
     return transport;
   }
-
-  private static class SessionClientTransport extends AbstractService implements ClientTransport {
-    private final Session session;
-
-    public SessionClientTransport(Session session) {
-      this.session = session;
-    }
-
-    @Override
-    protected void doStart() {}
-
-    @Override
-    public void doStop() {}
-
-    @Override
-    public ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
-      final SessionClientStream stream = new SessionClientStream(listener);
-      Request request = session.startRequest(method.getName(), stream.responseBuilder());
-      stream.start(request);
-      return stream;
-    }
-  }
 }
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
index 1fc7f59..694c07d 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
@@ -6,6 +6,7 @@
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -28,6 +29,7 @@
   private final int port;
   private final RequestRegistry requestRegistry;
   private final SSLEngine sslEngine;
+  private Channel channel;
 
   public Http2Client(String host, int port, RequestRegistry requestRegistry) {
     this(host, port, requestRegistry, null);
@@ -69,7 +71,8 @@
       ChannelFuture channelFuture = b.connect(host, port);
       // Wait for the connection
       channelFuture.sync(); // (5)
-      ChannelFuture closeFuture = channelFuture.channel().closeFuture();
+      channel = channelFuture.channel();
+      ChannelFuture closeFuture = channel.closeFuture();
       closeFuture.addListener(new WorkerCleanupListener(workerGroup));
       return new Http2Session(http2Codec.getWriter(), requestRegistry);
     } catch (Throwable t) {
@@ -78,6 +81,17 @@
     }
   }
 
+  public void stop() {
+    if (channel != null && channel.isOpen()) {
+      try {
+        channel.close().get();
+      } catch (Exception e) {
+        throw Throwables.propagate(e);
+      }
+    }
+    channel = null;
+  }
+
   private static class WorkerCleanupListener
       implements GenericFutureListener<io.netty.util.concurrent.Future<Void>> {
     private final EventLoopGroup workerGroup;