Adding CompositeBuffer and a few other utilties related to Buffers.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=72268003
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java b/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java
index 8dec0e3..3d744f0 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java
@@ -35,6 +35,13 @@
   }
 
   /**
+   * Shortcut for {@code wrap(bytes, 0, bytes.length}.
+   */
+  public static Buffer wrap(byte[] bytes) {
+    return new ByteArrayWrapper(bytes, 0, bytes.length);
+  }
+
+  /**
    * Creates a new {@link Buffer} that is backed by the given byte array.
    *
    * @param bytes the byte array being wrapped.
@@ -109,9 +116,27 @@
    * Creates a new {@link InputStream} backed by the given buffer. Any read taken on the stream will
    * automatically increment the read position of this buffer. Closing the stream, however, does not
    * affect the original buffer.
+   *
+   * @param buffer the buffer backing the new {@link InputStream}.
+   * @param owner if {@code true}, the returned stream will close the buffer when closed.
    */
-  public static InputStream openStream(Buffer buffer) {
-    return new BufferInputStream(buffer);
+  public static InputStream openStream(Buffer buffer, boolean owner) {
+    return new BufferInputStream(owner ? buffer : ignoreClose(buffer));
+  }
+
+  /**
+   * Decorates the given {@link Buffer} to ignore calls to {@link Buffer#close}.
+   *
+   * @param buffer the buffer to be decorated.
+   * @return a wrapper around {@code buffer} that ignores calls to {@link Buffer#close}.
+   */
+  public static Buffer ignoreClose(Buffer buffer) {
+    return new ForwardingBuffer(buffer) {
+      @Override
+      public void close() {
+        // Ignore.
+      }
+    };
   }
 
   /**
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/CompositeBuffer.java b/core/src/main/java/com/google/net/stubby/newtransport/CompositeBuffer.java
new file mode 100644
index 0000000..91398ac
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/CompositeBuffer.java
@@ -0,0 +1,181 @@
+package com.google.net.stubby.newtransport;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/**
+ * A {@link Buffer} that is composed of 0 or more {@link Buffer}s. This provides a facade that
+ * allows multiple buffers to be treated as one.
+ *
+ * <p>When a buffer is added to a composite, it's life cycle is controlled by the composite. Once
+ * the composite has read past the end of a given buffer, that buffer is automatically closed and
+ * removed from the composite.
+ */
+public class CompositeBuffer extends AbstractBuffer {
+
+  private int readableBytes;
+  private final Queue<Buffer> buffers = new ArrayDeque<Buffer>();
+
+  /**
+   * Adds a new {@link Buffer} at the end of the buffer list. After a buffer is added, it is
+   * expected that this {@link CompositeBuffer} has complete ownership. Any attempt to modify the
+   * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
+   * this {@link CompositeBuffer}.
+   */
+  public void addBuffer(Buffer buffer) {
+    buffers.add(buffer);
+    readableBytes += buffer.readableBytes();
+  }
+
+  @Override
+  public int readableBytes() {
+    return readableBytes;
+  }
+
+  @Override
+  public int readUnsignedByte() {
+    ReadOperation op = new ReadOperation() {
+      @Override
+      int readInternal(Buffer buffer, int length) {
+        return buffer.readUnsignedByte();
+      }
+    };
+    execute(op, 1);
+    return op.value;
+  }
+
+  @Override
+  public void skipBytes(int length) {
+    execute(new ReadOperation() {
+      @Override
+      public int readInternal(Buffer buffer, int length) {
+        buffer.skipBytes(length);
+        return 0;
+      }
+    }, length);
+  }
+
+  @Override
+  public void readBytes(final byte[] dest, final int destOffset, int length) {
+    execute(new ReadOperation() {
+      int currentOffset = destOffset;
+      @Override
+      public int readInternal(Buffer buffer, int length) {
+        buffer.readBytes(dest, currentOffset, length);
+        currentOffset += length;
+        return 0;
+      }
+    }, length);
+  }
+
+  @Override
+  public void readBytes(final ByteBuffer dest) {
+    execute(new ReadOperation() {
+      @Override
+      public int readInternal(Buffer buffer, int length) {
+        // Change the limit so that only lengthToCopy bytes are available.
+        int prevLimit = dest.limit();
+        dest.limit(dest.position() + length);
+
+        // Write the bytes and restore the original limit.
+        buffer.readBytes(dest);
+        dest.limit(prevLimit);
+        return 0;
+      }
+    }, dest.remaining());
+  }
+
+  @Override
+  public void readBytes(final OutputStream dest, int length) throws IOException {
+    ReadOperation op = new ReadOperation() {
+      @Override
+      public int readInternal(Buffer buffer, int length) throws IOException {
+        buffer.readBytes(dest, length);
+        return 0;
+      }
+    };
+    execute(op, length);
+
+    // If an exception occurred, throw it.
+    if (op.isError()) {
+      throw op.ex;
+    }
+  }
+
+  @Override
+  public void close() {
+    while (!buffers.isEmpty()) {
+      buffers.remove().close();
+    }
+  }
+
+  /**
+   * Executes the given {@link ReadOperation} against the {@link Buffer}s required to satisfy the
+   * requested {@code length}.
+   */
+  private void execute(ReadOperation op, int length) {
+    checkReadable(length);
+
+    for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) {
+      Buffer buffer = buffers.peek();
+      int lengthToCopy = Math.min(length, buffer.readableBytes());
+
+      // Perform the read operation for this buffer.
+      op.read(buffer, lengthToCopy);
+      if (op.isError()) {
+        return;
+      }
+
+      length -= lengthToCopy;
+      readableBytes -= lengthToCopy;
+    }
+
+    if (length > 0) {
+      // Should never get here.
+      throw new AssertionError("Failed executing read operation");
+    }
+  }
+
+  /**
+   * If the current buffer is exhausted, removes and closes it.
+   */
+  private void advanceBufferIfNecessary() {
+    Buffer buffer = buffers.peek();
+    if (buffer.readableBytes() == 0) {
+      buffers.remove().close();
+    }
+  }
+
+  /**
+   * A simple read operation to perform on a single {@link Buffer}. All state management for the
+   * buffers is done by {@link CompositeBuffer#execute(ReadOperation, int)}.
+   */
+  private abstract class ReadOperation {
+    /**
+     * Only used by {@link CompositeBuffer#readUnsignedByte()}.
+     */
+    int value;
+
+    /**
+     * Only used by {@link CompositeBuffer#readBytes(OutputStream, int)};
+     */
+    IOException ex;
+
+    final void read(Buffer buffer, int length) {
+      try {
+        value = readInternal(buffer, length);
+      } catch (IOException e) {
+        ex = e;
+      }
+    }
+
+    final boolean isError() {
+      return ex != null;
+    }
+
+    abstract int readInternal(Buffer buffer, int length) throws IOException;
+  }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingBuffer.java b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingBuffer.java
new file mode 100644
index 0000000..9dbcf0d
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingBuffer.java
@@ -0,0 +1,84 @@
+package com.google.net.stubby.newtransport;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Base class for a wrapper around another {@link Buffer}.
+ */
+public abstract class ForwardingBuffer implements Buffer {
+
+  private final Buffer buf;
+
+  public ForwardingBuffer(Buffer buf) {
+    this.buf = Preconditions.checkNotNull(buf, "buf");
+  }
+
+  @Override
+  public int readableBytes() {
+    return buf.readableBytes();
+  }
+
+  @Override
+  public int readUnsignedByte() {
+    return buf.readUnsignedByte();
+  }
+
+  @Override
+  public int readUnsignedMedium() {
+    return buf.readUnsignedMedium();
+  }
+
+  @Override
+  public int readUnsignedShort() {
+    return buf.readUnsignedShort();
+  }
+
+  @Override
+  public int readInt() {
+    return buf.readInt();
+  }
+
+  @Override
+  public void skipBytes(int length) {
+    buf.skipBytes(length);
+  }
+
+  @Override
+  public void readBytes(byte[] dest, int destOffset, int length) {
+    buf.readBytes(dest, destOffset, length);
+  }
+
+  @Override
+  public void readBytes(ByteBuffer dest) {
+    buf.readBytes(dest);
+  }
+
+  @Override
+  public void readBytes(OutputStream dest, int length) throws IOException {
+    buf.readBytes(dest, length);
+  }
+
+  @Override
+  public boolean hasArray() {
+    return buf.hasArray();
+  }
+
+  @Override
+  public byte[] array() {
+    return buf.array();
+  }
+
+  @Override
+  public int arrayOffset() {
+    return buf.arrayOffset();
+  }
+
+  @Override
+  public void close() {
+    buf.close();
+  }
+}
\ No newline at end of file
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/CompositeBufferTest.java b/core/src/test/java/com/google/net/stubby/newtransport/CompositeBufferTest.java
new file mode 100644
index 0000000..14bda50
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/newtransport/CompositeBufferTest.java
@@ -0,0 +1,154 @@
+package com.google.net.stubby.newtransport;
+
+import static io.netty.util.CharsetUtil.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Tests for {@link CompositeBuffer}.
+ */
+@RunWith(JUnit4.class)
+public class CompositeBufferTest {
+  private static final String EXPECTED_VALUE = "hello world";
+
+  private CompositeBuffer composite;
+
+  @Before
+  public void setup() {
+    composite = new CompositeBuffer();
+    splitAndAdd(EXPECTED_VALUE);
+  }
+
+  @After
+  public void teardown() {
+    composite.close();
+  }
+
+  @Test
+  public void singleBufferShouldSucceed() {
+    composite = new CompositeBuffer();
+    composite.addBuffer(Buffers.wrap(EXPECTED_VALUE.getBytes(UTF_8)));
+    assertEquals(EXPECTED_VALUE.length(), composite.readableBytes());
+    assertEquals(EXPECTED_VALUE, Buffers.readAsStringUtf8(composite));
+    assertEquals(0, composite.readableBytes());
+  }
+
+  @Test
+  public void readUnsignedByteShouldSucceed() {
+    for (int ix = 0; ix < EXPECTED_VALUE.length(); ++ix) {
+      int c = composite.readUnsignedByte();
+      assertEquals(EXPECTED_VALUE.charAt(ix), (char) c);
+    }
+    assertEquals(0, composite.readableBytes());
+  }
+
+  @Test
+  public void skipBytesShouldSucceed() {
+    int remaining = EXPECTED_VALUE.length();
+    composite.skipBytes(1);
+    remaining--;
+    assertEquals(remaining, composite.readableBytes());
+
+    composite.skipBytes(5);
+    remaining -= 5;
+    assertEquals(remaining, composite.readableBytes());
+
+    composite.skipBytes(remaining);
+    assertEquals(0, composite.readableBytes());
+  }
+
+  @Test
+  public void readByteArrayShouldSucceed() {
+    byte[] bytes = new byte[composite.readableBytes()];
+    int writeIndex = 0;
+
+    composite.readBytes(bytes, writeIndex, 1);
+    writeIndex++;
+    assertEquals(EXPECTED_VALUE.length() - writeIndex, composite.readableBytes());
+
+    composite.readBytes(bytes, writeIndex, 5);
+    writeIndex += 5;
+    assertEquals(EXPECTED_VALUE.length() - writeIndex, composite.readableBytes());
+
+    int remaining = composite.readableBytes();
+    composite.readBytes(bytes, writeIndex, remaining);
+    writeIndex += remaining;
+    assertEquals(0, composite.readableBytes());
+    assertEquals(bytes.length, writeIndex);
+    assertEquals(EXPECTED_VALUE, new String(bytes, UTF_8));
+  }
+
+  @Test
+  public void readByteBufferShouldSucceed() {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(EXPECTED_VALUE.length());
+    int remaining = EXPECTED_VALUE.length();
+
+    byteBuffer.limit(1);
+    composite.readBytes(byteBuffer);
+    remaining--;
+    assertEquals(remaining, composite.readableBytes());
+
+    byteBuffer.limit(byteBuffer.limit() + 5);
+    composite.readBytes(byteBuffer);
+    remaining -= 5;
+    assertEquals(remaining, composite.readableBytes());
+
+    byteBuffer.limit(byteBuffer.limit() + remaining);
+    composite.readBytes(byteBuffer);
+    assertEquals(0, composite.readableBytes());
+    assertEquals(EXPECTED_VALUE, new String(byteBuffer.array(), UTF_8));
+  }
+
+  @Test
+  public void readStreamShouldSucceed() throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    int remaining = EXPECTED_VALUE.length();
+
+    composite.readBytes(bos, 1);
+    remaining--;
+    assertEquals(remaining, composite.readableBytes());
+
+    composite.readBytes(bos, 5);
+    remaining -= 5;
+    assertEquals(remaining, composite.readableBytes());
+
+    composite.readBytes(bos, remaining);
+    assertEquals(0, composite.readableBytes());
+    assertEquals(EXPECTED_VALUE, new String(bos.toByteArray(), UTF_8));
+  }
+
+  @Test
+  public void closeShouldCloseBuffers() {
+    composite = new CompositeBuffer();
+    Buffer mock1 = mock(Buffer.class);
+    Buffer mock2 = mock(Buffer.class);
+    composite.addBuffer(mock1);
+    composite.addBuffer(mock2);
+
+    composite.close();
+    verify(mock1).close();
+    verify(mock2).close();
+  }
+
+  private void splitAndAdd(String value) {
+    int partLength = Math.max(1, value.length() / 4);
+    for (int startIndex = 0, endIndex = 0; startIndex < value.length(); startIndex = endIndex) {
+      endIndex = Math.min(value.length(), startIndex + partLength);
+      String part = value.substring(startIndex, endIndex);
+      composite.addBuffer(Buffers.wrap(part.getBytes(UTF_8)));
+    }
+
+    assertEquals(value.length(), composite.readableBytes());
+  }
+}