New Buffer type for transport API.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=70056282
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java
new file mode 100644
index 0000000..639d9db
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java
@@ -0,0 +1,59 @@
+package com.google.net.stubby.newtransport;
+
+/**
+ * Abstract base class for {@link Buffer} implementations.
+ */
+public abstract class AbstractBuffer implements Buffer {
+
+  @Override
+  public final int readUnsignedMedium() {
+    checkReadable(3);
+    int b1 = readUnsignedByte();
+    int b2 = readUnsignedByte();
+    int b3 = readUnsignedByte();
+    return b1 << 16 | b2 << 8 | b3;
+  }
+
+
+  @Override
+  public final int readUnsignedShort() {
+    checkReadable(2);
+    int b1 = readUnsignedByte();
+    int b2 = readUnsignedByte();
+    return b1 << 8 | b2;
+  }
+
+  @Override
+  public final int readInt() {
+    checkReadable(4);
+    int b1 = readUnsignedByte();
+    int b2 = readUnsignedByte();
+    int b3 = readUnsignedByte();
+    int b4 = readUnsignedByte();
+    return (b1 << 24) + (b2 << 16) + (b3 << 8) + b4;
+  }
+
+  @Override
+  public boolean hasArray() {
+    return false;
+  }
+
+  @Override
+  public byte[] array() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int arrayOffset() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() {}
+
+  protected final void checkReadable(int length) {
+    if (readableBytes() < length) {
+      throw new IndexOutOfBoundsException();
+    }
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Buffer.java b/core/src/main/java/com/google/net/stubby/newtransport/Buffer.java
new file mode 100644
index 0000000..a52a363
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Buffer.java
@@ -0,0 +1,111 @@
+package com.google.net.stubby.newtransport;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Interface for an abstract byte buffer. Buffers are intended to be a read-only, except for the
+ * read position which is incremented after each read call.
+ *
+ * <p>Buffers may optionally expose a backing array for optimization purposes, similar to what is
+ * done in {@link ByteBuffer}. It is not expected that callers will attempt to modify the backing
+ * array.
+ */
+public interface Buffer extends Closeable {
+
+  /**
+   * Gets the current number of readable bytes remaining in this buffer.
+   */
+  int readableBytes();
+
+  /**
+   * Reads the next unsigned byte from this buffer and increments the read position by 1. If the
+   * required bytes are not readable, throws {@link IndexOutOfBoundsException}.
+   */
+  int readUnsignedByte();
+
+  /**
+   * Reads a 3-byte unsigned integer from this buffer using big-endian byte ordering. Increments the
+   * read position by 3. If the required bytes are not readable, throws
+   * {@link IndexOutOfBoundsException}.
+   */
+  int readUnsignedMedium();
+
+  /**
+   * Reads a 2-byte unsigned integer from this buffer using big-endian byte ordering. Increments the
+   * read position by 2. If the required bytes are not readable, throws
+   * {@link IndexOutOfBoundsException}.
+   */
+  int readUnsignedShort();
+
+  /**
+   * Reads a 4-byte signed integer from this buffer using big-endian byte ordering. Increments the
+   * read position by 4. If the required bytes are not readable, throws
+   * {@link IndexOutOfBoundsException}.
+   */
+  int readInt();
+
+  /**
+   * Increments the read position by the given length. If the skipped bytes are not readable, throws
+   * {@link IndexOutOfBoundsException}.
+   */
+  void skipBytes(int length);
+
+  /**
+   * Reads {@code length} bytes from this buffer and writes them to the destination array.
+   * Increments the read position by {@code length}. If the required bytes are not readable or
+   * {@code dest} array is to small, throws {@link IndexOutOfBoundsException}.
+   *
+   * @param dest the destination array to receive the bytes.
+   * @param destOffset the starting offset in the destination array.
+   * @param length the number of bytes to be copied.
+   */
+  void readBytes(byte[] dest, int destOffset, int length);
+
+  /**
+   * Reads from this buffer until the destination's position reaches its limit, and increases the
+   * read position by the number of the transferred bytes. If the required bytes are not readable,
+   * throws {@link IndexOutOfBoundsException}.
+   *
+   * @param dest the destination buffer to receive the bytes.
+   */
+  void readBytes(ByteBuffer dest);
+
+  /**
+   * Reads {@code length} bytes from this buffer and writes them to the destination stream.
+   * Increments the read position by {@code length}. If the required bytes are not readable, throws
+   * {@link IndexOutOfBoundsException}.
+   *
+   * @param dest the destination stream to receive the bytes.
+   * @param length the number of bytes to be copied.
+   * @throws IOException thrown if any error was encountered while writing to the stream.
+   */
+  void readBytes(OutputStream dest, int length) throws IOException;
+
+  /**
+   * Indicates whether or not this buffer exposes a backing array.
+   */
+  boolean hasArray();
+
+  /**
+   * Gets the backing array for this buffer. This is an optional method, so callers should first
+   * check {@link #hasArray}. Buffers not supporting this method will throw
+   * {@link UnsupportedOperationException}.
+   */
+  byte[] array();
+
+  /**
+   * Gets the offset in the backing array of the current read position. This is an optional method,
+   * so callers should first check {@link #hasArray}. Buffers not supporting this method will throw
+   * {@link UnsupportedOperationException}.
+   */
+  int arrayOffset();
+
+  /**
+   * Closes this buffer and releases any resources.
+   */
+  @Override
+  void close();
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java b/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java
new file mode 100644
index 0000000..8dec0e3
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java
@@ -0,0 +1,365 @@
+package com.google.net.stubby.newtransport;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteStreams;
+import com.google.net.stubby.DeferredProtoInputStream;
+import com.google.protobuf.ByteString;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+/**
+ * Utility methods for creating {@link Buffer} instances.
+ */
+public final class Buffers {
+  private static final Buffer EMPTY_BUFFER = new ByteArrayWrapper(new byte[0]);
+
+  /**
+   * Returns an empty {@link Buffer} instance.
+   */
+  public static Buffer empty() {
+    return EMPTY_BUFFER;
+  }
+
+  /**
+   * Creates a new {@link Buffer} that is backed by the given {@link ByteString}.
+   */
+  public static Buffer wrap(ByteString bytes) {
+    return new ByteStringWrapper(bytes);
+  }
+
+  /**
+   * Creates a new {@link Buffer} that is backed by the given byte array.
+   *
+   * @param bytes the byte array being wrapped.
+   * @param offset the starting offset for the buffer within the byte array.
+   * @param length the length of the buffer from the {@code offset} index.
+   */
+  public static Buffer wrap(byte[] bytes, int offset, int length) {
+    return new ByteArrayWrapper(bytes, offset, length);
+  }
+
+  /**
+   * Creates a new {@link Buffer} that is backed by the given {@link ByteBuffer}. Calls to read from
+   * the buffer will increment the position of the {@link ByteBuffer}.
+   */
+  public static Buffer wrap(ByteBuffer bytes) {
+    return new ByteBufferWrapper(bytes);
+  }
+
+  /**
+   * Reads an entire {@link Buffer} to a new array. After calling this method, the buffer will
+   * contain no readable bytes.
+   */
+  public static byte[] readArray(Buffer buffer) {
+    Preconditions.checkNotNull(buffer, "buffer");
+    int length = buffer.readableBytes();
+    byte[] bytes = new byte[length];
+    buffer.readBytes(bytes, 0, length);
+    return bytes;
+  }
+
+  /**
+   * Creates a new {@link Buffer} that contains the content from the given {@link InputStream}.
+   *
+   * @param in the source of the bytes.
+   * @param length the number of bytes to be read from the stream.
+   */
+  public static Buffer copyFrom(InputStream in, int length) {
+    Preconditions.checkNotNull(in, "in");
+    Preconditions.checkArgument(length >= 0, "length must be positive");
+
+    if (in instanceof DeferredProtoInputStream) {
+      ByteString bstr = ((DeferredProtoInputStream) in).getMessage().toByteString();
+      return new ByteStringWrapper(bstr.substring(0, length));
+    }
+
+    byte[] bytes = new byte[length];
+    try {
+      ByteStreams.readFully(in, bytes);
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+    return new ByteArrayWrapper(bytes, 0, length);
+  }
+
+  /**
+   * Reads the entire {@link Buffer} to a new {@link String} with the given charset.
+   */
+  public static String readAsString(Buffer buffer, Charset charset) {
+    Preconditions.checkNotNull(charset, "charset");
+    byte[] bytes = readArray(buffer);
+    return new String(bytes, charset);
+  }
+
+  /**
+   * Reads the entire {@link Buffer} to a new {@link String} using UTF-8 decoding.
+   */
+  public static String readAsStringUtf8(Buffer buffer) {
+    return readAsString(buffer, UTF_8);
+  }
+
+  /**
+   * Creates a new {@link InputStream} backed by the given buffer. Any read taken on the stream will
+   * automatically increment the read position of this buffer. Closing the stream, however, does not
+   * affect the original buffer.
+   */
+  public static InputStream openStream(Buffer buffer) {
+    return new BufferInputStream(buffer);
+  }
+
+  /**
+   * A {@link Buffer} that is backed by a byte array.
+   */
+  private static class ByteArrayWrapper extends AbstractBuffer {
+    int offset;
+    final int end;
+    final byte[] bytes;
+
+    ByteArrayWrapper(byte[] bytes) {
+      this(bytes, 0, bytes.length);
+    }
+
+    ByteArrayWrapper(byte[] bytes, int offset, int length) {
+      Preconditions.checkArgument(offset >= 0, "offset must be >= 0");
+      Preconditions.checkArgument(length >= 0, "length must be >= 0");
+      Preconditions.checkArgument(offset + length <= bytes.length,
+          "offset + length exceeds array boundary");
+      this.bytes = Preconditions.checkNotNull(bytes, "bytes");
+      this.offset = offset;
+      this.end = offset + length;
+    }
+
+    @Override
+    public int readableBytes() {
+      return end - offset;
+    }
+
+    @Override
+    public void skipBytes(int length) {
+      checkReadable(length);
+      offset += length;
+    }
+
+    @Override
+    public int readUnsignedByte() {
+      checkReadable(1);
+      return bytes[offset++] & 0xFF;
+    }
+
+    @Override
+    public void readBytes(byte[] dest, int destIndex, int length) {
+      System.arraycopy(bytes, offset, dest, destIndex, length);
+      offset += length;
+    }
+
+    @Override
+    public void readBytes(ByteBuffer dest) {
+      Preconditions.checkNotNull(dest, "dest");
+      int length = dest.remaining();
+      checkReadable(length);
+      dest.put(bytes, offset, length);
+      offset += length;
+    }
+
+    @Override
+    public void readBytes(OutputStream dest, int length) throws IOException {
+      checkReadable(length);
+      dest.write(bytes, offset, length);
+      offset += length;
+    }
+
+    @Override
+    public boolean hasArray() {
+      return true;
+    }
+
+    @Override
+    public byte[] array() {
+      return bytes;
+    }
+
+    @Override
+    public int arrayOffset() {
+      return offset;
+    }
+  }
+
+  /**
+   * A {@link Buffer} that is backed by a {@link ByteString}.
+   */
+  private static class ByteStringWrapper extends AbstractBuffer {
+    int offset;
+    final ByteString bytes;
+
+    ByteStringWrapper(ByteString bytes) {
+      this.bytes = Preconditions.checkNotNull(bytes, "bytes");
+    }
+
+    @Override
+    public int readableBytes() {
+      return bytes.size() - offset;
+    }
+
+    @Override
+    public int readUnsignedByte() {
+      checkReadable(1);
+      return bytes.byteAt(offset++) & 0xFF;
+    }
+
+    @Override
+    public void skipBytes(int length) {
+      checkReadable(length);
+      offset += length;
+    }
+
+    @Override
+    public void readBytes(byte[] dest, int destOffset, int length) {
+      bytes.copyTo(dest, offset, destOffset, length);
+      offset += length;
+    }
+
+    @Override
+    public void readBytes(ByteBuffer dest) {
+      Preconditions.checkNotNull(dest, "dest");
+      int length = dest.remaining();
+      checkReadable(length);
+      internalSlice(length).copyTo(dest);
+      offset += length;
+    }
+
+    @Override
+    public void readBytes(OutputStream dest, int length) throws IOException {
+      checkReadable(length);
+      internalSlice(length).writeTo(dest);
+      offset += length;
+    }
+
+    private ByteString internalSlice(int length) {
+      return bytes.substring(offset, offset + length);
+    }
+  }
+
+  /**
+   * A {@link Buffer} that is backed by a {@link ByteBuffer}.
+   */
+  private static class ByteBufferWrapper extends AbstractBuffer {
+    final ByteBuffer bytes;
+
+    ByteBufferWrapper(ByteBuffer bytes) {
+      this.bytes = Preconditions.checkNotNull(bytes, "bytes");
+    }
+
+    @Override
+    public int readableBytes() {
+      return bytes.remaining();
+    }
+
+    @Override
+    public int readUnsignedByte() {
+      checkReadable(1);
+      return bytes.get() & 0xFF;
+    }
+
+    @Override
+    public void skipBytes(int length) {
+      checkReadable(length);
+      bytes.position(bytes.position() + length);
+    }
+
+    @Override
+    public void readBytes(byte[] dest, int destOffset, int length) {
+      checkReadable(length);
+      bytes.get(dest, destOffset, length);
+    }
+
+    @Override
+    public void readBytes(ByteBuffer dest) {
+      Preconditions.checkNotNull(dest, "dest");
+      int length = dest.remaining();
+      checkReadable(length);
+
+      // Change the limit so that only length bytes are available.
+      int prevLimit = bytes.limit();
+      bytes.limit(bytes.position() + length);
+
+      // Write the bytes and restore the original limit.
+      dest.put(bytes);
+      bytes.limit(prevLimit);
+    }
+
+    @Override
+    public void readBytes(OutputStream dest, int length) throws IOException {
+      checkReadable(length);
+      if (hasArray()) {
+        dest.write(array(), arrayOffset(), length);
+        bytes.position(bytes.position() + length);
+      } else {
+        // The buffer doesn't support array(). Copy the data to an intermediate buffer.
+        byte[] array = new byte[length];
+        bytes.get(array);
+        dest.write(array);
+      }
+    }
+
+    @Override
+    public boolean hasArray() {
+      return bytes.hasArray();
+    }
+
+    @Override
+    public byte[] array() {
+      return bytes.array();
+    }
+
+    @Override
+    public int arrayOffset() {
+      return bytes.arrayOffset() + bytes.position();
+    }
+  }
+
+  /**
+   * An {@link InputStream} that is backed by a {@link Buffer}.
+   */
+  private static class BufferInputStream extends InputStream {
+    final Buffer buffer;
+
+    public BufferInputStream(Buffer buffer) {
+      this.buffer = Preconditions.checkNotNull(buffer, "buffer");
+    }
+
+    @Override
+    public int available() throws IOException {
+      return buffer.readableBytes();
+    }
+
+    @Override
+    public int read() {
+      if (buffer.readableBytes() == 0) {
+        // EOF.
+        return -1;
+      }
+      return buffer.readUnsignedByte();
+    }
+
+    @Override
+    public int read(byte[] dest, int destOffset, int length) throws IOException {
+      if (buffer.readableBytes() == 0) {
+        // EOF.
+        return -1;
+      }
+
+      length = Math.min(buffer.readableBytes(), length);
+      buffer.readBytes(dest, destOffset, length);
+      return length;
+    }
+  }
+
+  private Buffers() {}
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyBuffer.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyBuffer.java
new file mode 100644
index 0000000..ace49e3
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyBuffer.java
@@ -0,0 +1,91 @@
+package com.google.net.stubby.newtransport.netty;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.net.stubby.newtransport.AbstractBuffer;
+import com.google.net.stubby.newtransport.Buffer;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link Buffer} implementation that is backed by a Netty {@link ByteBuf}. This class does not
+ * call {@link ByteBuf#retain}, so if that is needed it should be called prior to creating this
+ * buffer.
+ */
+class NettyBuffer extends AbstractBuffer {
+  private final ByteBuf buffer;
+  private boolean closed;
+
+  NettyBuffer(ByteBuf buffer) {
+    this.buffer = Preconditions.checkNotNull(buffer, "buffer");
+  }
+
+  ByteBuf buffer() {
+    return buffer;
+  }
+
+  @Override
+  public int readableBytes() {
+    return buffer.readableBytes();
+  }
+
+  @Override
+  public void skipBytes(int length) {
+    buffer.skipBytes(length);
+  }
+
+  @Override
+  public int readUnsignedByte() {
+    return buffer.readUnsignedByte();
+  }
+
+  @Override
+  public void readBytes(byte[] dest, int index, int length) {
+    buffer.readBytes(dest, index, length);
+  }
+
+  @Override
+  public void readBytes(ByteBuffer dest) {
+    buffer.readBytes(dest);
+  }
+
+  @Override
+  public void readBytes(OutputStream dest, int length) {
+    try {
+      buffer.readBytes(dest, length);
+    } catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public boolean hasArray() {
+    return buffer.hasArray();
+  }
+
+  @Override
+  public byte[] array() {
+    return buffer.array();
+  }
+
+  @Override
+  public int arrayOffset() {
+    return buffer.arrayOffset() + buffer.readerIndex();
+  }
+
+  /**
+   * If the first call to close, calls {@link ByteBuf#release} to release the internal Netty buffer.
+   */
+  @Override
+  public void close() {
+    // Don't allow slices to close. Also, only allow close to be called once.
+    if (!closed) {
+      closed = true;
+      buffer.release();
+    }
+  }
+}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/AbstractBufferTest.java b/core/src/test/java/com/google/net/stubby/newtransport/AbstractBufferTest.java
new file mode 100644
index 0000000..7628695
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/newtransport/AbstractBufferTest.java
@@ -0,0 +1,60 @@
+package com.google.net.stubby.newtransport;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.OngoingStubbing;
+
+
+/**
+ * Tests for {@link AbstractBuffer}.
+ */
+@RunWith(JUnit4.class)
+public class AbstractBufferTest {
+
+  @Mock
+  private AbstractBuffer buffer;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void readUnsignedShortShouldSucceed() {
+    mockBytes(0xFF, 0xEE);
+    assertEquals(0xFFEE, buffer.readUnsignedShort());
+  }
+
+  @Test
+  public void readUnsignedMediumShouldSucceed() {
+    mockBytes(0xFF, 0xEE, 0xDD);
+    assertEquals(0xFFEEDD, buffer.readUnsignedMedium());
+  }
+
+  @Test
+  public void readPositiveIntShouldSucceed() {
+    mockBytes(0x7F, 0xEE, 0xDD, 0xCC);
+    assertEquals(0x7FEEDDCC, buffer.readInt());
+  }
+
+  @Test
+  public void readNegativeIntShouldSucceed() {
+    mockBytes(0xFF, 0xEE, 0xDD, 0xCC);
+    assertEquals(0xFFEEDDCC, buffer.readInt());
+  }
+
+  private void mockBytes(int... bytes) {
+    when(buffer.readableBytes()).thenReturn(bytes.length);
+    OngoingStubbing<Integer> stub = when(buffer.readUnsignedByte());
+    for (int b : bytes) {
+      stub = stub.thenReturn(b);
+    }
+  }
+}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/BufferTestBase.java b/core/src/test/java/com/google/net/stubby/newtransport/BufferTestBase.java
new file mode 100644
index 0000000..40ed998
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/newtransport/BufferTestBase.java
@@ -0,0 +1,92 @@
+package com.google.net.stubby.newtransport;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Abstract base class for tests of {@link Buffer} subclasses.
+ */
+@RunWith(JUnit4.class)
+public abstract class BufferTestBase {
+  protected final String msg = "hello";
+
+  @Test
+  public void bufferShouldReadAllBytes() {
+    Buffer buffer = buffer();
+    for (int ix = 0; ix < msg.length(); ++ix) {
+      assertEquals(msg.length() - ix, buffer.readableBytes());
+      assertEquals(msg.charAt(ix), buffer.readUnsignedByte());
+    }
+    assertEquals(0, buffer.readableBytes());
+  }
+
+  @Test
+  public void readToArrayShouldSucceed() {
+    Buffer buffer = buffer();
+    byte[] array = new byte[msg.length()];
+    buffer.readBytes(array, 0, array.length);
+    Arrays.equals(msg.getBytes(UTF_8), array);
+    assertEquals(0, buffer.readableBytes());
+  }
+
+  @Test
+  public void partialReadToArrayShouldSucceed() {
+    Buffer buffer = buffer();
+    byte[] array = new byte[msg.length()];
+    buffer.readBytes(array, 1, 2);
+    Arrays.equals(new byte[] {'h', 'e'}, Arrays.copyOfRange(array, 1, 3));
+    assertEquals(3, buffer.readableBytes());
+  }
+
+  @Test
+  public void readToStreamShouldSucceed() throws Exception {
+    Buffer buffer = buffer();
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    buffer.readBytes(stream, msg.length());
+    Arrays.equals(msg.getBytes(UTF_8), stream.toByteArray());
+    assertEquals(0, buffer.readableBytes());
+  }
+
+  @Test
+  public void partialReadToStreamShouldSucceed() throws Exception {
+    Buffer buffer = buffer();
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    buffer.readBytes(stream, 2);
+    Arrays.equals(new byte[]{'h', 'e'}, Arrays.copyOfRange(stream.toByteArray(), 0, 2));
+    assertEquals(3, buffer.readableBytes());
+  }
+
+  @Test
+  public void readToByteBufferShouldSucceed() {
+    Buffer buffer = buffer();
+    ByteBuffer byteBuffer = ByteBuffer.allocate(msg.length());
+    buffer.readBytes(byteBuffer);
+    byteBuffer.flip();
+    byte[] array = new byte[msg.length()];
+    byteBuffer.get(array);
+    Arrays.equals(msg.getBytes(UTF_8), array);
+    assertEquals(0, buffer.readableBytes());
+  }
+
+  @Test
+  public void partialReadToByteBufferShouldSucceed() {
+    Buffer buffer = buffer();
+    ByteBuffer byteBuffer = ByteBuffer.allocate(2);
+    buffer.readBytes(byteBuffer);
+    byteBuffer.flip();
+    byte[] array = new byte[2];
+    byteBuffer.get(array);
+    Arrays.equals(new byte[]{'h', 'e'}, array);
+    assertEquals(3, buffer.readableBytes());
+  }
+
+  protected abstract Buffer buffer();
+}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/BuffersArrayTest.java b/core/src/test/java/com/google/net/stubby/newtransport/BuffersArrayTest.java
new file mode 100644
index 0000000..2dbb0bf
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/newtransport/BuffersArrayTest.java
@@ -0,0 +1,33 @@
+package com.google.net.stubby.newtransport;
+
+import static com.google.net.stubby.newtransport.Buffers.wrap;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Tests for the array-backed {@link Buffer} returned by {@link Buffers#wrap(byte[], int, int)};
+ */
+public class BuffersArrayTest extends BufferTestBase {
+
+  @Test
+  public void bufferShouldExposeArray() {
+    byte[] array = msg.getBytes(UTF_8);
+    Buffer buffer = wrap(array, 1, msg.length() - 1);
+    assertTrue(buffer.hasArray());
+    assertSame(array, buffer.array());
+    assertEquals(1, buffer.arrayOffset());
+
+    // Now read a byte and verify that the offset changes.
+    buffer.readUnsignedByte();
+    assertEquals(2, buffer.arrayOffset());
+  }
+
+  @Override
+  protected Buffer buffer() {
+    return Buffers.wrap(msg.getBytes(UTF_8), 0, msg.length());
+  }
+}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/BuffersByteBufferTest.java b/core/src/test/java/com/google/net/stubby/newtransport/BuffersByteBufferTest.java
new file mode 100644
index 0000000..6765d9a
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/newtransport/BuffersByteBufferTest.java
@@ -0,0 +1,16 @@
+package com.google.net.stubby.newtransport;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Tests for the array-backed {@link Buffer} returned by {@link Buffers#wrap(ByteBuffer)}.
+ */
+public class BuffersByteBufferTest extends BufferTestBase {
+
+  @Override
+  protected Buffer buffer() {
+    return Buffers.wrap(ByteBuffer.wrap(msg.getBytes(UTF_8)));
+  }
+}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/BuffersByteStringTest.java b/core/src/test/java/com/google/net/stubby/newtransport/BuffersByteStringTest.java
new file mode 100644
index 0000000..e3f7ed5
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/newtransport/BuffersByteStringTest.java
@@ -0,0 +1,14 @@
+package com.google.net.stubby.newtransport;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Tests for the array-backed {@link Buffer} returned by {@link Buffers#wrap(ByteString)}.
+ */
+public class BuffersByteStringTest extends BufferTestBase {
+
+  @Override
+  protected Buffer buffer() {
+    return Buffers.wrap(ByteString.copyFromUtf8(msg));
+  }
+}
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyBufferTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyBufferTest.java
new file mode 100644
index 0000000..98a277e
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyBufferTest.java
@@ -0,0 +1,45 @@
+package com.google.net.stubby.newtransport.netty;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import com.google.net.stubby.newtransport.BufferTestBase;
+import com.google.net.stubby.newtransport.Buffer;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import io.netty.buffer.Unpooled;
+
+/**
+ * Tests for {@link NettyBuffer}.
+ */
+@RunWith(JUnit4.class)
+public class NettyBufferTest extends BufferTestBase {
+  private NettyBuffer buffer;
+
+  @Before
+  public void setup() {
+    buffer = new NettyBuffer(Unpooled.copiedBuffer(msg, UTF_8));
+  }
+
+  @Test
+  public void closeShouldReleaseBuffer() {
+    buffer.close();
+    assertEquals(0, buffer.buffer().refCnt());
+  }
+
+  @Test
+  public void closeMultipleTimesShouldReleaseBufferOnce() {
+    buffer.close();
+    buffer.close();
+    assertEquals(0, buffer.buffer().refCnt());
+  }
+
+  @Override
+  protected Buffer buffer() {
+    return buffer;
+  }
+}