Adding simple Transport for HTTP. Also creating abstract base classes for common stream/transport code.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=70983246
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java
new file mode 100644
index 0000000..71d48f3
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java
@@ -0,0 +1,42 @@
+package com.google.net.stubby.newtransport;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.net.stubby.MethodDescriptor;
+
+/**
+ * Abstract base class for all {@link ClientTransport} implementations. Implements the
+ * {@link #newStream} method to perform a state check on the service before allowing stream
+ * creation.
+ */
+public abstract class AbstractClientTransport extends AbstractService implements ClientTransport {
+
+  @Override
+  public final ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
+    Preconditions.checkNotNull(method, "method");
+    Preconditions.checkNotNull(listener, "listener");
+    if (state() == State.STARTING) {
+      // Wait until the transport is running before creating the new stream.
+      awaitRunning();
+    }
+
+    if (state() != State.RUNNING) {
+      throw new IllegalStateException("Invalid state for creating new stream: " + state());
+    }
+
+    // Create the stream.
+    return newStreamInternal(method, listener);
+  }
+
+  /**
+   * Called by {@link #newStream} to perform the actual creation of the new {@link ClientStream}.
+   * This is only called after the transport has successfully transitioned to the {@code RUNNING}
+   * state.
+   *
+   * @param method the RPC method to be invoked on the server by the new stream.
+   * @param listener the listener for events on the new stream.
+   * @return the new stream.
+   */
+  protected abstract ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
+      StreamListener listener);
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
new file mode 100644
index 0000000..121e063
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
@@ -0,0 +1,263 @@
+package com.google.net.stubby.newtransport;
+
+import static com.google.net.stubby.newtransport.StreamState.CLOSED;
+import static com.google.net.stubby.newtransport.StreamState.OPEN;
+import static com.google.net.stubby.newtransport.StreamState.READ_ONLY;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.net.stubby.Status;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import javax.annotation.Nullable;
+
+/**
+ * Abstract base class for {@link Stream} implementations.
+ */
+public abstract class AbstractStream implements Stream {
+
+  /**
+   * Indicates the phase of the GRPC stream in one direction.
+   */
+  protected enum Phase {
+    CONTEXT, MESSAGE, STATUS
+  }
+
+  private volatile StreamState state = StreamState.OPEN;
+  private Status status;
+  private final Object stateLock = new Object();
+  private final Object writeLock = new Object();
+  private final MessageFramer framer;
+  private final StreamListener listener;
+  protected Phase inboundPhase = Phase.CONTEXT;
+  protected Phase outboundPhase = Phase.CONTEXT;
+
+  /**
+   * Handler for Framer output.
+   */
+  private final Framer.Sink<ByteBuffer> outboundFrameHandler = new Framer.Sink<ByteBuffer>() {
+    @Override
+    public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
+      sendFrame(frame, endOfStream);
+    }
+  };
+
+  /**
+   * Handler for Deframer output.
+   */
+  private final Framer inboundMessageHandler = new Framer() {
+    @Override
+    public void writeContext(String name, InputStream value, int length) {
+      ListenableFuture<Void> future = null;
+      try {
+        inboundPhase(Phase.CONTEXT);
+        future = listener.contextRead(name, value, length);
+      } finally {
+        closeWhenDone(future, value);
+      }
+    }
+
+    @Override
+    public void writePayload(InputStream input, int length) {
+      ListenableFuture<Void> future = null;
+      try {
+        inboundPhase(Phase.MESSAGE);
+        future = listener.messageRead(input, length);
+      } finally {
+        closeWhenDone(future, input);
+      }
+    }
+
+    @Override
+    public void writeStatus(Status status) {
+      inboundPhase(Phase.STATUS);
+      setStatus(status);
+    }
+
+    @Override
+    public void flush() {}
+
+    @Override
+    public boolean isClosed() {
+      return false;
+    }
+
+    @Override
+    public void close() {}
+
+    @Override
+    public void dispose() {}
+  };
+
+  protected AbstractStream(StreamListener listener) {
+    this.listener = Preconditions.checkNotNull(listener, "listener");
+
+    framer = new MessageFramer(outboundFrameHandler, 4096);
+    // No compression at the moment.
+    framer.setAllowCompression(false);
+  }
+
+  @Override
+  public StreamState state() {
+    return state;
+  }
+
+  @Override
+  public final void close() {
+    outboundPhase(Phase.STATUS);
+    synchronized (stateLock) {
+      state = state == OPEN ? READ_ONLY : CLOSED;
+    }
+    synchronized (writeLock) {
+      framer.close();
+    }
+  }
+
+  /**
+   * Free any resources associated with this stream. Subclass implementations must call this
+   * version.
+   */
+  public void dispose() {
+    synchronized (writeLock) {
+      framer.dispose();
+    }
+  }
+
+  @Override
+  public final void writeContext(String name, InputStream value, int length,
+      @Nullable Runnable accepted) {
+    Preconditions.checkNotNull(name, "name");
+    Preconditions.checkNotNull(value, "value");
+    Preconditions.checkArgument(length >= 0, "length must be >= 0");
+    outboundPhase(Phase.CONTEXT);
+    synchronized (writeLock) {
+      if (!framer.isClosed()) {
+        framer.writeContext(name, value, length);
+      }
+    }
+
+    // TODO(user): add flow control.
+    if (accepted != null) {
+      accepted.run();
+    }
+  }
+
+  @Override
+  public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
+    Preconditions.checkNotNull(message, "message");
+    Preconditions.checkArgument(length >= 0, "length must be >= 0");
+    outboundPhase(Phase.MESSAGE);
+    synchronized (writeLock) {
+      if (!framer.isClosed()) {
+        framer.writePayload(message, length);
+      }
+    }
+
+    // TODO(user): add flow control.
+    if (accepted != null) {
+      accepted.run();
+    }
+  }
+
+  @Override
+  public final void flush() {
+    synchronized (writeLock) {
+      if (!framer.isClosed()) {
+        framer.flush();
+      }
+    }
+  }
+
+  /**
+   * Sets the status if not already set and notifies the stream listener that the stream was closed.
+   * This method must be called from the transport thread.
+   *
+   * @param newStatus the new status to set
+   * @return {@code} true if the status was not already set.
+   */
+  public boolean setStatus(final Status newStatus) {
+    Preconditions.checkNotNull(newStatus, "newStatus");
+    synchronized (stateLock) {
+      if (status != null) {
+        // Disallow override of current status.
+        return false;
+      }
+
+      status = newStatus;
+      state = CLOSED;
+    }
+
+    // Invoke the observer callback.
+    listener.closed(newStatus);
+
+    // Free any resources.
+    dispose();
+
+    return true;
+  }
+
+  /**
+   * Sends an outbound frame to the server.
+   *
+   * @param frame a buffer containing the chunk of data to be sent.
+   * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
+   *        this endpoint.
+   */
+  protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
+
+  /**
+   * Gets the handler for inbound messages. Subclasses must use this as the target for a
+   * {@link com.google.net.stubby.newtransport.Deframer}.
+   */
+  protected final Framer inboundMessageHandler() {
+    return inboundMessageHandler;
+  }
+
+  /**
+   * Transitions the inbound phase. If the transition is disallowed, throws a
+   * {@link IllegalStateException}.
+   */
+  protected final void inboundPhase(Phase nextPhase) {
+    inboundPhase = verifyNextPhase(inboundPhase, nextPhase);
+  }
+
+  /**
+   * Transitions the outbound phase. If the transition is disallowed, throws a
+   * {@link IllegalStateException}.
+   */
+  protected final void outboundPhase(Phase nextPhase) {
+    outboundPhase = verifyNextPhase(outboundPhase, nextPhase);
+  }
+
+  private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
+    if (nextPhase.ordinal() < currentPhase.ordinal() || currentPhase == Phase.STATUS) {
+      throw new IllegalStateException(
+          String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
+    }
+    return nextPhase;
+  }
+
+  /**
+   * If the given future is provided, closes the {@link InputStream} when it completes. Otherwise
+   * the {@link InputStream} is closed immediately.
+   */
+  private static void closeWhenDone(@Nullable ListenableFuture<Void> future,
+      final InputStream input) {
+    if (future == null) {
+      Closeables.closeQuietly(input);
+      return;
+    }
+
+    // Close the buffer when the future completes.
+    future.addListener(new Runnable() {
+      @Override
+      public void run() {
+        Closeables.closeQuietly(input);
+      }
+    }, MoreExecutors.sameThreadExecutor());
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java
new file mode 100644
index 0000000..86fad43
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java
@@ -0,0 +1,164 @@
+package com.google.net.stubby.newtransport.http;
+
+import static com.google.net.stubby.Status.CANCELLED;
+import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_HEADER;
+import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_PROTORPC;
+import static com.google.net.stubby.newtransport.HttpUtil.HTTP_METHOD;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteBuffers;
+import com.google.net.stubby.MethodDescriptor;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.newtransport.AbstractClientTransport;
+import com.google.net.stubby.newtransport.AbstractStream;
+import com.google.net.stubby.newtransport.ClientStream;
+import com.google.net.stubby.newtransport.InputStreamDeframer;
+import com.google.net.stubby.newtransport.StreamListener;
+import com.google.net.stubby.newtransport.StreamState;
+import com.google.net.stubby.transport.Transport;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A simple client-side transport for RPC-over-HTTP/1.1. All execution (including listener
+ * callbacks) are executed in the application thread context.
+ */
+public class HttpClientTransport extends AbstractClientTransport {
+
+  private final URI baseUri;
+  private final Set<HttpClientStream> streams =
+      Collections.synchronizedSet(new HashSet<HttpClientStream>());
+
+  public HttpClientTransport(URI baseUri) {
+    this.baseUri = Preconditions.checkNotNull(baseUri, "baseUri");
+  }
+
+  @Override
+  protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
+    URI uri = baseUri.resolve(method.getName());
+    HttpClientStream stream = new HttpClientStream(uri, listener);
+    synchronized (streams) {
+      // Check for RUNNING to deal with race condition of this being executed right after doStop
+      // cancels all the streams.
+      if (state() != State.RUNNING) {
+        throw new IllegalStateException("Invalid state for creating new stream: " + state());
+      }
+      streams.add(stream);
+      return stream;
+    }
+  }
+
+  @Override
+  protected void doStart() {
+    notifyStarted();
+  }
+
+  @Override
+  protected void doStop() {
+    // Cancel all of the streams for this transport.
+    synchronized (streams) {
+      // Guaranteed to be in the STOPPING state here.
+      for (HttpClientStream stream : streams.toArray(new HttpClientStream[0])) {
+        stream.cancel();
+      }
+    }
+    notifyStopped();
+  }
+
+  /**
+   * Client stream implementation for an HTTP transport.
+   */
+  private class HttpClientStream extends AbstractStream implements ClientStream {
+    final HttpURLConnection connection;
+    final DataOutputStream outputStream;
+    boolean connected;
+
+    HttpClientStream(URI uri, StreamListener listener) {
+      super(listener);
+
+      try {
+        connection = (HttpURLConnection) uri.toURL().openConnection();
+        connection.setDoOutput(true);
+        connection.setDoInput(true);
+        connection.setRequestMethod(HTTP_METHOD);
+        connection.setRequestProperty(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC);
+        outputStream = new DataOutputStream(connection.getOutputStream());
+        connected = true;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void cancel() {
+      outboundPhase = Phase.STATUS;
+      if (setStatus(CANCELLED)) {
+        disconnect();
+      }
+    }
+
+    @Override
+    protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
+      if (state() == StreamState.CLOSED) {
+        // Ignore outbound frames after the stream has closed.
+        return;
+      }
+
+      try {
+        // Synchronizing here to protect against cancellation due to the transport shutting down.
+        synchronized (connection) {
+          // Write the data to the connection output stream.
+          ByteBuffers.asByteSource(frame).copyTo(outputStream);
+
+          if (endOfStream) {
+            // Close the output stream on this connection.
+            connection.getOutputStream().close();
+
+            // The request has completed so now process the response. This results in the listener's
+            // closed() callback being invoked since we're indicating that this is the end of the
+            // response stream.
+            //
+            // NOTE: Must read the response in the sending thread, since URLConnection has threading
+            // issues.
+            new InputStreamDeframer(inboundMessageHandler()).deliverFrame(
+                connection.getInputStream(), true);
+
+            // Close the input stream and disconnect.
+            connection.getInputStream().close();
+            disconnect();
+          }
+        }
+      } catch (IOException ioe) {
+        setStatus(new Status(Transport.Code.INTERNAL, ioe));
+      }
+    }
+
+    @Override
+    public void dispose() {
+      super.dispose();
+      disconnect();
+    }
+
+    /**
+     * Disconnects the HTTP connection if currently connected.
+     */
+    private void disconnect() {
+      // Synchronizing since this may be called for the stream (i.e. cancel or read complete) or
+      // due to shutting down the transport (i.e. cancel).
+      synchronized (connection) {
+        if (connected) {
+          connected = false;
+          streams.remove(this);
+          connection.disconnect();
+        }
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransportFactory.java b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransportFactory.java
new file mode 100644
index 0000000..794d02d
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransportFactory.java
@@ -0,0 +1,27 @@
+package com.google.net.stubby.newtransport.http;
+
+import com.google.net.stubby.newtransport.ClientTransportFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Factory that manufactures instances of {@link HttpClientTransport}.
+ */
+public class HttpClientTransportFactory implements ClientTransportFactory {
+  private final URI baseUri;
+
+  public HttpClientTransportFactory(String host, int port, boolean ssl) {
+    try {
+      String scheme = ssl ? "https" : "http";
+      baseUri = new URI(scheme, null, host, port, "/", null, null);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public HttpClientTransport newClientTransport() {
+    return new HttpClientTransport(baseUri);
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
index 8881ad0..2143a26 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
@@ -1,120 +1,33 @@
 package com.google.net.stubby.newtransport.netty;
 
 import static com.google.net.stubby.newtransport.StreamState.CLOSED;
-import static com.google.net.stubby.newtransport.StreamState.OPEN;
-import static com.google.net.stubby.newtransport.StreamState.READ_ONLY;
 
 import com.google.common.base.Preconditions;
-import com.google.common.io.Closeables;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.net.stubby.Status;
+import com.google.net.stubby.newtransport.AbstractStream;
 import com.google.net.stubby.newtransport.ClientStream;
 import com.google.net.stubby.newtransport.Deframer;
-import com.google.net.stubby.newtransport.Framer;
-import com.google.net.stubby.newtransport.MessageFramer;
 import com.google.net.stubby.newtransport.StreamListener;
-import com.google.net.stubby.newtransport.StreamState;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPromise;
 
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 
-import javax.annotation.Nullable;
-
 /**
  * Client stream for a Netty transport.
  */
-class NettyClientStream implements ClientStream {
+class NettyClientStream extends AbstractStream implements ClientStream {
   public static final int PENDING_STREAM_ID = -1;
 
-  /**
-   * Indicates the phase of the GRPC stream in one direction.
-   */
-  private enum Phase {
-    CONTEXT, MESSAGE, STATUS
-  }
-
-  /**
-   * Guards transition of stream state.
-   */
-  private final Object stateLock = new Object();
-
-  /**
-   * Guards access to the frame writer.
-   */
-  private final Object writeLock = new Object();
-
-  private volatile StreamState state = OPEN;
   private volatile int id = PENDING_STREAM_ID;
-  private Status status;
-  private Phase inboundPhase = Phase.CONTEXT;
-  private Phase outboundPhase = Phase.CONTEXT;
-  private final StreamListener listener;
   private final Channel channel;
-  private final Framer framer;
   private final Deframer<ByteBuf> deframer;
 
-  private final Framer.Sink<ByteBuffer> outboundFrameHandler = new Framer.Sink<ByteBuffer>() {
-    @Override
-    public void deliverFrame(ByteBuffer buffer, boolean endStream) {
-      ByteBuf buf = toByteBuf(buffer);
-      send(buf, endStream, endStream);
-    }
-  };
-
-  private final Framer inboundMessageHandler = new Framer() {
-    @Override
-    public void writeContext(String name, InputStream value, int length) {
-      ListenableFuture<Void> future = null;
-      try {
-        inboundPhase(Phase.CONTEXT);
-        future = listener.contextRead(name, value, length);
-      } finally {
-        closeWhenDone(future, value);
-      }
-    }
-
-    @Override
-    public void writePayload(InputStream input, int length) {
-      ListenableFuture<Void> future = null;
-      try {
-        inboundPhase(Phase.MESSAGE);
-        future = listener.messageRead(input, length);
-      } finally {
-        closeWhenDone(future, input);
-      }
-    }
-
-    @Override
-    public void writeStatus(Status status) {
-      inboundPhase(Phase.STATUS);
-      setStatus(status);
-    }
-
-    @Override
-    public void flush() {}
-
-    @Override
-    public boolean isClosed() {
-      return false;
-    }
-
-    @Override
-    public void close() {}
-
-    @Override
-    public void dispose() {}
-  };
-
   NettyClientStream(StreamListener listener, Channel channel) {
-    this.listener = Preconditions.checkNotNull(listener, "listener");
+    super(listener);
     this.channel = Preconditions.checkNotNull(channel, "channel");
-    this.deframer = new ByteBufDeframer(channel.alloc(), inboundMessageHandler);
-    this.framer = new MessageFramer(outboundFrameHandler, 4096);
+    this.deframer = new ByteBufDeframer(channel.alloc(), inboundMessageHandler());
   }
 
   /**
@@ -129,25 +42,6 @@
   }
 
   @Override
-  public StreamState state() {
-    return state;
-  }
-
-  @Override
-  public void close() {
-    outboundPhase(Phase.STATUS);
-    // Transition the state to mark the close the local side of the stream.
-    synchronized (stateLock) {
-      state = state == OPEN ? READ_ONLY : CLOSED;
-    }
-
-    // Close the frame writer and send any buffered frames.
-    synchronized (writeLock) {
-      framer.close();
-    }
-  }
-
-  @Override
   public void cancel() {
     outboundPhase = Phase.STATUS;
 
@@ -156,60 +50,6 @@
   }
 
   /**
-   * Free any resources associated with this stream.
-   */
-  public void dispose() {
-    synchronized (writeLock) {
-      framer.dispose();
-    }
-  }
-
-  @Override
-  public void writeContext(String name, InputStream value, int length,
-      @Nullable final Runnable accepted) {
-    Preconditions.checkNotNull(name, "name");
-    Preconditions.checkNotNull(value, "value");
-    Preconditions.checkArgument(length >= 0, "length must be >= 0");
-    outboundPhase(Phase.CONTEXT);
-    synchronized (writeLock) {
-      if (!framer.isClosed()) {
-        framer.writeContext(name, value, length);
-      }
-    }
-
-    // TODO(user): add flow control.
-    if (accepted != null) {
-      accepted.run();
-    }
-  }
-
-  @Override
-  public void writeMessage(InputStream message, int length, @Nullable final Runnable accepted) {
-    Preconditions.checkNotNull(message, "message");
-    Preconditions.checkArgument(length >= 0, "length must be >= 0");
-    outboundPhase(Phase.MESSAGE);
-    synchronized (writeLock) {
-      if (!framer.isClosed()) {
-        framer.writePayload(message, length);
-      }
-    }
-
-    // TODO(user): add flow control.
-    if (accepted != null) {
-      accepted.run();
-    }
-  }
-
-  @Override
-  public void flush() {
-    synchronized (writeLock) {
-      if (!framer.isClosed()) {
-        framer.flush();
-      }
-    }
-  }
-
-  /**
    * Called in the channel thread to process the content of an inbound DATA frame.
    *
    * @param frame the inbound HTTP/2 DATA frame. If this buffer is not used immediately, it must be
@@ -219,7 +59,7 @@
   public void inboundDataReceived(ByteBuf frame, boolean endOfStream, ChannelPromise promise) {
     Preconditions.checkNotNull(frame, "frame");
     Preconditions.checkNotNull(promise, "promise");
-    if (state == CLOSED) {
+    if (state() == CLOSED) {
       promise.setSuccess();
       return;
     }
@@ -231,44 +71,11 @@
     promise.setSuccess();
   }
 
-  /**
-   * Sets the status if not already set and notifies the stream listener that the stream was closed.
-   * This method must be called from the Netty channel thread.
-   *
-   * @param newStatus the new status to set
-   * @return {@code} true if the status was not already set.
-   */
-  public boolean setStatus(final Status newStatus) {
-    Preconditions.checkNotNull(newStatus, "newStatus");
-    synchronized (stateLock) {
-      if (status != null) {
-        // Disallow override of current status.
-        return false;
-      }
-
-      status = newStatus;
-      state = CLOSED;
-    }
-
-    // Invoke the observer callback.
-    listener.closed(newStatus);
-
-    // Free any resources.
-    dispose();
-
-    return true;
-  }
-
-  /**
-   * Writes the given frame to the channel.
-   *
-   * @param data the grpc frame to be written.
-   * @param endStream indicates whether this is the last frame to be sent for this stream.
-   * @param endMessage indicates whether the data ends at a message boundary.
-   */
-  private void send(ByteBuf data, boolean endStream, boolean endMessage) {
-    SendGrpcFrameCommand frame = new SendGrpcFrameCommand(this, data, endStream, endMessage);
-    channel.writeAndFlush(frame);
+  @Override
+  protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
+    SendGrpcFrameCommand cmd =
+        new SendGrpcFrameCommand(this, toByteBuf(frame), endOfStream, endOfStream);
+    channel.writeAndFlush(cmd);
   }
 
   /**
@@ -279,49 +86,4 @@
     buf.writeBytes(source);
     return buf;
   }
-
-  /**
-   * Transitions the inbound phase. If the transition is disallowed, throws a
-   * {@link IllegalStateException}.
-   */
-  private void inboundPhase(Phase nextPhase) {
-    inboundPhase = verifyNextPhase(inboundPhase, nextPhase);
-  }
-
-  /**
-   * Transitions the outbound phase. If the transition is disallowed, throws a
-   * {@link IllegalStateException}.
-   */
-  private void outboundPhase(Phase nextPhase) {
-    outboundPhase = verifyNextPhase(outboundPhase, nextPhase);
-  }
-
-  private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
-    // Only allow forward progression.
-    if (nextPhase.ordinal() < currentPhase.ordinal() || currentPhase == Phase.STATUS) {
-      throw new IllegalStateException(
-          String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
-    }
-    return nextPhase;
-  }
-
-  /**
-   * If the given future is provided, closes the {@link InputStream} when it completes. Otherwise
-   * the {@link InputStream} is closed immediately.
-   */
-  private static void closeWhenDone(@Nullable ListenableFuture<Void> future,
-      final InputStream input) {
-    if (future == null) {
-      Closeables.closeQuietly(input);
-      return;
-    }
-
-    // Close the buffer when the future completes.
-    future.addListener(new Runnable() {
-      @Override
-      public void run() {
-        Closeables.closeQuietly(input);
-      }
-    }, MoreExecutors.sameThreadExecutor());
-  }
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java
index 28a652d..1ef9416 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java
@@ -3,8 +3,8 @@
 import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractService;
 import com.google.net.stubby.MethodDescriptor;
+import com.google.net.stubby.newtransport.AbstractClientTransport;
 import com.google.net.stubby.newtransport.ClientStream;
 import com.google.net.stubby.newtransport.ClientTransport;
 import com.google.net.stubby.newtransport.StreamListener;
@@ -25,7 +25,7 @@
 /**
  * A Netty-based {@link ClientTransport} implementation.
  */
-class NettyClientTransport extends AbstractService implements ClientTransport {
+class NettyClientTransport extends AbstractClientTransport {
 
   private final String host;
   private final int port;
@@ -58,22 +58,7 @@
   }
 
   @Override
-  public ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
-    Preconditions.checkNotNull(method, "method");
-    Preconditions.checkNotNull(listener, "listener");
-    switch (state()) {
-      case STARTING:
-        // Wait until the transport is running before creating the new stream.
-        awaitRunning();
-        break;
-      case NEW:
-      case TERMINATED:
-      case FAILED:
-        throw new IllegalStateException("Unable to create new stream in state: " + state());
-      default:
-        break;
-    }
-
+  protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
     // Create the stream.
     NettyClientStream stream = new NettyClientStream(listener, channel);