Move gRPC core to third_party
Half our tests still need to be moved, but that will be for a later
time.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=67023748
diff --git a/core/src/main/java/com/google/net/stubby/AbstractOperation.java b/core/src/main/java/com/google/net/stubby/AbstractOperation.java
new file mode 100644
index 0000000..01896be
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/AbstractOperation.java
@@ -0,0 +1,117 @@
+package com.google.net.stubby;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.MapMaker;
+import com.google.common.logging.FormattingLogger;
+import com.google.net.stubby.transport.Transport;
+
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Common implementation for {@link Request} and {@link Response} operations
+ */
+public abstract class AbstractOperation implements Operation {
+
+ private static final FormattingLogger logger =
+ FormattingLogger.getLogger(AbstractOperation.class);
+
+ /**
+ * Allow implementations to associate state with an operation
+ */
+ private ConcurrentMap stash;
+ private final int id;
+ private Phase phase;
+ private Status status;
+
+ public AbstractOperation(int id) {
+ this.id = id;
+ this.phase = Phase.HEADERS;
+ stash = new MapMaker().concurrencyLevel(2).makeMap();
+ }
+
+ @Override
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public Phase getPhase() {
+ return phase;
+ }
+
+ /**
+ * Move into the desired phase.
+ */
+ protected Operation progressTo(Phase desiredPhase) {
+ if (desiredPhase.ordinal() < phase.ordinal()) {
+ close(new Status(Transport.Code.INTERNAL,
+ "Canot move to " + desiredPhase.name() + " from " + phase.name()));
+ } else {
+ phase = desiredPhase;
+ }
+ return this;
+ }
+
+ @Override
+ public Operation addContext(String type, InputStream message, Phase nextPhase) {
+ if (getPhase() == Phase.CLOSED) {
+ throw new RuntimeException("addContext called after operation closed");
+ }
+ if (phase == Phase.PAYLOAD) {
+ progressTo(Phase.FOOTERS);
+ }
+ if (phase == Phase.HEADERS || phase == Phase.FOOTERS) {
+ return progressTo(nextPhase);
+ }
+ throw new IllegalStateException("Cannot add context in phase " + phase.name());
+ }
+
+ @Override
+ public Operation addPayload(InputStream payload, Phase nextPhase) {
+ if (getPhase() == Phase.CLOSED) {
+ throw new RuntimeException("addPayload called after operation closed");
+ }
+ if (phase == Phase.HEADERS) {
+ progressTo(Phase.PAYLOAD);
+ }
+ if (phase == Phase.PAYLOAD) {
+ return progressTo(nextPhase);
+ }
+ throw new IllegalStateException("Cannot add payload in phase " + phase.name());
+ }
+
+ @Override
+ public Operation close(Status status) {
+ // TODO(user): Handle synchronization properly.
+ Preconditions.checkNotNull(status, "status");
+ this.phase = Phase.CLOSED;
+ if (this.status != null && this.status.getCode() != status.getCode()) {
+ logger.severefmt(status.getCause(),
+ "Attempting to override status of already closed operation from %s to %s",
+ this.status.getCode(), status.getCode());
+ }
+ this.status = status;
+ return this;
+ }
+
+ @Override
+ public Status getStatus() {
+ return status;
+ }
+
+ @Override
+ public <E> E put(Object key, E value) {
+ return (E) stash.put(key, value);
+ }
+
+ @Override
+ public <E> E get(Object key) {
+ return (E) stash.get(key);
+ }
+
+ @Override
+ public <E> E remove(Object key) {
+ return (E) stash.remove(key);
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/AbstractRequest.java b/core/src/main/java/com/google/net/stubby/AbstractRequest.java
new file mode 100644
index 0000000..4c75219
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/AbstractRequest.java
@@ -0,0 +1,31 @@
+package com.google.net.stubby;
+
+/**
+ * Common implementation for {@link Request} objects.
+ */
+public abstract class AbstractRequest extends AbstractOperation implements Request {
+
+ private final Response response;
+
+ /**
+ * Constructor that takes a pre-built {@link Response} and uses it's id
+ */
+ public AbstractRequest(Response response) {
+ super(response.getId());
+ this.response = response;
+ }
+
+ /**
+ * Constructor that takes a {@link Response.ResponseBuilder} to
+ * be built with the same id as this request
+ */
+ public AbstractRequest(int id, Response.ResponseBuilder responseBuilder) {
+ super(id);
+ this.response = responseBuilder.build(id);
+ }
+
+ @Override
+ public Response getResponse() {
+ return response;
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/AbstractResponse.java b/core/src/main/java/com/google/net/stubby/AbstractResponse.java
new file mode 100644
index 0000000..ddb4d39
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/AbstractResponse.java
@@ -0,0 +1,11 @@
+package com.google.net.stubby;
+
+/**
+ * Common implementation for {@link Response} objects.
+ */
+public class AbstractResponse extends AbstractOperation implements Response {
+
+ public AbstractResponse(int id) {
+ super(id);
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/DeferredInputStream.java b/core/src/main/java/com/google/net/stubby/DeferredInputStream.java
new file mode 100644
index 0000000..46f4d7b
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/DeferredInputStream.java
@@ -0,0 +1,20 @@
+package com.google.net.stubby;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Extension to {@link InputStream} to allow for deferred production of data. Allows for
+ * zero-copy conversions when the goal is to copy the contents of a resource to a
+ * stream or buffer.
+ */
+public abstract class DeferredInputStream extends InputStream {
+
+ /**
+ * Produce the entire contents of this stream to the specified target
+ *
+ * @return number of bytes written
+ */
+ public abstract int flushTo(OutputStream target) throws IOException;
+}
diff --git a/core/src/main/java/com/google/net/stubby/DeferredProtoInputStream.java b/core/src/main/java/com/google/net/stubby/DeferredProtoInputStream.java
new file mode 100644
index 0000000..9ac6846
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/DeferredProtoInputStream.java
@@ -0,0 +1,96 @@
+package com.google.net.stubby;
+
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.MessageLite;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import javax.annotation.Nullable;
+
+/**
+ * Implementation of {@link DeferredInputStream} backed by a protobuf.
+ */
+public class DeferredProtoInputStream extends DeferredInputStream {
+
+ // DeferredProtoInputStream is first initialized with a *message*. *partial* is initially null.
+ // Once there has been a read operation on this stream, *message* is serialized to *partial* and
+ // set to null.
+ @Nullable private MessageLite message;
+ @Nullable private ByteArrayInputStream partial;
+
+ public DeferredProtoInputStream(MessageLite message) {
+ this.message = message;
+ }
+
+ /**
+ * Returns the original protobuf message. Returns null after this stream has been read.
+ */
+ @Nullable
+ public MessageLite getMessage() {
+ return message;
+ }
+
+ @Override
+ public int flushTo(OutputStream target) throws IOException {
+ int written;
+ if (message != null) {
+ written = message.getSerializedSize();
+ message.writeTo(target);
+ message = null;
+ } else {
+ written = (int) ByteStreams.copy(partial, target);
+ partial = null;
+ }
+ return written;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (message != null) {
+ partial = new ByteArrayInputStream(message.toByteArray());
+ message = null;
+ }
+ if (partial != null) {
+ return partial.read();
+ }
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (message != null) {
+ int size = message.getSerializedSize();
+ if (len >= size) {
+ // This is the only case that is zero-copy.
+ CodedOutputStream stream = CodedOutputStream.newInstance(b, off, size);
+ message.writeTo(stream);
+ stream.flush();
+ stream.checkNoSpaceLeft();
+
+ message = null;
+ partial = null;
+ return size;
+ }
+
+ partial = new ByteArrayInputStream(message.toByteArray());
+ message = null;
+ }
+ if (partial != null) {
+ return partial.read(b, off, len);
+ }
+ return -1;
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (message != null) {
+ return message.getSerializedSize();
+ } else if (partial != null) {
+ return partial.available();
+ }
+ return 0;
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/GrpcFramingUtil.java b/core/src/main/java/com/google/net/stubby/GrpcFramingUtil.java
new file mode 100644
index 0000000..2ec545f
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/GrpcFramingUtil.java
@@ -0,0 +1,50 @@
+package com.google.net.stubby;
+
+/**
+ * Common constants and utilities for GRPC protocol framing.
+ * The format within the data stream provided by the transport layer is simply
+ *
+ * stream = frame+
+ * frame = frame-type framed-message
+ * frame-type = payload-type | context-type | status-type
+ * framed-message = payload | context | status
+ * payload = length <bytes>
+ * length = <uint32>
+ * context = context-key context-value
+ * context-key = length str
+ * context-value = length <bytes>
+ * status = TBD
+ *
+ * frame-type is implemented as a bitmask within a single byte
+ *
+ */
+public class GrpcFramingUtil {
+ /**
+ * Length of flags block in bytes
+ */
+ public static final int FRAME_TYPE_LENGTH = 1;
+
+ // Flags
+ public static final byte PAYLOAD_FRAME = 0x0;
+ public static final byte CONTEXT_VALUE_FRAME = 0x1;
+ public static final byte STATUS_FRAME = 0x2;
+ public static final byte RESERVED_FRAME = 0x3;
+ public static final byte FRAME_TYPE_MASK = 0x3;
+
+ /**
+ * No. of bytes for length field within a frame
+ */
+ public static final int FRAME_LENGTH = 4;
+
+ public static boolean isContextValueFrame(int flags) {
+ return (flags & FRAME_TYPE_MASK) == CONTEXT_VALUE_FRAME;
+ }
+
+ public static boolean isPayloadFrame(byte flags) {
+ return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME;
+ }
+
+ public static boolean isStatusFrame(byte flags) {
+ return (flags & FRAME_TYPE_MASK) == STATUS_FRAME;
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/Operation.java b/core/src/main/java/com/google/net/stubby/Operation.java
new file mode 100644
index 0000000..fc95eb8
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/Operation.java
@@ -0,0 +1,118 @@
+package com.google.net.stubby;
+
+import java.io.InputStream;
+
+import javax.annotation.Nullable;
+
+/**
+ * Base interface of operation implementations. Operations move through a phased execution
+ * model of
+ * HEADERS->PAYLOAD->FOOTERS->CLOSED
+ *
+ */
+public interface Operation {
+
+ public static enum Phase {
+ /**
+ * Used to communicate key-value pairs that define common context for the operation but
+ * that are not strictly part of the interface. Provided prior to delivering any formal
+ * parameters
+ */
+ HEADERS,
+ /**
+ * A sequence of delimited parameters to the called service
+ */
+ PAYLOAD,
+ /**
+ * Used to communicate key-value pairs that define common context for the call but
+ * that are not strictly part of the interface. Provided after all formal parameters have
+ * been delivered.
+ */
+ FOOTERS,
+ /**
+ * Indicates that the operation is closed and will not accept further input.
+ */
+ CLOSED
+ }
+
+ /**
+ * Unique id for this operation within the scope of the session.
+ * Should not be treated as a UUID
+ */
+ public int getId();
+
+ /**
+ * The current phase of the operation
+ */
+ public Phase getPhase();
+
+ /**
+ * Add a key-value context value.
+ * Allowed when phase = HEADERS | FOOTERS.
+ * Valid next phases
+ * HEADERS -> PAYLOAD_FRAME | FOOTERS | CLOSED
+ * FOOTERS -> CLOSED
+ * <p>
+ * The {@link InputStream} message must be entirely consumed before this call returns.
+ * Implementations should not pass references to this stream across thread boundaries without
+ * taking a copy.
+ * <p>
+ * {@code payload.available()} must return the number of remaining bytes to be read.
+ *
+ * @return this object
+ */
+ // TODO(user): Context is an incredibly general term. Consider having two signatures
+ // addHeader and addTrailer to follow HTTP nomenclature more closely.
+ public Operation addContext(String type, InputStream message, Phase nextPhase);
+
+ /**
+ * Send a payload to the receiver, indicates that more may follow.
+ * Allowed when phase = PAYLOAD_FRAME
+ * Valid next phases
+ * PAYLOAD_FRAME -> FOOTERS | CLOSED
+ * <p>
+ * The {@link InputStream} message must be entirely consumed before this call returns.
+ * Implementations should not pass references to this stream across thread boundaries without
+ * taking a copy.
+ * <p>
+ * {@code payload.available()} must return the number of remaining bytes to be read.
+ *
+ * @return this object
+ */
+ public Operation addPayload(InputStream payload, Phase nextPhase);
+
+ /**
+ * Progress to the CLOSED phase. More than one call to close is allowed as long their
+ * {@link com.google.net.stubby.Status#getCode()} agree. If they do not agree implementations
+ * should log the details of the newer status but retain the original one.
+ * <p>
+ * If an error occurs while implementing close the original passed {@link Status} should
+ * be retained if its code is not {@link com.google.net.stubby.transport.Transport.Code#OK}
+ * otherwise an appropriate {@link Status} should be formed from the error.
+ *
+ * @return this object
+ */
+ public Operation close(Status status);
+
+ /**
+ * Return the completion {@link Status} of the call or {@code null} if the operation has
+ * not yet completed.
+ */
+ @Nullable
+ public Status getStatus();
+
+ /**
+ * Store some arbitrary context with this operation
+ */
+ public <E> E put(Object key, E value);
+
+ /**
+ * Retrieve some arbitrary context from this operation
+ */
+ public <E> E get(Object key);
+
+ /**
+ * Remove some arbitrary context from this operation
+ */
+ public <E> E remove(Object key);
+}
diff --git a/core/src/main/java/com/google/net/stubby/ProtocolConstants.java b/core/src/main/java/com/google/net/stubby/ProtocolConstants.java
new file mode 100644
index 0000000..154ebe9
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/ProtocolConstants.java
@@ -0,0 +1,42 @@
+package com.google.net.stubby;
+
+/**
+ * Common constants for protocol framing. The format within the data stream is
+ *
+ * | Flags (1 byte) | flag-specific message |
+ *
+ * the flags block has the form
+ *
+ * | Reserved (5) | Compressed (1) | Frame Type (2) |
+ */
+public class ProtocolConstants {
+ /**
+ * Length of flags block
+ */
+ public static final int FLAGS_LENGTH = 1;
+
+ // Flags
+ public static final int PAYLOAD_FRAME = 0x0;
+ public static final int CONTEXT_VALUE_FRAME = 0x1;
+ public static final int RESPONSE_STATUS_FRAME = 0x2;
+ public static final int RESERVED_FRAME = 0x3;
+ public static final int FRAME_TYPE_MASK = 0x3;
+ public static final int COMPRESSED_FLAG = 0x4;
+
+ /**
+ * No. of bytes for the length of each data stream frame
+ */
+ public static final int FRAME_LENGTH = 4;
+
+ public static boolean isContextValueFrame(int flags) {
+ return (flags & FRAME_TYPE_MASK) == CONTEXT_VALUE_FRAME;
+ }
+
+ public static boolean isPayloadFrame(byte flags) {
+ return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME;
+ }
+
+ public static boolean isCompressed(int flags) {
+ return (flags & COMPRESSED_FLAG) != 0;
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/Request.java b/core/src/main/java/com/google/net/stubby/Request.java
new file mode 100644
index 0000000..e777b29
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/Request.java
@@ -0,0 +1,12 @@
+package com.google.net.stubby;
+
+/**
+ * A request {@link Operation} created by a client by calling
+ * {@link Session#startRequest(String, Response.ResponseBuilder)}
+ */
+public interface Request extends Operation {
+ /**
+ * Reference to the response operation that consumes replies to this request.
+ */
+ public Response getResponse();
+}
diff --git a/core/src/main/java/com/google/net/stubby/RequestRegistry.java b/core/src/main/java/com/google/net/stubby/RequestRegistry.java
new file mode 100644
index 0000000..23d16e6
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/RequestRegistry.java
@@ -0,0 +1,58 @@
+package com.google.net.stubby;
+
+import com.google.common.collect.MapMaker;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Registry of in-flight requests..
+ */
+public class RequestRegistry {
+
+ private final ConcurrentMap<Integer, Request> inFlight;
+
+ public RequestRegistry() {
+ inFlight = new MapMaker().concurrencyLevel(8).initialCapacity(1001).makeMap();
+ }
+
+ public void register(Request op) {
+ if (inFlight.putIfAbsent(op.getId(), op) != null) {
+ throw new IllegalArgumentException("Operation already bound for " + op.getId());
+ }
+ }
+
+ public Request lookup(int id) {
+ return inFlight.get(id);
+ }
+
+ public Request remove(int id) {
+ return inFlight.remove(id);
+ }
+
+ public Collection<Integer> getAllRequests() {
+ return Collections.unmodifiableSet(inFlight.keySet());
+ }
+
+ /**
+ * Closes any requests (and their associated responses) with the given status and removes them
+ * from the registry.
+ */
+ public void drainAllRequests(Status responseStatus) {
+ Iterator<Request> it = inFlight.values().iterator();
+ while (it.hasNext()) {
+ Request request = it.next();
+ if (request != null) {
+ if (request.getPhase() != Operation.Phase.CLOSED) {
+ request.close(responseStatus);
+ }
+ if (request.getResponse().getPhase() != Operation.Phase.CLOSED) {
+ request.getResponse().close(responseStatus);
+ }
+ }
+ it.remove();
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/Response.java b/core/src/main/java/com/google/net/stubby/Response.java
new file mode 100644
index 0000000..d06b86c
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/Response.java
@@ -0,0 +1,21 @@
+package com.google.net.stubby;
+
+/**
+ * A response {@link Operation} passed by a client to
+ * {@link Session#startRequest(String, ResponseBuilder)}
+ * when starting a remote call.
+ */
+public interface Response extends Operation {
+
+ public static interface ResponseBuilder {
+ /**
+ * Build the response with the specified id
+ */
+ public Response build(int id);
+
+ /**
+ * Build the response
+ */
+ public Response build();
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/Session.java b/core/src/main/java/com/google/net/stubby/Session.java
new file mode 100644
index 0000000..84cfb6e
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/Session.java
@@ -0,0 +1,19 @@
+package com.google.net.stubby;
+
+/**
+ * Session interface to be bound to the transport layer which is used by the higher-level
+ * layers to dispatch calls.
+ * <p>
+ * A session is used as a factory to start a named remote {@link Request} operation. The caller
+ * provides a {@link Response} operation to receive responses. Clients will make calls on the
+ * {@link Request} to send state to the server, simultaneously the transport layer will make calls
+ * into the {@link Response} as the server provides response state.
+ * <p>
+ */
+public interface Session {
+
+ /**
+ * Start a request in the context of this session.
+ */
+ public Request startRequest(String operationName, Response.ResponseBuilder responseBuilder);
+}
diff --git a/core/src/main/java/com/google/net/stubby/Status.java b/core/src/main/java/com/google/net/stubby/Status.java
new file mode 100644
index 0000000..e7f5fa9
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/Status.java
@@ -0,0 +1,105 @@
+package com.google.net.stubby;
+
+import com.google.common.base.Preconditions;
+import com.google.net.stubby.transport.Transport;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Defines the status of an operation using the canonical error space.
+ */
+@Immutable
+public class Status {
+
+ public static final Status OK = new Status(Transport.Code.OK);
+
+ private final Transport.Code code;
+ private final String description;
+ private final Throwable cause;
+
+ public Status(Transport.Code code) {
+ this(code, null, null);
+ }
+
+ public Status(Transport.Code code, @Nullable String description) {
+ this(code, description, null);
+ }
+
+ public Status(Transport.Code code, @Nullable Throwable cause) {
+ this(code, null, cause);
+ }
+
+ public Status(Transport.Code code, @Nullable String description, @Nullable Throwable cause) {
+ this.code = Preconditions.checkNotNull(code);
+ this.description = description;
+ this.cause = cause;
+ }
+
+ public Transport.Code getCode() {
+ return code;
+ }
+
+ @Nullable
+ public String getDescription() {
+ return description;
+ }
+
+ @Nullable
+ public Throwable getCause() {
+ return cause;
+ }
+
+ /**
+ * Override this status with another if allowed.
+ */
+ public Status overrideWith(Status newStatus) {
+ if (this.getCode() == Transport.Code.OK || newStatus.code == Transport.Code.OK) {
+ return this;
+ } else {
+ return newStatus;
+ }
+ }
+
+ public RuntimeException asRuntimeException() {
+ return new OperationRuntimeException(this);
+ }
+
+ public Exception asException() {
+ return new OperationException(this);
+ }
+
+ /**
+ * Exception thrown by implementations while managing an operation.
+ */
+ public static class OperationException extends Exception {
+
+ private final Status status;
+
+ public OperationException(Status status) {
+ super(status.getDescription(), status.getCause());
+ this.status = status;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+ }
+
+ /**
+ * Runtime exception thrown by implementations while managing an operation.
+ */
+ public static class OperationRuntimeException extends RuntimeException {
+
+ private final Status status;
+
+ public OperationRuntimeException(Status status) {
+ super(status.getDescription(), status.getCause());
+ this.status = status;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/http/HttpStreamDeframer.java b/core/src/main/java/com/google/net/stubby/http/HttpStreamDeframer.java
new file mode 100644
index 0000000..243a067
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http/HttpStreamDeframer.java
@@ -0,0 +1,24 @@
+package com.google.net.stubby.http;
+
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.transport.InputStreamDeframer;
+
+import java.io.InputStream;
+
+/**
+ * Simple deframer which does not have to deal with transport level framing.
+ */
+public class HttpStreamDeframer extends InputStreamDeframer {
+
+ public HttpStreamDeframer() {
+ }
+
+ @Override
+ public int deframe(InputStream frame, Operation target) {
+ int remaining = super.deframe(frame, target);
+ if (remaining > 0) {
+ throw new IllegalStateException("GRPC stream not correctly aligned");
+ }
+ return 0;
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/http/ServletSession.java b/core/src/main/java/com/google/net/stubby/http/ServletSession.java
new file mode 100644
index 0000000..fb61b28
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http/ServletSession.java
@@ -0,0 +1,299 @@
+package com.google.net.stubby.http;
+
+import com.google.common.io.ByteBuffers;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.net.stubby.AbstractResponse;
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Request;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.Session;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Framer;
+import com.google.net.stubby.transport.MessageFramer;
+import com.google.net.stubby.transport.Transport;
+import com.google.net.stubby.transport.TransportFrameUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * A server-only session to be used with a servlet engine. The wrapped session MUST use a
+ * same-thread executor for work-dispatch
+ */
+// TODO(user) Support a more flexible threading model than same-thread
+// TODO(user) Investigate Servlet3 compliance, in particular thread detaching
+public class ServletSession extends HttpServlet {
+
+ public static final String PROTORPC = "application/protorpc";
+ public static final String CONTENT_TYPE = "content-type";
+
+ private final Session session;
+ private final Executor executor;
+
+ public ServletSession(Session session, Executor executor) {
+ this.session = session;
+ this.executor = executor;
+ }
+
+ @Override
+ public String getServletName() {
+ return "gRPCServlet";
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException,
+ IOException {
+ try {
+ final SettableFuture<Void> requestCompleteFuture = SettableFuture.create();
+ final ResponseStream responseStream = new ResponseStream(resp, requestCompleteFuture);
+ final Request request = startRequest(req, resp, responseStream);
+ if (request == null) {
+ return;
+ }
+
+ // Deframe the request and begin the response processing.
+ new HttpStreamDeframer().deframe(req.getInputStream(), request);
+ request.close(Status.OK);
+
+ // Notify the response processing that the request is complete.
+ requestCompleteFuture.set(null);
+
+ // Block until the response is complete.
+ responseStream.getResponseCompleteFuture().get();
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Start the Request operation on the server
+ */
+ private Request startRequest(HttpServletRequest req, HttpServletResponse resp,
+ ResponseStream responseStream) throws IOException {
+ // TODO(user): Move into shared utility
+ if (!PROTORPC.equals(req.getHeader(CONTENT_TYPE))) {
+ resp.sendError(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE,
+ "The only supported content-type is " + PROTORPC);
+ return null;
+ }
+ // Use Path to specify the operation
+ String operationName = normalizeOperationName(req.getPathInfo());
+ if (operationName == null) {
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND);
+ return null;
+ }
+ // Create the operation and bind an HTTP response operation
+ Request op = session.startRequest(operationName, HttpResponseOperation.builder(responseStream));
+ if (op == null) {
+ // TODO(user): Unify error handling once spec finalized
+ resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Unknown RPC operation");
+ return null;
+ }
+ return op;
+ }
+
+ private String normalizeOperationName(String path) {
+ // TODO(user): This is where we would add path-namespacing of different implementations
+ // of services so they do not collide. For the moment this is not supported.
+ return path.substring(1);
+ }
+
+ /**
+ * Implementation of {@link Response}
+ */
+ private static class HttpResponseOperation extends AbstractResponse implements Framer.Sink {
+
+ static ResponseBuilder builder(final ResponseStream responseStream) {
+ return new ResponseBuilder() {
+ @Override
+ public Response build(int id) {
+ return new HttpResponseOperation(id, responseStream);
+ }
+
+ @Override
+ public Response build() {
+ return new HttpResponseOperation(-1, responseStream);
+ }
+ };
+ }
+
+ private final MessageFramer framer;
+ private final ResponseStream responseStream;
+
+ private HttpResponseOperation(int id, ResponseStream responseStream) {
+ super(id);
+ this.responseStream = responseStream;
+ // Always use no compression framing and treat the stream as one large frame
+ framer = new MessageFramer(4096);
+ framer.setAllowCompression(false);
+ try {
+ responseStream.write(TransportFrameUtil.NO_COMPRESS_FLAG);
+ } catch (IOException ioe) {
+ close(new Status(Transport.Code.INTERNAL, ioe));
+ }
+ }
+
+ @Override
+ public Operation addContext(String type, InputStream message, Phase nextPhase) {
+ super.addContext(type, message, nextPhase);
+ framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
+ return this;
+ }
+
+ @Override
+ public Operation addPayload(InputStream payload, Phase nextPhase) {
+ super.addPayload(payload, Phase.PAYLOAD);
+ framer.writePayload(payload, false, this);
+ if (nextPhase == Phase.CLOSED) {
+ close(Status.OK);
+ }
+ return this;
+ }
+
+ @Override
+ public Operation close(Status status) {
+ boolean alreadyClosed = getPhase() == Phase.CLOSED;
+ super.close(status);
+ if (!alreadyClosed) {
+ framer.writeStatus(status, true, this);
+ }
+ return this;
+ }
+
+ @Override
+ public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
+ boolean closed = getPhase() == Phase.CLOSED;
+ try {
+ // Skip the frame flag as we don't care about it for streaming output
+ frame.position(1);
+ ByteBuffers.asByteSource(frame).copyTo(responseStream);
+ } catch (Throwable t) {
+ close(new Status(Transport.Code.INTERNAL, t));
+ } finally {
+ if (closed && endOfMessage) {
+ framer.close();
+ responseStream.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Wraps the HTTP response {@link OutputStream}. Will buffer bytes until the request is
+ * complete. It will then flush its buffer to the output stream and all subsequent writes
+ * will go directly to the HTTP response.
+ */
+ private class ResponseStream extends OutputStream {
+ private final SettableFuture<Void> responseCompleteFuture = SettableFuture.create();
+ private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ private final HttpServletResponse httpResponse;
+ private volatile boolean requestComplete;
+ private volatile boolean closed;
+
+ public ResponseStream(HttpServletResponse httpResponse,
+ SettableFuture<Void> requestCompleteFuture) {
+ this.httpResponse = httpResponse;
+ httpResponse.setHeader("Content-Type", PROTORPC);
+
+ requestCompleteFuture.addListener(new Runnable() {
+ @Override
+ public void run() {
+ onRequestComplete();
+ }
+ }, executor);
+ }
+
+ public ListenableFuture<Void> getResponseCompleteFuture() {
+ return responseCompleteFuture;
+ }
+
+ @Override
+ public void close() {
+ synchronized (buffer) {
+ closed = true;
+
+ // If all the data has been written to the output stream, finish the response.
+ if (buffer.size() == 0) {
+ finish();
+ }
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ write(new byte[] {(byte) b}, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ // This loop will execute at most 2 times.
+ while (true) {
+ if (requestComplete) {
+ // The request already completed, just write directly to the response output stream.
+ httpResponse.getOutputStream().write(b, off, len);
+ return;
+ }
+
+ synchronized (buffer) {
+ if (requestComplete) {
+ // Handle the case that we completed the request just after the first check
+ // above. Just go back to the top of the loop and write directly to the response.
+ continue;
+ }
+
+ // Request hasn't completed yet, buffer the data for now.
+ buffer.write(b, off, len);
+ return;
+ }
+ }
+ }
+
+ private void onRequestComplete() {
+ try {
+ // Write the content of the buffer to the HTTP response.
+ synchronized (buffer) {
+ if (buffer.size() > 0) {
+ httpResponse.getOutputStream().write(buffer.toByteArray());
+ buffer.reset();
+ }
+ requestComplete = true;
+
+ if (closed) {
+ // The response is complete, finish the response.
+ finish();
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Closes the HTTP response and sets the future response completion future.
+ */
+ private void finish() {
+ try {
+ httpResponse.getOutputStream().close();
+ responseCompleteFuture.set(null);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java b/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java
new file mode 100644
index 0000000..96c5b7d
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java
@@ -0,0 +1,98 @@
+package com.google.net.stubby.http;
+
+import com.google.common.io.ByteBuffers;
+import com.google.net.stubby.AbstractRequest;
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.Session;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Framer;
+import com.google.net.stubby.transport.MessageFramer;
+import com.google.net.stubby.transport.Transport;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of {@link Session} using {@link HttpURLConnection} for clients. Services
+ * are dispatched relative to a base URI.
+ */
+public class UrlConnectionClientSession implements Session {
+
+ private final URI base;
+
+ public UrlConnectionClientSession(URI base) {
+ this.base = base;
+ }
+
+ @Override
+ public Request startRequest(String operationName, Response.ResponseBuilder responseBuilder) {
+ return new Request(base.resolve(operationName), responseBuilder.build());
+ }
+
+ private class Request extends AbstractRequest implements Framer.Sink {
+
+ private final HttpURLConnection connection;
+ private final DataOutputStream outputStream;
+ private final MessageFramer framer;
+
+ private Request(URI uri, Response response) {
+ super(response);
+ try {
+ connection = (HttpURLConnection) uri.toURL().openConnection();
+ connection.setDoOutput(true);
+ connection.setDoInput(true);
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/protorpc");
+ outputStream = new DataOutputStream(connection.getOutputStream());
+ } catch (IOException t) {
+ throw new RuntimeException(t);
+ }
+ // No compression when framing over HTTP for the moment
+ framer = new MessageFramer(4096);
+ framer.setAllowCompression(false);
+ }
+
+ @Override
+ public Operation addContext(String type, InputStream message, Phase nextPhase) {
+ super.addContext(type, message, nextPhase);
+ framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
+ return this;
+ }
+
+ @Override
+ public Operation addPayload(InputStream payload, Phase nextPhase) {
+ super.addPayload(payload, nextPhase);
+ framer.writePayload(payload, getPhase() == Phase.CLOSED, this);
+ return this;
+ }
+
+ @Override
+ public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
+ boolean closed = getPhase() == Phase.CLOSED;
+
+ try {
+ ByteBuffers.asByteSource(frame).copyTo(outputStream);
+ if (closed && endOfMessage) {
+ connection.getOutputStream().close();
+ // The request has completed so now process the response. Must do this in the same
+ // thread as URLConnection has threading issues.
+ new HttpStreamDeframer().deframe(connection.getInputStream(), getResponse());
+ connection.getInputStream().close();
+ connection.disconnect();
+ }
+ } catch (IOException ioe) {
+ close(new Status(Transport.Code.INTERNAL, ioe));
+ } finally {
+ if (closed && endOfMessage) {
+ framer.close();
+ }
+ }
+ }
+ }
+}
+
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java b/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java
new file mode 100644
index 0000000..57a703a
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java
@@ -0,0 +1,70 @@
+package com.google.net.stubby.http2.netty;
+
+import com.google.net.stubby.transport.Deframer;
+import com.google.net.stubby.transport.TransportFrameUtil;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteOrder;
+
+/**
+ * Parse a sequence of {@link ByteBuf} instances that represent the frames of a GRPC call
+ *
+ * TODO(user): This is essentially a duplicate of the spdy deframer. Should find a way to
+ * share common code.
+ */
+public class ByteBufDeframer extends Deframer<ByteBuf> {
+
+ private final CompositeByteBuf buffer;
+
+ public ByteBufDeframer() {
+ this(UnpooledByteBufAllocator.DEFAULT);
+ }
+
+ public ByteBufDeframer(ByteBufAllocator alloc) {
+ buffer = alloc.compositeBuffer();
+ }
+
+ public void dispose() {
+ // Remove the components from the composite buffer. This should set the reference
+ // count on all buffers to zero.
+ buffer.removeComponents(0, buffer.numComponents());
+
+ // Release the composite buffer
+ buffer.release();
+ }
+
+ @Override
+ protected DataInputStream prefix(ByteBuf frame) throws IOException {
+ buffer.addComponent(frame);
+ buffer.writerIndex(buffer.writerIndex() + frame.writerIndex() - frame.readerIndex());
+ return new DataInputStream(new ByteBufInputStream(buffer));
+ }
+
+ @Override
+ protected int consolidate() {
+ buffer.consolidate();
+ return buffer.readableBytes();
+ }
+
+ @Override
+ protected ByteBuf decompress(ByteBuf frame) throws IOException {
+ frame = frame.order(ByteOrder.BIG_ENDIAN);
+ int compressionType = frame.readUnsignedByte();
+ int frameLength = frame.readUnsignedMedium();
+ if (frameLength != frame.readableBytes()) {
+ throw new IllegalArgumentException("GRPC and buffer lengths misaligned. Frame length="
+ + frameLength + ", readableBytes=" + frame.readableBytes());
+ }
+ if (TransportFrameUtil.isNotCompressed(compressionType)) {
+ return frame;
+ }
+ throw new IOException("Unknown compression type " + compressionType);
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
new file mode 100644
index 0000000..11698d6
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
@@ -0,0 +1,99 @@
+package com.google.net.stubby.http2.netty;
+
+import com.google.common.base.Throwables;
+import com.google.net.stubby.RequestRegistry;
+import com.google.net.stubby.Session;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http2.draft10.frame.Http2FrameCodec;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * Simple client connection startup that creates a {@link Http2Session} for use
+ * with protocol bindings.
+ */
+public class Http2Client {
+ private final String host;
+ private final int port;
+ private final RequestRegistry requestRegistry;
+ private ChannelFuture channelFuture;
+ private final SSLEngine sslEngine;
+
+ public Http2Client(String host, int port, RequestRegistry requestRegistry) {
+ this(host, port, requestRegistry, null);
+ }
+
+ public Http2Client(String host, int port, RequestRegistry requestRegistry,
+ @Nullable SSLEngine sslEngine) {
+ this.host = host;
+ this.port = port;
+ this.requestRegistry = requestRegistry;
+ this.sslEngine = sslEngine;
+ // TODO(user): NPN support
+ if (sslEngine != null) {
+ sslEngine.setUseClientMode(true);
+ }
+ }
+
+ public Session startAndWait() {
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ Bootstrap b = new Bootstrap(); // (1)
+ b.group(workerGroup); // (2)
+ b.channel(NioSocketChannel.class); // (3)
+ b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
+ // TODO(user): Evaluate use of pooled allocator
+ b.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
+ b.handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ if (sslEngine != null) {
+ // Assume TLS when using SSL
+ ch.pipeline().addLast(new SslHandler(sslEngine, false));
+ }
+ ch.pipeline().addLast(
+ new Http2FrameCodec(),
+ new Http2Codec(requestRegistry));
+ }
+ });
+ // Start the client.
+ channelFuture = b.connect(host, port);
+ // Wait for the connection
+ channelFuture.sync(); // (5)
+ ChannelFuture closeFuture = channelFuture.channel().closeFuture();
+ closeFuture.addListener(new WorkerCleanupListener(workerGroup));
+ return new Http2Session(channelFuture.channel(), requestRegistry);
+ } catch (Throwable t) {
+ workerGroup.shutdownGracefully();
+ throw Throwables.propagate(t);
+ }
+ }
+
+ private static class WorkerCleanupListener
+ implements GenericFutureListener<io.netty.util.concurrent.Future<Void>> {
+ private final EventLoopGroup workerGroup;
+
+ public WorkerCleanupListener(EventLoopGroup workerGroup) {
+ this.workerGroup = workerGroup;
+ }
+
+ @Override
+ public void operationComplete(io.netty.util.concurrent.Future<Void> future) throws Exception {
+ workerGroup.shutdownGracefully();
+ }
+ }
+
+
+}
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
new file mode 100644
index 0000000..d0ad944
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
@@ -0,0 +1,226 @@
+package com.google.net.stubby.http2.netty;
+
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Operation.Phase;
+import com.google.net.stubby.Request;
+import com.google.net.stubby.RequestRegistry;
+import com.google.net.stubby.Session;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.MessageFramer;
+import com.google.net.stubby.transport.Transport;
+import com.google.net.stubby.transport.Transport.Code;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.draft10.Http2Error;
+import io.netty.handler.codec.http2.draft10.Http2Headers;
+import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2RstStreamFrame;
+import io.netty.handler.codec.http2.draft10.frame.Http2DataFrame;
+import io.netty.handler.codec.http2.draft10.frame.Http2HeadersFrame;
+import io.netty.handler.codec.http2.draft10.frame.Http2RstStreamFrame;
+import io.netty.handler.codec.http2.draft10.frame.Http2StreamFrame;
+
+/**
+ * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
+ * request-response dialog
+ */
+public class Http2Codec extends ChannelHandlerAdapter {
+
+ private final boolean client;
+ private final RequestRegistry requestRegistry;
+ private final Session session;
+ private ByteBufAllocator alloc;
+
+ /**
+ * Constructor used by servers, takes a session which will receive operation events.
+ */
+ public Http2Codec(Session session, RequestRegistry requestRegistry) {
+ this.client = false;
+ this.session = session;
+ this.requestRegistry = requestRegistry;
+ }
+
+ /**
+ * Constructor used by clients to send operations to a remote server
+ */
+ public Http2Codec(RequestRegistry requestRegistry) {
+ this.client = true;
+ this.session = null;
+ this.requestRegistry = requestRegistry;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // Abort any active requests.
+ requestRegistry.drainAllRequests(new Status(Transport.Code.ABORTED));
+
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (!(msg instanceof Http2StreamFrame)) {
+ return;
+ }
+ this.alloc = ctx.alloc();
+ Http2StreamFrame frame = (Http2StreamFrame) msg;
+ Request operation = requestRegistry.lookup(frame.getStreamId());
+ try {
+ if (operation == null) {
+ if (client) {
+ // For clients an operation must already exist in the registry
+ throw new IllegalStateException("Response operation must already be bound");
+ } else {
+ operation = serverStart(ctx, frame);
+ if (operation == null) {
+ // Unknown operation, refuse the stream
+ sendRstStream(ctx, frame.getStreamId(), Http2Error.REFUSED_STREAM);
+ }
+ }
+ } else {
+ // Consume the frame
+ progress(client ? operation.getResponse() : operation, frame);
+ }
+ } catch (Throwable e) {
+ closeWithInternalError(operation, e);
+ sendRstStream(ctx, frame.getStreamId(), Http2Error.INTERNAL_ERROR);
+ throw e;
+ }
+ }
+
+ /**
+ * Closes the request and its associate response with an internal error.
+ */
+ private void closeWithInternalError(Request request, Throwable e) {
+ if (request != null) {
+ Status status = new Status(Code.INTERNAL, e);
+ request.close(status);
+ request.getResponse().close(status);
+ requestRegistry.remove(request.getId());
+ }
+ }
+
+ /**
+ * Writes the HTTP/2 RST Stream frame to the remote endpoint, indicating a stream failure.
+ */
+ private void sendRstStream(ChannelHandlerContext ctx, int streamId, Http2Error error) {
+ DefaultHttp2RstStreamFrame frame = new DefaultHttp2RstStreamFrame.Builder()
+ .setStreamId(streamId).setErrorCode(error.getCode()).build();
+ ctx.writeAndFlush(frame);
+ }
+
+ /**
+ * Start the Request operation on the server
+ */
+ private Request serverStart(ChannelHandlerContext ctx, Http2StreamFrame frame) {
+ if (!(frame instanceof Http2HeadersFrame)) {
+ // TODO(user): Better error detail to client here
+ return null;
+ }
+ Http2HeadersFrame headers = (Http2HeadersFrame) frame;
+ if (!Http2Session.PROTORPC.equals(headers.getHeaders().get("content-type"))) {
+ return null;
+ }
+ // Use Path to specify the operation
+ String operationName =
+ normalizeOperationName(headers.getHeaders().get(Http2Headers.HttpName.PATH.value()));
+ if (operationName == null) {
+ return null;
+ }
+ // Create the operation and bind a HTTP2 response operation
+ Request op = session.startRequest(operationName,
+ Http2Response.builder(frame.getStreamId(), ctx.channel(), new MessageFramer(4096)));
+ if (op == null) {
+ return null;
+ }
+ requestRegistry.register(op);
+ // Immediately deframe the remaining headers in the frame
+ progressHeaders(op, (Http2HeadersFrame) frame);
+ return op;
+ }
+
+ // TODO(user): This needs proper namespacing support, this is currently just a hack
+ private static String normalizeOperationName(String path) {
+ return path.substring(1);
+ }
+
+
+ /**
+ * Consume a received frame
+ */
+ private void progress(Operation operation, Http2StreamFrame frame) {
+ if (frame instanceof Http2HeadersFrame) {
+ progressHeaders(operation, (Http2HeadersFrame) frame);
+ } else if (frame instanceof Http2DataFrame) {
+ progressPayload(operation, (Http2DataFrame) frame);
+ } else if (frame instanceof Http2RstStreamFrame) {
+ // Cancel
+ operation.close(null);
+ finish(operation);
+ } else {
+ // TODO(user): More refined handling for PING, GO_AWAY, SYN_STREAM, WINDOW_UPDATE, SETTINGS
+ operation.close(null);
+ finish(operation);
+ }
+ }
+
+ /**
+ * Consume headers in the frame. Any header starting with ':' is considered reserved
+ */
+ private void progressHeaders(Operation operation, Http2HeadersFrame frame) {
+ // TODO(user): Currently we do not do anything with HTTP2 headers
+ if (frame.isEndOfStream()) {
+ finish(operation);
+ }
+ }
+
+ private void progressPayload(Operation operation, Http2DataFrame frame) {
+ try {
+
+ // Copy the data buffer.
+ // TODO(user): Need to decide whether to use pooling or not.
+ ByteBuf dataCopy = frame.content().copy();
+
+ if (operation == null) {
+ return;
+ }
+ ByteBufDeframer deframer = getOrCreateDeframer(operation);
+ deframer.deframe(dataCopy, operation);
+ if (frame.isEndOfStream()) {
+ finish(operation);
+ }
+
+ } finally {
+ frame.release();
+ }
+ }
+
+ /**
+ * Called when a HTTP2 stream is closed.
+ */
+ private void finish(Operation operation) {
+ disposeDeframer(operation);
+ requestRegistry.remove(operation.getId());
+ if (operation.getPhase() != Phase.CLOSED) {
+ operation.close(Status.OK);
+ }
+ }
+
+ public ByteBufDeframer getOrCreateDeframer(Operation operation) {
+ ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
+ if (deframer == null) {
+ deframer = new ByteBufDeframer(alloc);
+ operation.put(ByteBufDeframer.class, deframer);
+ }
+ return deframer;
+ }
+
+ public void disposeDeframer(Operation operation) {
+ ByteBufDeframer deframer = operation.remove(ByteBufDeframer.class);
+ if (deframer != null) {
+ deframer.dispose();
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java
new file mode 100644
index 0000000..deec966
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java
@@ -0,0 +1,64 @@
+package com.google.net.stubby.http2.netty;
+
+import com.google.net.stubby.AbstractOperation;
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Framer;
+import com.google.net.stubby.transport.Transport;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2DataFrame;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Base implementation of {@link Operation} that writes HTTP2 frames
+ */
+abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
+
+ protected final Framer framer;
+ private final Channel channel;
+
+ Http2Operation(int streamId, Channel channel, Framer framer) {
+ super(streamId);
+ this.channel = channel;
+ this.framer = framer;
+ }
+
+ @Override
+ public Operation addContext(String type, InputStream message, Phase nextPhase) {
+ super.addContext(type, message, nextPhase);
+ framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
+ return this;
+ }
+
+ @Override
+ public Operation addPayload(InputStream payload, Phase nextPhase) {
+ super.addPayload(payload, nextPhase);
+ framer.writePayload(payload, getPhase() == Phase.CLOSED, this);
+ return this;
+ }
+
+ @Override
+ public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
+ boolean closed = getPhase() == Phase.CLOSED;
+ DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame.Builder().setStreamId(getId())
+ .setContent(Unpooled.wrappedBuffer(frame)).setEndOfStream(closed).build();
+ try {
+ ChannelFuture channelFuture = channel.writeAndFlush(dataFrame);
+ if (!closed) {
+ // Sync for all except the last frame to prevent buffer corruption.
+ channelFuture.get();
+ }
+ } catch (Exception e) {
+ close(new Status(Transport.Code.INTERNAL, e));
+ } finally {
+ if (closed) {
+ framer.close();
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
new file mode 100644
index 0000000..f62449d
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java
@@ -0,0 +1,55 @@
+package com.google.net.stubby.http2.netty;
+
+import com.google.net.stubby.Request;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.transport.Framer;
+
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http2.draft10.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.draft10.Http2Headers;
+import io.netty.handler.codec.http2.draft10.frame.DefaultHttp2HeadersFrame;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A SPDY based implementation of {@link Request}
+ */
+class Http2Request extends Http2Operation implements Request {
+
+ // TODO(user): Inject this
+ private static final String HOST_NAME;
+ static {
+ String hostName;
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException uhe) {
+ hostName = "localhost";
+ }
+ HOST_NAME = hostName;
+ }
+
+ private static DefaultHttp2HeadersFrame createHeadersFrame(int id, String operationName) {
+ Http2Headers headers = DefaultHttp2Headers.newBuilder()
+ .setMethod("POST")
+ .setPath("/" + operationName)
+ .setAuthority(HOST_NAME)
+ .setScheme("https")
+ .add("content-type", Http2Session.PROTORPC)
+ .build();
+ return new DefaultHttp2HeadersFrame.Builder().setStreamId(id).setHeaders(headers).build();
+ }
+
+ private final Response response;
+
+ public Http2Request(Response response, Channel channel, String operationName, Framer framer) {
+ super(response.getId(), channel, framer);
+ channel.write(createHeadersFrame(response.getId(), operationName));
+ this.response = response;
+ }
+
+ @Override
+ public Response getResponse() {
+ return response;
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java
new file mode 100644
index 0000000..ee8b2fa
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java
@@ -0,0 +1,42 @@
+package com.google.net.stubby.http2.netty;
+
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Framer;
+
+import io.netty.channel.Channel;
+
+/**
+ * A SPDY based implementation of a {@link Response}.
+ */
+class Http2Response extends Http2Operation implements Response {
+
+ public static ResponseBuilder builder(final int id, final Channel channel, final Framer framer) {
+ return new ResponseBuilder() {
+ @Override
+ public Response build(int id) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Response build() {
+ return new Http2Response(id, channel, framer);
+ }
+ };
+ }
+
+ @Override
+ public Operation close(Status status) {
+ boolean alreadyClosed = getPhase() == Phase.CLOSED;
+ super.close(status);
+ if (!alreadyClosed) {
+ framer.writeStatus(status, true, this);
+ }
+ return this;
+ }
+
+ private Http2Response(int id, Channel channel, Framer framer) {
+ super(id, channel, framer);
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Server.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Server.java
new file mode 100644
index 0000000..265a908
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Server.java
@@ -0,0 +1,69 @@
+package com.google.net.stubby.http2.netty;
+
+import com.google.net.stubby.RequestRegistry;
+import com.google.net.stubby.Session;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http2.draft10.frame.Http2FrameCodec;
+
+/**
+ * Simple server connection startup that attaches a {@link Session} implementation to a connection.
+ */
+public class Http2Server implements Runnable {
+ private final int port;
+ private final Session session;
+ private final RequestRegistry operations;
+ private Channel channel;
+
+ public Http2Server(int port, Session session, RequestRegistry operations) {
+ this.port = port;
+ this.session = session;
+ this.operations = operations;
+ }
+
+ @Override
+ public void run() {
+ EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap(); // (2)
+ // TODO(user): Evaluate use of pooled allocator
+ b.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
+ b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
+ .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(new Http2FrameCodec(), new Http2Codec(session, operations));
+ }
+ }).option(ChannelOption.SO_BACKLOG, 128) // (5)
+ .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
+
+ // Bind and startContext to accept incoming connections.
+ ChannelFuture f = b.bind(port).sync(); // (7)
+
+ // Wait until the server socket is closed.
+ channel = f.channel();
+ channel.closeFuture().sync();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
+ }
+ }
+
+ public void stop() throws Exception {
+ if (channel != null) {
+ channel.close().get();
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
new file mode 100644
index 0000000..225bb1b
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java
@@ -0,0 +1,44 @@
+package com.google.net.stubby.http2.netty;
+
+import com.google.net.stubby.Request;
+import com.google.net.stubby.RequestRegistry;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.Session;
+import com.google.net.stubby.transport.MessageFramer;
+
+import io.netty.channel.Channel;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An implementation of {@link Session} that can be used by clients to start
+ * a {@link Request}
+ */
+class Http2Session implements Session {
+
+ public static final String PROTORPC = "application/protorpc";
+
+ private final Channel channel;
+ private final RequestRegistry requestRegistry;
+ private AtomicInteger streamId;
+
+ public Http2Session(Channel channel, RequestRegistry requestRegistry) {
+ this.channel = channel;
+ this.requestRegistry = requestRegistry;
+ // Clients are odd numbers starting at 3. A value of 1 is reserved for the upgrade protocol.
+ streamId = new AtomicInteger(3);
+ }
+
+ private int getNextStreamId() {
+ return streamId.getAndAdd(2);
+ }
+
+ @Override
+ public Request startRequest(String operationName, Response.ResponseBuilder response) {
+ int nextSessionId = getNextStreamId();
+ Request operation = new Http2Request(response.build(nextSessionId), channel, operationName,
+ new MessageFramer(4096));
+ requestRegistry.register(operation);
+ return operation;
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/ByteBufDeframer.java b/core/src/main/java/com/google/net/stubby/spdy/netty/ByteBufDeframer.java
new file mode 100644
index 0000000..5863e8c
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/netty/ByteBufDeframer.java
@@ -0,0 +1,53 @@
+package com.google.net.stubby.spdy.netty;
+
+import com.google.net.stubby.transport.Deframer;
+import com.google.net.stubby.transport.TransportFrameUtil;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteOrder;
+
+/**
+ * Parse a sequence of {@link ByteBuf} instances that represent the frames of a GRPC call
+ */
+public class ByteBufDeframer extends Deframer<ByteBuf> {
+
+ private final CompositeByteBuf buffer;
+
+ public ByteBufDeframer() {
+ buffer = Unpooled.compositeBuffer();
+ }
+
+ @Override
+ protected DataInputStream prefix(ByteBuf frame) throws IOException {
+ buffer.addComponent(frame);
+ buffer.writerIndex(buffer.writerIndex() + frame.writerIndex() - frame.readerIndex());
+ return new DataInputStream(new ByteBufInputStream(buffer));
+ }
+
+ @Override
+ protected int consolidate() {
+ buffer.consolidate();
+ return buffer.readableBytes();
+ }
+
+ @Override
+ protected ByteBuf decompress(ByteBuf frame) throws IOException {
+ frame = frame.order(ByteOrder.BIG_ENDIAN);
+ int compressionType = frame.readUnsignedByte();
+ int frameLength = frame.readUnsignedMedium();
+ if (frameLength != frame.readableBytes()) {
+ throw new IllegalArgumentException("GRPC and buffer lengths misaligned. Frame length="
+ + frameLength + ", readableBytes=" + frame.readableBytes());
+ }
+ if (TransportFrameUtil.isNotCompressed(compressionType)) {
+ return frame;
+ }
+ throw new IOException("Unknown compression type " + compressionType);
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyClient.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyClient.java
new file mode 100644
index 0000000..1b6f8cb
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyClient.java
@@ -0,0 +1,100 @@
+package com.google.net.stubby.spdy.netty;
+
+import com.google.common.base.Throwables;
+import com.google.net.stubby.RequestRegistry;
+import com.google.net.stubby.Session;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.spdy.SpdyFrameCodec;
+import io.netty.handler.codec.spdy.SpdyVersion;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * Simple client connection startup that creates a {@link SpdySession} for use
+ * with protocol bindings.
+ */
+public class SpdyClient {
+ private final String host;
+ private final int port;
+ private final RequestRegistry requestRegistry;
+ private ChannelFuture channelFuture;
+ private final SSLEngine sslEngine;
+
+ public SpdyClient(String host, int port, RequestRegistry requestRegistry) {
+ this(host, port, requestRegistry, null);
+ }
+
+ public SpdyClient(String host, int port, RequestRegistry requestRegistry,
+ @Nullable SSLEngine sslEngine) {
+ this.host = host;
+ this.port = port;
+ this.requestRegistry = requestRegistry;
+ this.sslEngine = sslEngine;
+ // TODO(user): NPN support
+ if (sslEngine != null) {
+ sslEngine.setUseClientMode(true);
+ }
+ }
+
+ public Session startAndWait() {
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ Bootstrap b = new Bootstrap(); // (1)
+ b.group(workerGroup); // (2)
+ b.channel(NioSocketChannel.class); // (3)
+ b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
+ // TODO(user): Evaluate use of pooled allocator
+ b.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
+ b.handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ if (sslEngine != null) {
+ // Assume TLS when using SSL
+ ch.pipeline().addLast(new SslHandler(sslEngine, false));
+ }
+ ch.pipeline().addLast(
+ new SpdyFrameCodec(SpdyVersion.SPDY_3_1),
+ new SpdyCodec(requestRegistry));
+ }
+ });
+ // Start the client.
+ channelFuture = b.connect(host, port);
+ // Wait for the connection
+ channelFuture.sync(); // (5)
+ ChannelFuture closeFuture = channelFuture.channel().closeFuture();
+ closeFuture.addListener(new WorkerCleanupListener(workerGroup));
+ return new SpdySession(channelFuture.channel(), requestRegistry);
+ } catch (Throwable t) {
+ workerGroup.shutdownGracefully();
+ throw Throwables.propagate(t);
+ }
+ }
+
+ private static class WorkerCleanupListener
+ implements GenericFutureListener<io.netty.util.concurrent.Future<Void>> {
+ private final EventLoopGroup workerGroup;
+
+ public WorkerCleanupListener(EventLoopGroup workerGroup) {
+ this.workerGroup = workerGroup;
+ }
+
+ @Override
+ public void operationComplete(io.netty.util.concurrent.Future<Void> future) throws Exception {
+ workerGroup.shutdownGracefully();
+ }
+ }
+
+
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyCodec.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyCodec.java
new file mode 100644
index 0000000..bd45c0a
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyCodec.java
@@ -0,0 +1,199 @@
+package com.google.net.stubby.spdy.netty;
+
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Operation.Phase;
+import com.google.net.stubby.Request;
+import com.google.net.stubby.RequestRegistry;
+import com.google.net.stubby.Session;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.MessageFramer;
+import com.google.net.stubby.transport.Transport;
+import com.google.net.stubby.transport.Transport.Code;
+
+import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.spdy.DefaultSpdyRstStreamFrame;
+import io.netty.handler.codec.spdy.SpdyDataFrame;
+import io.netty.handler.codec.spdy.SpdyHeaders;
+import io.netty.handler.codec.spdy.SpdyHeadersFrame;
+import io.netty.handler.codec.spdy.SpdyRstStreamFrame;
+import io.netty.handler.codec.spdy.SpdyStreamFrame;
+import io.netty.handler.codec.spdy.SpdyStreamStatus;
+import io.netty.handler.codec.spdy.SpdySynStreamFrame;
+
+/**
+ * Codec used by clients and servers to interpret SPDY frames in the context of an ongoing
+ * request-response dialog
+ */
+public class SpdyCodec extends ChannelHandlerAdapter {
+
+ private final boolean client;
+ private final RequestRegistry requestRegistry;
+ private final Session session;
+
+ /**
+ * Constructor used by servers, takes a session which will receive operation events.
+ */
+ public SpdyCodec(Session session, RequestRegistry requestRegistry) {
+ this.client = false;
+ this.session = session;
+ this.requestRegistry = requestRegistry;
+ }
+
+ /**
+ * Constructor used by clients to send operations to a remote server
+ */
+ public SpdyCodec(RequestRegistry requestRegistry) {
+ this.client = true;
+ this.session = null;
+ this.requestRegistry = requestRegistry;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // Abort any active requests.
+ requestRegistry.drainAllRequests(new Status(Transport.Code.ABORTED));
+
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (!(msg instanceof SpdyStreamFrame)) {
+ return;
+ }
+ SpdyStreamFrame frame = (SpdyStreamFrame) msg;
+ Request operation = requestRegistry.lookup(frame.getStreamId());
+ try {
+ if (operation == null) {
+ if (client) {
+ // For clients an operation must already exist in the registry
+ throw new IllegalStateException("Response operation must already be bound");
+ } else {
+ operation = serverStart(ctx, frame);
+ if (operation == null) {
+ // Unknown operation, refuse the stream
+ sendRstStream(ctx, frame.getStreamId(), SpdyStreamStatus.REFUSED_STREAM);
+ }
+ }
+ } else {
+ // Consume the frame
+ progress(client ? operation.getResponse() : operation, frame);
+ }
+ } catch (Throwable e) {
+ closeWithInternalError(operation, e);
+ sendRstStream(ctx, frame.getStreamId(), SpdyStreamStatus.INTERNAL_ERROR);
+ throw e;
+ }
+ }
+
+ /**
+ * Closes the request and its associate response with an internal error.
+ */
+ private void closeWithInternalError(Request request, Throwable e) {
+ if (request != null) {
+ Status status = new Status(Code.INTERNAL, e);
+ request.close(status);
+ request.getResponse().close(status);
+ requestRegistry.remove(request.getId());
+ }
+ }
+
+ /**
+ * Writes the Spdy RST Stream frame to the remote endpoint, indicating a stream failure.
+ */
+ private void sendRstStream(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
+ DefaultSpdyRstStreamFrame frame = new DefaultSpdyRstStreamFrame(streamId, status.getCode());
+ ctx.writeAndFlush(frame);
+ }
+
+ /**
+ * Start the Request operation on the server
+ */
+ private Request serverStart(ChannelHandlerContext ctx, SpdyStreamFrame frame) {
+ if (!(frame instanceof SpdySynStreamFrame)) {
+ // TODO(user): Better error detail to client here
+ return null;
+ }
+ SpdySynStreamFrame headers = (SpdySynStreamFrame) frame;
+ if (!SpdySession.PROTORPC.equals(headers.headers().get("content-type"))) {
+ return null;
+ }
+ // Use Path to specify the operation
+ String operationName =
+ normalizeOperationName(headers.headers().get(SpdyHeaders.HttpNames.PATH));
+ if (operationName == null) {
+ return null;
+ }
+ // Create the operation and bind a SPDY response operation
+ Request op = session.startRequest(operationName,
+ SpdyResponse.builder(frame.getStreamId(), ctx.channel(), new MessageFramer(4096)));
+ if (op == null) {
+ return null;
+ }
+ requestRegistry.register(op);
+ // Immediately deframe the remaining headers in the frame
+ progressHeaders(op, (SpdyHeadersFrame) frame);
+ return op;
+ }
+
+ // TODO(user): This needs proper namespacing support, this is currently just a hack
+ private static String normalizeOperationName(String path) {
+ return path.substring(1);
+ }
+
+
+ /**
+ * Consume a received frame
+ */
+ private void progress(Operation operation, SpdyStreamFrame frame) {
+ if (frame instanceof SpdyHeadersFrame) {
+ progressHeaders(operation, (SpdyHeadersFrame) frame);
+ } else if (frame instanceof SpdyDataFrame) {
+ progressPayload(operation, (SpdyDataFrame) frame);
+ } else if (frame instanceof SpdyRstStreamFrame) {
+ // Cancel
+ operation.close(null);
+ finish(operation);
+ } else {
+ // TODO(user): More refined handling for PING, GO_AWAY, SYN_STREAM, WINDOW_UPDATE, SETTINGS
+ operation.close(null);
+ finish(operation);
+ }
+ }
+
+ /**
+ * Consume headers in the frame. Any header starting with ';' is considered reserved
+ */
+ private void progressHeaders(Operation operation, SpdyHeadersFrame frame) {
+ // TODO(user): Currently we do not do anything with SPDY headers
+ if (frame.isLast()) {
+ finish(operation);
+ }
+ }
+
+ private void progressPayload(Operation operation, SpdyDataFrame frame) {
+ if (operation == null) {
+ return;
+ }
+ ByteBufDeframer deframer = operation.get(ByteBufDeframer.class);
+ if (deframer == null) {
+ deframer = new ByteBufDeframer();
+ operation.put(ByteBufDeframer.class, deframer);
+ }
+ deframer.deframe(frame.content(), operation);
+ if (frame.isLast()) {
+ finish(operation);
+ }
+ }
+
+ /**
+ * Called when a SPDY stream is closed.
+ */
+ private void finish(Operation operation) {
+ requestRegistry.remove(operation.getId());
+ if (operation.getPhase() != Phase.CLOSED) {
+ operation.close(Status.OK);
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyOperation.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyOperation.java
new file mode 100644
index 0000000..bcfbd8e
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyOperation.java
@@ -0,0 +1,68 @@
+package com.google.net.stubby.spdy.netty;
+
+import com.google.net.stubby.AbstractOperation;
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Framer;
+import com.google.net.stubby.transport.Transport;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.handler.codec.spdy.DefaultSpdyDataFrame;
+import io.netty.handler.codec.spdy.SpdyHeadersFrame;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Base implementation of {@link Operation} that writes SPDY frames
+ */
+abstract class SpdyOperation extends AbstractOperation implements Framer.Sink {
+
+ protected final Framer framer;
+ private final Channel channel;
+
+ SpdyOperation(SpdyHeadersFrame headersFrame, Channel channel, Framer framer) {
+ super(headersFrame.getStreamId());
+ this.channel = channel;
+ this.framer = framer;
+ channel.write(headersFrame);
+ }
+
+ @Override
+ public Operation addContext(String type, InputStream message, Phase nextPhase) {
+ super.addContext(type, message, nextPhase);
+ framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
+ return this;
+ }
+
+ @Override
+ public Operation addPayload(InputStream payload, Phase nextPhase) {
+ super.addPayload(payload, nextPhase);
+ framer.writePayload(payload, getPhase() == Phase.CLOSED, this);
+ return this;
+ }
+
+ @Override
+ public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
+ boolean closed = getPhase() == Phase.CLOSED;
+ DefaultSpdyDataFrame dataFrame = new DefaultSpdyDataFrame(getId(),
+ Unpooled.wrappedBuffer(frame));
+ boolean streamClosed = closed && endOfMessage;
+ dataFrame.setLast(streamClosed);
+ try {
+ ChannelFuture channelFuture = channel.writeAndFlush(dataFrame);
+ if (!streamClosed) {
+ // Sync for all except the last frame to prevent buffer corruption.
+ channelFuture.get();
+ }
+ } catch (Exception e) {
+ close(new Status(Transport.Code.INTERNAL, e));
+ } finally {
+ if (streamClosed) {
+ framer.close();
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyRequest.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyRequest.java
new file mode 100644
index 0000000..4d67f21
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyRequest.java
@@ -0,0 +1,55 @@
+package com.google.net.stubby.spdy.netty;
+
+import com.google.net.stubby.Request;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.transport.Framer;
+
+import io.netty.channel.Channel;
+import io.netty.handler.codec.spdy.DefaultSpdySynStreamFrame;
+import io.netty.handler.codec.spdy.SpdyHeaders;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * A SPDY based implementation of {@link Request}
+ */
+class SpdyRequest extends SpdyOperation implements Request {
+
+ // TODO(user): Inject this
+ private static final String HOST_NAME;
+ static {
+ String hostName;
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException uhe) {
+ hostName = "localhost";
+ }
+ HOST_NAME = hostName;
+ }
+
+ private static DefaultSpdySynStreamFrame createHeadersFrame(int id, String operationName) {
+ DefaultSpdySynStreamFrame headersFrame = new DefaultSpdySynStreamFrame(id, 0, (byte) 0);
+ headersFrame.headers().add(SpdyHeaders.HttpNames.METHOD, "POST");
+ // TODO(user) Convert operation names to URIs
+ headersFrame.headers().add(SpdyHeaders.HttpNames.PATH, "/" + operationName);
+ headersFrame.headers().add(SpdyHeaders.HttpNames.VERSION, "HTTP/1.1");
+ headersFrame.headers().add(SpdyHeaders.HttpNames.HOST, HOST_NAME);
+ headersFrame.headers().add(SpdyHeaders.HttpNames.SCHEME, "https");
+ headersFrame.headers().add("content-type", SpdySession.PROTORPC);
+ return headersFrame;
+ }
+
+ private final Response response;
+
+ public SpdyRequest(Response response, Channel channel, String operationName,
+ Framer framer) {
+ super(createHeadersFrame(response.getId(), operationName), channel, framer);
+ this.response = response;
+ }
+
+ @Override
+ public Response getResponse() {
+ return response;
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyResponse.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyResponse.java
new file mode 100644
index 0000000..3c9bc3c
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyResponse.java
@@ -0,0 +1,53 @@
+package com.google.net.stubby.spdy.netty;
+
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Framer;
+
+import io.netty.channel.Channel;
+import io.netty.handler.codec.spdy.DefaultSpdySynReplyFrame;
+import io.netty.handler.codec.spdy.SpdyHeaders;
+
+/**
+ * A SPDY based implementation of a {@link Response}.
+ */
+class SpdyResponse extends SpdyOperation implements Response {
+
+ public static ResponseBuilder builder(final int id, final Channel channel,
+ final Framer framer) {
+ return new ResponseBuilder() {
+ @Override
+ public Response build(int id) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Response build() {
+ return new SpdyResponse(id, channel, framer);
+ }
+ };
+ }
+
+ @Override
+ public Operation close(Status status) {
+ boolean alreadyClosed = getPhase() == Phase.CLOSED;
+ super.close(status);
+ if (!alreadyClosed) {
+ framer.writeStatus(status, true, this);
+ }
+ return this;
+ }
+
+ public static DefaultSpdySynReplyFrame createSynReply(int id) {
+ DefaultSpdySynReplyFrame synReplyFrame = new DefaultSpdySynReplyFrame(id);
+ // TODO(user): Need to review status code handling
+ synReplyFrame.headers().add(SpdyHeaders.HttpNames.STATUS, "200");
+ return synReplyFrame;
+ }
+
+ private SpdyResponse(int id, Channel channel, Framer framer) {
+ super(createSynReply(id), channel, framer);
+ }
+
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyServer.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyServer.java
new file mode 100644
index 0000000..9b566f0
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyServer.java
@@ -0,0 +1,75 @@
+package com.google.net.stubby.spdy.netty;
+
+import com.google.net.stubby.RequestRegistry;
+import com.google.net.stubby.Session;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.spdy.SpdyFrameCodec;
+import io.netty.handler.codec.spdy.SpdyVersion;
+
+/**
+ * Simple server connection startup that attaches a {@link Session} implementation to
+ * a connection.
+ */
+public class SpdyServer implements Runnable {
+ private final int port;
+ private final Session session;
+ private final RequestRegistry operations;
+ private Channel channel;
+
+ public SpdyServer(int port, Session session, RequestRegistry operations) {
+ this.port = port;
+ this.session = session;
+ this.operations = operations;
+ }
+
+ @Override
+ public void run() {
+ EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap(); // (2)
+ // TODO(user): Evaluate use of pooled allocator
+ b.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class) // (3)
+ .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(
+ new SpdyFrameCodec(SpdyVersion.SPDY_3_1),
+ new SpdyCodec(session, operations));
+ }
+ })
+ .option(ChannelOption.SO_BACKLOG, 128) // (5)
+ .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
+
+ // Bind and startContext to accept incoming connections.
+ ChannelFuture f = b.bind(port).sync(); // (7)
+
+ // Wait until the server socket is closed.
+ channel = f.channel();
+ channel.closeFuture().sync();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
+ }
+ }
+
+ public void stop() throws Exception {
+ if (channel != null) {
+ channel.close().get();
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdySession.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdySession.java
new file mode 100644
index 0000000..25a7781
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdySession.java
@@ -0,0 +1,46 @@
+package com.google.net.stubby.spdy.netty;
+
+import com.google.net.stubby.Request;
+import com.google.net.stubby.RequestRegistry;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.Session;
+import com.google.net.stubby.transport.MessageFramer;
+
+import io.netty.channel.Channel;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An implementation of {@link Session} that can be used by clients to start
+ * a {@link Request}
+ */
+public class SpdySession implements Session {
+
+ public static final String PROTORPC = "application/protorpc";
+
+ private final Channel channel;
+ private final boolean clientSession;
+ private final RequestRegistry requestRegistry;
+ private AtomicInteger sessionId;
+
+ public SpdySession(Channel channel, RequestRegistry requestRegistry) {
+ this.channel = channel;
+ this.clientSession = true;
+ this.requestRegistry = requestRegistry;
+ // Clients are odd numbers starting at 1, servers are even numbers stating at 2
+ sessionId = new AtomicInteger(1);
+ }
+
+ private int getNextStreamId() {
+ return (sessionId.getAndIncrement() * 2) + (clientSession ? -1 : 0);
+ }
+
+ @Override
+ public Request startRequest(String operationName, Response.ResponseBuilder response) {
+ int nextSessionId = getNextStreamId();
+ Request operation = new SpdyRequest(response.build(nextSessionId), channel, operationName,
+ new MessageFramer(4096));
+ requestRegistry.register(operation);
+ return operation;
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/okhttp/Headers.java b/core/src/main/java/com/google/net/stubby/spdy/okhttp/Headers.java
new file mode 100644
index 0000000..f5e176c
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/okhttp/Headers.java
@@ -0,0 +1,32 @@
+package com.google.net.stubby.spdy.okhttp;
+
+import com.google.common.collect.Lists;
+
+import com.squareup.okhttp.internal.spdy.Header;
+
+import java.util.List;
+
+/**
+ * Constants for request/response headers.
+ */
+public class Headers {
+ public static final Header SCHEME_HEADER = new Header(Header.TARGET_SCHEME, "https");
+ public static final Header CONTENT_TYPE_HEADER =
+ new Header("content-type", "application/protorpc");
+ public static final Header RESPONSE_STATUS_OK = new Header(Header.RESPONSE_STATUS, "200");
+
+ public static List<Header> createRequestHeaders(String operationName) {
+ List<Header> headers = Lists.newArrayListWithCapacity(6);
+ headers.add(new Header(Header.TARGET_PATH, operationName));
+ headers.add(SCHEME_HEADER);
+ headers.add(CONTENT_TYPE_HEADER);
+ return headers;
+ }
+
+ public static List<Header> createResponseHeaders() {
+ // TODO(user): Need to review status code handling
+ List<Header> headers = Lists.newArrayListWithCapacity(6);
+ headers.add(RESPONSE_STATUS_OK);
+ return headers;
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/java/com/google/net/stubby/spdy/okhttp/OkHttpSession.java b/core/src/main/java/com/google/net/stubby/spdy/okhttp/OkHttpSession.java
new file mode 100644
index 0000000..8ae40bb
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/okhttp/OkHttpSession.java
@@ -0,0 +1,342 @@
+package com.google.net.stubby.spdy.okhttp;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingInputStream;
+import com.google.common.io.CountingOutputStream;
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Operation.Phase;
+import com.google.net.stubby.Request;
+import com.google.net.stubby.RequestRegistry;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.Session;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.InputStreamDeframer;
+import com.google.net.stubby.transport.MessageFramer;
+import com.google.net.stubby.transport.Transport;
+import com.google.net.stubby.transport.Transport.Code;
+
+import com.squareup.okhttp.Protocol;
+import com.squareup.okhttp.internal.spdy.ErrorCode;
+import com.squareup.okhttp.internal.spdy.FrameReader;
+import com.squareup.okhttp.internal.spdy.FrameWriter;
+import com.squareup.okhttp.internal.spdy.Header;
+import com.squareup.okhttp.internal.spdy.HeadersMode;
+import com.squareup.okhttp.internal.spdy.Http20Draft10;
+import com.squareup.okhttp.internal.spdy.Settings;
+import com.squareup.okhttp.internal.spdy.Spdy3;
+import com.squareup.okhttp.internal.spdy.Variant;
+
+import okio.BufferedSink;
+import okio.BufferedSource;
+import okio.ByteString;
+import okio.Okio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Basic implementation of {@link Session} using OkHttp
+ */
+public class OkHttpSession implements Session {
+
+ private static final ImmutableMap<ErrorCode, Status> ERROR_CODE_TO_STATUS = ImmutableMap
+ .<ErrorCode, Status>builder()
+ .put(ErrorCode.NO_ERROR, Status.OK)
+ .put(ErrorCode.PROTOCOL_ERROR, new Status(Transport.Code.INTERNAL, "Protocol error"))
+ .put(ErrorCode.INVALID_STREAM, new Status(Transport.Code.INTERNAL, "Invalid stream"))
+ .put(ErrorCode.UNSUPPORTED_VERSION,
+ new Status(Transport.Code.INTERNAL, "Unsupported version"))
+ .put(ErrorCode.STREAM_IN_USE, new Status(Transport.Code.INTERNAL, "Stream in use"))
+ .put(ErrorCode.STREAM_ALREADY_CLOSED,
+ new Status(Transport.Code.INTERNAL, "Stream already closed"))
+ .put(ErrorCode.INTERNAL_ERROR, new Status(Transport.Code.INTERNAL, "Internal error"))
+ .put(ErrorCode.FLOW_CONTROL_ERROR, new Status(Transport.Code.INTERNAL, "Flow control error"))
+ .put(ErrorCode.STREAM_CLOSED, new Status(Transport.Code.INTERNAL, "Stream closed"))
+ .put(ErrorCode.FRAME_TOO_LARGE, new Status(Transport.Code.INTERNAL, "Frame too large"))
+ .put(ErrorCode.REFUSED_STREAM, new Status(Transport.Code.INTERNAL, "Refused stream"))
+ .put(ErrorCode.CANCEL, new Status(Transport.Code.CANCELLED, "Cancelled"))
+ .put(ErrorCode.COMPRESSION_ERROR, new Status(Transport.Code.INTERNAL, "Compression error"))
+ .put(ErrorCode.INVALID_CREDENTIALS,
+ new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials"))
+ .build();
+
+ public static Session startClient(Protocol protocol, Socket socket,
+ RequestRegistry requestRegistry, Executor executor) {
+ try {
+ return new OkHttpSession(protocol, socket, requestRegistry, executor);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ public static Session startServer(Protocol protocol, Socket socket, Session server,
+ RequestRegistry requestRegistry, Executor executor) {
+ try {
+ return new OkHttpSession(protocol, socket, server, requestRegistry, executor);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ private final FrameReader frameReader;
+ private final FrameWriter frameWriter;
+ private final AtomicInteger sessionId;
+ private final Session serverSession;
+ private final RequestRegistry requestRegistry;
+ private final CountingInputStream countingInputStream;
+ private final CountingOutputStream countingOutputStream;
+
+ /**
+ * Construct a client-side session
+ */
+ private OkHttpSession(Protocol protocol, Socket socket, RequestRegistry requestRegistry,
+ Executor executor) throws IOException {
+ Variant variant = getProtocolVariant(protocol);
+ // TODO(user): use Okio.buffer(Socket)
+ countingInputStream = new CountingInputStream(socket.getInputStream());
+ countingOutputStream = new CountingOutputStream(socket.getOutputStream());
+
+ BufferedSource source = Okio.buffer(Okio.source(countingInputStream));
+ BufferedSink sink = Okio.buffer(Okio.sink(countingOutputStream));
+ frameReader = variant.newReader(source, true);
+ frameWriter = variant.newWriter(sink, true);
+
+ sessionId = new AtomicInteger(1);
+ this.serverSession = null;
+ this.requestRegistry = requestRegistry;
+ executor.execute(new FrameHandler());
+ }
+
+ /**
+ * Construct a server-side session
+ */
+ private OkHttpSession(Protocol protocol, Socket socket, Session server,
+ RequestRegistry requestRegistry, Executor executor) throws IOException {
+ Variant variant = getProtocolVariant(protocol);
+ // TODO(user): use Okio.buffer(Socket)
+ countingInputStream = new CountingInputStream(socket.getInputStream());
+ countingOutputStream = new CountingOutputStream(socket.getOutputStream());
+
+ BufferedSource source = Okio.buffer(Okio.source(countingInputStream));
+ BufferedSink sink = Okio.buffer(Okio.sink(countingOutputStream));
+ frameReader = variant.newReader(source, true);
+ frameWriter = variant.newWriter(sink, true);
+
+ sessionId = new AtomicInteger(1);
+ this.serverSession = server;
+ this.requestRegistry = requestRegistry;
+ executor.execute(new FrameHandler());
+ }
+
+ @Override
+ public String toString() {
+ return "in=" + countingInputStream.getCount() + ";out=" + countingOutputStream.getCount();
+ }
+
+ private Variant getProtocolVariant(Protocol protocol) {
+ switch (protocol) {
+ case HTTP_2:
+ return new Http20Draft10();
+ case SPDY_3:
+ return new Spdy3();
+ default:
+ throw new IllegalArgumentException("Unsupported protocol: " + protocol);
+ }
+ }
+
+ private int getNextStreamId() {
+ // Client initiated streams are odd, server initiated ones are even
+ // We start clients at 3 to avoid conflicting with HTTP negotiation
+ return (sessionId.getAndIncrement() * 2) + (isClient() ? 1 : 0);
+ }
+
+ private boolean isClient() {
+ return serverSession == null;
+ }
+
+ @Override
+ public Request startRequest(String operationName, Response.ResponseBuilder responseBuilder) {
+ int nextStreamId = getNextStreamId();
+ Response response = responseBuilder.build(nextStreamId);
+ SpdyRequest spdyRequest = new SpdyRequest(frameWriter, operationName, response, requestRegistry,
+ new MessageFramer(4096));
+ return spdyRequest;
+ }
+
+ /**
+ * Close and remove any requests that still reside in the registry.
+ */
+ private void closeAllRequests(Status status) {
+ for (Integer id : requestRegistry.getAllRequests()) {
+ Request request = requestRegistry.remove(id);
+ if (request != null && request.getPhase() != Phase.CLOSED) {
+ request.close(status);
+ }
+ }
+ }
+
+ /**
+ * Runnable which reads frames and dispatches them to in flight calls
+ */
+ private class FrameHandler implements FrameReader.Handler, Runnable {
+
+ private FrameHandler() {}
+
+ @Override
+ public void run() {
+ String threadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(isClient() ? "OkHttpClientSession" : "OkHttpServerSession");
+ try {
+ // Read until the underlying socket closes.
+ while (frameReader.nextFrame(this)) {
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ closeAllRequests(new Status(Code.INTERNAL, ioe.getMessage()));
+ } finally {
+ // Restore the original thread name.
+ Thread.currentThread().setName(threadName);
+ }
+ }
+
+ /**
+ * Lookup the operation bound to the specified stream id.
+ */
+ private Operation getOperation(int streamId) {
+ Request request = requestRegistry.lookup(streamId);
+ if (request == null) {
+ return null;
+ }
+ if (isClient()) {
+ return request.getResponse();
+ }
+ return request;
+ }
+
+
+ /**
+ * Handle a SPDY DATA frame
+ */
+ @Override
+ public void data(boolean inFinished, int streamId, BufferedSource in, int length)
+ throws IOException {
+ final Operation op = getOperation(streamId);
+ if (op == null) {
+ frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM);
+ return;
+ }
+ InputStreamDeframer deframer = op.get(InputStreamDeframer.class);
+ if (deframer == null) {
+ deframer = new InputStreamDeframer();
+ op.put(InputStreamDeframer.class, deframer);
+ }
+
+ // Wait until the frame is complete.
+ in.require(length);
+
+ deframer.deframe(ByteStreams.limit(in.inputStream(), length), op);
+ if (inFinished) {
+ finish(streamId);
+ op.close(Status.OK);
+ }
+ }
+
+ /**
+ * Called when a SPDY stream is closed.
+ */
+ private void finish(int streamId) {
+ Request request = requestRegistry.remove(streamId);
+ if (request != null && request.getPhase() != Phase.CLOSED) {
+ request.close(Status.OK);
+ }
+ }
+
+ /**
+ * Handle a SPDY HEADER or SYN_STREAM frame
+ */
+ @Override
+ public void headers(boolean arg0,
+ boolean inFinished,
+ int streamId,
+ int associatedStreamId,
+ int priority,
+ List<Header> headers,
+ HeadersMode headersMode) {
+ Operation op = getOperation(streamId);
+
+ // Start an Operation for SYN_STREAM
+ if (op == null && (headersMode == HeadersMode.SPDY_SYN_STREAM
+ || headersMode == HeadersMode.HTTP_20_HEADERS)) {
+ for (Header header : headers) {
+ if (header.name.equals(Header.TARGET_PATH)) {
+ Request request = serverSession.startRequest(header.value.utf8(),
+ SpdyResponse.builder(streamId, frameWriter, new MessageFramer(4096)));
+ requestRegistry.register(request);
+ op = request;
+ break;
+ }
+ }
+ }
+ if (op == null) {
+ return;
+ }
+ // TODO(user): Do we do anything with non-reserved header here? We could just
+ // pass them as context to the operation?
+ if (inFinished) {
+ finish(streamId);
+ }
+ }
+
+ @Override
+ public void rstStream(int streamId, ErrorCode errorCode) {
+ try {
+ Operation op = getOperation(streamId);
+ if (op == null) {
+ return;
+ }
+ op.close(ERROR_CODE_TO_STATUS.get(errorCode));
+ } finally {
+ finish(streamId);
+ }
+ }
+
+ @Override
+ public void settings(boolean clearPrevious, Settings settings) {
+ // not impl
+ }
+
+ @Override
+ public void ping(boolean reply, int payload1, int payload2) {
+ // noop
+ }
+
+ @Override
+ public void ackSettings() {
+ // fixme
+ }
+
+ @Override
+ public void goAway(int arg0, ErrorCode arg1, ByteString arg2) {
+ // fixme
+ }
+
+ @Override
+ public void pushPromise(int arg0, int arg1, List<Header> arg2) throws IOException {
+ // fixme
+ }
+
+ @Override
+ public void windowUpdate(int arg0, long arg1) {
+ // noop
+ }
+
+ @Override
+ public void priority(int streamId, int priority) {
+ // noop
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyOperation.java b/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyOperation.java
new file mode 100644
index 0000000..f1cbe65
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyOperation.java
@@ -0,0 +1,65 @@
+package com.google.net.stubby.spdy.okhttp;
+
+import com.google.common.io.ByteBuffers;
+import com.google.net.stubby.AbstractOperation;
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Framer;
+import com.google.net.stubby.transport.Transport;
+
+import com.squareup.okhttp.internal.spdy.FrameWriter;
+
+import okio.Buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Base implementation of {@link Operation} that writes SPDY frames
+ */
+abstract class SpdyOperation extends AbstractOperation implements Framer.Sink {
+
+ protected final Framer framer;
+ private final FrameWriter frameWriter;
+
+ SpdyOperation(int id, FrameWriter frameWriter, Framer framer) {
+ super(id);
+ this.frameWriter = frameWriter;
+ this.framer = framer;
+ }
+
+ @Override
+ public Operation addContext(String type, InputStream message, Phase nextPhase) {
+ super.addContext(type, message, nextPhase);
+ framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
+ return this;
+ }
+
+ @Override
+ public Operation addPayload(InputStream payload, Phase nextPhase) {
+ super.addPayload(payload, nextPhase);
+ framer.writePayload(payload, getPhase() == Phase.CLOSED, this);
+ return this;
+ }
+
+ @Override
+ public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
+ boolean closed = getPhase() == Phase.CLOSED;
+ try {
+ // Read the data into a buffer.
+ // TODO(user): swap to NIO buffers or zero-copy if/when okhttp/okio supports it
+ Buffer buffer = new Buffer().readFrom(ByteBuffers.newConsumingInputStream(frame));
+
+ // Write the data to the remote endpoint.
+ frameWriter.data(closed && endOfMessage, getId(), buffer);
+ frameWriter.flush();
+ } catch (IOException ioe) {
+ close(new Status(Transport.Code.INTERNAL, ioe));
+ } finally {
+ if (closed && endOfMessage) {
+ framer.close();
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyRequest.java b/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyRequest.java
new file mode 100644
index 0000000..198d614
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyRequest.java
@@ -0,0 +1,45 @@
+package com.google.net.stubby.spdy.okhttp;
+
+import com.google.net.stubby.Request;
+import com.google.net.stubby.RequestRegistry;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Framer;
+import com.google.net.stubby.transport.Transport;
+
+import com.squareup.okhttp.internal.spdy.FrameWriter;
+
+import java.io.IOException;
+
+/**
+ * A SPDY based implementation of {@link Request}
+ */
+public class SpdyRequest extends SpdyOperation implements Request {
+ private final Response response;
+
+ public SpdyRequest(FrameWriter frameWriter, String operationName,
+ Response response, RequestRegistry requestRegistry,
+ Framer framer) {
+ super(response.getId(), frameWriter, framer);
+ this.response = response;
+ try {
+ // Register this request.
+ requestRegistry.register(this);
+
+ frameWriter.synStream(false,
+ false,
+ getId(),
+ 0,
+ 0,
+ 0,
+ Headers.createRequestHeaders(operationName));
+ } catch (IOException ioe) {
+ close(new Status(Transport.Code.UNKNOWN, ioe));
+ }
+ }
+
+ @Override
+ public Response getResponse() {
+ return response;
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyResponse.java b/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyResponse.java
new file mode 100644
index 0000000..3d85ca1
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyResponse.java
@@ -0,0 +1,51 @@
+package com.google.net.stubby.spdy.okhttp;
+
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Response;
+import com.google.net.stubby.Status;
+import com.google.net.stubby.transport.Framer;
+import com.google.net.stubby.transport.Transport;
+
+import com.squareup.okhttp.internal.spdy.FrameWriter;
+
+import java.io.IOException;
+
+/**
+ * A SPDY based implementation of a {@link Response}.
+ */
+public class SpdyResponse extends SpdyOperation implements Response {
+
+ public static ResponseBuilder builder(final int id, final FrameWriter framewriter,
+ final Framer framer) {
+ return new ResponseBuilder() {
+ @Override
+ public Response build(int id) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Response build() {
+ return new SpdyResponse(id, framewriter, framer);
+ }
+ };
+ }
+
+ @Override
+ public Operation close(Status status) {
+ boolean alreadyClosed = getPhase() == Phase.CLOSED;
+ super.close(status);
+ if (!alreadyClosed) {
+ framer.writeStatus(status, true, this);
+ }
+ return this;
+ }
+
+ private SpdyResponse(int id, FrameWriter frameWriter, Framer framer) {
+ super(id, frameWriter, framer);
+ try {
+ frameWriter.synStream(false, false, getId(), 0, 0, 0, Headers.createResponseHeaders());
+ } catch (IOException ioe) {
+ close(new Status(Transport.Code.INTERNAL, ioe));
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java b/core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java
new file mode 100644
index 0000000..3cb292e
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java
@@ -0,0 +1,332 @@
+package com.google.net.stubby.transport;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.net.stubby.DeferredInputStream;
+import com.google.net.stubby.transport.Framer.Sink;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.zip.Deflater;
+
+/**
+ * Compression framer for SPDY and HTTP/2 transport frames, for use in both compression and
+ * non-compression scenarios. Receives message-stream as input. It is able to change compression
+ * configuration on-the-fly, but will not actually begin using the new configuration until the next
+ * full frame.
+ */
+class CompressionFramer {
+ /**
+ * Compression level to indicate using this class's default level. Note that this value is
+ * allowed to conflict with Deflate.DEFAULT_COMPRESSION, in which case this class's default
+ * prevails.
+ */
+ public static final int DEFAULT_COMPRESSION_LEVEL = -1;
+ private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+ /**
+ * Size of the GRPC compression frame header which consists of:
+ * 1 byte for the compression type,
+ * 3 bytes for the length of the compression frame.
+ */
+ @VisibleForTesting
+ static final int HEADER_LENGTH = 4;
+ /**
+ * Number of frame bytes to reserve to allow for zlib overhead. This does not include data-length
+ * dependent overheads and compression latency (delay between providing data to zlib and output of
+ * the compressed data).
+ *
+ * <p>References:
+ * deflate framing: http://www.gzip.org/zlib/rfc-deflate.html
+ * (note that bit-packing is little-endian (section 3.1.1) whereas description of sequences
+ * is big-endian, so bits appear reversed),
+ * zlib framing: http://tools.ietf.org/html/rfc1950,
+ * details on flush behavior: http://www.zlib.net/manual.html
+ */
+ @VisibleForTesting
+ static final int MARGIN
+ = 5 /* deflate current block overhead, assuming no compression:
+ block type (1) + len (2) + nlen (2) */
+ + 5 /* deflate flush; adds an empty block after current:
+ 00 (not end; no compression) 00 00 (len) FF FF (nlen) */
+ + 5 /* deflate flush; some versions of zlib output two empty blocks on some flushes */
+ + 5 /* deflate finish; adds empty block to mark end, since we commonly flush before finish:
+ 03 (end; fixed Huffman + 5 bits of end of block) 00 (last 3 bits + padding),
+ or if compression level is 0: 01 (end; no compression) 00 00 (len) FF FF (nlen) */
+ + 2 /* zlib header; CMF (1) + FLG (1) */ + 4 /* zlib ADLER32 (4) */
+ + 5 /* additional safety for good measure */;
+
+ private static final Logger log = Logger.getLogger(CompressionFramer.class.getName());
+
+ /**
+ * Bytes of frame being constructed. {@code position() == 0} when no frame in progress.
+ */
+ private final ByteBuffer bytebuf;
+ /** Number of frame bytes it is acceptable to leave unused when compressing. */
+ private final int sufficient;
+ private Deflater deflater;
+ /** Number of bytes written to deflater since last deflate sync. */
+ private int writtenSinceSync;
+ /** Number of bytes read from deflater since last deflate sync. */
+ private int readSinceSync;
+ /**
+ * Whether the current frame is actually being compressed. If {@code bytebuf.position() == 0},
+ * then this value has no meaning.
+ */
+ private boolean usingCompression;
+ /**
+ * Whether compression is requested. This does not imply we are compressing the current frame
+ * (see {@link #usingCompression}), or that we will even compress the next frame (see {@link
+ * #compressionUnsupported}).
+ */
+ private boolean allowCompression;
+ /** Whether compression is possible with current configuration and platform. */
+ private final boolean compressionUnsupported;
+ /**
+ * Compression level to set on the Deflater, where {@code DEFAULT_COMPRESSION_LEVEL} implies this
+ * class's default.
+ */
+ private int compressionLevel = DEFAULT_COMPRESSION_LEVEL;
+ private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
+
+ /**
+ * Since compression tries to form full frames, if compression is working well then it will
+ * consecutively compress smaller amounts of input data in order to not exceed the frame size. For
+ * example, if the data is getting 50% compression and a maximum frame size of 128, then it will
+ * encode roughly 128 bytes which leaves 64, so we encode 64, 32, 16, 8, 4, 2, 1, 1.
+ * {@code sufficient} cuts off the long tail and says that at some point the frame is "good
+ * enough" to stop. Choosing a value of {@code 0} is not outrageous.
+ *
+ * @param maxFrameSize maximum number of bytes allowed for output frames
+ * @param allowCompression whether frames should be compressed
+ * @param sufficient number of frame bytes it is acceptable to leave unused when compressing
+ */
+ public CompressionFramer(int maxFrameSize, boolean allowCompression, int sufficient) {
+ this.allowCompression = allowCompression;
+ int maxSufficient = maxFrameSize - HEADER_LENGTH - MARGIN
+ - 1 /* to force at least one byte of data */;
+ boolean compressionUnsupported = false;
+ if (maxSufficient < 0) {
+ compressionUnsupported = true;
+ log.log(Level.INFO, "Frame not large enough for compression");
+ } else if (maxSufficient < sufficient) {
+ log.log(Level.INFO, "Compression sufficient reduced to {0} from {1} to fit in frame size {2}",
+ new Object[] {maxSufficient, sufficient, maxFrameSize});
+ sufficient = maxSufficient;
+ }
+ this.sufficient = sufficient;
+ // TODO(user): Benchmark before switching to direct buffers
+ bytebuf = ByteBuffer.allocate(maxFrameSize);
+ if (!bytebuf.hasArray()) {
+ compressionUnsupported = true;
+ log.log(Level.INFO, "Byte buffer doesn't support array(), which is required for compression");
+ }
+ this.compressionUnsupported = compressionUnsupported;
+ }
+
+ /**
+ * Sets whether compression is encouraged.
+ */
+ public void setAllowCompression(boolean allow) {
+ this.allowCompression = allow;
+ }
+
+ /**
+ * Set the preferred compression level for when compression is enabled.
+ *
+ * @param level the preferred compression level (0-9), or {@code DEFAULT_COMPRESSION_LEVEL} to use
+ * this class's default
+ * @see java.util.zip.Deflater#setLevel
+ */
+ public void setCompressionLevel(int level) {
+ Preconditions.checkArgument(level == DEFAULT_COMPRESSION_LEVEL
+ || (level >= Deflater.NO_COMPRESSION && level <= Deflater.BEST_COMPRESSION),
+ "invalid compression level");
+ this.compressionLevel = level;
+ }
+
+ /**
+ * Ensures state and buffers are initialized for writing data to a frame. Callers should be very
+ * aware this method may modify {@code usingCompression}.
+ */
+ private void checkInitFrame() {
+ if (bytebuf.position() != 0) {
+ return;
+ }
+ bytebuf.position(HEADER_LENGTH);
+ usingCompression = compressionUnsupported ? false : allowCompression;
+ if (usingCompression) {
+ if (deflater == null) {
+ deflater = new Deflater();
+ } else {
+ deflater.reset();
+ }
+ deflater.setLevel(compressionLevel == DEFAULT_COMPRESSION_LEVEL
+ ? Deflater.DEFAULT_COMPRESSION : compressionLevel);
+ writtenSinceSync = 0;
+ readSinceSync = 0;
+ }
+ }
+
+ /** Frame contents of {@code message}, flushing to {@code sink} as necessary. */
+ public int write(InputStream message, Sink sink) throws IOException {
+ checkInitFrame();
+ if (!usingCompression && bytebuf.hasArray()) {
+ if (bytebuf.remaining() == 0) {
+ commitToSink(sink, false);
+ }
+ int available = message.available();
+ if (available <= bytebuf.remaining()) {
+ // When InputStream is DeferredProtoInputStream, this is zero-copy because bytebuf is large
+ // enough for the proto to be serialized directly into it.
+ int read = ByteStreams.read(message,
+ bytebuf.array(), bytebuf.arrayOffset() + bytebuf.position(), bytebuf.remaining());
+ bytebuf.position(bytebuf.position() + read);
+ if (read != available) {
+ throw new RuntimeException("message.available() did not follow our semantics of always "
+ + "returning the number of remaining bytes");
+ }
+ return read;
+ }
+ }
+ outputStreamAdapter.setSink(sink);
+ try {
+ if (message instanceof DeferredInputStream) {
+ return ((DeferredInputStream) message).flushTo(outputStreamAdapter);
+ } else {
+ // This could be optimized when compression is off, but we expect performance-critical code
+ // to provide a DeferredInputStream.
+ return (int) ByteStreams.copy(message, outputStreamAdapter);
+ }
+ } finally {
+ outputStreamAdapter.setSink(null);
+ }
+ }
+
+ /**
+ * Frame contents of {@code b} between {@code off} (inclusive) and {@code off + len} (exclusive),
+ * flushing to {@code sink} as necessary.
+ */
+ public void write(byte[] b, int off, int len, Sink sink) {
+ while (len > 0) {
+ checkInitFrame();
+ if (!usingCompression) {
+ if (bytebuf.remaining() == 0) {
+ commitToSink(sink, false);
+ continue;
+ }
+ int toWrite = Math.min(len, bytebuf.remaining());
+ bytebuf.put(b, off, toWrite);
+ off += toWrite;
+ len -= toWrite;
+ } else {
+ if (bytebuf.remaining() <= MARGIN + sufficient) {
+ commitToSink(sink, false);
+ continue;
+ }
+ // Amount of memory that is guaranteed not to be consumed, including in-flight data in zlib.
+ int safeCapacity = bytebuf.remaining() - MARGIN
+ - (writtenSinceSync - readSinceSync) - dataLengthDependentOverhead(writtenSinceSync);
+ if (safeCapacity <= 0) {
+ while (deflatePut(deflater, bytebuf, Deflater.SYNC_FLUSH) != 0) {}
+ writtenSinceSync = 0;
+ readSinceSync = 0;
+ continue;
+ }
+ int toWrite = Math.min(len, safeCapacity - dataLengthDependentOverhead(safeCapacity));
+ deflater.setInput(b, off, toWrite);
+ writtenSinceSync += toWrite;
+ while (!deflater.needsInput()) {
+ readSinceSync += deflatePut(deflater, bytebuf, Deflater.NO_FLUSH);
+ }
+ // Clear internal references of byte[] b.
+ deflater.setInput(EMPTY_BYTE_ARRAY);
+ off += toWrite;
+ len -= toWrite;
+ }
+ }
+ }
+
+ /**
+ * When data is uncompressable, there are 5B of overhead per deflate block, which is generally
+ * 16 KiB for zlib, but the format supports up to 32 KiB. One block's overhead is already
+ * accounted for in MARGIN. We use 1B/2KiB to circumvent dealing with rounding errors. Note that
+ * 1B/2KiB is not enough to support 8 KiB blocks due to rounding errors.
+ */
+ private static int dataLengthDependentOverhead(int length) {
+ return length / 2048;
+ }
+
+ private static int deflatePut(Deflater deflater, ByteBuffer bytebuf, int flush) {
+ if (bytebuf.remaining() == 0) {
+ throw new AssertionError("Compressed data exceeded frame size");
+ }
+ int deflateBytes = deflater.deflate(bytebuf.array(), bytebuf.arrayOffset() + bytebuf.position(),
+ bytebuf.remaining(), flush);
+ bytebuf.position(bytebuf.position() + deflateBytes);
+ return deflateBytes;
+ }
+
+ public void endOfMessage(Sink sink) {
+ if ((!usingCompression && bytebuf.remaining() == 0)
+ || (usingCompression && bytebuf.remaining() <= MARGIN + sufficient)) {
+ commitToSink(sink, true);
+ }
+ }
+
+ public void flush(Sink sink) {
+ if (bytebuf.position() == 0) {
+ return;
+ }
+ commitToSink(sink, true);
+ }
+
+ /**
+ * Writes compression frame to sink. It does not initialize the next frame, so {@link
+ * #checkInitFrame()} is necessary if other frames are to follow.
+ */
+ private void commitToSink(Sink sink, boolean endOfMessage) {
+ if (usingCompression) {
+ deflater.finish();
+ while (!deflater.finished()) {
+ deflatePut(deflater, bytebuf, Deflater.NO_FLUSH);
+ }
+ if (endOfMessage) {
+ deflater.end();
+ deflater = null;
+ }
+ }
+ int frameFlag = usingCompression
+ ? TransportFrameUtil.FLATE_FLAG : TransportFrameUtil.NO_COMPRESS_FLAG;
+ // Header = 1b flag | 3b length of GRPC frame
+ int header = (frameFlag << 24) | (bytebuf.position() - 4);
+ bytebuf.putInt(0, header);
+ bytebuf.flip();
+ sink.deliverFrame(bytebuf, endOfMessage);
+ bytebuf.clear();
+ }
+
+ private class OutputStreamAdapter extends OutputStream {
+ private Sink sink;
+ private final byte[] singleByte = new byte[1];
+
+ @Override
+ public void write(int b) {
+ singleByte[0] = (byte) b;
+ write(singleByte, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) {
+ CompressionFramer.this.write(b, off, len, sink);
+ }
+
+ public void setSink(Sink sink) {
+ this.sink = sink;
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/transport/Deframer.java b/core/src/main/java/com/google/net/stubby/transport/Deframer.java
new file mode 100644
index 0000000..5721c46
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/transport/Deframer.java
@@ -0,0 +1,137 @@
+package com.google.net.stubby.transport;
+
+import com.google.common.io.ByteStreams;
+import com.google.net.stubby.GrpcFramingUtil;
+import com.google.net.stubby.Operation;
+import com.google.net.stubby.Status;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Base implementation that joins a sequence of framed GRPC data produced by a {@link Framer},
+ * reconstructs their messages and hands them off to a receiving {@link Operation}
+ */
+// TODO(user): Either make this an interface of convert Framer -> AbstractFramer for consistency
+public abstract class Deframer<F> {
+
+ /**
+ * Unset frame length
+ */
+ private static final int LENGTH_NOT_SET = -1;
+
+ private boolean inFrame;
+ private byte currentFlags;
+ private int currentLength = LENGTH_NOT_SET;
+
+ public Deframer() {}
+
+ /**
+ * Consume a frame of bytes provided by the transport. Note that transport framing is not
+ * aligned on GRPC frame boundaries so this code needs to do bounds checking and buffering
+ * across transport frame boundaries.
+ *
+ * @return the number of unconsumed bytes remaining in the buffer
+ */
+ public int deframe(F frame, Operation target) {
+ try {
+ frame = decompress(frame);
+ DataInputStream grpcStream = prefix(frame);
+ // Loop until no more GRPC frames can be fully decoded
+ while (true) {
+ if (!inFrame) {
+ // Not in frame so attempt to read flags
+ if (!ensure(grpcStream, GrpcFramingUtil.FRAME_TYPE_LENGTH)) {
+ return consolidate();
+ }
+ currentFlags = grpcStream.readByte();
+ inFrame = true;
+ }
+ if (currentLength == LENGTH_NOT_SET) {
+ // Read the frame length
+ if (!ensure(grpcStream, GrpcFramingUtil.FRAME_LENGTH)) {
+ return consolidate();
+ }
+ currentLength = grpcStream.readInt();
+ }
+ // Ensure that the entire frame length is available to read
+ InputStream framedChunk = ensureMessage(grpcStream, currentLength);
+ if (framedChunk == null) {
+ // Insufficient bytes available
+ return consolidate();
+ }
+ if (GrpcFramingUtil.isPayloadFrame(currentFlags)) {
+ try {
+ // Report payload to the receiving operation
+ target.addPayload(framedChunk, Operation.Phase.PAYLOAD);
+ } finally {
+ currentLength = LENGTH_NOT_SET;
+ inFrame = false;
+ }
+ } else if (GrpcFramingUtil.isContextValueFrame(currentFlags)) {
+ // Not clear if using proto encoding here is of any benefit.
+ // Using ContextValue.parseFrom requires copying out of the framed chunk
+ // Writing a custom parser would have to do varint handling and potentially
+ // deal with out-of-order tags etc.
+ Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk);
+ target.addContext(contextValue.getKey(),
+ contextValue.getValue().newInput(),
+ target.getPhase());
+ } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
+ int status = framedChunk.read() << 8 | framedChunk.read();
+ Transport.Code code = Transport.Code.valueOf(status);
+ // TODO(user): Resolve what to do with remainder of framedChunk
+ if (code == null) {
+ // Log for unknown code
+ target.close(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
+ } else {
+ target.close(new Status(code));
+ }
+ }
+ if (grpcStream.available() == 0) {
+ // We've processed all the data so consolidate the underlying buffers
+ return consolidate();
+ }
+ }
+ } catch (IOException ioe) {
+ Status status = new Status(Transport.Code.UNKNOWN, ioe);
+ target.close(status);
+ throw status.asRuntimeException();
+ }
+ }
+
+ /**
+ * Return a stream view over the current buffer prefixed to the input frame
+ */
+ protected abstract DataInputStream prefix(F frame) throws IOException;
+
+ /**
+ * Consolidate the underlying buffers and return the number of buffered bytes remaining
+ */
+ protected abstract int consolidate() throws IOException;
+
+ /**
+ * Decompress the raw frame buffer prior to prefixing it.
+ */
+ protected abstract F decompress(F frame) throws IOException;
+
+ /**
+ * Ensure that {@code len} bytes are available in the buffer and frame
+ */
+ private boolean ensure(InputStream input, int len) throws IOException {
+ return (input.available() >= len);
+ }
+
+ /**
+ * Return a message of {@code len} bytes than can be read from the buffer. If sufficient
+ * bytes are unavailable then buffer the available bytes and return null.
+ */
+ private InputStream ensureMessage(InputStream input, int len)
+ throws IOException {
+ if (input.available() < len) {
+ return null;
+ }
+ return ByteStreams.limit(input, len);
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/transport/Framer.java b/core/src/main/java/com/google/net/stubby/transport/Framer.java
new file mode 100644
index 0000000..2ef0365
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/transport/Framer.java
@@ -0,0 +1,53 @@
+package com.google.net.stubby.transport;
+
+import com.google.net.stubby.Status;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Implementations produce the GRPC byte sequence and then split it over multiple frames to be
+ * delivered via the transport layer which implements {@link Framer.Sink}
+ */
+public interface Framer {
+
+ /**
+ * Sink implemented by the transport layer to receive frames and forward them to their
+ * destination
+ */
+ public interface Sink {
+ /**
+ * Deliver a frame via the transport.
+ * @param frame The contents of the frame to deliver
+ * @param endOfMessage Whether the frame is the last one for the current GRPC message.
+ */
+ public void deliverFrame(ByteBuffer frame, boolean endOfMessage);
+ }
+
+ /**
+ * Write out a Context-Value message. {@code message} will be completely consumed.
+ * {@code message.available()} must return the number of remaining bytes to be read.
+ */
+ public void writeContext(String type, InputStream message, boolean flush, Sink sink);
+
+ /**
+ * Write out a Payload message. {@code payload} will be completely consumed.
+ * {@code payload.available()} must return the number of remaining bytes to be read.
+ */
+ public void writePayload(InputStream payload, boolean flush, Sink sink);
+
+ /**
+ * Write out a Status message.
+ */
+ public void writeStatus(Status status, boolean flush, Sink sink);
+
+ /**
+ * Flush any buffered data in the framer to the sink.
+ */
+ public void flush(Sink sink);
+
+ /**
+ * Close the framer and release any buffers.
+ */
+ public void close();
+}
diff --git a/core/src/main/java/com/google/net/stubby/transport/InputStreamDeframer.java b/core/src/main/java/com/google/net/stubby/transport/InputStreamDeframer.java
new file mode 100644
index 0000000..e793aad
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/transport/InputStreamDeframer.java
@@ -0,0 +1,142 @@
+package com.google.net.stubby.transport;
+
+import com.google.common.io.ByteStreams;
+import com.google.net.stubby.Operation;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Deframer that expects the input frames to be provided as {@link InputStream} instances
+ * which accurately report their size using {@link java.io.InputStream#available()}.
+ */
+public class InputStreamDeframer extends Deframer<InputStream> {
+
+ private final InputStreamDeframer.PrefixingInputStream prefixingInputStream;
+
+ public InputStreamDeframer() {
+ prefixingInputStream = new PrefixingInputStream(4096);
+ }
+
+ /**
+ * Deframing a single input stream that contains multiple GRPC frames
+ */
+ @Override
+ public int deframe(InputStream frame, Operation target) {
+ try {
+ int read = 0;
+ while (frame.available() > 0) {
+ read += super.deframe(frame, target);
+ }
+ return read;
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ protected DataInputStream prefix(InputStream frame) throws IOException {
+ prefixingInputStream.consolidate();
+ prefixingInputStream.prefix(frame);
+ return new DataInputStream(prefixingInputStream);
+ }
+
+ @Override
+ protected int consolidate() throws IOException {
+ prefixingInputStream.consolidate();
+ return prefixingInputStream.available();
+ }
+
+ @Override
+ protected InputStream decompress(InputStream frame) throws IOException {
+ int compressionType = frame.read();
+ int frameLength = frame.read() << 16 | frame.read() << 8 | frame.read();
+ InputStream raw = ByteStreams.limit(frame, frameLength);
+ if (TransportFrameUtil.isNotCompressed(compressionType)) {
+ return raw;
+ } else if (TransportFrameUtil.isFlateCompressed(compressionType)) {
+ return new InflaterInputStream(raw);
+ }
+ throw new IOException("Unknown compression type " + compressionType);
+ }
+
+ /**
+ * InputStream that prefixes another input stream with a fixed buffer.
+ */
+ private class PrefixingInputStream extends InputStream {
+
+ private InputStream suffix;
+ private byte[] buffer;
+ private int bufferIndex;
+ private int maxRetainedBuffer;
+
+ private PrefixingInputStream(int maxRetainedBuffer) {
+ // TODO(user): Implement support for this.
+ this.maxRetainedBuffer = maxRetainedBuffer;
+ }
+
+ void prefix(InputStream suffix) {
+ this.suffix = suffix;
+ }
+
+ void consolidate() throws IOException {
+ int remainingSuffix = suffix == null ? 0 : suffix.available();
+ if (remainingSuffix == 0) {
+ // No suffix so clear
+ suffix = null;
+ } else if (buffer == null || remainingSuffix > buffer.length - bufferIndex) {
+ // Suffix exceeds current buffer size
+ buffer = ByteStreams.toByteArray(suffix);
+ bufferIndex = 0;
+ } else if (buffer.length == bufferIndex) {
+ // Buffer has been fully consumed, copy suffix into it
+ ByteStreams.readFully(suffix, buffer, buffer.length - remainingSuffix, remainingSuffix);
+ bufferIndex = buffer.length - remainingSuffix;
+ } else {
+ // Buffer has been partially consumed so shift the buffer before copying in the suffix
+ System.arraycopy(buffer, bufferIndex, buffer, bufferIndex - remainingSuffix,
+ buffer.length - bufferIndex);
+ ByteStreams.readFully(suffix, buffer, buffer.length - remainingSuffix, remainingSuffix);
+ bufferIndex -= remainingSuffix;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int read = readFromBuffer(b, off, len);
+ if (suffix != null) {
+ read += suffix.read(b, off + read, len - read);
+ }
+ return read;
+ }
+
+ private int readFromBuffer(byte[] b, int off, int len) {
+ if (buffer == null) {
+ return 0;
+ }
+ len = Math.min(buffer.length - bufferIndex, len);
+ System.arraycopy(buffer, bufferIndex, b, off, len);
+ bufferIndex += len;
+ return len;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (buffer == null || bufferIndex == buffer.length) {
+ return suffix == null ? -1 : suffix.read();
+ }
+ return buffer[bufferIndex++];
+ }
+
+ @Override
+ public int available() throws IOException {
+ int available = buffer != null ? buffer.length - bufferIndex : 0;
+ if (suffix != null) {
+ available += suffix.available();
+ }
+ return available;
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/transport/MessageFramer.java b/core/src/main/java/com/google/net/stubby/transport/MessageFramer.java
new file mode 100644
index 0000000..057d7d6
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/transport/MessageFramer.java
@@ -0,0 +1,180 @@
+package com.google.net.stubby.transport;
+
+import com.google.net.stubby.GrpcFramingUtil;
+import com.google.net.stubby.Status;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+/**
+ * Default {@link Framer} implementation.
+ */
+public class MessageFramer implements Framer {
+
+ /**
+ * Size of the GRPC message frame header which consists of
+ * 1 byte for the type (payload, context, status)
+ * 4 bytes for the length of the message
+ */
+ private static final int MESSAGE_HEADER_SIZE = 5;
+
+ /**
+ * UTF-8 charset which is used for key name encoding in context values
+ */
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ /**
+ * Precomputed protobuf tags for ContextValue
+ */
+ private static final byte[] VALUE_TAG;
+ private static final byte[] KEY_TAG;
+
+
+ static {
+ // Initialize constants for serializing context-value in a protobuf compatible manner
+ try {
+ byte[] buf = new byte[8];
+ CodedOutputStream coded = CodedOutputStream.newInstance(buf);
+ coded.writeTag(Transport.ContextValue.KEY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+ coded.flush();
+ KEY_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten());
+ coded = CodedOutputStream.newInstance(buf);
+ coded.writeTag(Transport.ContextValue.VALUE_FIELD_NUMBER,
+ WireFormat.WIRETYPE_LENGTH_DELIMITED);
+ coded.flush();
+ VALUE_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten());
+ } catch (IOException ioe) {
+ // Unrecoverable
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ private CompressionFramer framer;
+ private final ByteBuffer scratch = ByteBuffer.allocate(16);
+
+ public MessageFramer(int maxFrameSize) {
+ // TODO(user): maxFrameSize should probably come from a 'Platform' class
+ framer = new CompressionFramer(maxFrameSize, false, maxFrameSize / 16);
+ }
+
+ /**
+ * Sets whether compression is encouraged.
+ */
+ public void setAllowCompression(boolean enable) {
+ framer.setAllowCompression(enable);
+ }
+
+ /**
+ * Set the preferred compression level for when compression is enabled.
+ * @param level the preferred compression level, or {@code -1} to use the framing default
+ * @see java.util.zip.Deflater#setLevel
+ */
+ public void setCompressionLevel(int level) {
+ framer.setCompressionLevel(level);
+ }
+
+ @Override
+ public void writePayload(InputStream message, boolean flush, Sink sink) {
+ try {
+ scratch.clear();
+ scratch.put(GrpcFramingUtil.PAYLOAD_FRAME);
+ int messageLength = message.available();
+ scratch.putInt(messageLength);
+ framer.write(scratch.array(), 0, scratch.position(), sink);
+ if (messageLength != framer.write(message, sink)) {
+ throw new RuntimeException("InputStream's available() was inaccurate");
+ }
+ framer.endOfMessage(sink);
+ if (flush && framer != null) {
+ framer.flush(sink);
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+
+ @Override
+ public void writeContext(String key, InputStream message, boolean flush, Sink sink) {
+ try {
+ scratch.clear();
+ scratch.put(GrpcFramingUtil.CONTEXT_VALUE_FRAME);
+ byte[] keyBytes = key.getBytes(UTF_8);
+ int lenKeyPrefix = KEY_TAG.length +
+ CodedOutputStream.computeRawVarint32Size(keyBytes.length);
+ int messageLen = message.available();
+ int lenValPrefix = VALUE_TAG.length + CodedOutputStream.computeRawVarint32Size(messageLen);
+ int totalLen = lenKeyPrefix + keyBytes.length + lenValPrefix + messageLen;
+ scratch.putInt(totalLen);
+ framer.write(scratch.array(), 0, scratch.position(), sink);
+
+ // Write key
+ scratch.clear();
+ scratch.put(KEY_TAG);
+ writeRawVarInt32(keyBytes.length, scratch);
+ framer.write(scratch.array(), 0, scratch.position(), sink);
+ framer.write(keyBytes, 0, keyBytes.length, sink);
+
+ // Write value
+ scratch.clear();
+ scratch.put(VALUE_TAG);
+ writeRawVarInt32(messageLen, scratch);
+ framer.write(scratch.array(), 0, scratch.position(), sink);
+ if (messageLen != framer.write(message, sink)) {
+ throw new RuntimeException("InputStream's available() was inaccurate");
+ }
+ framer.endOfMessage(sink);
+ if (flush && framer != null) {
+ framer.flush(sink);
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public void writeStatus(Status status, boolean flush, Sink sink) {
+ short code = (short) status.getCode().ordinal();
+ scratch.clear();
+ scratch.put(GrpcFramingUtil.STATUS_FRAME);
+ int length = 2;
+ scratch.putInt(length);
+ scratch.putShort(code);
+ framer.write(scratch.array(), 0, scratch.position(), sink);
+ framer.endOfMessage(sink);
+ if (flush && framer != null) {
+ framer.flush(sink);
+ }
+ }
+
+ @Override
+ public void flush(Sink sink) {
+ framer.flush(sink);
+ }
+
+ @Override
+ public void close() {
+ // TODO(user): Returning buffer to a pool would go here
+ framer = null;
+ }
+
+ /**
+ * Write a raw VarInt32 to the buffer
+ */
+ private static void writeRawVarInt32(int value, ByteBuffer bytebuf) {
+ while (true) {
+ if ((value & ~0x7F) == 0) {
+ bytebuf.put((byte) value);
+ return;
+ } else {
+ bytebuf.put((byte) ((value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java b/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java
new file mode 100644
index 0000000..da73aff
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java
@@ -0,0 +1,23 @@
+package com.google.net.stubby.transport;
+
+/**
+ * Utility functions for transport layer framing.
+ *
+ * Within a given transport frame we reserve the first byte to indicate the
+ * type of compression used for the contents of the transport frame.
+ */
+public class TransportFrameUtil {
+
+ // Compression modes (lowest order 3 bits of frame flags)
+ public static final byte NO_COMPRESS_FLAG = 0x0;
+ public static final byte FLATE_FLAG = 0x1;
+ public static final byte COMPRESSION_FLAG_MASK = 0x7;
+
+ public static boolean isNotCompressed(int b) {
+ return ((b & COMPRESSION_FLAG_MASK) == NO_COMPRESS_FLAG);
+ }
+
+ public static boolean isFlateCompressed(int b) {
+ return ((b & COMPRESSION_FLAG_MASK) == FLATE_FLAG);
+ }
+}
diff --git a/core/src/test/java/com/google/net/stubby/transport/CompressionFramerTest.java b/core/src/test/java/com/google/net/stubby/transport/CompressionFramerTest.java
new file mode 100644
index 0000000..1ef6507
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/transport/CompressionFramerTest.java
@@ -0,0 +1,99 @@
+package com.google.net.stubby.transport;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.google.common.primitives.Bytes;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.Deflater;
+import java.util.zip.InflaterInputStream;
+
+/** Unit tests for {@link CompressionFramer}. */
+@RunWith(JUnit4.class)
+public class CompressionFramerTest {
+ private int maxFrameSize = 1024;
+ private int sufficient = 8;
+ private CompressionFramer framer = new CompressionFramer(maxFrameSize, true, sufficient);
+ private CapturingSink sink = new CapturingSink();
+
+ @Test
+ public void testGoodCompression() {
+ byte[] payload = new byte[1000];
+ framer.setCompressionLevel(Deflater.BEST_COMPRESSION);
+ framer.write(payload, 0, payload.length, sink);
+ framer.endOfMessage(sink);
+ framer.flush(sink);
+
+ assertEquals(1, sink.frames.size());
+ byte[] frame = sink.frames.get(0);
+ assertEquals(TransportFrameUtil.FLATE_FLAG, frame[0]);
+ assertTrue(decodeFrameLength(frame) < 30);
+ assertArrayEquals(payload, decompress(frame));
+ }
+
+ @Test
+ public void testPoorCompression() {
+ byte[] payload = new byte[3 * maxFrameSize / 2];
+ new Random(1).nextBytes(payload);
+ framer.setCompressionLevel(Deflater.DEFAULT_COMPRESSION);
+ framer.write(payload, 0, payload.length, sink);
+ framer.endOfMessage(sink);
+ framer.flush(sink);
+
+ assertEquals(2, sink.frames.size());
+ assertEquals(TransportFrameUtil.FLATE_FLAG, sink.frames.get(0)[0]);
+ assertEquals(TransportFrameUtil.FLATE_FLAG, sink.frames.get(1)[0]);
+ assertTrue(decodeFrameLength(sink.frames.get(0)) <= maxFrameSize);
+ assertTrue(decodeFrameLength(sink.frames.get(0))
+ >= maxFrameSize - CompressionFramer.HEADER_LENGTH - CompressionFramer.MARGIN - sufficient);
+ assertArrayEquals(payload, decompress(sink.frames));
+ }
+
+ private static int decodeFrameLength(byte[] frame) {
+ return ((frame[1] & 0xFF) << 16)
+ | ((frame[2] & 0xFF) << 8)
+ | (frame[3] & 0xFF);
+ }
+
+ private static byte[] decompress(byte[] frame) {
+ try {
+ return ByteStreams.toByteArray(new InflaterInputStream(new ByteArrayInputStream(frame,
+ CompressionFramer.HEADER_LENGTH, frame.length - CompressionFramer.HEADER_LENGTH)));
+ } catch (IOException ex) {
+ throw new AssertionError();
+ }
+ }
+
+ private static byte[] decompress(List<byte[]> frames) {
+ byte[][] bytes = new byte[frames.size()][];
+ for (int i = 0; i < frames.size(); i++) {
+ bytes[i] = decompress(frames.get(i));
+ }
+ return Bytes.concat(bytes);
+ }
+
+ private static class CapturingSink implements Framer.Sink {
+ public final List<byte[]> frames = Lists.newArrayList();
+
+ @Override
+ public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
+ byte[] frameBytes = new byte[frame.remaining()];
+ frame.get(frameBytes);
+ assertEquals(frameBytes.length - CompressionFramer.HEADER_LENGTH,
+ decodeFrameLength(frameBytes));
+ frames.add(frameBytes);
+ }
+ }
+}
diff --git a/core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java b/core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java
new file mode 100644
index 0000000..fae878c
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java
@@ -0,0 +1,126 @@
+package com.google.net.stubby.transport;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.io.ByteBuffers;
+import com.google.common.primitives.Bytes;
+import com.google.net.stubby.GrpcFramingUtil;
+import com.google.net.stubby.Status;
+import com.google.protobuf.ByteString;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Tests for {@link MessageFramer}
+ */
+@RunWith(JUnit4.class)
+public class MessageFramerTest {
+
+ public static final int TRANSPORT_FRAME_SIZE = 57;
+
+ @Test
+ public void testPayload() throws Exception {
+ MessageFramer framer = new MessageFramer(TRANSPORT_FRAME_SIZE);
+ byte[] payload = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+ byte[] unframedStream =
+ Bytes.concat(
+ new byte[]{GrpcFramingUtil.PAYLOAD_FRAME},
+ new byte[]{0, 0, 0, (byte) payload.length},
+ payload);
+ CapturingSink sink = new CapturingSink();
+ for (int i = 0; i < 1000; i++) {
+ framer.writePayload(new ByteArrayInputStream(payload), (i % 17 == 11), sink);
+ if ((i + 1) % 13 == 0) {
+ // Test flushing periodically
+ framer.flush(sink);
+ }
+ }
+ framer.flush(sink);
+ assertEquals(sink.deframedStream.length, unframedStream.length * 1000);
+ for (int i = 0; i < 1000; i++) {
+ assertArrayEquals(unframedStream,
+ Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length,
+ (i + 1) * unframedStream.length));
+ }
+ }
+
+ @Test
+ public void testContext() throws Exception {
+ MessageFramer framer = new MessageFramer(TRANSPORT_FRAME_SIZE);
+ byte[] payload = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
+ byte[] contextValue = Transport.ContextValue.newBuilder()
+ .setKey("somekey")
+ .setValue(ByteString.copyFrom(payload))
+ .build().toByteArray();
+ byte[] unframedStream =
+ Bytes.concat(
+ new byte[]{GrpcFramingUtil.CONTEXT_VALUE_FRAME},
+ new byte[]{0, 0,
+ (byte) (contextValue.length >> 8 & 0xff),
+ (byte) (contextValue.length & 0xff)},
+ contextValue);
+ CapturingSink sink = new CapturingSink();
+ for (int i = 0; i < 1000; i++) {
+ framer.writeContext("somekey", new ByteArrayInputStream(payload), (i % 17 == 11), sink);
+ if ((i + 1) % 13 == 0) {
+ framer.flush(sink);
+ }
+ }
+ framer.flush(sink);
+ assertEquals(unframedStream.length * 1000, sink.deframedStream.length);
+ for (int i = 0; i < 1000; i++) {
+ assertArrayEquals(unframedStream,
+ Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length,
+ (i + 1) * unframedStream.length));
+ }
+ }
+
+ @Test
+ public void testStatus() throws Exception {
+ MessageFramer framer = new MessageFramer(TRANSPORT_FRAME_SIZE);
+ byte[] unframedStream = Bytes.concat(
+ new byte[]{GrpcFramingUtil.STATUS_FRAME},
+ new byte[]{0, 0, 0, 2}, // Len is 2 bytes
+ new byte[]{0, 13}); // Internal==13
+ CapturingSink sink = new CapturingSink();
+ for (int i = 0; i < 1000; i++) {
+ framer.writeStatus(new Status(Transport.Code.INTERNAL), (i % 17 == 11), sink);
+ if ((i + 1) % 13 == 0) {
+ framer.flush(sink);
+ }
+ }
+ framer.flush(sink);
+ assertEquals(sink.deframedStream.length, unframedStream.length * 1000);
+ for (int i = 0; i < 1000; i++) {
+ assertArrayEquals(unframedStream,
+ Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length,
+ (i + 1) * unframedStream.length));
+ }
+ }
+
+ static class CapturingSink implements Framer.Sink {
+
+ byte[] deframedStream = new byte[0];
+
+ @Override
+ public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
+ assertTrue(frame.remaining() <= TRANSPORT_FRAME_SIZE);
+ // Frame must contain compression flag & 24 bit length
+ int header = frame.getInt();
+ byte flag = (byte) (header >>> 24);
+ int length = header & 0xFFFFFF;
+ assertTrue(TransportFrameUtil.isNotCompressed(flag));
+ assertEquals(frame.remaining(), length);
+ // Frame must exceed dictated transport frame size
+ deframedStream = Bytes.concat(deframedStream, ByteBuffers.extractBytes(frame));
+ }
+ }
+}