Implementing Netty-based GRPC Client Transport. Server-side will be implemented in a follow-up CL.
Overview:
- NettyClientTransport - this is the entry point into the client transport. This creates streams as well as the NettyClientHandler
- NettyClientStream - client stream implementation. This sends commands to the NettyClientHandler via commands. Callbacks on the stream are made directly from the NettyClientHandler in the channel thread context.
- NettyClientHandler - A Netty HTTP/2 handler that acts as a bridge between the NettyClientStreams and Netty's HTTP/2 processing.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=70970028
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
index 06b26e5..6c7387d 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java
@@ -5,6 +5,7 @@
import com.google.net.stubby.Operation;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
+import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
@@ -26,6 +27,7 @@
private boolean inFrame;
private byte currentFlags;
private int currentLength = LENGTH_NOT_SET;
+ private boolean statusDelivered;
public Deframer(Framer target) {
this.target = target;
@@ -34,8 +36,12 @@
@Override
public void deliverFrame(F frame, boolean endOfStream) {
int remaining = internalDeliverFrame(frame);
- if (endOfStream && remaining > 0) {
- target.writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame"));
+ if (endOfStream) {
+ if (remaining > 0) {
+ writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame"));
+ } else if (!statusDelivered) {
+ writeStatus(Status.OK);
+ }
}
}
@@ -91,8 +97,8 @@
// deal with out-of-order tags etc.
Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk);
try {
- target.writeContext(contextValue.getKey(),
- contextValue.getValue().newInput(), currentLength);
+ ByteString value = contextValue.getValue();
+ target.writeContext(contextValue.getKey(), value.newInput(), value.size());
} finally {
currentLength = LENGTH_NOT_SET;
inFrame = false;
@@ -104,10 +110,9 @@
try {
if (code == null) {
// Log for unknown code
- target.writeStatus(
- new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
+ writeStatus(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
} else {
- target.writeStatus(new Status(code));
+ writeStatus(new Status(code));
}
} finally {
currentLength = LENGTH_NOT_SET;
@@ -121,7 +126,7 @@
}
} catch (IOException ioe) {
Status status = new Status(Transport.Code.UNKNOWN, ioe);
- target.writeStatus(status);
+ writeStatus(status);
throw status.asRuntimeException();
}
}
@@ -148,6 +153,11 @@
return (input.available() >= len);
}
+ private void writeStatus(Status status) {
+ target.writeStatus(status);
+ statusDelivered = true;
+ }
+
/**
* Return a message of {@code len} bytes than can be read from the buffer. If sufficient
* bytes are unavailable then buffer the available bytes and return null.