Copy framing classes to newtransport, tweaking some of their APIs

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69489753
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/CompressionFramer.java b/core/src/main/java/com/google/net/stubby/newtransport/CompressionFramer.java
new file mode 100644
index 0000000..82b4943
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/CompressionFramer.java
@@ -0,0 +1,336 @@
+package com.google.net.stubby.newtransport;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.net.stubby.DeferredInputStream;
+import com.google.net.stubby.newtransport.Framer.Sink;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.zip.Deflater;
+
+/**
+ * Compression framer for HTTP/2 transport frames, for use in both compression and
+ * non-compression scenarios. Receives message-stream as input. It is able to change compression
+ * configuration on-the-fly, but will not actually begin using the new configuration until the next
+ * full frame.
+ */
+class CompressionFramer {
+  /**
+   * Compression level to indicate using this class's default level. Note that this value is
+   * allowed to conflict with Deflate.DEFAULT_COMPRESSION, in which case this class's default
+   * prevails.
+   */
+  public static final int DEFAULT_COMPRESSION_LEVEL = -1;
+  private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+  /**
+   * Size of the GRPC compression frame header which consists of:
+   * 1 byte for the compression type,
+   * 3 bytes for the length of the compression frame.
+   */
+  @VisibleForTesting
+  static final int HEADER_LENGTH = 4;
+  /**
+   * Number of frame bytes to reserve to allow for zlib overhead. This does not include data-length
+   * dependent overheads and compression latency (delay between providing data to zlib and output of
+   * the compressed data).
+   *
+   * <p>References:
+   *   deflate framing: http://www.gzip.org/zlib/rfc-deflate.html
+   *     (note that bit-packing is little-endian (section 3.1.1) whereas description of sequences
+   *     is big-endian, so bits appear reversed),
+   *   zlib framing: http://tools.ietf.org/html/rfc1950,
+   *   details on flush behavior: http://www.zlib.net/manual.html
+   */
+  @VisibleForTesting
+  static final int MARGIN
+      = 5 /* deflate current block overhead, assuming no compression:
+             block type (1) + len (2) + nlen (2) */
+      + 5 /* deflate flush; adds an empty block after current:
+             00 (not end; no compression) 00 00 (len) FF FF (nlen) */
+      + 5 /* deflate flush; some versions of zlib output two empty blocks on some flushes */
+      + 5 /* deflate finish; adds empty block to mark end, since we commonly flush before finish:
+             03 (end; fixed Huffman + 5 bits of end of block) 00 (last 3 bits + padding),
+             or if compression level is 0: 01 (end; no compression) 00 00 (len) FF FF (nlen) */
+      + 2 /* zlib header; CMF (1) + FLG (1) */ + 4 /* zlib ADLER32 (4) */
+      + 5 /* additional safety for good measure */;
+
+  private static final Logger log = Logger.getLogger(CompressionFramer.class.getName());
+
+  private final Sink<ByteBuffer> sink;
+  /**
+   * Bytes of frame being constructed. {@code position() == 0} when no frame in progress.
+   */
+  private final ByteBuffer bytebuf;
+  /** Number of frame bytes it is acceptable to leave unused when compressing. */
+  private final int sufficient;
+  private Deflater deflater;
+  /** Number of bytes written to deflater since last deflate sync. */
+  private int writtenSinceSync;
+  /** Number of bytes read from deflater since last deflate sync. */
+  private int readSinceSync;
+  /**
+   * Whether the current frame is actually being compressed. If {@code bytebuf.position() == 0},
+   * then this value has no meaning.
+   */
+  private boolean usingCompression;
+  /**
+   * Whether compression is requested. This does not imply we are compressing the current frame
+   * (see {@link #usingCompression}), or that we will even compress the next frame (see {@link
+   * #compressionUnsupported}).
+   */
+  private boolean allowCompression;
+  /** Whether compression is possible with current configuration and platform. */
+  private final boolean compressionUnsupported;
+  /**
+   * Compression level to set on the Deflater, where {@code DEFAULT_COMPRESSION_LEVEL} implies this
+   * class's default.
+   */
+  private int compressionLevel = DEFAULT_COMPRESSION_LEVEL;
+  private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
+
+  /**
+   * Since compression tries to form full frames, if compression is working well then it will
+   * consecutively compress smaller amounts of input data in order to not exceed the frame size. For
+   * example, if the data is getting 50% compression and a maximum frame size of 128, then it will
+   * encode roughly 128 bytes which leaves 64, so we encode 64, 32, 16, 8, 4, 2, 1, 1.
+   * {@code sufficient} cuts off the long tail and says that at some point the frame is "good
+   * enough" to stop. Choosing a value of {@code 0} is not outrageous.
+   *
+   * @param maxFrameSize maximum number of bytes allowed for output frames
+   * @param allowCompression whether frames should be compressed
+   * @param sufficient number of frame bytes it is acceptable to leave unused when compressing
+   */
+  public CompressionFramer(Sink<ByteBuffer> sink, int maxFrameSize, boolean allowCompression,
+      int sufficient) {
+    this.sink = sink;
+    this.allowCompression = allowCompression;
+    int maxSufficient = maxFrameSize - HEADER_LENGTH - MARGIN
+        - 1 /* to force at least one byte of data */;
+    boolean compressionUnsupported = false;
+    if (maxSufficient < 0) {
+      compressionUnsupported = true;
+      log.log(Level.INFO, "Frame not large enough for compression");
+    } else if (maxSufficient < sufficient) {
+      log.log(Level.INFO, "Compression sufficient reduced to {0} from {1} to fit in frame size {2}",
+          new Object[] {maxSufficient, sufficient, maxFrameSize});
+      sufficient = maxSufficient;
+    }
+    this.sufficient = sufficient;
+    // TODO(user): Benchmark before switching to direct buffers
+    bytebuf = ByteBuffer.allocate(maxFrameSize);
+    if (!bytebuf.hasArray()) {
+      compressionUnsupported = true;
+      log.log(Level.INFO, "Byte buffer doesn't support array(), which is required for compression");
+    }
+    this.compressionUnsupported = compressionUnsupported;
+  }
+
+  /**
+   * Sets whether compression is encouraged.
+   */
+  public void setAllowCompression(boolean allow) {
+    this.allowCompression = allow;
+  }
+
+  /**
+   * Set the preferred compression level for when compression is enabled.
+   *
+   * @param level the preferred compression level (0-9), or {@code DEFAULT_COMPRESSION_LEVEL} to use
+   *     this class's default
+   * @see java.util.zip.Deflater#setLevel
+   */
+  public void setCompressionLevel(int level) {
+    Preconditions.checkArgument(level == DEFAULT_COMPRESSION_LEVEL
+        || (level >= Deflater.NO_COMPRESSION && level <= Deflater.BEST_COMPRESSION),
+        "invalid compression level");
+    this.compressionLevel = level;
+  }
+
+  /**
+   * Ensures state and buffers are initialized for writing data to a frame. Callers should be very
+   * aware this method may modify {@code usingCompression}.
+   */
+  private void checkInitFrame() {
+    if (bytebuf.position() != 0) {
+      return;
+    }
+    bytebuf.position(HEADER_LENGTH);
+    usingCompression = compressionUnsupported ? false : allowCompression;
+    if (usingCompression) {
+      if (deflater == null) {
+        deflater = new Deflater();
+      } else {
+        deflater.reset();
+      }
+      deflater.setLevel(compressionLevel == DEFAULT_COMPRESSION_LEVEL
+          ? Deflater.DEFAULT_COMPRESSION : compressionLevel);
+      writtenSinceSync = 0;
+      readSinceSync = 0;
+    }
+  }
+
+  /** Frame contents of {@code message}, flushing to {@code sink} as necessary. */
+  public int write(InputStream message) throws IOException {
+    checkInitFrame();
+    if (!usingCompression && bytebuf.hasArray()) {
+      if (bytebuf.remaining() == 0) {
+        commitToSink(false, false);
+      }
+      int available = message.available();
+      if (available <= bytebuf.remaining()) {
+        // When InputStream is DeferredProtoInputStream, this is zero-copy because bytebuf is large
+        // enough for the proto to be serialized directly into it.
+        int read = ByteStreams.read(message,
+            bytebuf.array(), bytebuf.arrayOffset() + bytebuf.position(), bytebuf.remaining());
+        bytebuf.position(bytebuf.position() + read);
+        if (read != available) {
+          throw new RuntimeException("message.available() did not follow our semantics of always "
+              + "returning the number of remaining bytes");
+        }
+        return read;
+      }
+    }
+    if (message instanceof DeferredInputStream) {
+      return ((DeferredInputStream) message).flushTo(outputStreamAdapter);
+    } else {
+      // This could be optimized when compression is off, but we expect performance-critical code
+      // to provide a DeferredInputStream.
+      return (int) ByteStreams.copy(message, outputStreamAdapter);
+    }
+  }
+
+  /**
+   * Frame contents of {@code b} between {@code off} (inclusive) and {@code off + len} (exclusive),
+   * flushing to {@code sink} as necessary.
+   */
+  public void write(byte[] b, int off, int len) {
+    while (len > 0) {
+      checkInitFrame();
+      if (!usingCompression) {
+        if (bytebuf.remaining() == 0) {
+          commitToSink(false, false);
+          continue;
+        }
+        int toWrite = Math.min(len, bytebuf.remaining());
+        bytebuf.put(b, off, toWrite);
+        off += toWrite;
+        len -= toWrite;
+      } else {
+        if (bytebuf.remaining() <= MARGIN + sufficient) {
+          commitToSink(false, false);
+          continue;
+        }
+        // Amount of memory that is guaranteed not to be consumed, including in-flight data in zlib.
+        int safeCapacity = bytebuf.remaining() - MARGIN
+            - (writtenSinceSync - readSinceSync) - dataLengthDependentOverhead(writtenSinceSync);
+        if (safeCapacity <= 0) {
+          while (deflatePut(deflater, bytebuf, Deflater.SYNC_FLUSH) != 0) {}
+          writtenSinceSync = 0;
+          readSinceSync = 0;
+          continue;
+        }
+        int toWrite = Math.min(len, safeCapacity - dataLengthDependentOverhead(safeCapacity));
+        deflater.setInput(b, off, toWrite);
+        writtenSinceSync += toWrite;
+        while (!deflater.needsInput()) {
+          readSinceSync += deflatePut(deflater, bytebuf, Deflater.NO_FLUSH);
+        }
+        // Clear internal references of byte[] b.
+        deflater.setInput(EMPTY_BYTE_ARRAY);
+        off += toWrite;
+        len -= toWrite;
+      }
+    }
+  }
+
+  /**
+   * When data is uncompressable, there are 5B of overhead per deflate block, which is generally
+   * 16 KiB for zlib, but the format supports up to 32 KiB. One block's overhead is already
+   * accounted for in MARGIN. We use 1B/2KiB to circumvent dealing with rounding errors. Note that
+   * 1B/2KiB is not enough to support 8 KiB blocks due to rounding errors.
+   */
+  private static int dataLengthDependentOverhead(int length) {
+    return length / 2048;
+  }
+
+  private static int deflatePut(Deflater deflater, ByteBuffer bytebuf, int flush) {
+    if (bytebuf.remaining() == 0) {
+      throw new AssertionError("Compressed data exceeded frame size");
+    }
+    int deflateBytes = deflater.deflate(bytebuf.array(), bytebuf.arrayOffset() + bytebuf.position(),
+        bytebuf.remaining(), flush);
+    bytebuf.position(bytebuf.position() + deflateBytes);
+    return deflateBytes;
+  }
+
+  public void endOfMessage() {
+    if ((!usingCompression && bytebuf.remaining() == 0)
+        || (usingCompression && bytebuf.remaining() <= MARGIN + sufficient)) {
+      commitToSink(true, false);
+    }
+  }
+
+  public void flush() {
+    if (bytebuf.position() == 0) {
+      return;
+    }
+    commitToSink(true, false);
+  }
+
+  public void close() {
+    if (bytebuf.position() == 0) {
+      // No pending frame, so send an empty one.
+      bytebuf.flip();
+      sink.deliverFrame(bytebuf, true);
+      bytebuf.clear();
+    } else {
+      commitToSink(true, true);
+    }
+  }
+
+  /**
+   * Writes compression frame to sink. It does not initialize the next frame, so {@link
+   * #checkInitFrame()} is necessary if other frames are to follow.
+   */
+  private void commitToSink(boolean endOfMessage, boolean endOfStream) {
+    if (usingCompression) {
+      deflater.finish();
+      while (!deflater.finished()) {
+        deflatePut(deflater, bytebuf, Deflater.NO_FLUSH);
+      }
+      if (endOfMessage) {
+        deflater.end();
+        deflater = null;
+      }
+    }
+    int frameFlag = usingCompression
+        ? TransportFrameUtil.FLATE_FLAG : TransportFrameUtil.NO_COMPRESS_FLAG;
+    // Header = 1b flag | 3b length of GRPC frame
+    int header = (frameFlag << 24) | (bytebuf.position() - 4);
+    bytebuf.putInt(0, header);
+    bytebuf.flip();
+    sink.deliverFrame(bytebuf, endOfStream);
+    bytebuf.clear();
+  }
+
+  private class OutputStreamAdapter extends OutputStream {
+    private final byte[] singleByte = new byte[1];
+
+    @Override
+    public void write(int b) {
+      singleByte[0] = (byte) b;
+      write(singleByte, 0, 1);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) {
+      CompressionFramer.this.write(b, off, len);
+    }
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
new file mode 100644
index 0000000..06b26e5
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
@@ -0,0 +1,162 @@
+package com.google.net.stubby.newtransport;
+
+import com.google.common.io.ByteStreams;
+import com.google.net.stubby.GrpcFramingUtil;
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Base implementation that joins a sequence of framed GRPC data produced by a {@link Framer},
+ * reconstructs their messages and hands them off to a receiving {@link Operation}
+ */
+public abstract class Deframer<F> implements Framer.Sink<F> {
+
+  /**
+   *  Unset frame length
+   */
+  private static final int LENGTH_NOT_SET = -1;
+
+  private final Framer target;
+  private boolean inFrame;
+  private byte currentFlags;
+  private int currentLength = LENGTH_NOT_SET;
+
+  public Deframer(Framer target) {
+    this.target = target;
+  }
+
+  @Override
+  public void deliverFrame(F frame, boolean endOfStream) {
+    int remaining = internalDeliverFrame(frame);
+    if (endOfStream && remaining > 0) {
+      target.writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame"));
+    }
+  }
+
+  /**
+   * Consume a frame of bytes provided by the transport. Note that transport framing is not
+   * aligned on GRPC frame boundaries so this code needs to do bounds checking and buffering
+   * across transport frame boundaries.
+   *
+   * @return the number of unconsumed bytes remaining in the buffer
+   */
+  private int internalDeliverFrame(F frame) {
+    try {
+      frame = decompress(frame);
+      DataInputStream grpcStream = prefix(frame);
+      // Loop until no more GRPC frames can be fully decoded
+      while (true) {
+        if (!inFrame) {
+          // Not in frame so attempt to read flags
+          if (!ensure(grpcStream, GrpcFramingUtil.FRAME_TYPE_LENGTH)) {
+            return consolidate();
+          }
+          currentFlags = grpcStream.readByte();
+          inFrame = true;
+        }
+        if (currentLength == LENGTH_NOT_SET) {
+          // Read the frame length
+          if (!ensure(grpcStream, GrpcFramingUtil.FRAME_LENGTH)) {
+            return consolidate();
+          }
+          currentLength = grpcStream.readInt();
+        }
+        // Ensure that the entire frame length is available to read
+        InputStream framedChunk = ensureMessage(grpcStream, currentLength);
+        if (framedChunk == null) {
+          // Insufficient bytes available
+          return consolidate();
+        }
+        if (GrpcFramingUtil.isPayloadFrame(currentFlags)) {
+          // Advance stream now, because target.addPayload() may not or may process the frame on
+          // another thread.
+          framedChunk = new ByteArrayInputStream(ByteStreams.toByteArray(framedChunk));
+          try {
+            // Report payload to the receiving operation
+            target.writePayload(framedChunk, currentLength);
+          } finally {
+            currentLength = LENGTH_NOT_SET;
+            inFrame = false;
+          }
+        } else if (GrpcFramingUtil.isContextValueFrame(currentFlags)) {
+          // Not clear if using proto encoding here is of any benefit.
+          // Using ContextValue.parseFrom requires copying out of the framed chunk
+          // Writing a custom parser would have to do varint handling and potentially
+          // deal with out-of-order tags etc.
+          Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk);
+          try {
+            target.writeContext(contextValue.getKey(),
+                contextValue.getValue().newInput(), currentLength);
+          } finally {
+            currentLength = LENGTH_NOT_SET;
+            inFrame = false;
+          }
+        } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
+          int status = framedChunk.read() << 8 | framedChunk.read();
+          Transport.Code code = Transport.Code.valueOf(status);
+          // TODO(user): Resolve what to do with remainder of framedChunk
+          try {
+            if (code == null) {
+              // Log for unknown code
+              target.writeStatus(
+                  new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
+            } else {
+              target.writeStatus(new Status(code));
+            }
+          } finally {
+            currentLength = LENGTH_NOT_SET;
+            inFrame = false;
+          }
+        }
+        if (grpcStream.available() == 0) {
+          // We've processed all the data so consolidate the underlying buffers
+          return consolidate();
+        }
+      }
+    } catch (IOException ioe) {
+      Status status = new Status(Transport.Code.UNKNOWN, ioe);
+      target.writeStatus(status);
+      throw status.asRuntimeException();
+    }
+  }
+
+  /**
+   * Return a stream view over the current buffer prefixed to the input frame
+   */
+  protected abstract DataInputStream prefix(F frame) throws IOException;
+
+  /**
+   * Consolidate the underlying buffers and return the number of buffered bytes remaining
+   */
+  protected abstract int consolidate() throws IOException;
+
+  /**
+   * Decompress the raw frame buffer prior to prefixing it.
+   */
+  protected abstract F decompress(F frame) throws IOException;
+
+  /**
+   * Ensure that {@code len} bytes are available in the buffer and frame
+   */
+  private boolean ensure(InputStream input, int len) throws IOException {
+    return (input.available() >= len);
+  }
+
+  /**
+   * Return a message of {@code len} bytes than can be read from the buffer. If sufficient
+   * bytes are unavailable then buffer the available bytes and return null.
+   */
+  private InputStream ensureMessage(InputStream input, int len)
+      throws IOException {
+    if (input.available() < len) {
+      return null;
+    }
+    return ByteStreams.limit(input, len);
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Framer.java b/core/src/main/java/com/google/net/stubby/newtransport/Framer.java
new file mode 100644
index 0000000..125ae59
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Framer.java
@@ -0,0 +1,58 @@
+package com.google.net.stubby.newtransport;
+
+import com.google.net.stubby.Status;
+
+import java.io.InputStream;
+
+/**
+ * Implementations produce the GRPC byte sequence and then split it over multiple frames to be
+ * delivered via the transport layer which implements {@link Framer.Sink}
+ */
+public interface Framer {
+
+  /**
+   * Sink implemented by the transport layer to receive frames and forward them to their
+   * destination
+   */
+  public interface Sink<T> {
+    /**
+     * Deliver a frame via the transport.
+     * @param frame the contents of the frame to deliver
+     * @param endOfStream whether the frame is the last one for the GRPC stream
+     */
+    public void deliverFrame(T frame, boolean endOfStream);
+  }
+
+  /**
+   * Write out a Context-Value message. {@code message} will be completely consumed.
+   * {@code message.available()} must return the number of remaining bytes to be read.
+   */
+  public void writeContext(String type, InputStream message, int length);
+
+  /**
+   * Write out a Payload message. {@code payload} will be completely consumed.
+   * {@code payload.available()} must return the number of remaining bytes to be read.
+   */
+  public void writePayload(InputStream payload, int length);
+
+  /**
+   * Write out a Status message.
+   */
+  // TODO(user): change this signature when we actually start writing out the complete Status.
+  public void writeStatus(Status status);
+
+  /**
+   * Flush any buffered data in the framer to the sink.
+   */
+  public void flush();
+
+  /**
+   * Flushes and closes the framer and releases any buffers.
+   */
+  public void close();
+
+  /**
+   * Closes the framer and releases any buffers, but does not flush.
+   */
+  public void dispose();
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java b/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java
new file mode 100644
index 0000000..c0bdbae
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java
@@ -0,0 +1,149 @@
+package com.google.net.stubby.newtransport;
+
+import com.google.common.io.ByteStreams;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Deframer that expects the input frames to be provided as {@link InputStream} instances
+ * which accurately report their size using {@link java.io.InputStream#available()}.
+ */
+public class InputStreamDeframer extends Deframer<InputStream> {
+
+  private final PrefixingInputStream prefixingInputStream;
+
+  public InputStreamDeframer(Framer target) {
+    super(target);
+    prefixingInputStream = new PrefixingInputStream(4096);
+  }
+
+  /**
+   * Deframing a single input stream that contains multiple GRPC frames
+   */
+  @Override
+  public void deliverFrame(InputStream frame, boolean endOfStream) {
+    super.deliverFrame(frame, endOfStream);
+    try {
+      if (frame.available() > 0) {
+        throw new AssertionError();
+      }
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  protected DataInputStream prefix(InputStream frame) throws IOException {
+    prefixingInputStream.consolidate();
+    prefixingInputStream.prefix(frame);
+    return new DataInputStream(prefixingInputStream);
+  }
+
+  @Override
+  protected int consolidate() throws IOException {
+    prefixingInputStream.consolidate();
+    return prefixingInputStream.available();
+  }
+
+  @Override
+  protected InputStream decompress(InputStream frame) throws IOException {
+    int compressionType = frame.read();
+    int frameLength =  frame.read() << 16 | frame.read() << 8 | frame.read();
+    InputStream raw = ByteStreams.limit(frame, frameLength);
+    if (TransportFrameUtil.isNotCompressed(compressionType)) {
+      return raw;
+    } else if (TransportFrameUtil.isFlateCompressed(compressionType)) {
+      return new InflaterInputStream(raw);
+    }
+    throw new IOException("Unknown compression type " + compressionType);
+  }
+
+  /**
+   * InputStream that prefixes another input stream with a fixed buffer.
+   */
+  private class PrefixingInputStream extends InputStream {
+
+    private InputStream suffix;
+    private byte[] buffer;
+    private int bufferIndex;
+    private int maxRetainedBuffer;
+
+    private PrefixingInputStream(int maxRetainedBuffer) {
+      // TODO(user): Implement support for this.
+      this.maxRetainedBuffer = maxRetainedBuffer;
+    }
+
+    void prefix(InputStream suffix) {
+      this.suffix = suffix;
+    }
+
+    void consolidate() throws IOException {
+      int remainingSuffix = suffix == null ? 0 : suffix.available();
+      if (remainingSuffix == 0) {
+        // No suffix so clear
+        suffix = null;
+        return;
+      }
+      int bufferLength = buffer == null ? 0 : buffer.length;
+      int bytesInBuffer = bufferLength - bufferIndex;
+      // Shift existing bytes
+      if (bufferLength < bytesInBuffer + remainingSuffix) {
+        // Buffer too small, so create a new buffer before copying in the suffix
+        byte[] newBuffer = new byte[bytesInBuffer + remainingSuffix];
+        if (bytesInBuffer > 0) {
+          System.arraycopy(buffer, bufferIndex, newBuffer, 0, bytesInBuffer);
+        }
+        buffer = newBuffer;
+        bufferIndex = 0;
+      } else {
+        // Enough space is in buffer, so shift the existing bytes to open up exactly enough bytes
+        // for the suffix at the end.
+        System.arraycopy(buffer, bufferIndex, buffer, bufferIndex - remainingSuffix, bytesInBuffer);
+        bufferIndex -= remainingSuffix;
+      }
+      // Write suffix to buffer
+      ByteStreams.readFully(suffix, buffer, buffer.length - remainingSuffix, remainingSuffix);
+      suffix = null;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      int read = readFromBuffer(b, off, len);
+      if (suffix != null) {
+        read += suffix.read(b, off + read, len - read);
+      }
+      return read;
+    }
+
+    private int readFromBuffer(byte[] b, int off, int len) {
+      if (buffer == null) {
+        return 0;
+      }
+      len = Math.min(buffer.length - bufferIndex, len);
+      System.arraycopy(buffer, bufferIndex, b, off, len);
+      bufferIndex += len;
+      return len;
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (buffer == null || bufferIndex == buffer.length) {
+        return suffix == null ? -1 : suffix.read();
+      }
+      return buffer[bufferIndex++];
+    }
+
+    @Override
+    public int available() throws IOException {
+      int available = buffer != null ? buffer.length - bufferIndex : 0;
+      if (suffix != null) {
+        // FIXME(ejona): This is likely broken with compressed streams.
+        available += suffix.available();
+      }
+      return available;
+    }
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/MessageFramer.java b/core/src/main/java/com/google/net/stubby/newtransport/MessageFramer.java
new file mode 100644
index 0000000..f74b837
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/MessageFramer.java
@@ -0,0 +1,177 @@
+package com.google.net.stubby.newtransport;
+
+import com.google.net.stubby.GrpcFramingUtil;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Transport;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+/**
+ * Default {@link Framer} implementation.
+ */
+public class MessageFramer implements Framer {
+
+  /**
+   * Size of the GRPC message frame header which consists of
+   * 1 byte for the type (payload, context, status)
+   * 4 bytes for the length of the message
+   */
+  private static final int MESSAGE_HEADER_SIZE = 5;
+
+  /**
+   * UTF-8 charset which is used for key name encoding in context values
+   */
+  private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+  /**
+   * Precomputed protobuf tags for ContextValue
+   */
+  private static final byte[] VALUE_TAG;
+  private static final byte[] KEY_TAG;
+
+
+  static {
+    // Initialize constants for serializing context-value in a protobuf compatible manner
+    try {
+      byte[] buf = new byte[8];
+      CodedOutputStream coded = CodedOutputStream.newInstance(buf);
+      coded.writeTag(Transport.ContextValue.KEY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+      coded.flush();
+      KEY_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten());
+      coded = CodedOutputStream.newInstance(buf);
+      coded.writeTag(Transport.ContextValue.VALUE_FIELD_NUMBER,
+          WireFormat.WIRETYPE_LENGTH_DELIMITED);
+      coded.flush();
+      VALUE_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten());
+    } catch (IOException ioe) {
+      // Unrecoverable
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private CompressionFramer framer;
+  private final ByteBuffer scratch = ByteBuffer.allocate(16);
+
+  public MessageFramer(Sink<ByteBuffer> sink, int maxFrameSize) {
+    // TODO(user): maxFrameSize should probably come from a 'Platform' class
+    framer = new CompressionFramer(sink, maxFrameSize, false, maxFrameSize / 16);
+  }
+
+  /**
+   * Sets whether compression is encouraged.
+   */
+  public void setAllowCompression(boolean enable) {
+    framer.setAllowCompression(enable);
+  }
+
+  /**
+   * Set the preferred compression level for when compression is enabled.
+   * @param level the preferred compression level, or {@code -1} to use the framing default
+   * @see java.util.zip.Deflater#setLevel
+   */
+  public void setCompressionLevel(int level) {
+    framer.setCompressionLevel(level);
+  }
+
+  @Override
+  public void writePayload(InputStream message, int messageLength) {
+    try {
+      scratch.clear();
+      scratch.put(GrpcFramingUtil.PAYLOAD_FRAME);
+      scratch.putInt(messageLength);
+      framer.write(scratch.array(), 0, scratch.position());
+      if (messageLength != framer.write(message)) {
+        throw new RuntimeException("Message length was inaccurate");
+      }
+      framer.endOfMessage();
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+
+  @Override
+  public void writeContext(String key, InputStream message, int messageLen) {
+    try {
+      scratch.clear();
+      scratch.put(GrpcFramingUtil.CONTEXT_VALUE_FRAME);
+      byte[] keyBytes = key.getBytes(UTF_8);
+      int lenKeyPrefix = KEY_TAG.length +
+          CodedOutputStream.computeRawVarint32Size(keyBytes.length);
+      int lenValPrefix = VALUE_TAG.length + CodedOutputStream.computeRawVarint32Size(messageLen);
+      int totalLen = lenKeyPrefix + keyBytes.length + lenValPrefix + messageLen;
+      scratch.putInt(totalLen);
+      framer.write(scratch.array(), 0, scratch.position());
+
+      // Write key
+      scratch.clear();
+      scratch.put(KEY_TAG);
+      writeRawVarInt32(keyBytes.length, scratch);
+      framer.write(scratch.array(), 0, scratch.position());
+      framer.write(keyBytes, 0, keyBytes.length);
+
+      // Write value
+      scratch.clear();
+      scratch.put(VALUE_TAG);
+      writeRawVarInt32(messageLen, scratch);
+      framer.write(scratch.array(), 0, scratch.position());
+      if (messageLen != framer.write(message)) {
+        throw new RuntimeException("Message length was inaccurate");
+      }
+      framer.endOfMessage();
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Override
+  public void writeStatus(Status status) {
+    short code = (short) status.getCode().ordinal();
+    scratch.clear();
+    scratch.put(GrpcFramingUtil.STATUS_FRAME);
+    int length = 2;
+    scratch.putInt(length);
+    scratch.putShort(code);
+    framer.write(scratch.array(), 0, scratch.position());
+    framer.endOfMessage();
+  }
+
+  @Override
+  public void flush() {
+    framer.flush();
+  }
+
+  @Override
+  public void close() {
+    // TODO(user): Returning buffer to a pool would go here
+    framer.close();
+    framer = null;
+  }
+
+  @Override
+  public void dispose() {
+    // TODO(user): Returning buffer to a pool would go here
+    framer = null;
+  }
+
+  /**
+   * Write a raw VarInt32 to the buffer
+   */
+  private static void writeRawVarInt32(int value, ByteBuffer bytebuf) {
+    while (true) {
+      if ((value & ~0x7F) == 0) {
+        bytebuf.put((byte) value);
+        return;
+      } else {
+        bytebuf.put((byte) ((value & 0x7F) | 0x80));
+        value >>>= 7;
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java b/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java
new file mode 100644
index 0000000..e543961
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java
@@ -0,0 +1,23 @@
+package com.google.net.stubby.newtransport;
+
+/**
+ * Utility functions for transport layer framing.
+ *
+ * Within a given transport frame we reserve the first byte to indicate the
+ * type of compression used for the contents of the transport frame.
+ */
+public class TransportFrameUtil {
+
+  // Compression modes (lowest order 3 bits of frame flags)
+  public static final byte NO_COMPRESS_FLAG = 0x0;
+  public static final byte FLATE_FLAG =  0x1;
+  public static final byte COMPRESSION_FLAG_MASK = 0x7;
+
+  public static boolean isNotCompressed(int b) {
+    return ((b & COMPRESSION_FLAG_MASK) == NO_COMPRESS_FLAG);
+  }
+
+  public static boolean isFlateCompressed(int b) {
+    return ((b & COMPRESSION_FLAG_MASK) == FLATE_FLAG);
+  }
+}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/CompressionFramerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/CompressionFramerTest.java
new file mode 100644
index 0000000..b93875a
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/newtransport/CompressionFramerTest.java
@@ -0,0 +1,99 @@
+package com.google.net.stubby.newtransport;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Bytes;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.Deflater;
+import java.util.zip.InflaterInputStream;
+
+/** Unit tests for {@link CompressionFramer}. */
+@RunWith(JUnit4.class)
+public class CompressionFramerTest {
+  private int maxFrameSize = 1024;
+  private int sufficient = 8;
+  private CapturingSink sink = new CapturingSink();
+  private CompressionFramer framer = new CompressionFramer(sink, maxFrameSize, true, sufficient);
+
+  @Test
+  public void testGoodCompression() {
+    byte[] payload = new byte[1000];
+    framer.setCompressionLevel(Deflater.BEST_COMPRESSION);
+    framer.write(payload, 0, payload.length);
+    framer.endOfMessage();
+    framer.flush();
+
+    assertEquals(1, sink.frames.size());
+    byte[] frame = sink.frames.get(0);
+    assertEquals(TransportFrameUtil.FLATE_FLAG, frame[0]);
+    assertTrue(decodeFrameLength(frame) < 30);
+    assertArrayEquals(payload, decompress(frame));
+  }
+
+  @Test
+  public void testPoorCompression() {
+    byte[] payload = new byte[3 * maxFrameSize / 2];
+    new Random(1).nextBytes(payload);
+    framer.setCompressionLevel(Deflater.DEFAULT_COMPRESSION);
+    framer.write(payload, 0, payload.length);
+    framer.endOfMessage();
+    framer.flush();
+
+    assertEquals(2, sink.frames.size());
+    assertEquals(TransportFrameUtil.FLATE_FLAG, sink.frames.get(0)[0]);
+    assertEquals(TransportFrameUtil.FLATE_FLAG, sink.frames.get(1)[0]);
+    assertTrue(decodeFrameLength(sink.frames.get(0)) <= maxFrameSize);
+    assertTrue(decodeFrameLength(sink.frames.get(0))
+        >= maxFrameSize - CompressionFramer.HEADER_LENGTH - CompressionFramer.MARGIN - sufficient);
+    assertArrayEquals(payload, decompress(sink.frames));
+  }
+
+  private static int decodeFrameLength(byte[] frame) {
+    return ((frame[1] & 0xFF) << 16)
+        | ((frame[2] & 0xFF) << 8)
+        | (frame[3] & 0xFF);
+  }
+
+  private static byte[] decompress(byte[] frame) {
+    try {
+      return ByteStreams.toByteArray(new InflaterInputStream(new ByteArrayInputStream(frame,
+          CompressionFramer.HEADER_LENGTH, frame.length - CompressionFramer.HEADER_LENGTH)));
+    } catch (IOException ex) {
+      throw new AssertionError();
+    }
+  }
+
+  private static byte[] decompress(List<byte[]> frames) {
+    byte[][] bytes = new byte[frames.size()][];
+    for (int i = 0; i < frames.size(); i++) {
+      bytes[i] = decompress(frames.get(i));
+    }
+    return Bytes.concat(bytes);
+  }
+
+  private static class CapturingSink implements Framer.Sink<ByteBuffer> {
+    public final List<byte[]> frames = Lists.newArrayList();
+
+    @Override
+    public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
+      byte[] frameBytes = new byte[frame.remaining()];
+      frame.get(frameBytes);
+      assertEquals(frameBytes.length - CompressionFramer.HEADER_LENGTH,
+          decodeFrameLength(frameBytes));
+      frames.add(frameBytes);
+    }
+  }
+}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/MessageFramerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/MessageFramerTest.java
new file mode 100644
index 0000000..11730a7
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/newtransport/MessageFramerTest.java
@@ -0,0 +1,127 @@
+package com.google.net.stubby.newtransport;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.io.ByteBuffers;
+import com.google.common.primitives.Bytes;
+import com.google.net.stubby.GrpcFramingUtil;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Transport;
+import com.google.protobuf.ByteString;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Tests for {@link MessageFramer}
+ */
+@RunWith(JUnit4.class)
+public class MessageFramerTest {
+
+  public static final int TRANSPORT_FRAME_SIZE = 57;
+
+  @Test
+  public void testPayload() throws Exception {
+    CapturingSink sink = new CapturingSink();
+    MessageFramer framer = new MessageFramer(sink, TRANSPORT_FRAME_SIZE);
+    byte[] payload = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+    byte[] unframedStream =
+        Bytes.concat(
+            new byte[]{GrpcFramingUtil.PAYLOAD_FRAME},
+            new byte[]{0, 0, 0, (byte) payload.length},
+            payload);
+    for (int i = 0; i < 1000; i++) {
+      framer.writePayload(new ByteArrayInputStream(payload), payload.length);
+      if ((i + 1) % 13 == 0) {
+        // Test flushing periodically
+        framer.flush();
+      }
+    }
+    framer.flush();
+    assertEquals(sink.deframedStream.length, unframedStream.length * 1000);
+    for (int i = 0; i < 1000; i++) {
+      assertArrayEquals(unframedStream,
+          Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length,
+              (i + 1) * unframedStream.length));
+    }
+  }
+
+  @Test
+  public void testContext() throws Exception {
+    CapturingSink sink = new CapturingSink();
+    MessageFramer framer = new MessageFramer(sink, TRANSPORT_FRAME_SIZE);
+    byte[] payload = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+    byte[] contextValue = Transport.ContextValue.newBuilder()
+        .setKey("somekey")
+        .setValue(ByteString.copyFrom(payload))
+        .build().toByteArray();
+    byte[] unframedStream =
+        Bytes.concat(
+            new byte[]{GrpcFramingUtil.CONTEXT_VALUE_FRAME},
+            new byte[]{0, 0,
+                (byte) (contextValue.length >> 8 & 0xff),
+                (byte) (contextValue.length & 0xff)},
+            contextValue);
+    for (int i = 0; i < 1000; i++) {
+      framer.writeContext("somekey", new ByteArrayInputStream(payload), payload.length);
+      if ((i + 1) % 13 == 0) {
+        framer.flush();
+      }
+    }
+    framer.flush();
+    assertEquals(unframedStream.length * 1000, sink.deframedStream.length);
+    for (int i = 0; i < 1000; i++) {
+      assertArrayEquals(unframedStream,
+          Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length,
+              (i + 1) * unframedStream.length));
+    }
+  }
+
+  @Test
+  public void testStatus() throws Exception {
+    CapturingSink sink = new CapturingSink();
+    MessageFramer framer = new MessageFramer(sink, TRANSPORT_FRAME_SIZE);
+    byte[] unframedStream = Bytes.concat(
+            new byte[]{GrpcFramingUtil.STATUS_FRAME},
+            new byte[]{0, 0, 0, 2},  // Len is 2 bytes
+            new byte[]{0, 13});  // Internal==13
+    for (int i = 0; i < 1000; i++) {
+      framer.writeStatus(new Status(Transport.Code.INTERNAL));
+      if ((i + 1) % 13 == 0) {
+        framer.flush();
+      }
+    }
+    framer.flush();
+    assertEquals(sink.deframedStream.length, unframedStream.length * 1000);
+    for (int i = 0; i < 1000; i++) {
+      assertArrayEquals(unframedStream,
+          Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length,
+              (i + 1) * unframedStream.length));
+    }
+  }
+
+  static class CapturingSink implements Framer.Sink<ByteBuffer> {
+
+    byte[] deframedStream = new byte[0];
+
+    @Override
+    public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
+      assertTrue(frame.remaining() <= TRANSPORT_FRAME_SIZE);
+      // Frame must contain compression flag & 24 bit length
+      int header = frame.getInt();
+      byte flag = (byte) (header >>> 24);
+      int length = header & 0xFFFFFF;
+      assertTrue(TransportFrameUtil.isNotCompressed(flag));
+      assertEquals(frame.remaining(), length);
+      // Frame must exceed dictated transport frame size
+      deframedStream = Bytes.concat(deframedStream,  ByteBuffers.extractBytes(frame));
+    }
+  }
+}