Copy framing classes to newtransport, tweaking some of their APIs

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69489753
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
new file mode 100644
index 0000000..06b26e5
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
@@ -0,0 +1,162 @@
+package com.google.net.stubby.newtransport;
+
+import com.google.common.io.ByteStreams;
+import com.google.net.stubby.GrpcFramingUtil;
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Transport;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Base implementation that joins a sequence of framed GRPC data produced by a {@link Framer},
+ * reconstructs their messages and hands them off to a receiving {@link Operation}
+ */
+public abstract class Deframer<F> implements Framer.Sink<F> {
+
+  /**
+   *  Unset frame length
+   */
+  private static final int LENGTH_NOT_SET = -1;
+
+  private final Framer target;
+  private boolean inFrame;
+  private byte currentFlags;
+  private int currentLength = LENGTH_NOT_SET;
+
+  public Deframer(Framer target) {
+    this.target = target;
+  }
+
+  @Override
+  public void deliverFrame(F frame, boolean endOfStream) {
+    int remaining = internalDeliverFrame(frame);
+    if (endOfStream && remaining > 0) {
+      target.writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame"));
+    }
+  }
+
+  /**
+   * Consume a frame of bytes provided by the transport. Note that transport framing is not
+   * aligned on GRPC frame boundaries so this code needs to do bounds checking and buffering
+   * across transport frame boundaries.
+   *
+   * @return the number of unconsumed bytes remaining in the buffer
+   */
+  private int internalDeliverFrame(F frame) {
+    try {
+      frame = decompress(frame);
+      DataInputStream grpcStream = prefix(frame);
+      // Loop until no more GRPC frames can be fully decoded
+      while (true) {
+        if (!inFrame) {
+          // Not in frame so attempt to read flags
+          if (!ensure(grpcStream, GrpcFramingUtil.FRAME_TYPE_LENGTH)) {
+            return consolidate();
+          }
+          currentFlags = grpcStream.readByte();
+          inFrame = true;
+        }
+        if (currentLength == LENGTH_NOT_SET) {
+          // Read the frame length
+          if (!ensure(grpcStream, GrpcFramingUtil.FRAME_LENGTH)) {
+            return consolidate();
+          }
+          currentLength = grpcStream.readInt();
+        }
+        // Ensure that the entire frame length is available to read
+        InputStream framedChunk = ensureMessage(grpcStream, currentLength);
+        if (framedChunk == null) {
+          // Insufficient bytes available
+          return consolidate();
+        }
+        if (GrpcFramingUtil.isPayloadFrame(currentFlags)) {
+          // Advance stream now, because target.addPayload() may not or may process the frame on
+          // another thread.
+          framedChunk = new ByteArrayInputStream(ByteStreams.toByteArray(framedChunk));
+          try {
+            // Report payload to the receiving operation
+            target.writePayload(framedChunk, currentLength);
+          } finally {
+            currentLength = LENGTH_NOT_SET;
+            inFrame = false;
+          }
+        } else if (GrpcFramingUtil.isContextValueFrame(currentFlags)) {
+          // Not clear if using proto encoding here is of any benefit.
+          // Using ContextValue.parseFrom requires copying out of the framed chunk
+          // Writing a custom parser would have to do varint handling and potentially
+          // deal with out-of-order tags etc.
+          Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk);
+          try {
+            target.writeContext(contextValue.getKey(),
+                contextValue.getValue().newInput(), currentLength);
+          } finally {
+            currentLength = LENGTH_NOT_SET;
+            inFrame = false;
+          }
+        } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
+          int status = framedChunk.read() << 8 | framedChunk.read();
+          Transport.Code code = Transport.Code.valueOf(status);
+          // TODO(user): Resolve what to do with remainder of framedChunk
+          try {
+            if (code == null) {
+              // Log for unknown code
+              target.writeStatus(
+                  new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
+            } else {
+              target.writeStatus(new Status(code));
+            }
+          } finally {
+            currentLength = LENGTH_NOT_SET;
+            inFrame = false;
+          }
+        }
+        if (grpcStream.available() == 0) {
+          // We've processed all the data so consolidate the underlying buffers
+          return consolidate();
+        }
+      }
+    } catch (IOException ioe) {
+      Status status = new Status(Transport.Code.UNKNOWN, ioe);
+      target.writeStatus(status);
+      throw status.asRuntimeException();
+    }
+  }
+
+  /**
+   * Return a stream view over the current buffer prefixed to the input frame
+   */
+  protected abstract DataInputStream prefix(F frame) throws IOException;
+
+  /**
+   * Consolidate the underlying buffers and return the number of buffered bytes remaining
+   */
+  protected abstract int consolidate() throws IOException;
+
+  /**
+   * Decompress the raw frame buffer prior to prefixing it.
+   */
+  protected abstract F decompress(F frame) throws IOException;
+
+  /**
+   * Ensure that {@code len} bytes are available in the buffer and frame
+   */
+  private boolean ensure(InputStream input, int len) throws IOException {
+    return (input.available() >= len);
+  }
+
+  /**
+   * Return a message of {@code len} bytes than can be read from the buffer. If sufficient
+   * bytes are unavailable then buffer the available bytes and return null.
+   */
+  private InputStream ensureMessage(InputStream input, int len)
+      throws IOException {
+    if (input.available() < len) {
+      return null;
+    }
+    return ByteStreams.limit(input, len);
+  }
+}