core,netty: correctly count streams that ended due to client cancel (#3683)

Only bump the counter from AbstractServerStream.TransportState, and hole punch
from AbstractServerStream to TransportState when the application calls close.
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java
index 0132847..b984337 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java
@@ -131,6 +131,10 @@
       statsTraceCtx.streamClosed(status);
       endOfMessages();
       addStatusToTrailers(trailers, status);
+      // Safe to set without synchronization because access is tightly controlled.
+      // closedStatus is only set from here, and is read from a place that has happen-after
+      // guarantees with respect to here.
+      transportState().setClosedStatus(status);
       abstractServerStreamSink().writeTrailers(trailers, headersSent, status);
     }
   }
@@ -178,7 +182,10 @@
     return statsTraceCtx;
   }
 
-  /** This should only called from the transport thread. */
+  /**
+   * This should only called from the transport thread (except for private interactions with
+   * {@code AbstractServerStream}).
+   */
   protected abstract static class TransportState extends AbstractStream.TransportState {
     /** Whether listener.closed() has been called. */
     private boolean listenerClosed;
@@ -190,6 +197,9 @@
     private boolean deframerClosed = false;
     private boolean immediateCloseRequested = false;
     private Runnable deframerClosedTask;
+    /** The status that the application used to close this stream. */
+    @Nullable
+    private Status closedStatus;
 
     protected TransportState(
         int maxMessageSize,
@@ -314,18 +324,34 @@
     }
 
     /**
-     * Closes the listener if not previously closed and frees resources.
+     * Closes the listener if not previously closed and frees resources. {@code newStatus} is a
+     * status generated by gRPC. It is <b>not</b> the status the stream closed with.
      */
     private void closeListener(Status newStatus) {
+      // If newStatus is OK, the application must have already called AbstractServerStream.close()
+      // and the status passed in there was the actual status of the RPC.
+      // If newStatus non-OK, then the RPC ended some other way and the server application did
+      // not initiate the termination.
+      Preconditions.checkState(!newStatus.isOk() || closedStatus != null);
       if (!listenerClosed) {
-        // If status is OK, close() is guaranteed to be called which should decide the final status
         if (!newStatus.isOk()) {
           statsTraceCtx.streamClosed(newStatus);
+          transportTracer.reportStreamClosed(false);
+        } else {
+          transportTracer.reportStreamClosed(closedStatus.isOk());
         }
         listenerClosed = true;
         onStreamDeallocated();
         listener().closed(newStatus);
       }
     }
+
+    /**
+     * Stores the {@code Status} that the application used to close this stream.
+     */
+    private void setClosedStatus(Status closeStatus) {
+      Preconditions.checkState(closedStatus == null, "closedStatus can only be set once");
+      closedStatus = closeStatus;
+    }
   }
 }
diff --git a/core/src/main/java/io/grpc/internal/TransportTracer.java b/core/src/main/java/io/grpc/internal/TransportTracer.java
index 23a5212..8423309 100644
--- a/core/src/main/java/io/grpc/internal/TransportTracer.java
+++ b/core/src/main/java/io/grpc/internal/TransportTracer.java
@@ -17,7 +17,6 @@
 package io.grpc.internal;
 
 import com.google.common.base.Preconditions;
-import io.grpc.Status;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -67,8 +66,8 @@
   /**
    * Reports that a stream closed with the specified Status.
    */
-  public void reportStreamClosed(Status status) {
-    if (status.isOk()) {
+  public void reportStreamClosed(boolean success) {
+    if (success) {
       streamsSucceeded++;
     } else {
       streamsFailed++;
diff --git a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java
index 8c7406a..3c7971b 100644
--- a/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/AbstractServerStreamTest.java
@@ -95,6 +95,7 @@
     ReadableBuffer buffer = mock(ReadableBuffer.class);
 
     // Close the deframer
+    stream.close(Status.OK, new Metadata());
     stream.transportState().complete();
     // Frame received after deframer closed, should be ignored and not trigger an exception
     stream.transportState().inboundDataReceived(buffer, true);
@@ -118,6 +119,7 @@
 
     // Queue bytes in deframer
     stream.transportState().inboundDataReceived(ReadableBuffers.wrap(new byte[] {1}), false);
+    stream.close(Status.OK, new Metadata());
     stream.transportState().complete();
 
     assertEquals(Status.OK, closedFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
@@ -172,6 +174,7 @@
   public void completeWithoutClose() {
     stream.transportState().setListener(new ServerStreamListenerBase());
     // Test that it doesn't throw an exception
+    stream.close(Status.OK, new Metadata());
     stream.transportState().complete();
   }
 
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
index 79b1023..370f889 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
@@ -602,7 +602,6 @@
     }
     if (cmd.endOfStream()) {
       closeStreamWhenDone(promise, streamId);
-      transportTracer.reportStreamClosed(cmd.status());
     }
     encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
   }
diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
index b08d6e0..d089d89 100644
--- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
+++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
@@ -1432,7 +1432,7 @@
   }
 
   @Test
-  public void transportTracer_streamEnded_ok() throws Exception {
+  public void transportTracer_server_streamEnded_ok() throws Exception {
     server.start(serverListener);
     client = newClientTransport(server);
     runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
@@ -1462,7 +1462,7 @@
   }
 
   @Test
-  public void transportTracer_streamEnded_nonOk() throws Exception {
+  public void transportTracer_server_streamEnded_nonOk() throws Exception {
     server.start(serverListener);
     client = newClientTransport(server);
     runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
@@ -1491,6 +1491,45 @@
   }
 
   @Test
+  public void transportTracer_client_streamEnded_nonOk() throws Exception {
+    server.start(serverListener);
+    client = newClientTransport(server);
+    runIfNotNull(client.start(mockClientTransportListener));
+    ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
+    ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
+    clientStream.start(clientStreamListener);
+    MockServerTransportListener serverTransportListener =
+        serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    StreamCreation serverStreamCreation =
+        serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    if (!haveTransportTracer()) {
+      return;
+    }
+
+    TransportTracer.Stats serverBefore =
+        serverTransportListener.transport.getTransportStats().get();
+    assertEquals(0, serverBefore.streamsFailed);
+    assertEquals(0, serverBefore.streamsSucceeded);
+    // TODO(zpencer): uncomment when integrated with client transport
+    // TransportTracer.Stats clientBefore = client.getTransportStats().get();
+    // assertEquals(0, clientBefore.streamsFailed);
+    // assertEquals(0, clientBefore.streamsSucceeded);
+
+    clientStream.cancel(Status.UNKNOWN);
+    // do not validate stats until close() has been called on server
+    assertNotNull(serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
+
+    TransportTracer.Stats serverAfter =
+        serverTransportListener.transport.getTransportStats().get();
+    assertEquals(1, serverAfter.streamsFailed);
+    assertEquals(0, serverAfter.streamsSucceeded);
+    // TODO(zpencer): uncomment when integrated with client transport
+    // TransportTracer.Stats clientAfter = client.getTransportStats().get();
+    // assertEquals(1, clientAfter.streamsFailed);
+    // assertEquals(0, clientAfter.streamsSucceeded);
+  }
+
+  @Test
   public void transportTracer_receive_msg() throws Exception {
     server.start(serverListener);
     client = newClientTransport(server);