Plumb trailer passing through transport streams.

We purposefully avoid going through the (de)framer, since close()
behavior is specific to whether on client or server.
AbstractClientStream and AbstractServerStream handle mapping the events
to appropriate semantics, but require stashing status/trailer for later
use.

It was very interesting getting to a point where we could support the old
and new protocol; that is probably the most detailed-oriented portion of
the CL. There are some interface hacks going on, but those will
naturally be removed when we trash the gRPC v1 framer.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=76092186
diff --git a/core/src/main/java/com/google/net/stubby/Metadata.java b/core/src/main/java/com/google/net/stubby/Metadata.java
index c757294..951a69a 100644
--- a/core/src/main/java/com/google/net/stubby/Metadata.java
+++ b/core/src/main/java/com/google/net/stubby/Metadata.java
@@ -87,6 +87,42 @@
     }
   };
 
+  /**
+   * Simple metadata marshaller that encodes an integer as a signed decimal string or as big endian
+   * binary with four bytes.
+   */
+  public static final Marshaller<Integer> INTEGER_MARSHALLER = new Marshaller<Integer>() {
+    @Override
+    public byte[] toBytes(Integer value) {
+      return new byte[] {
+        (byte) (value >>> 24),
+        (byte) (value >>> 16),
+        (byte) (value >>> 8),
+        (byte) (value >>> 0)};
+    }
+
+    @Override
+    public String toAscii(Integer value) {
+      return value.toString();
+    }
+
+    @Override
+    public Integer parseBytes(byte[] serialized) {
+      if (serialized.length != 4) {
+        throw new IllegalArgumentException("Can only deserialize 4 bytes into an integer");
+      }
+      return (serialized[0] << 24)
+          |  (serialized[1] << 16)
+          |  (serialized[2] << 8)
+          |   serialized[3];
+    }
+
+    @Override
+    public Integer parseAscii(String ascii) {
+      return Integer.valueOf(ascii);
+    }
+  };
+
   private final ListMultimap<String, MetadataEntry> store;
   private final boolean serializable;
 
@@ -395,6 +431,9 @@
    * Key for metadata entries. Allows for parsing and serialization of metadata.
    */
   public static class Key<T> {
+    public static <T> Key<T> of(String name, Marshaller<T> marshaller) {
+      return new Key<T>(name, marshaller);
+    }
 
     private final String name;
     private final byte[] asciiName;
@@ -403,7 +442,7 @@
     /**
      * Keys have a name and a marshaller used for serialization.
      */
-    public Key(String name, Marshaller<T> marshaller) {
+    private Key(String name, Marshaller<T> marshaller) {
       this.name = Preconditions.checkNotNull(name, "name").intern();
       this.asciiName = name.getBytes(StandardCharsets.US_ASCII);
       this.marshaller = Preconditions.checkNotNull(marshaller);
diff --git a/core/src/main/java/com/google/net/stubby/Status.java b/core/src/main/java/com/google/net/stubby/Status.java
index 6b6de60..61f6ed7 100644
--- a/core/src/main/java/com/google/net/stubby/Status.java
+++ b/core/src/main/java/com/google/net/stubby/Status.java
@@ -4,6 +4,8 @@
 import com.google.common.base.Throwables;
 import com.google.net.stubby.transport.Transport;
 
+import java.util.logging.Logger;
+
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.Immutable;
 
@@ -12,9 +14,14 @@
  */
 @Immutable
 public class Status {
-
   public static final Status OK = new Status(Transport.Code.OK);
   public static final Status CANCELLED = new Status(Transport.Code.CANCELLED);
+  public static final Metadata.Key<Transport.Code> CODE_KEY
+      = Metadata.Key.of("grpc-status", new CodeMarshaller());
+  public static final Metadata.Key<String> MESSAGE_KEY
+      = Metadata.Key.of("grpc-message", Metadata.STRING_MARSHALLER);
+
+  private static final Logger log = Logger.getLogger(Status.class.getName());
 
   public static Status fromThrowable(Throwable t) {
     for (Throwable cause : Throwables.getCausalChain(t)) {
@@ -134,4 +141,35 @@
     builder.append("]");
     return builder.toString();
   }
+
+  private static class CodeMarshaller implements Metadata.Marshaller<Transport.Code> {
+    @Override
+    public byte[] toBytes(Transport.Code value) {
+      return Metadata.INTEGER_MARSHALLER.toBytes(value.getNumber());
+    }
+
+    @Override
+    public String toAscii(Transport.Code value) {
+      return Metadata.INTEGER_MARSHALLER.toAscii(value.getNumber());
+    }
+
+    @Override
+    public Transport.Code parseBytes(byte[] serialized) {
+      return intToCode(Metadata.INTEGER_MARSHALLER.parseBytes(serialized));
+    }
+
+    @Override
+    public Transport.Code parseAscii(String ascii) {
+      return intToCode(Metadata.INTEGER_MARSHALLER.parseAscii(ascii));
+    }
+
+    private Transport.Code intToCode(Integer i) {
+      Transport.Code code = Transport.Code.valueOf(i);
+      if (code == null) {
+        log.warning("Unknown Code: " + i);
+        code = Transport.Code.UNKNOWN;
+      }
+      return code;
+    }
+  }
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java
index 639d9db..f85860d 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java
@@ -30,7 +30,7 @@
     int b2 = readUnsignedByte();
     int b3 = readUnsignedByte();
     int b4 = readUnsignedByte();
-    return (b1 << 24) + (b2 << 16) + (b3 << 8) + b4;
+    return (b1 << 24) | (b2 << 16) | (b3 << 8) | b4;
   }
 
   @Override
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java
index 99508f8..0b81bab 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java
@@ -5,9 +5,15 @@
 import static com.google.net.stubby.newtransport.StreamState.READ_ONLY;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import javax.annotation.concurrent.GuardedBy;
+
 /**
  * The abstract base class for {@link ClientStream} implementations.
  */
@@ -15,37 +21,64 @@
 
   private final StreamListener listener;
 
+  @GuardedBy("stateLock")
   private Status status;
 
   private final Object stateLock = new Object();
   private volatile StreamState state = StreamState.OPEN;
 
+  private Status stashedStatus;
+  private Metadata.Trailers stashedTrailers;
+
   protected AbstractClientStream(StreamListener listener) {
     this.listener = Preconditions.checkNotNull(listener);
   }
 
   @Override
-  protected final StreamListener listener() {
-    return listener;
+  protected ListenableFuture<Void> receiveMessage(InputStream is, int length) {
+    return listener.messageRead(is, length);
+  }
+
+  /** gRPC protocol v1 support */
+  @Override
+  protected void receiveStatus(Status status) {
+    Preconditions.checkNotNull(status, "status");
+    stashedStatus = status;
+    stashedTrailers = new Metadata.Trailers();
   }
 
   /**
-   * Overrides the behavior of the {@link StreamListener#closed(Status)} method to call
-   * {@link #setStatus(Status)}, rather than notifying the {@link #listener()} directly.
+   * If using gRPC v2 protocol, this method must be called with received trailers before notifying
+   * deframer of end of stream.
    */
-  @Override
-  protected final StreamListener inboundMessageHandler() {
-    // Wraps the base handler to get status update.
-    return new ForwardingStreamListener(super.inboundMessageHandler()) {
-      @Override
-      public void closed(Status status, Metadata.Trailers trailers) {
-        inboundPhase(Phase.STATUS);
-        // TODO(user): Fix once we switch the wire format to express status in trailers
-        setStatus(status, new Metadata.Trailers());
-      }
-    };
+  public void stashTrailers(Metadata.Trailers trailers) {
+    Preconditions.checkNotNull(status, "trailers");
+    stashedStatus = new Status(trailers.get(Status.CODE_KEY), trailers.get(Status.MESSAGE_KEY));
+    trailers.removeAll(Status.CODE_KEY);
+    trailers.removeAll(Status.MESSAGE_KEY);
+    stashedTrailers = trailers;
   }
 
+  @Override
+  protected void remoteEndClosed() {
+    Preconditions.checkState(stashedStatus != null, "Status and trailers should have been set");
+    setStatus(stashedStatus, stashedTrailers);
+  }
+
+  @Override
+  protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
+    sendFrame(frame, endOfStream);
+  }
+
+  /**
+   * Sends an outbound frame to the remote end point.
+   *
+   * @param frame a buffer containing the chunk of data to be sent.
+   * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
+   *        this endpoint.
+   */
+  protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
+
   /**
    * Sets the status if not already set and notifies the stream listener that the stream was closed.
    * This method must be called from the transport thread.
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java
index 9fd5a9d..1204844 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java
@@ -5,10 +5,14 @@
 import static com.google.net.stubby.newtransport.StreamState.WRITE_ONLY;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.transport.Transport;
 
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
 import javax.annotation.concurrent.GuardedBy;
 
 /**
@@ -26,19 +30,26 @@
   /** Saved application status for notifying when graceful stream termination completes. */
   @GuardedBy("stateLock")
   private Status gracefulStatus;
-  @GuardedBy("stateLock")
-  private Metadata.Trailers gracefulTrailers;
-
-  @Override
-  protected final StreamListener listener() {
-    return listener;
-  }
+  /** Saved trailers from close() that need to be sent once the framer has sent all messages. */
+  private Metadata.Trailers stashedTrailers;
 
   public final void setListener(ServerStreamListener listener) {
     this.listener = Preconditions.checkNotNull(listener, "listener");
   }
 
   @Override
+  protected ListenableFuture<Void> receiveMessage(InputStream is, int length) {
+    inboundPhase(Phase.MESSAGE);
+    return listener.messageRead(is, length);
+  }
+
+  /** gRPC protocol v1 support */
+  @Override
+  protected void receiveStatus(Status status) {
+    Preconditions.checkState(status == Status.OK, "Received status can only be OK on server");
+  }
+
+  @Override
   public final void close(Status status, Metadata.Trailers trailers) {
     Preconditions.checkNotNull(status, "status");
     Preconditions.checkNotNull(trailers, "trailers");
@@ -52,13 +63,48 @@
         // is notified via complete()). Since there may be large buffers involved, the actual
         // completion of the RPC could be much later than this call.
         gracefulStatus = status;
-        gracefulTrailers = trailers;
       }
     }
+    trailers.removeAll(Status.CODE_KEY);
+    trailers.removeAll(Status.MESSAGE_KEY);
+    trailers.put(Status.CODE_KEY, status.getCode());
+    if (status.getDescription() != null) {
+      trailers.put(Status.MESSAGE_KEY, status.getDescription());
+    }
+    this.stashedTrailers = trailers;
     closeFramer(status);
     dispose();
   }
 
+  @Override
+  protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
+    if (!GRPC_V2_PROTOCOL) {
+      sendFrame(frame, endOfStream);
+    } else {
+      sendFrame(frame, false);
+      if (endOfStream) {
+        sendTrailers(stashedTrailers);
+        stashedTrailers = null;
+      }
+    }
+  }
+
+  /**
+   * Sends an outbound frame to the remote end point.
+   *
+   * @param frame a buffer containing the chunk of data to be sent.
+   * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
+   *        this endpoint.
+   */
+  protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
+
+  /**
+   * Sends trailers to the remote end point. This call implies end of stream.
+   *
+   * @param trailers metadata to be sent to end point
+   */
+  protected abstract void sendTrailers(Metadata.Trailers trailers);
+
   /**
    * The Stream is considered completely closed and there is no further opportunity for error. It
    * calls the listener's {@code closed()} if it was not already done by {@link #abortStream}. Note
@@ -80,7 +126,7 @@
           new Metadata.Trailers());
       throw new IllegalStateException("successful complete() without close()");
     }
-    listener.closed(status, gracefulTrailers);
+    listener.closed(status, new Metadata.Trailers());
   }
 
   @Override
@@ -91,7 +137,8 @@
   /**
    * Called when the remote end half-closes the stream.
    */
-  public final void remoteEndClosed() {
+  @Override
+  protected final void remoteEndClosed() {
     synchronized (stateLock) {
       Preconditions.checkState(state == OPEN, "Stream not OPEN");
       state = WRITE_ONLY;
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
index f0cfa2a..d4c3bc4 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java
@@ -4,7 +4,6 @@
 import com.google.common.io.Closeables;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Status;
 
 import java.io.InputStream;
@@ -41,29 +40,19 @@
   private final Framer.Sink<ByteBuffer> outboundFrameHandler = new Framer.Sink<ByteBuffer>() {
     @Override
     public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
-      sendFrame(frame, endOfStream);
+      internalSendFrame(frame, endOfStream);
     }
   };
 
   /**
-   * Internal handler for Deframer output. Informs the {@link #listener()} of inbound messages.
+   * Internal handler for deframer output. Informs stream of inbound messages.
    */
-  private final StreamListener inboundMessageHandler = new StreamListener() {
-
-    @Override
-    public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
-      inboundPhase(Phase.HEADERS);
-      ListenableFuture<Void> future = listener().headersRead(headers);
-      disableWindowUpdate(future);
-      return future;
-    }
-
+  private final GrpcDeframer.Sink inboundMessageHandler = new GrpcDeframer.Sink() {
     @Override
     public ListenableFuture<Void> messageRead(InputStream input, int length) {
       ListenableFuture<Void> future = null;
       try {
-        inboundPhase(Phase.MESSAGE);
-        future = listener().messageRead(input, length);
+        future = receiveMessage(input, length);
         disableWindowUpdate(future);
         return future;
       } finally {
@@ -72,9 +61,13 @@
     }
 
     @Override
-    public void closed(Status status, Metadata.Trailers trailers) {
-      inboundPhase(Phase.STATUS);
-      listener().closed(status, trailers);
+    public void statusRead(Status status) {
+      receiveStatus(status);
+    }
+
+    @Override
+    public void endOfStream() {
+      remoteEndClosed();
     }
   };
 
@@ -129,12 +122,16 @@
    * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
    *        this endpoint.
    */
-  protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
+  protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream);
 
-  /**
-   * Returns the listener associated to this stream.
-   */
-  protected abstract StreamListener listener();
+  /** A message was deframed. */
+  protected abstract ListenableFuture<Void> receiveMessage(InputStream is, int length);
+
+  /** A status was deframed. */
+  protected abstract void receiveStatus(Status status);
+
+  /** Deframer reached end of stream. */
+  protected abstract void remoteEndClosed();
 
   /**
    * If the given future is non-{@code null}, temporarily disables window updates for inbound flow
@@ -147,7 +144,7 @@
    * Gets the internal handler for inbound messages. Subclasses must use this as the target for a
    * {@link com.google.net.stubby.newtransport.Deframer}.
    */
-  protected StreamListener inboundMessageHandler() {
+  protected GrpcDeframer.Sink inboundMessageHandler() {
     return inboundMessageHandler;
   }
 
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
index 817ff94..dd7d80f 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
@@ -2,7 +2,6 @@
 
 import com.google.common.io.ByteStreams;
 import com.google.net.stubby.GrpcFramingUtil;
-import com.google.net.stubby.Metadata;
 import com.google.net.stubby.Operation;
 import com.google.net.stubby.Status;
 import com.google.net.stubby.transport.Transport;
@@ -23,13 +22,13 @@
    */
   private static final int LENGTH_NOT_SET = -1;
 
-  private final StreamListener target;
+  private final GrpcDeframer.Sink target;
   private boolean inFrame;
   private byte currentFlags;
   private int currentLength = LENGTH_NOT_SET;
   private boolean statusDelivered;
 
-  public Deframer(StreamListener target) {
+  public Deframer(GrpcDeframer.Sink target) {
     this.target = target;
   }
 
@@ -141,7 +140,8 @@
   }
 
   private void writeStatus(Status status) {
-    target.closed(status, new Metadata.Trailers());
+    target.statusRead(status);
+    target.endOfStream();
     statusDelivered = true;
   }
 
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
index cc1a306..9d55009 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java
@@ -20,6 +20,9 @@
  * {@link Decompressor}.
  */
 public class GrpcDeframer implements Closeable {
+  public interface Sink extends MessageDeframer2.Sink {
+    void statusRead(Status status);
+  }
 
   private enum State {
     HEADER, BODY
@@ -35,22 +38,22 @@
   private boolean statusNotified;
   private boolean endOfStream;
   private boolean deliveryOutstanding;
-  private StreamListener listener;
+  private Sink sink;
   private CompositeBuffer nextFrame;
 
   /**
    * Constructs the deframer.
    *
    * @param decompressor the object used for de-framing GRPC compression frames.
-   * @param listener the listener for fully read GRPC messages.
+   * @param sink the sink for fully read GRPC messages.
    * @param executor the executor to be used for delivery. All calls to
    *        {@link #deframe(Buffer, boolean)} must be made in the context of this executor. This
    *        executor must not allow concurrent access to this class, so it must be either a single
    *        thread or have sequential processing of events.
    */
-  public GrpcDeframer(Decompressor decompressor, StreamListener listener, Executor executor) {
+  public GrpcDeframer(Decompressor decompressor, Sink sink, Executor executor) {
     this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
-    this.listener = Preconditions.checkNotNull(listener, "listener");
+    this.sink = Preconditions.checkNotNull(sink, "sink");
     this.executor = Preconditions.checkNotNull(executor, "executor");
     deliveryTask = new Runnable() {
       @Override
@@ -62,7 +65,7 @@
   }
 
   /**
-   * Adds the given data to this deframer and attempts delivery to the listener.
+   * Adds the given data to this deframer and attempts delivery to the sink.
    */
   public void deframe(Buffer data, boolean endOfStream) {
     Preconditions.checkNotNull(data, "data");
@@ -87,7 +90,7 @@
 
   /**
    * If there is no outstanding delivery, attempts to read and deliver as many messages to the
-   * listener as possible. Only one outstanding delivery is allowed at a time.
+   * sink as possible. Only one outstanding delivery is allowed at a time.
    */
   private void deliver() {
     if (deliveryOutstanding) {
@@ -106,11 +109,11 @@
           processHeader();
           break;
         case BODY:
-          // Read the body and deliver the message to the listener.
+          // Read the body and deliver the message to the sink.
           deliveryOutstanding = true;
           ListenableFuture<Void> processingFuture = processBody();
           if (processingFuture != null) {
-            // A listener was returned for the completion of processing the delivered
+            // A sink was returned for the completion of processing the delivered
             // message. Once it's done, try to deliver the next message.
             processingFuture.addListener(deliveryTask, executor);
             return;
@@ -200,9 +203,9 @@
    */
   private ListenableFuture<Void> processMessage() {
     try {
-      return listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
+      return sink.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
     } finally {
-      // Don't close the frame, since the listener is now responsible for the life-cycle.
+      // Don't close the frame, since the sink is now responsible for the life-cycle.
       nextFrame = null;
     }
   }
@@ -223,10 +226,11 @@
   }
 
   /**
-   * Delivers the status notification to the listener.
+   * Delivers the status notification to the sink.
    */
   private void notifyStatus(Status status) {
     statusNotified = true;
-    listener.closed(status, new Metadata.Trailers());
+    sink.statusRead(status);
+    sink.endOfStream();
   }
 }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java b/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java
index 75895d3..fe5aa06 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java
@@ -15,7 +15,7 @@
 
   private final PrefixingInputStream prefixingInputStream;
 
-  public InputStreamDeframer(StreamListener target) {
+  public InputStreamDeframer(GrpcDeframer.Sink target) {
     super(target);
     prefixingInputStream = new PrefixingInputStream(4096);
   }
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/MessageDeframer2.java b/core/src/main/java/com/google/net/stubby/newtransport/MessageDeframer2.java
index 57a25d3..8c37e02 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/MessageDeframer2.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/MessageDeframer2.java
@@ -3,11 +3,10 @@
 import com.google.common.base.Preconditions;
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.net.stubby.Metadata;
-import com.google.net.stubby.Status;
 
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
+import java.io.InputStream;
 import java.io.IOException;
 import java.util.concurrent.Executor;
 import java.util.zip.GZIPInputStream;
@@ -30,63 +29,17 @@
     NONE, GZIP;
   }
 
+  public interface Sink {
+    public ListenableFuture<Void> messageRead(InputStream is, int length);
+    public void endOfStream();
+  }
+
   private enum State {
     HEADER, BODY
   }
 
-  /**
-   * Create a deframer for use on the server-side. All calls to this class must be made in the
-   * context of the provided executor, which also must not allow concurrent processing of Runnables.
-   *
-   * @param listener callback for fully read GRPC messages
-   * @param executor used for internal event processing
-   */
-  public static MessageDeframer2 createOnServer(StreamListener listener, Executor executor) {
-    return createOnServer(listener, executor, Compression.NONE);
-  }
-
-  /**
-   * Create a deframer for use on the server-side. All calls to this class must be made in the
-   * context of the provided executor, which also must not allow concurrent processing of Runnables.
-   *
-   * @param listener callback for fully read GRPC messages
-   * @param executor used for internal event processing
-   * @param compression the compression used if a compressed frame is encountered, with NONE meaning
-   *     unsupported
-   */
-  public static MessageDeframer2 createOnServer(StreamListener listener, Executor executor,
-      Compression compression) {
-    return new MessageDeframer2(listener, executor, false, compression);
-  }
-
-  /**
-   * Create a deframer for use on the client-side. All calls to this class must be made in the
-   * context of the provided executor, which also must not allow concurrent processing of Runnables.
-   *
-   * @param listener callback for fully read GRPC messages
-   * @param executor used for internal event processing
-   */
-  public static MessageDeframer2 createOnClient(StreamListener listener, Executor executor) {
-    return createOnClient(listener, executor, Compression.NONE);
-  }
-
-  /**
-   * Create a deframer for use on the client-side. All calls to this class must be made in the
-   * context of the provided executor, which also must not allow concurrent processing of Runnables.
-   *
-   * @param listener callback for fully read GRPC messages
-   * @param executor used for internal event processing
-   * @param compression the compression used if a compressed frame is encountered, with NONE meaning
-   *     unsupported
-   */
-  public static MessageDeframer2 createOnClient(StreamListener listener, Executor executor,
-      Compression compression) {
-    return new MessageDeframer2(listener, executor, true, compression);
-  }
-
-  private final StreamListener listener;
+  private final Sink sink;
   private final Executor executor;
-  private final boolean client;
   private final Compression compression;
   private final Runnable deliveryTask = new Runnable() {
         @Override
@@ -103,16 +56,35 @@
   private CompositeBuffer nextFrame;
   private CompositeBuffer unprocessed = new CompositeBuffer();
 
-  private MessageDeframer2(StreamListener listener, Executor executor, boolean client,
-      Compression compression) {
-    this.listener = Preconditions.checkNotNull(listener, "listener");
+  /**
+   * Create a deframer. All calls to this class must be made in the context of the provided
+   * executor, which also must not allow concurrent processing of Runnables. Compression will not
+   * be supported.
+   *
+   * @param sink callback for fully read GRPC messages
+   * @param executor used for internal event processing
+   */
+  public MessageDeframer2(Sink sink, Executor executor) {
+    this(sink, executor, Compression.NONE);
+  }
+
+  /**
+   * Create a deframer. All calls to this class must be made in the context of the provided
+   * executor, which also must not allow concurrent processing of Runnables.
+   *
+   * @param sink callback for fully read GRPC messages
+   * @param executor used for internal event processing
+   * @param compression the compression used if a compressed frame is encountered, with NONE meaning
+   *     unsupported
+   */
+  public MessageDeframer2(Sink sink, Executor executor, Compression compression) {
+    this.sink = Preconditions.checkNotNull(sink, "sink");
     this.executor = Preconditions.checkNotNull(executor, "executor");
-    this.client = client;
     this.compression = Preconditions.checkNotNull(compression, "compression");
   }
 
   /**
-   * Adds the given data to this deframer and attempts delivery to the listener.
+   * Adds the given data to this deframer and attempts delivery to the sink.
    */
   public void deframe(Buffer data, boolean endOfStream) {
     Preconditions.checkNotNull(data, "data");
@@ -134,9 +106,18 @@
     }
   }
 
+  public void delayProcessing(ListenableFuture<Void> future) {
+    Preconditions.checkState(!deliveryOutstanding, "Only one delay allowed concurrently");
+    if (future != null) {
+      deliveryOutstanding = true;
+      // Once future completes, try to deliver the next message.
+      future.addListener(deliveryTask, executor);
+    }
+  }
+
   /**
    * If there is no outstanding delivery, attempts to read and deliver as many messages to the
-   * listener as possible. Only one outstanding delivery is allowed at a time.
+   * sink as possible. Only one outstanding delivery is allowed at a time.
    */
   private void deliver() {
     if (deliveryOutstanding) {
@@ -151,11 +132,11 @@
           processHeader();
           break;
         case BODY:
-          // Read the body and deliver the message to the listener.
+          // Read the body and deliver the message to the sink.
           deliveryOutstanding = true;
           ListenableFuture<Void> processingFuture = processBody();
           if (processingFuture != null) {
-            // A listener was returned for the completion of processing the delivered
+            // A future was returned for the completion of processing the delivered
             // message. Once it's done, try to deliver the next message.
             processingFuture.addListener(deliveryTask, executor);
             return;
@@ -175,10 +156,7 @@
         // application is properly notified of abortion.
         throw new RuntimeException("Encountered end-of-stream mid-frame");
       }
-      if (!client) {
-        // If on the server-side, we need to notify application of half-close.
-        listener.closed(Status.OK, new Metadata.Trailers());
-      }
+      sink.endOfStream();
     }
   }
 
@@ -241,13 +219,13 @@
         } catch (IOException ex) {
           throw new RuntimeException(ex);
         }
-        future = listener.messageRead(new ByteArrayInputStream(bytes), bytes.length);
+        future = sink.messageRead(new ByteArrayInputStream(bytes), bytes.length);
       } else {
         throw new AssertionError("Unknown compression type");
       }
     } else {
-      // Don't close the frame, since the listener is now responsible for the life-cycle.
-      future = listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
+      // Don't close the frame, since the sink is now responsible for the life-cycle.
+      future = sink.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
       nextFrame = null;
     }
 
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
index 00ee76c..1d990c3 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java
@@ -50,8 +50,7 @@
       this.deframer2 = null;
     } else {
       this.deframer = null;
-      this.deframer2 = MessageDeframer2.createOnClient(
-          inboundMessageHandler(), channel.eventLoop());
+      this.deframer2 = new MessageDeframer2(inboundMessageHandler(), channel.eventLoop());
     }
     windowUpdateManager = new WindowUpdateManager(channel, inboundFlow);
   }
@@ -83,8 +82,14 @@
   public void inboundHeadersRecieved(Http2Headers headers, boolean endOfStream) {
     responseCode = responseCode(headers);
     isGrpcResponse = isGrpcResponse(headers, responseCode);
-    if (!isGrpcResponse && endOfStream) {
-      setStatus(new Status(responseCode), new Metadata.Trailers());
+    if (endOfStream) {
+      if (isGrpcResponse) {
+        // TODO(user): call stashTrailers() as appropriate, then provide endOfStream to
+        // deframer.
+        setStatus(new Status(responseCode), new Metadata.Trailers());
+      } else {
+        setStatus(new Status(responseCode), new Metadata.Trailers());
+      }
     }
   }
 
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java
index 3193e47..e023596 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java
@@ -57,16 +57,6 @@
     super(connection, frameReader, frameWriter, inboundFlow, outboundFlow);
     this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
     this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow");
-
-    // Observe the HTTP/2 connection for events.
-    connection.addListener(new Http2ConnectionAdapter() {
-        @Override
-        public void streamHalfClosed(Http2Stream stream) {
-          if (stream.state() == Http2Stream.State.HALF_CLOSED_REMOTE) {
-            serverStream(stream).remoteEndClosed();
-          }
-        }
-    });
   }
 
   @Override
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerStream.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerStream.java
index e33bce9..cc5c1be 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerStream.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerStream.java
@@ -2,6 +2,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.net.stubby.Metadata;
 import com.google.net.stubby.newtransport.AbstractServerStream;
 import com.google.net.stubby.newtransport.GrpcDeframer;
 import com.google.net.stubby.newtransport.MessageDeframer2;
@@ -35,7 +36,7 @@
       deframer2 = null;
     } else {
       deframer = null;
-      deframer2 = MessageDeframer2.createOnServer(inboundMessageHandler(), channel.eventLoop());
+      deframer2 = new MessageDeframer2(inboundMessageHandler(), channel.eventLoop());
     }
     windowUpdateManager =
         new WindowUpdateManager(channel, Preconditions.checkNotNull(inboundFlow, "inboundFlow"));
@@ -69,6 +70,11 @@
   }
 
   @Override
+  protected void sendTrailers(Metadata.Trailers trailers) {
+    // TODO(user): send trailers
+  }
+
+  @Override
   public int id() {
     return id;
   }