Changing gRPC Java inbound flow control model
The goal is to mirror the token-based approach used by the Reactive
Streams API.
diff --git a/core/src/main/java/com/google/net/stubby/Call.java b/core/src/main/java/com/google/net/stubby/Call.java
index b8f6986..61ba2a8 100644
--- a/core/src/main/java/com/google/net/stubby/Call.java
+++ b/core/src/main/java/com/google/net/stubby/Call.java
@@ -31,9 +31,6 @@
package com.google.net.stubby;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import javax.annotation.Nullable;
/**
* Low-level methods for communicating with a remote server during a single RPC. Unlike normal RPCs,
@@ -69,14 +66,13 @@
* This method is always called, if no headers were received then an empty {@link Metadata}
* is passed.
*/
- public abstract ListenableFuture<Void> onHeaders(Metadata.Headers headers);
+ public abstract void onHeaders(Metadata.Headers headers);
/**
* A response payload has been received. For streaming calls, there may be zero payload
* messages.
*/
- @Nullable
- public abstract ListenableFuture<Void> onPayload(T payload);
+ public abstract void onPayload(T payload);
/**
* The Call has been closed. No further sending or receiving can occur. If {@code status} is
@@ -98,6 +94,22 @@
public abstract void start(Listener<ResponseT> responseListener, Metadata.Headers headers);
/**
+ * Requests up to the given number of messages from the call to be delivered to
+ * {@link Listener#onPayload(Object)}. No additional messages will be delivered.
+ *
+ * <p>Message delivery is guaranteed to be sequential in the order received. In addition, the
+ * listener methods will not be accessed concurrently. While it is not guaranteed that the same
+ * thread will always be used, it is guaranteed that only a single thread will access the listener
+ * at a time.
+ *
+ * <p>If it is desired to bypass inbound flow control, a very large number of messages can be
+ * specified (e.g. {@link Integer#MAX_VALUE}).
+ *
+ * @param numMessages the requested number of messages to be delivered to the listener.
+ */
+ public abstract void request(int numMessages);
+
+ /**
* Prevent any further processing for this Call. No further messages may be sent or will be
* received. The server is informed of cancellations, but may not stop processing the call.
* Cancellation is permitted even if previously {@code cancel()}ed or {@link #halfClose}d.
diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java
index 839d16d..9b5722b 100644
--- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java
+++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java
@@ -32,13 +32,10 @@
package com.google.net.stubby;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service.Listener;
import com.google.common.util.concurrent.Service.State;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.transport.ClientStream;
import com.google.net.stubby.transport.ClientStreamListener;
import com.google.net.stubby.transport.ClientTransport;
@@ -48,7 +45,6 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@@ -68,6 +64,7 @@
@Override public void flush() {}
@Override public void cancel() {}
@Override public void halfClose() {}
+ @Override public void request(int numMessages) {}
}
private final ClientTransportFactory transportFactory;
@@ -271,6 +268,11 @@
}
@Override
+ public void request(int numMessages) {
+ stream.request(numMessages);
+ }
+
+ @Override
public void cancel() {
// Cancel is called in exception handling cases, so it may be the case that the
// stream was never successfully created.
@@ -311,58 +313,51 @@
private class ClientStreamListenerImpl implements ClientStreamListener {
private final Listener<RespT> observer;
+ private boolean closed;
public ClientStreamListenerImpl(Listener<RespT> observer) {
Preconditions.checkNotNull(observer);
this.observer = observer;
}
- private ListenableFuture<Void> dispatchCallable(
- final Callable<ListenableFuture<Void>> callable) {
- final SettableFuture<Void> ours = SettableFuture.create();
+ @Override
+ public void headersRead(final Metadata.Headers headers) {
callExecutor.execute(new Runnable() {
@Override
public void run() {
try {
- ListenableFuture<Void> theirs = callable.call();
- if (theirs == null) {
- ours.set(null);
- } else {
- Futures.addCallback(theirs, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- ours.set(null);
- }
- @Override
- public void onFailure(Throwable t) {
- ours.setException(t);
- }
- }, MoreExecutors.directExecutor());
+ if (closed) {
+ return;
}
+
+ observer.onHeaders(headers);
} catch (Throwable t) {
- ours.setException(t);
+ cancel();
+ throw Throwables.propagate(t);
}
}
});
- return ours;
}
@Override
- public ListenableFuture<Void> headersRead(final Metadata.Headers headers) {
- return dispatchCallable(new Callable<ListenableFuture<Void>>() {
+ public void messageRead(final InputStream message, final int length) {
+ callExecutor.execute(new Runnable() {
@Override
- public ListenableFuture<Void> call() throws Exception {
- return observer.onHeaders(headers);
- }
- });
- }
+ public void run() {
+ try {
+ if (closed) {
+ return;
+ }
- @Override
- public ListenableFuture<Void> messageRead(final InputStream message, final int length) {
- return dispatchCallable(new Callable<ListenableFuture<Void>>() {
- @Override
- public ListenableFuture<Void> call() {
- return observer.onPayload(method.parseResponse(message));
+ try {
+ observer.onPayload(method.parseResponse(message));
+ } finally {
+ message.close();
+ }
+ } catch (Throwable t) {
+ cancel();
+ throw Throwables.propagate(t);
+ }
}
});
}
@@ -372,6 +367,7 @@
callExecutor.execute(new Runnable() {
@Override
public void run() {
+ closed = true;
observer.onClose(status, trailers);
}
});
diff --git a/core/src/main/java/com/google/net/stubby/ClientInterceptors.java b/core/src/main/java/com/google/net/stubby/ClientInterceptors.java
index 636f903..1e0248b 100644
--- a/core/src/main/java/com/google/net/stubby/ClientInterceptors.java
+++ b/core/src/main/java/com/google/net/stubby/ClientInterceptors.java
@@ -33,7 +33,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.Arrays;
import java.util.Iterator;
@@ -122,6 +121,11 @@
}
@Override
+ public void request(int numMessages) {
+ this.delegate.request(numMessages);
+ }
+
+ @Override
public void cancel() {
this.delegate.cancel();
}
@@ -150,13 +154,13 @@
}
@Override
- public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
- return delegate.onHeaders(headers);
+ public void onHeaders(Metadata.Headers headers) {
+ delegate.onHeaders(headers);
}
@Override
- public ListenableFuture<Void> onPayload(T payload) {
- return delegate.onPayload(payload);
+ public void onPayload(T payload) {
+ delegate.onPayload(payload);
}
@Override
diff --git a/core/src/main/java/com/google/net/stubby/ServerCall.java b/core/src/main/java/com/google/net/stubby/ServerCall.java
index e925349..8efdb2b 100644
--- a/core/src/main/java/com/google/net/stubby/ServerCall.java
+++ b/core/src/main/java/com/google/net/stubby/ServerCall.java
@@ -31,9 +31,6 @@
package com.google.net.stubby;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import javax.annotation.Nullable;
/**
* Low-level method for communicating with a remote client during a single RPC. Unlike normal RPCs,
@@ -67,8 +64,7 @@
* A request payload has been received. For streaming calls, there may be zero payload
* messages.
*/
- @Nullable
- public abstract ListenableFuture<Void> onPayload(RequestT payload);
+ public abstract void onPayload(RequestT payload);
/**
* The client completed all message sending. However, the call may still be cancelled.
@@ -94,6 +90,14 @@
}
/**
+ * Requests up to the given number of messages from the call to be delivered to
+ * {@link Listener#onPayload(Object)}. No additional messages will be delivered.
+ *
+ * @param numMessages the requested number of messages to be delivered to the listener.
+ */
+ public abstract void request(int numMessages);
+
+ /**
* Send response header metadata prior to sending a response payload. This method may
* only be called once and cannot be called after calls to {@code Stream#sendPayload}
* or {@code #close}.
diff --git a/core/src/main/java/com/google/net/stubby/ServerImpl.java b/core/src/main/java/com/google/net/stubby/ServerImpl.java
index f7cd2e9..df933fe 100644
--- a/core/src/main/java/com/google/net/stubby/ServerImpl.java
+++ b/core/src/main/java/com/google/net/stubby/ServerImpl.java
@@ -34,26 +34,20 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.transport.ServerListener;
import com.google.net.stubby.transport.ServerStream;
import com.google.net.stubby.transport.ServerStreamListener;
import com.google.net.stubby.transport.ServerTransportListener;
+import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
-import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
-import javax.annotation.Nullable;
-
/**
* Default implementation of {@link Server}, for creation by transports.
*
@@ -299,9 +293,12 @@
private static class NoopListener implements ServerStreamListener {
@Override
- @Nullable
- public ListenableFuture<Void> messageRead(InputStream value, int length) {
- return null;
+ public void messageRead(InputStream value, int length) {
+ try {
+ value.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -349,12 +346,16 @@
}
@Override
- @Nullable
- public ListenableFuture<Void> messageRead(final InputStream message, final int length) {
- return dispatchCallable(new Callable<ListenableFuture<Void>>() {
+ public void messageRead(final InputStream message, final int length) {
+ callExecutor.execute(new Runnable() {
@Override
- public ListenableFuture<Void> call() {
- return getListener().messageRead(message, length);
+ public void run() {
+ try {
+ getListener().messageRead(message, length);
+ } catch (Throwable t) {
+ internalClose(Status.fromThrowable(t), new Metadata.Trailers());
+ throw Throwables.propagate(t);
+ }
}
});
}
@@ -383,36 +384,6 @@
}
});
}
-
- private ListenableFuture<Void> dispatchCallable(
- final Callable<ListenableFuture<Void>> callable) {
- final SettableFuture<Void> ours = SettableFuture.create();
- callExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- ListenableFuture<Void> theirs = callable.call();
- if (theirs == null) {
- ours.set(null);
- } else {
- Futures.addCallback(theirs, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- ours.set(null);
- }
- @Override
- public void onFailure(Throwable t) {
- ours.setException(t);
- }
- }, MoreExecutors.directExecutor());
- }
- } catch (Throwable t) {
- ours.setException(t);
- }
- }
- });
- return ours;
- }
}
private class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
@@ -426,6 +397,11 @@
}
@Override
+ public void request(int numMessages) {
+ stream.request(numMessages);
+ }
+
+ @Override
public void sendHeaders(Metadata.Headers headers) {
stream.writeHeaders(headers);
}
@@ -468,13 +444,28 @@
}
@Override
- @Nullable
- public ListenableFuture<Void> messageRead(final InputStream message, int length) {
- return listener.onPayload(methodDef.parseRequest(message));
+ public void messageRead(final InputStream message, int length) {
+ if (cancelled) {
+ return;
+ }
+
+ try {
+ listener.onPayload(methodDef.parseRequest(message));
+ } finally {
+ try {
+ message.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
@Override
public void halfClosed() {
+ if (cancelled) {
+ return;
+ }
+
listener.onHalfClose();
}
diff --git a/core/src/main/java/com/google/net/stubby/ServerInterceptors.java b/core/src/main/java/com/google/net/stubby/ServerInterceptors.java
index a17c6ac..8401c09 100644
--- a/core/src/main/java/com/google/net/stubby/ServerInterceptors.java
+++ b/core/src/main/java/com/google/net/stubby/ServerInterceptors.java
@@ -145,6 +145,11 @@
}
@Override
+ public void request(int numMessages) {
+ delegate.request(numMessages);
+ }
+
+ @Override
public void sendHeaders(Metadata.Headers headers) {
delegate.sendHeaders(headers);
}
diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java
index 053b057..3cf17ef 100644
--- a/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java
@@ -33,14 +33,11 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -53,7 +50,6 @@
implements ClientStream {
private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
- private static final ListenableFuture<Void> COMPLETED_FUTURE = Futures.immediateFuture(null);
private final ClientStreamListener listener;
private boolean listenerClosed;
@@ -65,17 +61,15 @@
private Runnable closeListenerTask;
- protected AbstractClientStream(ClientStreamListener listener, Executor deframerExecutor) {
- super(deframerExecutor);
+ protected AbstractClientStream(ClientStreamListener listener) {
this.listener = Preconditions.checkNotNull(listener);
}
@Override
- protected ListenableFuture<Void> receiveMessage(InputStream is, int length) {
- if (listenerClosed) {
- return COMPLETED_FUTURE;
+ protected void receiveMessage(InputStream is, int length) {
+ if (!listenerClosed) {
+ listener.messageRead(is, length);
}
- return listener.messageRead(is, length);
}
@Override
@@ -114,7 +108,7 @@
new Object[]{id(), headers});
}
inboundPhase(Phase.MESSAGE);
- delayDeframer(listener.headersRead(headers));
+ listener.headersRead(headers);
}
/**
@@ -208,7 +202,7 @@
closeListenerTask = null;
// Determine if the deframer is stalled (i.e. currently has no complete messages to deliver).
- boolean deliveryStalled = !deframer.isDeliveryOutstanding();
+ boolean deliveryStalled = deframer.isStalled();
if (stopDelivery || deliveryStalled) {
// Close the listener immediately.
diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java
index 7dd1b53..ab1c9a6 100644
--- a/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java
+++ b/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java
@@ -32,13 +32,11 @@
package com.google.net.stubby.transport;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -63,8 +61,7 @@
/** Saved trailers from close() that need to be sent once the framer has sent all messages. */
private Metadata.Trailers stashedTrailers;
- protected AbstractServerStream(IdT id, Executor deframerExecutor) {
- super(deframerExecutor);
+ protected AbstractServerStream(IdT id) {
id(id);
}
@@ -73,9 +70,9 @@
}
@Override
- protected ListenableFuture<Void> receiveMessage(InputStream is, int length) {
+ protected void receiveMessage(InputStream is, int length) {
inboundPhase(Phase.MESSAGE);
- return listener.messageRead(is, length);
+ listener.messageRead(is, length);
}
@Override
diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java
index bed0372..8959408 100644
--- a/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java
+++ b/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java
@@ -34,15 +34,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
-import com.google.common.io.Closeables;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
import javax.annotation.Nullable;
@@ -59,15 +53,6 @@
private volatile IdT id;
private final MessageFramer framer;
- private final FutureCallback<Object> deframerErrorCallback = new FutureCallback<Object>() {
- @Override
- public void onSuccess(Object result) {}
-
- @Override
- public void onFailure(Throwable t) {
- deframeFailed(t);
- }
- };
final MessageDeframer deframer;
@@ -81,7 +66,7 @@
*/
private Phase outboundPhase = Phase.HEADERS;
- AbstractStream(Executor deframerExecutor) {
+ AbstractStream() {
MessageDeframer.Listener inboundMessageHandler = new MessageDeframer.Listener() {
@Override
public void bytesRead(int numBytes) {
@@ -89,14 +74,8 @@
}
@Override
- public ListenableFuture<Void> messageRead(InputStream input, final int length) {
- ListenableFuture<Void> future = null;
- try {
- future = receiveMessage(input, length);
- return future;
- } finally {
- closeWhenDone(future, input);
- }
+ public void messageRead(InputStream input, final int length) {
+ receiveMessage(input, length);
}
@Override
@@ -117,7 +96,7 @@
};
framer = new MessageFramer(outboundFrameHandler, 4096);
- this.deframer = new MessageDeframer(inboundMessageHandler, deframerExecutor);
+ this.deframer = new MessageDeframer(inboundMessageHandler);
}
/**
@@ -194,7 +173,7 @@
protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream);
/** A message was deframed. */
- protected abstract ListenableFuture<Void> receiveMessage(InputStream is, int length);
+ protected abstract void receiveMessage(InputStream is, int length);
/** Deframer has no pending deliveries. */
protected abstract void inboundDeliveryPaused();
@@ -215,23 +194,25 @@
/**
* Called to parse a received frame and attempt delivery of any completed
- * messages.
+ * messages. Must be called from the transport thread.
*/
protected final void deframe(Buffer frame, boolean endOfStream) {
- ListenableFuture<?> future;
- future = deframer.deframe(frame, endOfStream);
- if (future != null) {
- Futures.addCallback(future, deframerErrorCallback);
+ try {
+ deframer.deframe(frame, endOfStream);
+ } catch (Throwable t) {
+ deframeFailed(t);
}
}
/**
- * Delays delivery from the deframer until the given future completes.
+ * Called to request the given number of messages from the deframer. Must be called
+ * from the transport thread.
*/
- protected final void delayDeframer(ListenableFuture<?> future) {
- ListenableFuture<?> deliveryFuture = deframer.delayProcessing(future);
- if (deliveryFuture != null) {
- Futures.addCallback(deliveryFuture, deframerErrorCallback);
+ protected final void requestMessagesFromDeframer(int numMessages) {
+ try {
+ deframer.request(numMessages);
+ } catch (Throwable t) {
+ deframeFailed(t);
}
}
@@ -272,26 +253,6 @@
}
/**
- * If the given future is provided, closes the {@link InputStream} when it completes. Otherwise
- * the {@link InputStream} is closed immediately.
- */
- private static void closeWhenDone(@Nullable ListenableFuture<Void> future,
- final InputStream input) {
- if (future == null) {
- Closeables.closeQuietly(input);
- return;
- }
-
- // Close the buffer when the future completes.
- future.addListener(new Runnable() {
- @Override
- public void run() {
- Closeables.closeQuietly(input);
- }
- }, MoreExecutors.directExecutor());
- }
-
- /**
* Can the stream receive data from its remote peer.
*/
public boolean canReceive() {
diff --git a/core/src/main/java/com/google/net/stubby/transport/ClientStream.java b/core/src/main/java/com/google/net/stubby/transport/ClientStream.java
index 3187f3c..188fc84 100644
--- a/core/src/main/java/com/google/net/stubby/transport/ClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/transport/ClientStream.java
@@ -49,5 +49,4 @@
* the remote end-point is closed.
*/
void halfClose();
-
}
diff --git a/core/src/main/java/com/google/net/stubby/transport/ClientStreamListener.java b/core/src/main/java/com/google/net/stubby/transport/ClientStreamListener.java
index a1f8d98..0ec84ce 100644
--- a/core/src/main/java/com/google/net/stubby/transport/ClientStreamListener.java
+++ b/core/src/main/java/com/google/net/stubby/transport/ClientStreamListener.java
@@ -31,12 +31,9 @@
package com.google.net.stubby.transport;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
-import javax.annotation.Nullable;
-
/** An observer of client-side stream events. */
public interface ClientStreamListener extends StreamListener {
/**
@@ -48,11 +45,8 @@
* <p>This method should return quickly, as the same thread may be used to process other streams.
*
* @param headers the fully buffered received headers.
- * @return a processing completion future, or {@code null} to indicate that processing of the
- * headers is immediately complete.
*/
- @Nullable
- ListenableFuture<Void> headersRead(Metadata.Headers headers);
+ void headersRead(Metadata.Headers headers);
/**
* Called when the stream is fully closed. {@link
diff --git a/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java b/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java
index 682b184..9c8cdb1 100644
--- a/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java
+++ b/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java
@@ -37,7 +37,6 @@
import com.google.net.stubby.Status;
import java.nio.charset.Charset;
-import java.util.concurrent.Executor;
import javax.annotation.Nullable;
@@ -70,8 +69,8 @@
private Charset errorCharset = Charsets.UTF_8;
private boolean contentTypeChecked;
- protected Http2ClientStream(ClientStreamListener listener, Executor deframerExecutor) {
- super(listener, deframerExecutor);
+ protected Http2ClientStream(ClientStreamListener listener) {
+ super(listener);
}
protected void transportHeadersReceived(Metadata.Headers headers) {
diff --git a/core/src/main/java/com/google/net/stubby/transport/MessageDeframer.java b/core/src/main/java/com/google/net/stubby/transport/MessageDeframer.java
index 162c025..42b5865 100644
--- a/core/src/main/java/com/google/net/stubby/transport/MessageDeframer.java
+++ b/core/src/main/java/com/google/net/stubby/transport/MessageDeframer.java
@@ -32,19 +32,13 @@
package com.google.net.stubby.transport;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.Status;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
-import java.util.concurrent.Executor;
import java.util.zip.GZIPInputStream;
import javax.annotation.concurrent.NotThreadSafe;
@@ -52,8 +46,8 @@
/**
* Deframer for GRPC frames.
*
- * <p>This class is not thread-safe. All calls to this class must be made in the context of the
- * executor provided during creation. That executor must not allow concurrent execution of tasks.
+ * <p>This class is not thread-safe. All calls to public methods should be made in the transport
+ * thread.
*/
@NotThreadSafe
public class MessageDeframer implements Closeable {
@@ -82,11 +76,8 @@
*
* @param is stream containing the message.
* @param length the length in bytes of the message.
- * @return a future indicating when the application has completed processing the message. The
- * next delivery will not occur until this future completes. If {@code null}, it is assumed that
- * the application has completed processing the message upon returning from the method call.
*/
- ListenableFuture<Void> messageRead(InputStream is, int length);
+ void messageRead(InputStream is, int length);
/**
* Called when end-of-stream has not yet been reached but there are no complete messages
@@ -105,65 +96,67 @@
}
private final Listener listener;
- private final Executor executor;
private final Compression compression;
private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH;
private boolean compressedFlag;
private boolean endOfStream;
- private SettableFuture<?> deliveryOutstanding;
private CompositeBuffer nextFrame;
private CompositeBuffer unprocessed = new CompositeBuffer();
+ private long pendingDeliveries;
+ private boolean deliveryStalled = true;
/**
- * Create a deframer. All calls to this class must be made in the context of the provided
- * executor, which also must not allow concurrent processing of Runnables. Compression will not be
- * supported.
+ * Create a deframer. Compression will not be supported.
*
* @param listener listener for deframer events.
- * @param executor used for internal event processing
*/
- public MessageDeframer(Listener listener, Executor executor) {
- this(listener, executor, Compression.NONE);
+ public MessageDeframer(Listener listener) {
+ this(listener, Compression.NONE);
}
/**
- * Create a deframer. All calls to this class must be made in the context of the provided
- * executor, which also must not allow concurrent processing of Runnables.
+ * Create a deframer.
*
* @param listener listener for deframer events.
- * @param executor used for internal event processing
* @param compression the compression used if a compressed frame is encountered, with NONE meaning
* unsupported
*/
- public MessageDeframer(Listener listener, Executor executor, Compression compression) {
+ public MessageDeframer(Listener listener, Compression compression) {
this.listener = Preconditions.checkNotNull(listener, "sink");
- this.executor = Preconditions.checkNotNull(executor, "executor");
this.compression = Preconditions.checkNotNull(compression, "compression");
}
/**
- * Adds the given data to this deframer and attempts delivery to the sink.
+ * Requests up to the given number of messages from the call to be delivered to
+ * {@link Listener#messageRead(InputStream, int)}. No additional messages will be delivered.
*
- * <p>If returned future is not {@code null}, then it completes when no more deliveries are
- * occuring. Delivering completes if all available deframing input is consumed or if delivery
- * resulted in an exception, in which case this method may throw the exception or the returned
- * future will fail with the throwable. The future is guaranteed to complete within the executor
- * provided during construction.
+ * @param numMessages the requested number of messages to be delivered to the listener.
*/
- public ListenableFuture<?> deframe(Buffer data, boolean endOfStream) {
+ public void request(int numMessages) {
+ Preconditions.checkArgument(numMessages > 0, "numMessages must be > 0");
+ pendingDeliveries += numMessages;
+ deliver();
+ }
+
+ /**
+ * Adds the given data to this deframer and attempts delivery to the sink.
+ */
+ public void deframe(Buffer data, boolean endOfStream) {
Preconditions.checkNotNull(data, "data");
Preconditions.checkState(!this.endOfStream, "Past end of stream");
unprocessed.addBuffer(data);
// Indicate that all of the data for this stream has been received.
this.endOfStream = endOfStream;
+ deliver();
+ }
- if (isDeliveryOutstanding()) {
- // Only allow one outstanding delivery at a time.
- return null;
- }
- return deliver();
+ /**
+ * Indicates whether delivery is currently stalled, pending receipt of more data.
+ */
+ public boolean isStalled() {
+ return deliveryStalled;
}
@Override
@@ -175,83 +168,23 @@
}
/**
- * Indicates whether or not there is currently a delivery outstanding to the application.
+ * Reads and delivers as many messages to the sink as possible.
*/
- public final boolean isDeliveryOutstanding() {
- return deliveryOutstanding != null;
- }
-
- /**
- * Consider {@code future} to be a message currently being processed. Messages will not be
- * delivered until the future completes. The returned future behaves as if it was returned by
- * {@link #deframe(Buffer, boolean)}.
- *
- * @throws IllegalStateException if a message is already being processed
- */
- public ListenableFuture<?> delayProcessing(ListenableFuture<?> future) {
- Preconditions.checkState(!isDeliveryOutstanding(), "Only one delay allowed concurrently");
- if (future == null) {
- return null;
- }
- return delayProcessingInternal(future);
- }
-
- /**
- * May only be called when a delivery is known not to be outstanding. If deliveryOutstanding is
- * non-null, then it will be re-used and this method will return {@code null}.
- */
- private ListenableFuture<?> delayProcessingInternal(ListenableFuture<?> future) {
- Preconditions.checkNotNull(future, "future");
- // Return a separate future so that our callback is guaranteed to complete before any
- // listeners on the returned future.
- ListenableFuture<?> returnFuture = null;
- if (!isDeliveryOutstanding()) {
- returnFuture = deliveryOutstanding = SettableFuture.create();
- }
- Futures.addCallback(future, new FutureCallback<Object>() {
- @Override
- public void onFailure(Throwable t) {
- SettableFuture<?> previousOutstanding = deliveryOutstanding;
- deliveryOutstanding = null;
- previousOutstanding.setException(t);
- }
-
- @Override
- public void onSuccess(Object result) {
- try {
- deliver();
- } catch (Throwable t) {
- if (!isDeliveryOutstanding()) {
- throw Throwables.propagate(t);
- } else {
- onFailure(t);
- }
- }
- }
- }, executor);
- return returnFuture;
- }
-
- /**
- * Reads and delivers as many messages to the sink as possible. May only be called when a delivery
- * is known not to be outstanding.
- */
- private ListenableFuture<?> deliver() {
+ private void deliver() {
// Process the uncompressed bytes.
- while (readRequiredBytes()) {
+ boolean stalled = false;
+ while (pendingDeliveries > 0 && !(stalled = !readRequiredBytes())) {
switch (state) {
case HEADER:
processHeader();
break;
case BODY:
- // Read the body and deliver the message to the sink.
- ListenableFuture<?> processingFuture = processBody();
- if (processingFuture != null) {
- // A future was returned for the completion of processing the delivered
- // message. Once it's done, try to deliver the next message.
- return delayProcessingInternal(processingFuture);
- }
+ // Read the body and deliver the message.
+ processBody();
+ // Since we've delivered a message, decrement the number of pending
+ // deliveries remaining.
+ pendingDeliveries--;
break;
default:
throw new AssertionError("Invalid state: " + state);
@@ -259,25 +192,29 @@
}
if (endOfStream) {
- if (nextFrame.readableBytes() != 0) {
- throw Status.INTERNAL
- .withDescription("Encountered end-of-stream mid-frame")
+ if (!isDataAvailable()) {
+ listener.endOfStream();
+ } else if (stalled) {
+ // We've received the entire stream and have data available but we don't have
+ // enough to read the next frame ... this is bad.
+ throw Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame")
.asRuntimeException();
}
- listener.endOfStream();
}
- // All available messages have processed.
- if (isDeliveryOutstanding()) {
- SettableFuture<?> previousOutstanding = deliveryOutstanding;
- deliveryOutstanding = null;
- previousOutstanding.set(null);
- if (!endOfStream) {
- // Notify that delivery is currently paused.
- listener.deliveryStalled();
- }
+ // Never indicate that we're stalled if we've received all the data for the stream.
+ stalled &= !endOfStream;
+
+ // If we're transitioning to the stalled state, notify the listener.
+ boolean previouslyStalled = deliveryStalled;
+ deliveryStalled = stalled;
+ if (stalled && !previouslyStalled) {
+ listener.deliveryStalled();
}
- return null;
+ }
+
+ private boolean isDataAvailable() {
+ return unprocessed.readableBytes() > 0 || (nextFrame != null && nextFrame.readableBytes() > 0);
}
/**
@@ -335,35 +272,32 @@
* Processes the body of the GRPC compression frame. A single compression frame may contain
* several GRPC messages within it.
*/
- private ListenableFuture<?> processBody() {
- ListenableFuture<?> future;
+ private void processBody() {
if (compressedFlag) {
if (compression == Compression.NONE) {
- throw Status.INTERNAL
- .withDescription("Can't decode compressed frame as compression not configured.")
- .asRuntimeException();
+ throw Status.INTERNAL.withDescription(
+ "Can't decode compressed frame as compression not configured.").asRuntimeException();
} else if (compression == Compression.GZIP) {
// Fully drain frame.
byte[] bytes;
try {
- bytes = ByteStreams.toByteArray(
- new GZIPInputStream(Buffers.openStream(nextFrame, false)));
+ bytes =
+ ByteStreams.toByteArray(new GZIPInputStream(Buffers.openStream(nextFrame, false)));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
- future = listener.messageRead(new ByteArrayInputStream(bytes), bytes.length);
+ listener.messageRead(new ByteArrayInputStream(bytes), bytes.length);
} else {
throw new AssertionError("Unknown compression type");
}
} else {
// Don't close the frame, since the sink is now responsible for the life-cycle.
- future = listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
+ listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
nextFrame = null;
}
// Done with this frame, begin processing the next header.
state = State.HEADER;
requiredLength = HEADER_LENGTH;
- return future;
}
}
diff --git a/core/src/main/java/com/google/net/stubby/transport/Stream.java b/core/src/main/java/com/google/net/stubby/transport/Stream.java
index fd795bc..85c32cd 100644
--- a/core/src/main/java/com/google/net/stubby/transport/Stream.java
+++ b/core/src/main/java/com/google/net/stubby/transport/Stream.java
@@ -42,6 +42,15 @@
*/
public interface Stream {
/**
+ * Requests up to the given number of messages from the call to be delivered to
+ * {@link StreamListener#messageRead(java.io.InputStream, int)}. No additional messages will be
+ * delivered.
+ *
+ * @param numMessages the requested number of messages to be delivered to the listener.
+ */
+ void request(int numMessages);
+
+ /**
* Writes a message payload to the remote end-point. The bytes from the stream are immediate read
* by the Transport. This method will always return immediately and will not wait for the write to
* complete.
diff --git a/core/src/main/java/com/google/net/stubby/transport/StreamListener.java b/core/src/main/java/com/google/net/stubby/transport/StreamListener.java
index 2250e34..a38b51d 100644
--- a/core/src/main/java/com/google/net/stubby/transport/StreamListener.java
+++ b/core/src/main/java/com/google/net/stubby/transport/StreamListener.java
@@ -31,12 +31,8 @@
package com.google.net.stubby.transport;
-import com.google.common.util.concurrent.ListenableFuture;
-
import java.io.InputStream;
-import javax.annotation.Nullable;
-
/**
* An observer of {@link Stream} events. It is guaranteed to only have one concurrent callback at a
* time.
@@ -46,21 +42,12 @@
* Called upon receiving a message from the remote end-point. The {@link InputStream} is
* non-blocking and contains the entire message.
*
- * <p>The method optionally returns a future that can be observed by flow control to determine
- * when the message has been processed by the application. If {@code null} is returned, processing
- * of this message is assumed to be complete upon returning from this method.
- *
- * <p>The {@code message} {@link InputStream} will be closed when the returned future completes.
- * If no future is returned, the stream will be closed immediately after returning from this
- * method.
+ * <p>The provided {@code message} {@link InputStream} must be closed by the listener.
*
* <p>This method should return quickly, as the same thread may be used to process other streams.
*
* @param message the bytes of the message.
* @param length the length of the message {@link InputStream}.
- * @return a processing completion future, or {@code null} to indicate that processing of the
- * message is immediately complete.
*/
- @Nullable
- ListenableFuture<Void> messageRead(InputStream message, int length);
+ void messageRead(InputStream message, int length);
}
diff --git a/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java b/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java
index bbd3fd4..22ce6d1 100644
--- a/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java
+++ b/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java
@@ -130,6 +130,7 @@
public void ordered() {
final List<String> order = new ArrayList<String>();
channel = new Channel() {
+ @SuppressWarnings("unchecked")
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
order.add("channel");
@@ -199,9 +200,9 @@
public void start(Call.Listener<RespT> responseListener, Metadata.Headers headers) {
super.start(new ForwardingListener<RespT>(responseListener) {
@Override
- public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+ public void onHeaders(Metadata.Headers headers) {
examinedHeaders.add(headers);
- return super.onHeaders(headers);
+ super.onHeaders(headers);
}
}, headers);
}
diff --git a/core/src/test/java/com/google/net/stubby/ServerImplTest.java b/core/src/test/java/com/google/net/stubby/ServerImplTest.java
index 9f3b953..9cb6cf4 100644
--- a/core/src/test/java/com/google/net/stubby/ServerImplTest.java
+++ b/core/src/test/java/com/google/net/stubby/ServerImplTest.java
@@ -34,21 +34,18 @@
import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.notNull;
import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.transport.ServerStream;
import com.google.net.stubby.transport.ServerStreamListener;
import com.google.net.stubby.transport.ServerTransportListener;
@@ -71,8 +68,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
/** Unit tests for {@link ServerImpl}. */
@@ -86,7 +81,9 @@
private Service transportServer = new NoopService();
private ServerImpl server = new ServerImpl(executor, registry)
.setTransportServer(transportServer);
- private ServerStream stream = Mockito.mock(ServerStream.class);
+
+ @Mock
+ private ServerStream stream;
@Mock
private ServerCall.Listener<String> callListener;
@@ -238,9 +235,8 @@
assertNotNull(call);
String order = "Lots of pizza, please";
- ListenableFuture<Void> future = streamListener.messageRead(STRING_MARSHALLER.stream(order), 1);
- future.get();
- verify(callListener).onPayload(order);
+ streamListener.messageRead(STRING_MARSHALLER.stream(order), 1);
+ verify(callListener, timeout(2000)).onPayload(order);
call.sendPayload(314);
ArgumentCaptor<InputStream> inputCaptor = ArgumentCaptor.forClass(InputStream.class);
@@ -297,48 +293,6 @@
verifyNoMoreInteractions(stream);
}
- @Test
- public void futureStatusIsPropagatedToTransport() throws Exception {
- final AtomicReference<ServerCall<Integer>> callReference
- = new AtomicReference<ServerCall<Integer>>();
- registry.addService(ServerServiceDefinition.builder("Waiter")
- .addMethod("serve", STRING_MARSHALLER, INTEGER_MARSHALLER,
- new ServerCallHandler<String, Integer>() {
- @Override
- public ServerCall.Listener<String> startCall(String fullMethodName,
- ServerCall<Integer> call, Metadata.Headers headers) {
- callReference.set(call);
- return callListener;
- }
- }).build());
- ServerTransportListener transportListener = newTransport(server);
-
- ServerStreamListener streamListener
- = transportListener.streamCreated(stream, "/Waiter/serve", new Metadata.Headers());
- assertNotNull(streamListener);
-
- executeBarrier(executor).await();
- ServerCall<Integer> call = callReference.get();
- assertNotNull(call);
-
- String delay = "No, I've not looked over the menu yet";
- SettableFuture<Void> appFuture = SettableFuture.create();
- when(callListener.onPayload(delay)).thenReturn(appFuture);
- ListenableFuture<Void> future = streamListener.messageRead(STRING_MARSHALLER.stream(delay), 1);
- executeBarrier(executor).await();
- verify(callListener).onPayload(delay);
- try {
- future.get(0, TimeUnit.SECONDS);
- fail();
- } catch (TimeoutException ex) {
- // Expected.
- }
-
- appFuture.set(null);
- // Shouldn't throw.
- future.get(0, TimeUnit.SECONDS);
- }
-
private static ServerTransportListener newTransport(ServerImpl server) {
Service transport = new NoopService();
transport.startAsync();
diff --git a/core/src/test/java/com/google/net/stubby/transport/MessageDeframerTest.java b/core/src/test/java/com/google/net/stubby/transport/MessageDeframerTest.java
index 3365b3c..bab8f59 100644
--- a/core/src/test/java/com/google/net/stubby/transport/MessageDeframerTest.java
+++ b/core/src/test/java/com/google/net/stubby/transport/MessageDeframerTest.java
@@ -32,25 +32,16 @@
package com.google.net.stubby.transport;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Bytes;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.transport.MessageDeframer.Listener;
import org.junit.Test;
@@ -62,7 +53,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.zip.GZIPOutputStream;
/**
@@ -71,13 +61,13 @@
@RunWith(JUnit4.class)
public class MessageDeframerTest {
private Listener listener = mock(Listener.class);
- private MessageDeframer deframer =
- new MessageDeframer(listener, MoreExecutors.directExecutor());
+ private MessageDeframer deframer = new MessageDeframer(listener);
private ArgumentCaptor<InputStream> messages = ArgumentCaptor.forClass(InputStream.class);
@Test
public void simplePayload() {
- assertNull(deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 2, 3, 14}), false));
+ deframer.request(1);
+ deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 2, 3, 14}), false);
verify(listener).messageRead(messages.capture(), eq(2));
assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt());
@@ -86,8 +76,8 @@
@Test
public void smallCombinedPayloads() {
- assertNull(
- deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false));
+ deframer.request(2);
+ deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
verify(listener).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(listener).messageRead(messages.capture(), eq(2));
@@ -98,7 +88,8 @@
@Test
public void endOfStreamWithPayloadShouldNotifyEndOfStream() {
- assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true));
+ deframer.request(1);
+ deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true);
verify(listener).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(listener).endOfStream();
@@ -108,17 +99,18 @@
@Test
public void endOfStreamShouldNotifyEndOfStream() {
- assertNull(deframer.deframe(buffer(new byte[0]), true));
+ deframer.deframe(buffer(new byte[0]), true);
verify(listener).endOfStream();
verifyNoMoreInteractions(listener);
}
@Test
public void payloadSplitBetweenBuffers() {
- assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false));
+ deframer.request(1);
+ deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false);
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
- assertNull(deframer.deframe(buffer(new byte[] {2, 6}), false));
+ deframer.deframe(buffer(new byte[] {2, 6}), false);
verify(listener).messageRead(messages.capture(), eq(7));
assertEquals(Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt());
@@ -127,10 +119,12 @@
@Test
public void frameHeaderSplitBetweenBuffers() {
- assertNull(deframer.deframe(buffer(new byte[] {0, 0}), false));
+ deframer.request(1);
+
+ deframer.deframe(buffer(new byte[] {0, 0}), false);
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
- assertNull(deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false));
+ deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false);
verify(listener).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt());
@@ -139,7 +133,8 @@
@Test
public void emptyPayload() {
- assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false));
+ deframer.request(1);
+ deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false);
verify(listener).messageRead(messages.capture(), eq(0));
assertEquals(Bytes.asList(), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt());
@@ -148,8 +143,9 @@
@Test
public void largerFrameSize() {
- assertNull(deframer.deframe(
- Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false));
+ deframer.request(1);
+ deframer.deframe(
+ Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false);
verify(listener).messageRead(messages.capture(), eq(1000));
assertEquals(Bytes.asList(new byte[1000]), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt());
@@ -157,110 +153,23 @@
}
@Test
- public void payloadCallbackShouldWaitForFutureCompletion() {
- SettableFuture<Void> messageFuture = SettableFuture.create();
- when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
- // Deframe a block with 2 messages.
- ListenableFuture<?> deframeFuture
- = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
- assertNotNull(deframeFuture);
- verify(listener).messageRead(messages.capture(), eq(1));
- assertEquals(Bytes.asList(new byte[]{3}), bytes(messages));
- verify(listener, atLeastOnce()).bytesRead(anyInt());
+ public void endOfStreamCallbackShouldWaitForMessageDelivery() {
+ deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true);
verifyNoMoreInteractions(listener);
- SettableFuture<Void> messageFuture2 = SettableFuture.create();
- when(listener.messageRead(any(InputStream.class), eq(2))).thenReturn(messageFuture2);
- messageFuture.set(null);
- assertFalse(deframeFuture.isDone());
- verify(listener).messageRead(messages.capture(), eq(2));
- assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(messages));
- verify(listener, atLeastOnce()).bytesRead(anyInt());
- verifyNoMoreInteractions(listener);
-
- messageFuture2.set(null);
- assertTrue(deframeFuture.isDone());
-
- verify(listener, atLeastOnce()).bytesRead(anyInt());
- verify(listener).deliveryStalled();
- verifyNoMoreInteractions(listener);
- }
-
- @Test
- public void endOfStreamCallbackShouldWaitForFutureCompletion() {
- SettableFuture<Void> messageFuture = SettableFuture.create();
- when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
- ListenableFuture<?> deframeFuture
- = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true);
- assertNotNull(deframeFuture);
+ deframer.request(1);
verify(listener).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
- verify(listener, atLeastOnce()).bytesRead(anyInt());
- verifyNoMoreInteractions(listener);
-
- messageFuture.set(null);
- assertTrue(deframeFuture.isDone());
verify(listener).endOfStream();
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
}
@Test
- public void futureShouldPropagateThrownException() throws InterruptedException {
- SettableFuture<Void> messageFuture = SettableFuture.create();
- when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
- ListenableFuture<?> deframeFuture
- = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
- assertNotNull(deframeFuture);
- verify(listener).messageRead(messages.capture(), eq(1));
- assertEquals(Bytes.asList(new byte[]{3}), bytes(messages));
- verify(listener, atLeastOnce()).bytesRead(anyInt());
- verifyNoMoreInteractions(listener);
-
- RuntimeException thrownEx = new RuntimeException();
- when(listener.messageRead(any(InputStream.class), eq(2))).thenThrow(thrownEx);
- messageFuture.set(null);
- verify(listener).messageRead(messages.capture(), eq(2));
- assertTrue(deframeFuture.isDone());
- try {
- deframeFuture.get();
- fail("Should have throws ExecutionException");
- } catch (ExecutionException ex) {
- assertEquals(thrownEx, ex.getCause());
- }
- verify(listener, atLeastOnce()).bytesRead(anyInt());
- verifyNoMoreInteractions(listener);
- }
-
- @Test
- public void futureFailureShouldStopAndPropagateFailure() throws InterruptedException {
- SettableFuture<Void> messageFuture = SettableFuture.create();
- when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
- ListenableFuture<?> deframeFuture
- = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
- assertNotNull(deframeFuture);
- verify(listener).messageRead(messages.capture(), eq(1));
- assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
- verify(listener, atLeastOnce()).bytesRead(anyInt());
- verifyNoMoreInteractions(listener);
-
- RuntimeException thrownEx = new RuntimeException();
- messageFuture.setException(thrownEx);
- assertTrue(deframeFuture.isDone());
- try {
- deframeFuture.get();
- fail("Should have throws ExecutionException");
- } catch (ExecutionException ex) {
- assertEquals(thrownEx, ex.getCause());
- }
- verify(listener, atLeastOnce()).bytesRead(anyInt());
- verifyNoMoreInteractions(listener);
- }
-
- @Test
public void compressed() {
- deframer = new MessageDeframer(
- listener, MoreExecutors.directExecutor(), MessageDeframer.Compression.GZIP);
+ deframer = new MessageDeframer(listener, MessageDeframer.Compression.GZIP);
+ deframer.request(1);
+
byte[] payload = compress(new byte[1000]);
assertTrue(payload.length < 100);
byte[] header = new byte[] {1, 0, 0, 0, (byte) payload.length};
diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java
index f557985..cadb10a 100644
--- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java
+++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java
@@ -39,7 +39,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.net.stubby.AbstractServerBuilder;
@@ -75,12 +74,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -426,22 +425,18 @@
// Start the call and prepare capture of results.
final List<StreamingOutputCallResponse> results =
Collections.synchronizedList(new ArrayList<StreamingOutputCallResponse>());
- final List<SettableFuture<Void>> processedFutures =
- Collections.synchronizedList(new LinkedList<SettableFuture<Void>>());
final SettableFuture<Void> completionFuture = SettableFuture.create();
+ final AtomicInteger count = new AtomicInteger();
call.start(new Call.Listener<StreamingOutputCallResponse>() {
@Override
- public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
- return null;
+ public void onHeaders(Metadata.Headers headers) {
}
@Override
- public ListenableFuture<Void> onPayload(final StreamingOutputCallResponse payload) {
- SettableFuture<Void> processedFuture = SettableFuture.create();
+ public void onPayload(final StreamingOutputCallResponse payload) {
results.add(payload);
- processedFutures.add(processedFuture);
- return processedFuture;
+ count.incrementAndGet();
}
@Override
@@ -460,17 +455,9 @@
// Slowly set completion on all of the futures.
int expectedResults = responseSizes.size();
- int count = 0;
- while (count < expectedResults) {
- if (!processedFutures.isEmpty()) {
- assertEquals(1, processedFutures.size());
- assertEquals(count + 1, results.size());
- count++;
-
- // Remove and set the first future to allow receipt of additional messages
- // from flow control.
- processedFutures.remove(0).set(null);
- }
+ while (count.get() < expectedResults) {
+ // Allow one more inbound message to be delivered to the application.
+ call.request(1);
// Sleep a bit.
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java
index af91312..d9e3c30 100644
--- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java
+++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java
@@ -51,11 +51,21 @@
private final NettyClientHandler handler;
NettyClientStream(ClientStreamListener listener, Channel channel, NettyClientHandler handler) {
- super(listener, channel.eventLoop());
+ super(listener);
this.channel = checkNotNull(channel, "channel");
this.handler = checkNotNull(handler, "handler");
}
+ @Override
+ public void request(final int numMessages) {
+ channel.eventLoop().execute(new Runnable() {
+ @Override
+ public void run() {
+ requestMessagesFromDeframer(numMessages);
+ }
+ });
+ }
+
void transportHeadersReceived(Http2Headers headers, boolean endOfStream) {
if (endOfStream) {
transportTrailersReceived(Utils.convertTrailers(headers));
diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java
index c4241a8..a5ef06e 100644
--- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java
+++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java
@@ -51,7 +51,7 @@
private final NettyServerHandler handler;
NettyServerStream(Channel channel, int id, NettyServerHandler handler) {
- super(id, channel.eventLoop());
+ super(id);
this.channel = checkNotNull(channel, "channel");
this.handler = checkNotNull(handler, "handler");
}
@@ -61,6 +61,16 @@
}
@Override
+ public void request(final int numMessages) {
+ channel.eventLoop().execute(new Runnable() {
+ @Override
+ public void run() {
+ requestMessagesFromDeframer(numMessages);
+ }
+ });
+ }
+
+ @Override
protected void inboundDeliveryPaused() {
// Do nothing.
}
diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java
index 112780c..1c49854 100644
--- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java
+++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java
@@ -45,9 +45,7 @@
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.AbstractStream;
@@ -64,7 +62,6 @@
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
-import org.mockito.Mockito;
import java.io.InputStream;
@@ -202,9 +199,9 @@
verify(listener, never()).closed(any(Status.class), any(Metadata.Trailers.class));
// We are now waiting for 100 bytes of error context on the stream, cancel has not yet been sent
- Mockito.verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class));
+ verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class));
stream().transportDataReceived(Unpooled.buffer(100).writeZero(100), false);
- Mockito.verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class));
+ verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class));
stream().transportDataReceived(Unpooled.buffer(1000).writeZero(1000), false);
// Now verify that cancel is sent and an error is reported to the listener
@@ -226,10 +223,6 @@
@Test
public void deframedDataAfterCancelShouldBeIgnored() throws Exception {
- // Mock the listener to return this future when a message is read.
- final SettableFuture<Void> future = SettableFuture.create();
- when(listener.messageRead(any(InputStream.class), anyInt())).thenReturn(future);
-
stream().id(1);
// Receive headers first so that it's a valid GRPC response.
stream().transportHeadersReceived(grpcResponseHeaders(), false);
@@ -238,6 +231,9 @@
stream().transportDataReceived(simpleGrpcFrame(), false);
stream().transportDataReceived(simpleGrpcFrame(), false);
+ // Only allow the first to be delivered.
+ stream().request(1);
+
// Receive error trailers. The server status will not be processed until after all of the
// data frames have been processed. Since cancellation will interrupt message delivery,
// this status will never be processed and the listener will instead only see the
@@ -251,9 +247,8 @@
Metadata.Trailers trailers = Utils.convertTrailers(grpcResponseTrailers(Status.CANCELLED));
stream().transportReportStatus(Status.CANCELLED, true, trailers);
- // Now complete the future to trigger the deframer to fire the next message to the
- // stream.
- future.set(null);
+ // Now allow the delivery of the second.
+ stream().request(1);
// Verify that the listener was only notified of the first message, not the second.
verify(listener).messageRead(any(InputStream.class), anyInt());
diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java
index d749ea0..0bfd8a6 100644
--- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java
+++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java
@@ -44,6 +44,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -163,6 +164,7 @@
private void inboundDataShouldForwardToStreamListener(boolean endStream) throws Exception {
createStream();
+ stream.request(1);
// Create a data frame and then trigger the handler to read it.
ByteBuf frame = dataFrame(STREAM_ID, endStream);
@@ -180,6 +182,7 @@
@Test
public void clientHalfCloseShouldForwardToStreamListener() throws Exception {
createStream();
+ stream.request(1);
handler.channelRead(ctx, emptyGrpcFrame(STREAM_ID, true));
ArgumentCaptor<InputStream> captor = ArgumentCaptor.forClass(InputStream.class);
@@ -202,11 +205,12 @@
@Test
public void streamErrorShouldNotCloseChannel() throws Exception {
createStream();
+ stream.request(1);
// When a DATA frame is read, throw an exception. It will be converted into an
// Http2StreamException.
RuntimeException e = new RuntimeException("Fake Exception");
- when(streamListener.messageRead(any(InputStream.class), anyInt())).thenThrow(e);
+ doThrow(e).when(streamListener).messageRead(any(InputStream.class), anyInt());
// Read a DATA frame to trigger the exception.
handler.channelRead(ctx, emptyGrpcFrame(STREAM_ID, true));
@@ -217,7 +221,7 @@
// Verify the stream was closed.
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(streamListener).closed(captor.capture());
- assertEquals(e, captor.getValue().asException().getCause().getCause());
+ assertEquals(e, captor.getValue().asException().getCause());
assertEquals(Code.INTERNAL, captor.getValue().getCode());
}
@@ -225,7 +229,7 @@
public void connectionErrorShouldCloseChannel() throws Exception {
createStream();
- // Read a DATA frame to trigger the exception.
+ // Read a bad frame to trigger the exception.
handler.channelRead(ctx, badFrame());
// Verify the expected GO_AWAY frame was written.
diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java
index a2605c4..c109a69 100644
--- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java
+++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java
@@ -32,7 +32,6 @@
package com.google.net.stubby.transport.netty;
import static com.google.net.stubby.transport.netty.NettyTestUtil.messageFrame;
-import static com.google.net.stubby.transport.netty.NettyTestUtil.statusFrame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyStreamTestBase.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyStreamTestBase.java
index 249ec63..ad04847 100644
--- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyStreamTestBase.java
+++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyStreamTestBase.java
@@ -35,14 +35,12 @@
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.transport.AbstractStream;
import com.google.net.stubby.transport.StreamListener;
@@ -95,8 +93,6 @@
@Mock
protected ChannelPromise promise;
- protected SettableFuture<Void> processingFuture;
-
protected InputStream input;
protected AbstractStream<Integer> stream;
@@ -114,9 +110,6 @@
when(pipeline.firstContext()).thenReturn(ctx);
when(eventLoop.inEventLoop()).thenReturn(true);
- processingFuture = SettableFuture.create();
- when(listener().messageRead(any(InputStream.class), anyInt())).thenReturn(processingFuture);
-
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -132,6 +125,8 @@
@Test
public void inboundMessageShouldCallListener() throws Exception {
+ stream.request(1);
+
if (stream instanceof NettyServerStream) {
((NettyServerStream) stream).inboundDataReceived(messageFrame(MESSAGE), false);
} else {
@@ -142,10 +137,6 @@
// Verify that inbound flow control window update has been disabled for the stream.
assertEquals(MESSAGE, NettyTestUtil.toString(captor.getValue()));
-
- // Verify that inbound flow control window update has been re-enabled for the stream after
- // the future completes.
- processingFuture.set(null);
}
protected abstract AbstractStream<Integer> createStream();
diff --git a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java
index 0c5baa1..59a07c8 100644
--- a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java
+++ b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java
@@ -42,7 +42,6 @@
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.concurrent.Executor;
import javax.annotation.concurrent.GuardedBy;
@@ -57,28 +56,11 @@
/**
* Construct a new client stream.
*/
- static OkHttpClientStream newStream(final Executor executor, ClientStreamListener listener,
+ static OkHttpClientStream newStream(ClientStreamListener listener,
AsyncFrameWriter frameWriter,
OkHttpClientTransport transport,
OutboundFlowController outboundFlow) {
- // Create a lock object that can be used by both the executor and methods in the stream
- // to ensure consistent locking behavior.
- final Object executorLock = new Object();
- Executor synchronizingExecutor = new Executor() {
- @Override
- public void execute(final Runnable command) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- synchronized (executorLock) {
- command.run();
- }
- }
- });
- }
- };
- return new OkHttpClientStream(synchronizingExecutor, listener, frameWriter, transport,
- executorLock, outboundFlow);
+ return new OkHttpClientStream(listener, frameWriter, transport, outboundFlow);
}
@GuardedBy("executorLock")
@@ -88,25 +70,28 @@
private final AsyncFrameWriter frameWriter;
private final OutboundFlowController outboundFlow;
private final OkHttpClientTransport transport;
- // Lock used to synchronize with work done on the executor.
- private final Object executorLock;
+ private final Object lock = new Object();
private Object outboundFlowState;
- private OkHttpClientStream(final Executor executor,
- final ClientStreamListener listener,
+ private OkHttpClientStream(ClientStreamListener listener,
AsyncFrameWriter frameWriter,
OkHttpClientTransport transport,
- Object executorLock,
OutboundFlowController outboundFlow) {
- super(listener, executor);
+ super(listener);
this.frameWriter = frameWriter;
this.transport = transport;
- this.executorLock = executorLock;
this.outboundFlow = outboundFlow;
}
+ @Override
+ public void request(final int numMessages) {
+ synchronized (lock) {
+ requestMessagesFromDeframer(numMessages);
+ }
+ }
+
public void transportHeadersReceived(List<Header> headers, boolean endOfStream) {
- synchronized (executorLock) {
+ synchronized (lock) {
if (endOfStream) {
transportTrailersReceived(Utils.convertTrailers(headers));
} else {
@@ -120,7 +105,7 @@
* the future listeners (executed by synchronizedExecutor) will not be executed in the same time.
*/
public void transportDataReceived(okio.Buffer frame, boolean endOfStream) {
- synchronized (executorLock) {
+ synchronized (lock) {
long length = frame.size();
window -= length;
super.transportDataReceived(new OkHttpBuffer(frame), endOfStream);
@@ -143,7 +128,7 @@
@Override
protected void returnProcessedBytes(int processedBytes) {
- synchronized (executorLock) {
+ synchronized (lock) {
processedWindow -= processedBytes;
if (processedWindow <= WINDOW_UPDATE_THRESHOLD) {
int delta = OkHttpClientTransport.DEFAULT_INITIAL_WINDOW_SIZE - processedWindow;
@@ -157,7 +142,7 @@
@Override
public void transportReportStatus(Status newStatus, boolean stopDelivery,
Metadata.Trailers trailers) {
- synchronized (executorLock) {
+ synchronized (lock) {
super.transportReportStatus(newStatus, stopDelivery, trailers);
}
}
diff --git a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java
index 83ccc80..a48a1f3 100644
--- a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java
+++ b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java
@@ -166,7 +166,7 @@
protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
Metadata.Headers headers,
ClientStreamListener listener) {
- OkHttpClientStream clientStream = OkHttpClientStream.newStream(executor, listener,
+ OkHttpClientStream clientStream = OkHttpClientStream.newStream(listener,
frameWriter, this, outboundFlow);
if (goAway) {
clientStream.transportReportStatus(goAwayStatus, false, new Metadata.Trailers());
diff --git a/okhttp/src/test/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransportTest.java
index d701537..87f7572 100644
--- a/okhttp/src/test/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransportTest.java
+++ b/okhttp/src/test/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransportTest.java
@@ -45,7 +45,6 @@
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.Service.State;
import com.google.net.stubby.Metadata;
@@ -137,8 +136,8 @@
public void nextFrameThrowIOException() throws Exception {
MockStreamListener listener1 = new MockStreamListener();
MockStreamListener listener2 = new MockStreamListener();
- clientTransport.newStream(method, new Metadata.Headers(), listener1);
- clientTransport.newStream(method, new Metadata.Headers(), listener2);
+ clientTransport.newStream(method, new Metadata.Headers(), listener1).request(1);
+ clientTransport.newStream(method, new Metadata.Headers(), listener2).request(1);
assertEquals(2, streams.size());
assertTrue(streams.containsKey(3));
assertTrue(streams.containsKey(5));
@@ -158,7 +157,7 @@
final int numMessages = 10;
final String message = "Hello Client";
MockStreamListener listener = new MockStreamListener();
- clientTransport.newStream(method, new Metadata.Headers(), listener);
+ clientTransport.newStream(method, new Metadata.Headers(), listener).request(numMessages);
assertTrue(streams.containsKey(3));
frameHandler.headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
assertNotNull(listener.headers);
@@ -179,7 +178,7 @@
@Test
public void invalidInboundHeadersCancelStream() throws Exception {
MockStreamListener listener = new MockStreamListener();
- clientTransport.newStream(method, new Metadata.Headers(), listener);
+ clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
assertTrue(streams.containsKey(3));
// Empty headers block without correct content type or status
frameHandler.headers(false, false, 3, 0, new ArrayList<Header>(),
@@ -246,8 +245,8 @@
public void windowUpdate() throws Exception {
MockStreamListener listener1 = new MockStreamListener();
MockStreamListener listener2 = new MockStreamListener();
- clientTransport.newStream(method,new Metadata.Headers(), listener1);
- clientTransport.newStream(method,new Metadata.Headers(), listener2);
+ clientTransport.newStream(method,new Metadata.Headers(), listener1).request(2);
+ clientTransport.newStream(method,new Metadata.Headers(), listener2).request(2);
assertEquals(2, streams.size());
OkHttpClientStream stream1 = streams.get(3);
OkHttpClientStream stream2 = streams.get(5);
@@ -299,7 +298,7 @@
@Test
public void windowUpdateWithInboundFlowControl() throws Exception {
MockStreamListener listener = new MockStreamListener();
- clientTransport.newStream(method, new Metadata.Headers(), listener);
+ clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
OkHttpClientStream stream = streams.get(3);
int messageLength = OkHttpClientTransport.DEFAULT_INITIAL_WINDOW_SIZE / 2 + 1;
@@ -342,8 +341,8 @@
// start 2 streams.
MockStreamListener listener1 = new MockStreamListener();
MockStreamListener listener2 = new MockStreamListener();
- clientTransport.newStream(method,new Metadata.Headers(), listener1);
- clientTransport.newStream(method,new Metadata.Headers(), listener2);
+ clientTransport.newStream(method,new Metadata.Headers(), listener1).request(1);
+ clientTransport.newStream(method,new Metadata.Headers(), listener2).request(1);
assertEquals(2, streams.size());
// Receive goAway, max good id is 3.
@@ -494,18 +493,16 @@
}
@Override
- public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
+ public void headersRead(Metadata.Headers headers) {
this.headers = headers;
- return null;
}
@Override
- public ListenableFuture<Void> messageRead(InputStream message, int length) {
+ public void messageRead(InputStream message, int length) {
String msg = getContent(message);
if (msg != null) {
messages.add(msg);
}
- return null;
}
@Override
@@ -522,13 +519,18 @@
}
static String getContent(InputStream message) {
- BufferedReader br =
- new BufferedReader(new InputStreamReader(message, UTF_8));
+ BufferedReader br = new BufferedReader(new InputStreamReader(message, UTF_8));
try {
// Only one line message is used in this test.
return br.readLine();
} catch (IOException e) {
return null;
+ } finally {
+ try {
+ message.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
}
diff --git a/stub/src/main/java/com/google/net/stubby/stub/Calls.java b/stub/src/main/java/com/google/net/stubby/stub/Calls.java
index 279e840..f47a9ea 100644
--- a/stub/src/main/java/com/google/net/stubby/stub/Calls.java
+++ b/stub/src/main/java/com/google/net/stubby/stub/Calls.java
@@ -160,7 +160,7 @@
ReqT param,
StreamObserver<RespT> responseObserver) {
asyncServerStreamingCall(call, param,
- new StreamObserverToCallListenerAdapter<RespT>(responseObserver));
+ new StreamObserverToCallListenerAdapter<RespT>(call, responseObserver));
}
private static <ReqT, RespT> void asyncServerStreamingCall(
@@ -168,6 +168,7 @@
ReqT param,
Call.Listener<RespT> responseListener) {
call.start(responseListener, new Metadata.Headers());
+ call.request(1);
try {
call.sendPayload(param);
call.halfClose();
@@ -217,10 +218,11 @@
* Execute a duplex-streaming call.
* @return request stream observer.
*/
- public static <ReqT, RespT> StreamObserver<ReqT> duplexStreamingCall(
- Call<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
- call.start(new StreamObserverToCallListenerAdapter<RespT>(responseObserver),
+ public static <ReqT, RespT> StreamObserver<ReqT> duplexStreamingCall(Call<ReqT, RespT> call,
+ StreamObserver<RespT> responseObserver) {
+ call.start(new StreamObserverToCallListenerAdapter<RespT>(call, responseObserver),
new Metadata.Headers());
+ call.request(1);
return new CallToStreamObserverAdapter<ReqT>(call);
}
@@ -248,22 +250,25 @@
}
}
- private static class StreamObserverToCallListenerAdapter<T> extends Call.Listener<T> {
- private final StreamObserver<T> observer;
+ private static class StreamObserverToCallListenerAdapter<RespT> extends Call.Listener<RespT> {
+ private final Call<?, RespT> call;
+ private final StreamObserver<RespT> observer;
- public StreamObserverToCallListenerAdapter(StreamObserver<T> observer) {
+ public StreamObserverToCallListenerAdapter(Call<?, RespT> call, StreamObserver<RespT> observer) {
+ this.call = call;
this.observer = observer;
}
@Override
- public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
- return null;
+ public void onHeaders(Metadata.Headers headers) {
}
@Override
- public ListenableFuture<Void> onPayload(T payload) {
+ public void onPayload(RespT payload) {
observer.onValue(payload);
- return null;
+
+ // Request delivery of the next inbound message.
+ call.request(1);
}
@Override
@@ -288,18 +293,16 @@
}
@Override
- public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
- return null;
+ public void onHeaders(Metadata.Headers headers) {
}
@Override
- public ListenableFuture<Void> onPayload(RespT value) {
+ public void onPayload(RespT value) {
if (this.value != null) {
throw Status.INTERNAL.withDescription("More than one value received for unary call")
.asRuntimeException();
}
this.value = value;
- return null;
}
@Override
@@ -357,11 +360,13 @@
if (!hasNext()) {
throw new NoSuchElementException();
}
- @SuppressWarnings("unchecked")
- Payload<T> tmp = (Payload<T>) last;
- last = null;
- tmp.processed.set(null);
- return tmp.value;
+ try {
+ @SuppressWarnings("unchecked")
+ T tmp = (T) last;
+ return tmp;
+ } finally {
+ last = null;
+ }
}
@Override
@@ -373,16 +378,13 @@
private boolean done = false;
@Override
- public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
- return null;
+ public void onHeaders(Metadata.Headers headers) {
}
@Override
- public ListenableFuture<Void> onPayload(T value) {
+ public void onPayload(T value) {
Preconditions.checkState(!done, "Call already closed");
- SettableFuture<Void> future = SettableFuture.create();
- buffer.add(new Payload<T>(value, future));
- return future;
+ buffer.add(value);
}
@Override
@@ -397,14 +399,4 @@
}
}
}
-
- private static class Payload<T> {
- public final T value;
- public final SettableFuture<Void> processed;
-
- public Payload(T value, SettableFuture<Void> processed) {
- this.value = value;
- this.processed = processed;
- }
- }
}
diff --git a/stub/src/main/java/com/google/net/stubby/stub/MetadataUtils.java b/stub/src/main/java/com/google/net/stubby/stub/MetadataUtils.java
index 2ab7e76..c382aca 100644
--- a/stub/src/main/java/com/google/net/stubby/stub/MetadataUtils.java
+++ b/stub/src/main/java/com/google/net/stubby/stub/MetadataUtils.java
@@ -31,7 +31,6 @@
package com.google.net.stubby.stub;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Call;
import com.google.net.stubby.Channel;
import com.google.net.stubby.ClientInterceptor;
@@ -122,9 +121,9 @@
trailersCapture.set(null);
super.start(new ForwardingListener<RespT>(responseListener) {
@Override
- public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
+ public void onHeaders(Metadata.Headers headers) {
headersCapture.set(headers);
- return super.onHeaders(headers);
+ super.onHeaders(headers);
}
@Override
diff --git a/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java b/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java
index 2f4198c..c68ef14 100644
--- a/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java
+++ b/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java
@@ -31,7 +31,6 @@
package com.google.net.stubby.stub;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.ServerCall;
import com.google.net.stubby.ServerCallHandler;
@@ -59,21 +58,24 @@
public ServerCall.Listener<ReqT> startCall(
String fullMethodName, final ServerCall<RespT> call, Metadata.Headers headers) {
final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call);
+ call.request(1);
return new EmptyServerCallListener<ReqT>() {
ReqT request;
@Override
- public ListenableFuture<Void> onPayload(ReqT request) {
+ public void onPayload(ReqT request) {
if (this.request == null) {
// We delay calling method.invoke() until onHalfClose(), because application may call
// close(OK) inside invoke(), while close(OK) is not allowed before onHalfClose().
this.request = request;
+
+ // Request delivery of the next inbound message.
+ call.request(1);
} else {
call.close(
Status.INVALID_ARGUMENT.withDescription(
"More than one request payloads for unary call or server streaming call"),
new Metadata.Trailers());
}
- return null;
}
@Override
@@ -99,17 +101,20 @@
final StreamingRequestMethod<ReqT, RespT> method) {
return new ServerCallHandler<ReqT, RespT>() {
@Override
- public ServerCall.Listener<ReqT> startCall(String fullMethodName, ServerCall<RespT> call,
- Metadata.Headers headers) {
+ public ServerCall.Listener<ReqT> startCall(String fullMethodName,
+ final ServerCall<RespT> call, Metadata.Headers headers) {
+ call.request(1);
final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call);
final StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
return new EmptyServerCallListener<ReqT>() {
boolean halfClosed = false;
@Override
- public ListenableFuture<Void> onPayload(ReqT request) {
+ public void onPayload(ReqT request) {
requestObserver.onValue(request);
- return null;
+
+ // Request delivery of the next inbound message.
+ call.request(1);
}
@Override
@@ -158,6 +163,9 @@
throw Status.CANCELLED.asRuntimeException();
}
call.sendPayload(response);
+
+ // Request delivery of the next inbound message.
+ call.request(1);
}
@Override
@@ -177,8 +185,7 @@
private static class EmptyServerCallListener<ReqT> extends ServerCall.Listener<ReqT> {
@Override
- public ListenableFuture<Void> onPayload(ReqT request) {
- return null;
+ public void onPayload(ReqT request) {
}
@Override