core,netty: correctly count streams that ended due to client cancel (#3683)
Only bump the counter from AbstractServerStream.TransportState, and hole punch
from AbstractServerStream to TransportState when the application calls close.
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java
index 0132847..b984337 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java
@@ -131,6 +131,10 @@
statsTraceCtx.streamClosed(status);
endOfMessages();
addStatusToTrailers(trailers, status);
+ // Safe to set without synchronization because access is tightly controlled.
+ // closedStatus is only set from here, and is read from a place that has happen-after
+ // guarantees with respect to here.
+ transportState().setClosedStatus(status);
abstractServerStreamSink().writeTrailers(trailers, headersSent, status);
}
}
@@ -178,7 +182,10 @@
return statsTraceCtx;
}
- /** This should only called from the transport thread. */
+ /**
+ * This should only called from the transport thread (except for private interactions with
+ * {@code AbstractServerStream}).
+ */
protected abstract static class TransportState extends AbstractStream.TransportState {
/** Whether listener.closed() has been called. */
private boolean listenerClosed;
@@ -190,6 +197,9 @@
private boolean deframerClosed = false;
private boolean immediateCloseRequested = false;
private Runnable deframerClosedTask;
+ /** The status that the application used to close this stream. */
+ @Nullable
+ private Status closedStatus;
protected TransportState(
int maxMessageSize,
@@ -314,18 +324,34 @@
}
/**
- * Closes the listener if not previously closed and frees resources.
+ * Closes the listener if not previously closed and frees resources. {@code newStatus} is a
+ * status generated by gRPC. It is <b>not</b> the status the stream closed with.
*/
private void closeListener(Status newStatus) {
+ // If newStatus is OK, the application must have already called AbstractServerStream.close()
+ // and the status passed in there was the actual status of the RPC.
+ // If newStatus non-OK, then the RPC ended some other way and the server application did
+ // not initiate the termination.
+ Preconditions.checkState(!newStatus.isOk() || closedStatus != null);
if (!listenerClosed) {
- // If status is OK, close() is guaranteed to be called which should decide the final status
if (!newStatus.isOk()) {
statsTraceCtx.streamClosed(newStatus);
+ transportTracer.reportStreamClosed(false);
+ } else {
+ transportTracer.reportStreamClosed(closedStatus.isOk());
}
listenerClosed = true;
onStreamDeallocated();
listener().closed(newStatus);
}
}
+
+ /**
+ * Stores the {@code Status} that the application used to close this stream.
+ */
+ private void setClosedStatus(Status closeStatus) {
+ Preconditions.checkState(closedStatus == null, "closedStatus can only be set once");
+ closedStatus = closeStatus;
+ }
}
}
diff --git a/core/src/main/java/io/grpc/internal/TransportTracer.java b/core/src/main/java/io/grpc/internal/TransportTracer.java
index 23a5212..8423309 100644
--- a/core/src/main/java/io/grpc/internal/TransportTracer.java
+++ b/core/src/main/java/io/grpc/internal/TransportTracer.java
@@ -17,7 +17,6 @@
package io.grpc.internal;
import com.google.common.base.Preconditions;
-import io.grpc.Status;
import java.util.concurrent.TimeUnit;
/**
@@ -67,8 +66,8 @@
/**
* Reports that a stream closed with the specified Status.
*/
- public void reportStreamClosed(Status status) {
- if (status.isOk()) {
+ public void reportStreamClosed(boolean success) {
+ if (success) {
streamsSucceeded++;
} else {
streamsFailed++;
diff --git a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java
index 8c7406a..3c7971b 100644
--- a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java
@@ -95,6 +95,7 @@
ReadableBuffer buffer = mock(ReadableBuffer.class);
// Close the deframer
+ stream.close(Status.OK, new Metadata());
stream.transportState().complete();
// Frame received after deframer closed, should be ignored and not trigger an exception
stream.transportState().inboundDataReceived(buffer, true);
@@ -118,6 +119,7 @@
// Queue bytes in deframer
stream.transportState().inboundDataReceived(ReadableBuffers.wrap(new byte[] {1}), false);
+ stream.close(Status.OK, new Metadata());
stream.transportState().complete();
assertEquals(Status.OK, closedFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
@@ -172,6 +174,7 @@
public void completeWithoutClose() {
stream.transportState().setListener(new ServerStreamListenerBase());
// Test that it doesn't throw an exception
+ stream.close(Status.OK, new Metadata());
stream.transportState().complete();
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
index 79b1023..370f889 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
@@ -602,7 +602,6 @@
}
if (cmd.endOfStream()) {
closeStreamWhenDone(promise, streamId);
- transportTracer.reportStreamClosed(cmd.status());
}
encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
}
diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
index b08d6e0..d089d89 100644
--- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
+++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
@@ -1432,7 +1432,7 @@
}
@Test
- public void transportTracer_streamEnded_ok() throws Exception {
+ public void transportTracer_server_streamEnded_ok() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
@@ -1462,7 +1462,7 @@
}
@Test
- public void transportTracer_streamEnded_nonOk() throws Exception {
+ public void transportTracer_server_streamEnded_nonOk() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
@@ -1491,6 +1491,45 @@
}
@Test
+ public void transportTracer_client_streamEnded_nonOk() throws Exception {
+ server.start(serverListener);
+ client = newClientTransport(server);
+ runIfNotNull(client.start(mockClientTransportListener));
+ ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
+ ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
+ clientStream.start(clientStreamListener);
+ MockServerTransportListener serverTransportListener =
+ serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ StreamCreation serverStreamCreation =
+ serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (!haveTransportTracer()) {
+ return;
+ }
+
+ TransportTracer.Stats serverBefore =
+ serverTransportListener.transport.getTransportStats().get();
+ assertEquals(0, serverBefore.streamsFailed);
+ assertEquals(0, serverBefore.streamsSucceeded);
+ // TODO(zpencer): uncomment when integrated with client transport
+ // TransportTracer.Stats clientBefore = client.getTransportStats().get();
+ // assertEquals(0, clientBefore.streamsFailed);
+ // assertEquals(0, clientBefore.streamsSucceeded);
+
+ clientStream.cancel(Status.UNKNOWN);
+ // do not validate stats until close() has been called on server
+ assertNotNull(serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+
+ TransportTracer.Stats serverAfter =
+ serverTransportListener.transport.getTransportStats().get();
+ assertEquals(1, serverAfter.streamsFailed);
+ assertEquals(0, serverAfter.streamsSucceeded);
+ // TODO(zpencer): uncomment when integrated with client transport
+ // TransportTracer.Stats clientAfter = client.getTransportStats().get();
+ // assertEquals(1, clientAfter.streamsFailed);
+ // assertEquals(0, clientAfter.streamsSucceeded);
+ }
+
+ @Test
public void transportTracer_receive_msg() throws Exception {
server.start(serverListener);
client = newClientTransport(server);