core: integrate instrumentation-java (Census) tracing (#2938)
Main implementation is in CensusTracingModule.
Also a few fix-ups in the stats implementation CensusStatsModule:
- Change header key name from grpc-census-bin to grpc-tags-bin
- Server does not fail on header parse errors. Uses the default instead.
Protect Census-based stats and tracing with static flags: `GrpcUtil.enableCensusStats` and `GrpcUtil.enableCensusTracing`. They keep those features disabled by default until they, especially their wire formats, are stabilized.
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index b257ffd..c97fffc 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -39,6 +39,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.instrumentation.stats.Stats;
import com.google.instrumentation.stats.StatsContextFactory;
+import com.google.instrumentation.trace.Tracing;
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
import io.grpc.CompressorRegistry;
@@ -287,19 +288,25 @@
nameResolverFactory = NameResolverProvider.asFactory();
}
- List<ClientInterceptor> interceptors = this.interceptors;
- if (recordsStats()) {
+ List<ClientInterceptor> effectiveInterceptors =
+ new ArrayList<ClientInterceptor>(this.interceptors);
+ if (GrpcUtil.enableCensusStats && recordsStats()) {
StatsContextFactory statsCtxFactory =
this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory();
if (statsCtxFactory != null) {
- interceptors = new ArrayList<ClientInterceptor>(this.interceptors);
- CensusStreamTracerModule census =
- new CensusStreamTracerModule(statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
+ CensusStatsModule censusStats =
+ new CensusStatsModule(statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
// First interceptor runs last (see ClientInterceptors.intercept()), so that no
// other interceptor can override the tracer factory we set in CallOptions.
- interceptors.add(0, census.getClientInterceptor());
+ effectiveInterceptors.add(0, censusStats.getClientInterceptor());
}
}
+ if (GrpcUtil.enableCensusTracing) {
+ CensusTracingModule censusTracing =
+ new CensusTracingModule(Tracing.getTracer(), Tracing.getBinaryPropagationHandler());
+ effectiveInterceptors.add(0, censusTracing.getClientInterceptor());
+ }
+
return new ManagedChannelImpl(
target,
// TODO(carl-mastrangelo): Allow clients to pass this in
@@ -316,7 +323,7 @@
GrpcUtil.STOPWATCH_SUPPLIER,
idleTimeoutMillis,
userAgent,
- interceptors);
+ effectiveInterceptors);
}
/**
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index 649b6e5..fbeb147 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -38,6 +38,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.instrumentation.stats.Stats;
import com.google.instrumentation.stats.StatsContextFactory;
+import com.google.instrumentation.trace.Tracing;
import io.grpc.BindableService;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
@@ -172,14 +173,22 @@
public ServerImpl build() {
ArrayList<ServerStreamTracer.Factory> tracerFactories =
new ArrayList<ServerStreamTracer.Factory>();
- StatsContextFactory statsFactory =
- this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory();
- if (statsFactory != null) {
- CensusStreamTracerModule census =
- new CensusStreamTracerModule(statsFactory, GrpcUtil.STOPWATCH_SUPPLIER);
- tracerFactories.add(census.getServerTracerFactory());
+ if (GrpcUtil.enableCensusStats) {
+ StatsContextFactory statsFactory =
+ this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory();
+ if (statsFactory != null) {
+ CensusStatsModule censusStats =
+ new CensusStatsModule(statsFactory, GrpcUtil.STOPWATCH_SUPPLIER);
+ tracerFactories.add(censusStats.getServerTracerFactory());
+ }
+ }
+ if (GrpcUtil.enableCensusTracing) {
+ CensusTracingModule censusTracing =
+ new CensusTracingModule(Tracing.getTracer(), Tracing.getBinaryPropagationHandler());
+ tracerFactories.add(censusTracing.getServerTracerFactory());
}
tracerFactories.addAll(streamTracerFactories);
+
io.grpc.internal.InternalServer transportServer =
buildTransportServer(Collections.unmodifiableList(tracerFactories));
ServerImpl server = new ServerImpl(getExecutorPool(),
diff --git a/core/src/main/java/io/grpc/internal/CensusStreamTracerModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java
similarity index 89%
rename from core/src/main/java/io/grpc/internal/CensusStreamTracerModule.java
rename to core/src/main/java/io/grpc/internal/CensusStatsModule.java
index a91cec4..ac05e88 100644
--- a/core/src/main/java/io/grpc/internal/CensusStreamTracerModule.java
+++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java
@@ -63,19 +63,23 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
- * Provides factories for {@link StreamTracer} that records tracing and metrics to Census.
+ * Provides factories for {@link StreamTracer} that records stats to Census.
*
* <p>On the client-side, a factory is created for each call, because ClientCall starts earlier than
* the ClientStream, and in some cases may even not create a ClientStream at all. Therefore, it's
* the factory that reports the summary to Census.
*
- * <p>On the server-side, a tracer is created for each call, because ServerStream starts earlier
- * than the ServerCall. Therefore, it's the tracer that reports the summary to Census.
+ * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
+ * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call and
+ * it's the tracer that reports the summary to Census.
*/
-final class CensusStreamTracerModule {
+final class CensusStatsModule {
+ private static final Logger logger = Logger.getLogger(CensusStatsModule.class.getName());
private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1);
private static final ClientTracer BLANK_CLIENT_TRACER = new ClientTracer();
@@ -86,16 +90,17 @@
private final StatsContextFactory statsCtxFactory;
private final Supplier<Stopwatch> stopwatchSupplier;
- private final Metadata.Key<StatsContext> statsHeader;
- private final CensusClientInterceptor clientInterceptor = new CensusClientInterceptor();
+ @VisibleForTesting
+ final Metadata.Key<StatsContext> statsHeader;
+ private final StatsClientInterceptor clientInterceptor = new StatsClientInterceptor();
private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
- CensusStreamTracerModule(
+ CensusStatsModule(
final StatsContextFactory statsCtxFactory, Supplier<Stopwatch> stopwatchSupplier) {
this.statsCtxFactory = checkNotNull(statsCtxFactory, "statsCtxFactory");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.statsHeader =
- Metadata.Key.of("grpc-census-bin", new Metadata.BinaryMarshaller<StatsContext>() {
+ Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller<StatsContext>() {
@Override
public byte[] toBytes(StatsContext context) {
// TODO(carl-mastrangelo): currently we only make sure the correctness. We may need to
@@ -113,8 +118,9 @@
public StatsContext parseBytes(byte[] serialized) {
try {
return statsCtxFactory.deserialize(new ByteArrayInputStream(serialized));
- } catch (IOException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ logger.log(Level.FINE, "Failed to parse stats header", e);
+ return statsCtxFactory.getDefault();
}
}
});
@@ -192,7 +198,9 @@
checkState(streamTracer.compareAndSet(null, tracer),
"Are you creating multiple streams per call? This class doesn't yet support this case.");
headers.discardAll(statsHeader);
- headers.put(statsHeader, parentCtx);
+ if (parentCtx != statsCtxFactory.getDefault()) {
+ headers.put(statsHeader, parentCtx);
+ }
return tracer;
}
@@ -245,10 +253,10 @@
private final AtomicLong outboundUncompressedSize = new AtomicLong();
private final AtomicLong inboundUncompressedSize = new AtomicLong();
- ServerTracer(String fullMethodName, @Nullable StatsContext parentCtx) {
+ ServerTracer(String fullMethodName, StatsContext parentCtx) {
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
+ this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.stopwatch = stopwatchSupplier.get().start();
- this.parentCtx = parentCtx;
}
@Override
@@ -308,11 +316,10 @@
@Override
public <ReqT, RespT> Context filterContext(Context context) {
- if (parentCtx != null) {
+ if (parentCtx != statsCtxFactory.getDefault()) {
return context.withValue(STATS_CONTEXT_KEY, parentCtx);
- } else {
- return context;
}
+ return context;
}
}
@@ -320,14 +327,18 @@
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
StatsContext parentCtx = headers.get(statsHeader);
+ if (parentCtx == null) {
+ parentCtx = statsCtxFactory.getDefault();
+ }
return new ServerTracer(fullMethodName, parentCtx);
}
}
- private class CensusClientInterceptor implements ClientInterceptor {
+ private class StatsClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+ // New RPCs on client-side inherit the stats context from the current Context.
StatsContext parentCtx = STATS_CONTEXT_KEY.get();
if (parentCtx == null) {
parentCtx = statsCtxFactory.getDefault();
diff --git a/core/src/main/java/io/grpc/internal/CensusTracingModule.java b/core/src/main/java/io/grpc/internal/CensusTracingModule.java
new file mode 100644
index 0000000..4668033
--- /dev/null
+++ b/core/src/main/java/io/grpc/internal/CensusTracingModule.java
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.instrumentation.trace.ContextUtils.CONTEXT_SPAN_KEY;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.instrumentation.trace.BinaryPropagationHandler;
+import com.google.instrumentation.trace.EndSpanOptions;
+import com.google.instrumentation.trace.Span;
+import com.google.instrumentation.trace.SpanContext;
+import com.google.instrumentation.trace.Status;
+import com.google.instrumentation.trace.Tracer;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientStreamTracer;
+import io.grpc.Context;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerStreamTracer;
+import io.grpc.StreamTracer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+
+/**
+ * Provides factories for {@link StreamTracer} that records traces to Census.
+ *
+ * <p>On the client-side, a factory is created for each call, because ClientCall starts earlier than
+ * the ClientStream, and in some cases may even not create a ClientStream at all. Therefore, it's
+ * the factory that reports the summary to Census.
+ *
+ * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
+ * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call and
+ * it's the tracer that reports the summary to Census.
+ */
+final class CensusTracingModule {
+ private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName());
+ // TODO(zhangkun83): record NetworkEvent to Span for each message
+ private static final ClientStreamTracer noopClientTracer = new ClientStreamTracer() {};
+
+ private final Tracer censusTracer;
+ private final BinaryPropagationHandler censusTracingPropagationHandler;
+ @VisibleForTesting
+ final Metadata.Key<SpanContext> tracingHeader;
+ private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
+ private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
+
+ CensusTracingModule(
+ Tracer censusTracer, final BinaryPropagationHandler censusTracingPropagationHandler) {
+ this.censusTracer = checkNotNull(censusTracer, "censusTracer");
+ this.censusTracingPropagationHandler =
+ checkNotNull(censusTracingPropagationHandler, "censusTracingPropagationHandler");
+ this.tracingHeader =
+ Metadata.Key.of("grpc-trace-bin", new Metadata.BinaryMarshaller<SpanContext>() {
+ @Override
+ public byte[] toBytes(SpanContext context) {
+ return censusTracingPropagationHandler.toBinaryValue(context);
+ }
+
+ @Override
+ public SpanContext parseBytes(byte[] serialized) {
+ try {
+ return censusTracingPropagationHandler.fromBinaryValue(serialized);
+ } catch (Exception e) {
+ logger.log(Level.FINE, "Failed to parse tracing header", e);
+ return SpanContext.INVALID;
+ }
+ }
+ });
+ }
+
+ /**
+ * Creates a {@link ClientCallTracer} for a new call.
+ */
+ @VisibleForTesting
+ ClientCallTracer newClientCallTracer(@Nullable Span parentSpan, String fullMethodName) {
+ return new ClientCallTracer(parentSpan, fullMethodName);
+ }
+
+ /**
+ * Returns the server tracer factory.
+ */
+ ServerStreamTracer.Factory getServerTracerFactory() {
+ return serverTracerFactory;
+ }
+
+ /**
+ * Returns the client interceptor that facilitates Census-based stats reporting.
+ */
+ ClientInterceptor getClientInterceptor() {
+ return clientInterceptor;
+ }
+
+ private static String makeSpanName(String prefix, String fullMethodName) {
+ return prefix + "." + fullMethodName.replace('/', '.');
+ }
+
+ @VisibleForTesting
+ static Status convertStatus(io.grpc.Status grpcStatus) {
+ Status status;
+ switch (grpcStatus.getCode()) {
+ case OK:
+ status = Status.OK;
+ break;
+ case CANCELLED:
+ status = Status.CANCELLED;
+ break;
+ case UNKNOWN:
+ status = Status.UNKNOWN;
+ break;
+ case INVALID_ARGUMENT:
+ status = Status.INVALID_ARGUMENT;
+ break;
+ case DEADLINE_EXCEEDED:
+ status = Status.DEADLINE_EXCEEDED;
+ break;
+ case NOT_FOUND:
+ status = Status.NOT_FOUND;
+ break;
+ case ALREADY_EXISTS:
+ status = Status.ALREADY_EXISTS;
+ break;
+ case PERMISSION_DENIED:
+ status = Status.PERMISSION_DENIED;
+ break;
+ case RESOURCE_EXHAUSTED:
+ status = Status.RESOURCE_EXHAUSTED;
+ break;
+ case FAILED_PRECONDITION:
+ status = Status.FAILED_PRECONDITION;
+ break;
+ case ABORTED:
+ status = Status.ABORTED;
+ break;
+ case OUT_OF_RANGE:
+ status = Status.OUT_OF_RANGE;
+ break;
+ case UNIMPLEMENTED:
+ status = Status.UNIMPLEMENTED;
+ break;
+ case INTERNAL:
+ status = Status.INTERNAL;
+ break;
+ case UNAVAILABLE:
+ status = Status.UNAVAILABLE;
+ break;
+ case DATA_LOSS:
+ status = Status.DATA_LOSS;
+ break;
+ case UNAUTHENTICATED:
+ status = Status.UNAUTHENTICATED;
+ break;
+ default:
+ throw new AssertionError("Unhandled status code " + grpcStatus.getCode());
+ }
+ if (grpcStatus.getDescription() != null) {
+ status = status.withDescription(grpcStatus.getDescription());
+ }
+ return status;
+ }
+
+ private static EndSpanOptions createEndSpanOptions(io.grpc.Status status) {
+ return EndSpanOptions.builder().setStatus(convertStatus(status)).build();
+ }
+
+ @VisibleForTesting
+ final class ClientCallTracer extends ClientStreamTracer.Factory {
+
+ private final String fullMethodName;
+ private final AtomicBoolean callEnded = new AtomicBoolean(false);
+ private final Span span;
+
+ ClientCallTracer(@Nullable Span parentSpan, String fullMethodName) {
+ this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
+ this.span =
+ censusTracer.spanBuilder(parentSpan, makeSpanName("Sent", fullMethodName)).startSpan();
+ }
+
+ @Override
+ public ClientStreamTracer newClientStreamTracer(Metadata headers) {
+ headers.discardAll(tracingHeader);
+ headers.put(tracingHeader, span.getContext());
+ return noopClientTracer;
+ }
+
+ /**
+ * Record a finished call and mark the current time as the end time.
+ *
+ * <p>Can be called from any thread without synchronization. Calling it the second time or more
+ * is a no-op.
+ */
+ void callEnded(io.grpc.Status status) {
+ if (!callEnded.compareAndSet(false, true)) {
+ return;
+ }
+ span.end(createEndSpanOptions(status));
+ }
+ }
+
+ private final class ServerTracer extends ServerStreamTracer {
+ private final String fullMethodName;
+ private final Span span;
+ private final AtomicBoolean streamClosed = new AtomicBoolean(false);
+
+ ServerTracer(String fullMethodName, @Nullable SpanContext remoteSpan) {
+ this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
+ this.span =
+ censusTracer.spanBuilderWithRemoteParent(
+ remoteSpan, makeSpanName("Recv", fullMethodName))
+ .startSpan();
+ }
+
+ /**
+ * Record a finished stream and mark the current time as the end time.
+ *
+ * <p>Can be called from any thread without synchronization. Calling it the second time or more
+ * is a no-op.
+ */
+ @Override
+ public void streamClosed(io.grpc.Status status) {
+ if (!streamClosed.compareAndSet(false, true)) {
+ return;
+ }
+ span.end(createEndSpanOptions(status));
+ }
+
+ @Override
+ public <ReqT, RespT> Context filterContext(Context context) {
+ return context.withValue(CONTEXT_SPAN_KEY, span);
+ }
+ }
+
+ private final class ServerTracerFactory extends ServerStreamTracer.Factory {
+ @SuppressWarnings("ReferenceEquality")
+ @Override
+ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
+ SpanContext remoteSpan = headers.get(tracingHeader);
+ if (remoteSpan == SpanContext.INVALID) {
+ remoteSpan = null;
+ }
+ return new ServerTracer(fullMethodName, remoteSpan);
+ }
+ }
+
+ private class TracingClientInterceptor implements ClientInterceptor {
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+ MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+ // New RPCs on client-side inherit the tracing context from the current Context.
+ Span parentSpan = CONTEXT_SPAN_KEY.get();
+ final ClientCallTracer tracerFactory =
+ newClientCallTracer(parentSpan, method.getFullMethodName());
+ ClientCall<ReqT, RespT> call =
+ next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
+ return new SimpleForwardingClientCall<ReqT, RespT>(call) {
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata headers) {
+ delegate().start(
+ new SimpleForwardingClientCallListener<RespT>(responseListener) {
+ @Override
+ public void onClose(io.grpc.Status status, Metadata trailers) {
+ tracerFactory.callEnded(status);
+ super.onClose(status, trailers);
+ }
+ },
+ headers);
+ }
+ };
+ }
+ }
+}
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index 0ffd05b..4da7be6 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -203,6 +203,20 @@
public static final long SERVER_KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE;
/**
+ * Whether the channel builder and server builder will try to load and use Census stats library.
+ * Delete this and assume always on once Census stats has been fully tested and its wire-format is
+ * stabilized.
+ */
+ public static boolean enableCensusStats;
+
+ /**
+ * Whether the channel builder and server builder will try to load and use Census tracing library.
+ * Delete this and assume always on once Census stats has been fully tested and its wire-format is
+ * stabilized.
+ */
+ public static boolean enableCensusTracing;
+
+ /**
* Maps HTTP error response status codes to transport codes, as defined in <a
* href="https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md">
* http-grpc-status-mapping.md</a>. Never returns a status for which {@code status.isOk()} is
diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java
new file mode 100644
index 0000000..23e3a00
--- /dev/null
+++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java
@@ -0,0 +1,728 @@
+/*
+ * Copyright 2017, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.instrumentation.stats.RpcConstants;
+import com.google.instrumentation.stats.StatsContext;
+import com.google.instrumentation.stats.TagValue;
+import com.google.instrumentation.trace.Annotation;
+import com.google.instrumentation.trace.AttributeValue;
+import com.google.instrumentation.trace.BinaryPropagationHandler;
+import com.google.instrumentation.trace.ContextUtils;
+import com.google.instrumentation.trace.EndSpanOptions;
+import com.google.instrumentation.trace.Link;
+import com.google.instrumentation.trace.NetworkEvent;
+import com.google.instrumentation.trace.Span;
+import com.google.instrumentation.trace.SpanContext;
+import com.google.instrumentation.trace.SpanFactory;
+import com.google.instrumentation.trace.SpanId;
+import com.google.instrumentation.trace.StartSpanOptions;
+import com.google.instrumentation.trace.TraceId;
+import com.google.instrumentation.trace.TraceOptions;
+import com.google.instrumentation.trace.Tracer;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors;
+import io.grpc.ClientStreamTracer;
+import io.grpc.Context;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.ServerStreamTracer;
+import io.grpc.Status;
+import io.grpc.internal.testing.StatsTestUtils;
+import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
+import io.grpc.testing.GrpcServerRule;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Test for {@link CensusStatsModule} and {@link CensusTracingModule}.
+ */
+@RunWith(JUnit4.class)
+public class CensusModulesTest {
+ private static final CallOptions.Key<String> CUSTOM_OPTION =
+ CallOptions.Key.of("option1", "default");
+ private static final CallOptions CALL_OPTIONS =
+ CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue");
+
+ private static class StringInputStream extends InputStream {
+ final String string;
+
+ StringInputStream(String string) {
+ this.string = string;
+ }
+
+ @Override
+ public int read() {
+ // InProcessTransport doesn't actually read bytes from the InputStream. The InputStream is
+ // passed to the InProcess server and consumed by MARSHALLER.parse().
+ throw new UnsupportedOperationException("Should not be called");
+ }
+ }
+
+ private static final MethodDescriptor.Marshaller<String> MARSHALLER =
+ new MethodDescriptor.Marshaller<String>() {
+ @Override
+ public InputStream stream(String value) {
+ return new StringInputStream(value);
+ }
+
+ @Override
+ public String parse(InputStream stream) {
+ return ((StringInputStream) stream).string;
+ }
+ };
+
+ private final MethodDescriptor<String, String> method = MethodDescriptor.create(
+ MethodDescriptor.MethodType.UNKNOWN, "package1.service2/method3",
+ MARSHALLER, MARSHALLER);
+ private final FakeClock fakeClock = new FakeClock();
+ private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
+ private final Random random = new Random(0);
+ private final SpanContext fakeClientSpanContext =
+ SpanContext.create(
+ TraceId.generateRandomId(random), SpanId.generateRandomId(random),
+ TraceOptions.builder().build());
+ private final SpanContext fakeClientParentSpanContext =
+ SpanContext.create(
+ TraceId.generateRandomId(random), SpanId.generateRandomId(random),
+ TraceOptions.builder().build());
+ private final SpanContext fakeServerSpanContext =
+ SpanContext.create(
+ TraceId.generateRandomId(random), SpanId.generateRandomId(random),
+ TraceOptions.builder().build());
+ private final SpanContext fakeServerParentSpanContext =
+ SpanContext.create(
+ TraceId.generateRandomId(random), SpanId.generateRandomId(random),
+ TraceOptions.builder().build());
+ private final Span fakeClientSpan = new FakeSpan(fakeClientSpanContext);
+ private final Span fakeServerSpan = new FakeSpan(fakeServerSpanContext);
+ private final Span fakeClientParentSpan = new FakeSpan(fakeClientParentSpanContext);
+ private final Span fakeServerParentSpan = new FakeSpan(fakeServerParentSpanContext);
+ private final Span spyClientSpan = spy(fakeClientSpan);
+ private final Span spyServerSpan = spy(fakeServerSpan);
+ private final byte[] binarySpanContext = new byte[]{3, 1, 5};
+
+ @Rule
+ public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
+
+ @Mock
+ private AccessibleSpanFactory mockSpanFactory;
+ @Mock
+ private BinaryPropagationHandler mockTracingPropagationHandler;
+ @Mock
+ private ClientCall.Listener<String> mockClientCallListener;
+ @Mock
+ private ServerCall.Listener<String> mockServerCallListener;
+ @Captor
+ private ArgumentCaptor<CallOptions> callOptionsCaptor;
+ @Captor
+ private ArgumentCaptor<ClientCall.Listener<String>> clientCallListenerCaptor;
+ @Captor
+ private ArgumentCaptor<Status> statusCaptor;
+
+ private Tracer tracer;
+ private CensusStatsModule censusStats;
+ private CensusTracingModule censusTracing;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(mockSpanFactory.startSpan(any(Span.class), anyString(), any(StartSpanOptions.class)))
+ .thenReturn(spyClientSpan);
+ when(
+ mockSpanFactory.startSpanWithRemoteParent(
+ any(SpanContext.class), anyString(), any(StartSpanOptions.class)))
+ .thenReturn(spyServerSpan);
+ when(mockTracingPropagationHandler.toBinaryValue(any(SpanContext.class)))
+ .thenReturn(binarySpanContext);
+ when(mockTracingPropagationHandler.fromBinaryValue(any(byte[].class)))
+ .thenReturn(fakeServerParentSpanContext);
+ tracer = new Tracer(mockSpanFactory) {};
+ censusStats = new CensusStatsModule(statsCtxFactory, fakeClock.getStopwatchSupplier());
+ censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler);
+ }
+
+ @After
+ public void wrapUp() {
+ assertNull(statsCtxFactory.pollRecord());
+ }
+
+ @Test
+ public void clientInterceptorNoCustomTag() {
+ testClientInterceptors(false);
+ }
+
+ @Test
+ public void clientInterceptorCustomTag() {
+ testClientInterceptors(true);
+ }
+
+ // Test that Census ClientInterceptors uses the StatsContext and Span out of the current Context
+ // to create the ClientCallTracer, and that it intercepts ClientCall.Listener.onClose() to call
+ // ClientCallTracer.callEnded().
+ private void testClientInterceptors(boolean nonDefaultContext) {
+ grpcServerRule.getServiceRegistry().addService(
+ ServerServiceDefinition.builder("package1.service2").addMethod(
+ method, new ServerCallHandler<String, String>() {
+ @Override
+ public ServerCall.Listener<String> startCall(
+ ServerCall<String, String> call, Metadata headers) {
+ call.sendHeaders(new Metadata());
+ call.sendMessage("Hello");
+ call.close(
+ Status.PERMISSION_DENIED.withDescription("No you don't"), new Metadata());
+ return mockServerCallListener;
+ }
+ }).build());
+
+ final AtomicReference<CallOptions> capturedCallOptions = new AtomicReference<CallOptions>();
+ ClientInterceptor callOptionsCaptureInterceptor = new ClientInterceptor() {
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+ MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+ capturedCallOptions.set(callOptions);
+ return next.newCall(method, callOptions);
+ }
+ };
+ Channel interceptedChannel =
+ ClientInterceptors.intercept(
+ grpcServerRule.getChannel(), callOptionsCaptureInterceptor,
+ censusStats.getClientInterceptor(), censusTracing.getClientInterceptor());
+ ClientCall<String, String> call;
+ if (nonDefaultContext) {
+ Context ctx =
+ Context.ROOT.withValues(
+ CensusStatsModule.STATS_CONTEXT_KEY,
+ statsCtxFactory.getDefault().with(
+ StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")),
+ ContextUtils.CONTEXT_SPAN_KEY,
+ fakeClientParentSpan);
+ Context origCtx = ctx.attach();
+ try {
+ call = interceptedChannel.newCall(method, CALL_OPTIONS);
+ } finally {
+ ctx.detach(origCtx);
+ }
+ } else {
+ assertNull(CensusStatsModule.STATS_CONTEXT_KEY.get());
+ assertNull(ContextUtils.CONTEXT_SPAN_KEY.get());
+ call = interceptedChannel.newCall(method, CALL_OPTIONS);
+ }
+
+ // The interceptor adds tracer factory to CallOptions
+ assertEquals("customvalue", capturedCallOptions.get().getOption(CUSTOM_OPTION));
+ assertEquals(2, capturedCallOptions.get().getStreamTracerFactories().size());
+ assertTrue(
+ capturedCallOptions.get().getStreamTracerFactories().get(0)
+ instanceof CensusTracingModule.ClientCallTracer);
+ assertTrue(
+ capturedCallOptions.get().getStreamTracerFactories().get(1)
+ instanceof CensusStatsModule.ClientCallTracer);
+
+ // Make the call
+ Metadata headers = new Metadata();
+ call.start(mockClientCallListener, headers);
+ assertNull(statsCtxFactory.pollRecord());
+ if (nonDefaultContext) {
+ verify(mockSpanFactory).startSpan(
+ same(fakeClientParentSpan), eq("Sent.package1.service2.method3"),
+ any(StartSpanOptions.class));
+ } else {
+ verify(mockSpanFactory).startSpan(
+ isNull(Span.class), eq("Sent.package1.service2.method3"), any(StartSpanOptions.class));
+ }
+ verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
+
+ // End the call
+ call.halfClose();
+ call.request(1);
+
+ verify(mockClientCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
+ Status status = statusCaptor.getValue();
+ assertEquals(Status.Code.PERMISSION_DENIED, status.getCode());
+ assertEquals("No you don't", status.getDescription());
+
+ // The intercepting listener calls callEnded() on ClientCallTracer, which records to Census.
+ StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
+ assertNotNull(record);
+ TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.toString());
+ TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
+ assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.toString());
+ if (nonDefaultContext) {
+ TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG);
+ assertEquals("extra value", extraTag.toString());
+ } else {
+ assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG));
+ }
+ verify(spyClientSpan).end(
+ EndSpanOptions.builder()
+ .setStatus(
+ com.google.instrumentation.trace.Status.PERMISSION_DENIED
+ .withDescription("No you don't"))
+ .build());
+ verify(spyClientSpan, never()).end();
+ }
+
+ @Test
+ public void clientBasicStatsDefaultContext() {
+ CensusStatsModule.ClientCallTracer callTracer =
+ censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName());
+ Metadata headers = new Metadata();
+ ClientStreamTracer tracer = callTracer.newClientStreamTracer(headers);
+
+ fakeClock.forwardTime(30, MILLISECONDS);
+ tracer.outboundHeaders();
+
+ fakeClock.forwardTime(100, MILLISECONDS);
+ tracer.outboundWireSize(1028);
+ tracer.outboundUncompressedSize(1128);
+
+ fakeClock.forwardTime(16, MILLISECONDS);
+ tracer.inboundWireSize(33);
+ tracer.inboundUncompressedSize(67);
+ tracer.outboundWireSize(99);
+ tracer.outboundUncompressedSize(865);
+
+ fakeClock.forwardTime(24, MILLISECONDS);
+ tracer.inboundWireSize(154);
+ tracer.inboundUncompressedSize(552);
+ tracer.streamClosed(Status.OK);
+ callTracer.callEnded(Status.OK);
+
+ StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
+ assertNotNull(record);
+ assertNoServerContent(record);
+ TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.toString());
+ TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
+ assertEquals(Status.Code.OK.toString(), statusTag.toString());
+ assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
+ assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
+ assertEquals(1128 + 865,
+ record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
+ assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
+ assertEquals(67 + 552,
+ record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
+ assertEquals(30 + 100 + 16 + 24,
+ record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+ }
+
+ @Test
+ public void clientBasicTracingDefaultSpan() {
+ CensusTracingModule.ClientCallTracer callTracer =
+ censusTracing.newClientCallTracer(null, method.getFullMethodName());
+ Metadata headers = new Metadata();
+ ClientStreamTracer tracer = callTracer.newClientStreamTracer(headers);
+ verify(mockSpanFactory).startSpan(
+ isNull(Span.class), eq("Sent.package1.service2.method3"), any(StartSpanOptions.class));
+ verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
+
+ tracer.streamClosed(Status.OK);
+ callTracer.callEnded(Status.OK);
+
+ verify(spyClientSpan).end(
+ EndSpanOptions.builder().setStatus(com.google.instrumentation.trace.Status.OK).build());
+ verifyNoMoreInteractions(mockSpanFactory);
+ }
+
+ @Test
+ public void clientStreamNeverCreatedStillRecordStats() {
+ CensusStatsModule.ClientCallTracer callTracer =
+ censusStats.newClientCallTracer(
+ statsCtxFactory.getDefault(), method.getFullMethodName());
+
+ fakeClock.forwardTime(3000, MILLISECONDS);
+ callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
+
+ StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
+ assertNotNull(record);
+ assertNoServerContent(record);
+ TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.toString());
+ TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
+ assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString());
+ assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ERROR_COUNT));
+ assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
+ assertEquals(0,
+ record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
+ assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
+ assertEquals(0,
+ record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
+ assertEquals(3000, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+ assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
+ }
+
+ @Test
+ public void clientStreamNeverCreatedStillRecordTracing() {
+ CensusTracingModule.ClientCallTracer callTracer =
+ censusTracing.newClientCallTracer(fakeClientParentSpan, method.getFullMethodName());
+ verify(mockSpanFactory).startSpan(
+ same(fakeClientParentSpan), eq("Sent.package1.service2.method3"),
+ any(StartSpanOptions.class));
+
+ callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
+ verify(spyClientSpan).end(
+ EndSpanOptions.builder()
+ .setStatus(
+ com.google.instrumentation.trace.Status.DEADLINE_EXCEEDED
+ .withDescription("3 seconds"))
+ .build());
+ verify(spyClientSpan, never()).end();
+ }
+
+ @Test
+ public void statsHeadersPropagateTags() {
+ // EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are
+ // propagated. The StatsContextFactory decides which tags are to propagated. gRPC facilitates
+ // the propagation by putting them in the headers.
+ StatsContext clientCtx = statsCtxFactory.getDefault().with(
+ StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897"));
+ CensusStatsModule.ClientCallTracer callTracer =
+ censusStats.newClientCallTracer(clientCtx, method.getFullMethodName());
+ Metadata headers = new Metadata();
+ // This propagates clientCtx to headers
+ callTracer.newClientStreamTracer(headers);
+ assertTrue(headers.containsKey(censusStats.statsHeader));
+
+ ServerStreamTracer serverTracer =
+ censusStats.getServerTracerFactory().newServerStreamTracer(
+ method.getFullMethodName(), headers);
+ // Server tracer deserializes clientCtx from the headers, so that it records stats with the
+ // propagated tags.
+ Context serverContext = serverTracer.filterContext(Context.ROOT);
+ // It also put clientCtx in the Context seen by the call handler
+ assertEquals(clientCtx, CensusStatsModule.STATS_CONTEXT_KEY.get(serverContext));
+
+
+ // Verifies that the server tracer records the status with the propagated tag
+ serverTracer.streamClosed(Status.OK);
+
+ StatsTestUtils.MetricsRecord serverRecord = statsCtxFactory.pollRecord();
+ assertNotNull(serverRecord);
+ assertNoClientContent(serverRecord);
+ TagValue serverMethodTag = serverRecord.tags.get(RpcConstants.RPC_SERVER_METHOD);
+ assertEquals(method.getFullMethodName(), serverMethodTag.toString());
+ TagValue serverStatusTag = serverRecord.tags.get(RpcConstants.RPC_STATUS);
+ assertEquals(Status.Code.OK.toString(), serverStatusTag.toString());
+ assertNull(serverRecord.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
+ TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
+ assertEquals("extra-tag-value-897", serverPropagatedTag.toString());
+
+ // Verifies that the client tracer factory uses clientCtx, which includes the custom tags, to
+ // record stats.
+ callTracer.callEnded(Status.OK);
+
+ StatsTestUtils.MetricsRecord clientRecord = statsCtxFactory.pollRecord();
+ assertNotNull(clientRecord);
+ assertNoServerContent(clientRecord);
+ TagValue clientMethodTag = clientRecord.tags.get(RpcConstants.RPC_CLIENT_METHOD);
+ assertEquals(method.getFullMethodName(), clientMethodTag.toString());
+ TagValue clientStatusTag = clientRecord.tags.get(RpcConstants.RPC_STATUS);
+ assertEquals(Status.Code.OK.toString(), clientStatusTag.toString());
+ assertNull(clientRecord.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
+ TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG);
+ assertEquals("extra-tag-value-897", clientPropagatedTag.toString());
+ }
+
+ @Test
+ public void statsHeadersNotPropagateDefaultContext() {
+ CensusStatsModule.ClientCallTracer callTracer =
+ censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName());
+ Metadata headers = new Metadata();
+ callTracer.newClientStreamTracer(headers);
+ assertFalse(headers.containsKey(censusStats.statsHeader));
+ }
+
+ @Test
+ public void statsHeaderMalformed() {
+ // Construct a malformed header and make sure parsing it will throw
+ byte[] statsHeaderValue = new byte[]{1};
+ Metadata.Key<byte[]> arbitraryStatsHeader =
+ Metadata.Key.of("grpc-tags-bin", Metadata.BINARY_BYTE_MARSHALLER);
+ try {
+ statsCtxFactory.deserialize(new ByteArrayInputStream(statsHeaderValue));
+ fail("Should have thrown");
+ } catch (Exception e) {
+ // Expected
+ }
+
+ // But the header key will return a default context for it
+ Metadata headers = new Metadata();
+ assertNull(headers.get(censusStats.statsHeader));
+ headers.put(arbitraryStatsHeader, statsHeaderValue);
+ assertSame(statsCtxFactory.getDefault(), headers.get(censusStats.statsHeader));
+ }
+
+ @Test
+ public void traceHeadersPropagateSpanContext() throws Exception {
+ CensusTracingModule.ClientCallTracer callTracer =
+ censusTracing.newClientCallTracer(fakeClientParentSpan, method.getFullMethodName());
+ Metadata headers = new Metadata();
+ callTracer.newClientStreamTracer(headers);
+
+ verify(mockTracingPropagationHandler).toBinaryValue(same(fakeClientSpanContext));
+ verifyNoMoreInteractions(mockTracingPropagationHandler);
+ verify(mockSpanFactory).startSpan(
+ same(fakeClientParentSpan), eq("Sent.package1.service2.method3"),
+ any(StartSpanOptions.class));
+ verifyNoMoreInteractions(mockSpanFactory);
+ assertTrue(headers.containsKey(censusTracing.tracingHeader));
+
+ ServerStreamTracer serverTracer =
+ censusTracing.getServerTracerFactory().newServerStreamTracer(
+ method.getFullMethodName(), headers);
+ verify(mockTracingPropagationHandler).fromBinaryValue(same(binarySpanContext));
+ verify(mockSpanFactory).startSpanWithRemoteParent(
+ same(fakeServerParentSpanContext), eq("Recv.package1.service2.method3"),
+ any(StartSpanOptions.class));
+
+ Context filteredContext = serverTracer.filterContext(Context.ROOT);
+ assertSame(spyServerSpan, ContextUtils.CONTEXT_SPAN_KEY.get(filteredContext));
+ }
+
+ @Test
+ public void traceHeaderMalformed() throws Exception {
+ // As comparison, normal header parsing
+ Metadata headers = new Metadata();
+ headers.put(censusTracing.tracingHeader, fakeClientSpanContext);
+ // mockTracingPropagationHandler was stubbed to always return fakeServerParentSpanContext
+ assertSame(fakeServerParentSpanContext, headers.get(censusTracing.tracingHeader));
+
+ // Make BinaryPropagationHandler always throw when parsing the header
+ when(mockTracingPropagationHandler.fromBinaryValue(any(byte[].class)))
+ .thenThrow(new ParseException("Malformed header", 0));
+
+ headers = new Metadata();
+ assertNull(headers.get(censusTracing.tracingHeader));
+ headers.put(censusTracing.tracingHeader, fakeClientSpanContext);
+ assertSame(SpanContext.INVALID, headers.get(censusTracing.tracingHeader));
+ assertNotSame(fakeServerParentSpanContext, SpanContext.INVALID);
+
+ // A null Span is used as the parent in this case
+ censusTracing.getServerTracerFactory().newServerStreamTracer(
+ method.getFullMethodName(), headers);
+ verify(mockSpanFactory).startSpanWithRemoteParent(
+ isNull(SpanContext.class), eq("Recv.package1.service2.method3"),
+ any(StartSpanOptions.class));
+ }
+
+ @Test
+ public void serverBasicStatsNoHeaders() {
+ ServerStreamTracer.Factory tracerFactory = censusStats.getServerTracerFactory();
+ ServerStreamTracer tracer =
+ tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
+
+ Context filteredContext = tracer.filterContext(Context.ROOT);
+ assertNull(CensusStatsModule.STATS_CONTEXT_KEY.get(filteredContext));
+
+ tracer.inboundWireSize(34);
+ tracer.inboundUncompressedSize(67);
+
+ fakeClock.forwardTime(100, MILLISECONDS);
+ tracer.outboundWireSize(1028);
+ tracer.outboundUncompressedSize(1128);
+
+ fakeClock.forwardTime(16, MILLISECONDS);
+ tracer.inboundWireSize(154);
+ tracer.inboundUncompressedSize(552);
+ tracer.outboundWireSize(99);
+ tracer.outboundUncompressedSize(865);
+
+ fakeClock.forwardTime(24, MILLISECONDS);
+
+ tracer.streamClosed(Status.CANCELLED);
+
+ StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
+ assertNotNull(record);
+ assertNoClientContent(record);
+ TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.toString());
+ TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
+ assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString());
+ assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_ERROR_COUNT));
+ assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
+ assertEquals(1128 + 865,
+ record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
+ assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES));
+ assertEquals(67 + 552,
+ record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
+ assertEquals(100 + 16 + 24,
+ record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_SERVER_LATENCY));
+ }
+
+ @Test
+ public void serverBasicTracingNoHeaders() {
+ ServerStreamTracer.Factory tracerFactory = censusTracing.getServerTracerFactory();
+ ServerStreamTracer tracer =
+ tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
+ verifyZeroInteractions(mockTracingPropagationHandler);
+ verify(mockSpanFactory).startSpanWithRemoteParent(
+ isNull(SpanContext.class), eq("Recv.package1.service2.method3"),
+ any(StartSpanOptions.class));
+
+ Context filteredContext = tracer.filterContext(Context.ROOT);
+ assertSame(spyServerSpan, ContextUtils.CONTEXT_SPAN_KEY.get(filteredContext));
+
+ verify(spyServerSpan, never()).end(any(EndSpanOptions.class));
+ tracer.streamClosed(Status.CANCELLED);
+
+ verify(spyServerSpan).end(
+ EndSpanOptions.builder()
+ .setStatus(com.google.instrumentation.trace.Status.CANCELLED).build());
+ verify(spyServerSpan, never()).end();
+ }
+
+ @Test
+ public void convertToTracingStatus() {
+ // Without description
+ for (Status.Code grpcCode : Status.Code.values()) {
+ Status grpcStatus = Status.fromCode(grpcCode);
+ com.google.instrumentation.trace.Status tracingStatus =
+ CensusTracingModule.convertStatus(grpcStatus);
+ assertEquals(grpcCode.toString(), tracingStatus.getCanonicalCode().toString());
+ assertNull(tracingStatus.getDescription());
+ }
+
+ // With description
+ for (Status.Code grpcCode : Status.Code.values()) {
+ Status grpcStatus = Status.fromCode(grpcCode).withDescription("This is my description");
+ com.google.instrumentation.trace.Status tracingStatus =
+ CensusTracingModule.convertStatus(grpcStatus);
+ assertEquals(grpcCode.toString(), tracingStatus.getCanonicalCode().toString());
+ assertEquals(grpcStatus.getDescription(), tracingStatus.getDescription());
+ }
+ }
+
+ private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) {
+ assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
+ assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES));
+ assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
+ assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME));
+ assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY));
+ assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
+ assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
+ }
+
+ private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) {
+ assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
+ assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
+ assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
+ assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+ assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
+ assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
+ assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
+ }
+
+ // Promote the visibility of SpanFactory's methods to allow mocking
+ private abstract static class AccessibleSpanFactory extends SpanFactory {
+ @Override
+ public abstract Span startSpan(@Nullable Span parent, String name, StartSpanOptions options);
+
+ @Override
+ public abstract Span startSpanWithRemoteParent(
+ @Nullable SpanContext remoteParent, String name, StartSpanOptions options);
+ }
+
+ private static class FakeSpan extends Span {
+ FakeSpan(SpanContext ctx) {
+ super(ctx, null);
+ }
+
+ @Override
+ public void addAttributes(Map<String, AttributeValue> attributes) {
+ }
+
+ @Override
+ public void addAnnotation(String description, Map<String, AttributeValue> attributes) {
+ }
+
+ @Override
+ public void addAnnotation(Annotation annotation) {
+ }
+
+ @Override
+ public void addNetworkEvent(NetworkEvent networkEvent) {
+ }
+
+ @Override
+ public void addLink(Link link) {
+ }
+
+ @Override
+ public void end(EndSpanOptions options) {
+ }
+ }
+}
diff --git a/core/src/test/java/io/grpc/internal/CensusStreamTracerModuleTest.java b/core/src/test/java/io/grpc/internal/CensusStreamTracerModuleTest.java
deleted file mode 100644
index 764141f..0000000
--- a/core/src/test/java/io/grpc/internal/CensusStreamTracerModuleTest.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Copyright 2017, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.instrumentation.stats.RpcConstants;
-import com.google.instrumentation.stats.StatsContext;
-import com.google.instrumentation.stats.TagValue;
-import io.grpc.CallOptions;
-import io.grpc.Channel;
-import io.grpc.ClientCall;
-import io.grpc.ClientInterceptor;
-import io.grpc.ClientStreamTracer;
-import io.grpc.Context;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor.MethodType;
-import io.grpc.MethodDescriptor;
-import io.grpc.ServerStreamTracer;
-import io.grpc.Status;
-import io.grpc.internal.CensusStreamTracerModule.ClientCallTracer;
-import io.grpc.internal.testing.StatsTestUtils;
-import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
-import io.grpc.testing.TestMethodDescriptors;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Test for {@link StatsTraceContext}.
- */
-@RunWith(JUnit4.class)
-public class CensusStreamTracerModuleTest {
- private static final CallOptions.Key<String> CUSTOM_OPTION =
- CallOptions.Key.of("option1", "default");
- private static final CallOptions CALL_OPTIONS =
- CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue");
-
- private final FakeClock fakeClock = new FakeClock();
- private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
- private final CensusStreamTracerModule census =
- new CensusStreamTracerModule(statsCtxFactory, fakeClock.getStopwatchSupplier());
-
- @Mock
- private Channel mockChannel;
- @Mock
- private ClientCall<Void, Void> mockClientCall;
- @Mock
- private ClientCall.Listener<Void> mockClientCallListener;
- @Captor
- private ArgumentCaptor<CallOptions> callOptionsCaptor;
- @Captor
- private ArgumentCaptor<ClientCall.Listener<Void>> clientCallListenerCaptor;
-
- @Before
- @SuppressWarnings("unchecked")
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- when(mockChannel.newCall(any(MethodDescriptor.class), any(CallOptions.class)))
- .thenReturn(mockClientCall);
- }
-
- @After
- public void wrapUp() {
- assertNull(statsCtxFactory.pollRecord());
- }
-
- @Test
- public void clientInterceptorNoCustomTag() {
- testClientInterceptor(false);
- }
-
- @Test
- public void clientInterceptorCustomTag() {
- testClientInterceptor(true);
- }
-
- private void testClientInterceptor(boolean customTag) {
- String methodName = MethodDescriptor.generateFullMethodName("Service1", "method1");
- ClientInterceptor interceptor = census.getClientInterceptor();
- MethodDescriptor<Void, Void> method = MethodDescriptor.<Void, Void>newBuilder()
- .setType(MethodType.UNARY)
- .setFullMethodName(methodName)
- .setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
- .setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
- .build();
-
- ClientCall<Void, Void> call;
- if (customTag) {
- Context ctx =
- Context.ROOT.withValue(
- CensusStreamTracerModule.STATS_CONTEXT_KEY,
- statsCtxFactory.getDefault().with(
- StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")));
- Context origCtx = ctx.attach();
- try {
- call = interceptor.interceptCall(method, CALL_OPTIONS, mockChannel);
- } finally {
- ctx.detach(origCtx);
- }
- } else {
- assertNull(CensusStreamTracerModule.STATS_CONTEXT_KEY.get());
- call = interceptor.interceptCall(method, CALL_OPTIONS, mockChannel);
- }
-
- // The interceptor adds tracer factory to CallOptions
- verify(mockChannel).newCall(same(method), callOptionsCaptor.capture());
- CallOptions capturedCallOptions = callOptionsCaptor.getValue();
- assertEquals("customvalue", capturedCallOptions.getOption(CUSTOM_OPTION));
- assertEquals(1, capturedCallOptions.getStreamTracerFactories().size());
- assertTrue(capturedCallOptions.getStreamTracerFactories().get(0) instanceof ClientCallTracer);
-
- // Start the call
- Metadata headers = new Metadata();
- call.start(mockClientCallListener, headers);
-
- verify(mockClientCall).start(clientCallListenerCaptor.capture(), same(headers));
- assertNull(statsCtxFactory.pollRecord());
-
- // Then listener receives onClose()
- Status status = Status.CANCELLED.withDescription("I am just doing it");
- Metadata trailers = new Metadata();
- clientCallListenerCaptor.getValue().onClose(status, trailers);
-
- // The intercepting listener will call callEnded() on ClientTracerFactory, which records to
- // Census.
- verify(mockClientCallListener).onClose(same(status), same(trailers));
- StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
- assertNotNull(record);
- TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
- assertEquals(methodName, methodTag.toString());
- TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
- assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString());
- if (customTag) {
- TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG);
- assertEquals("extra value", extraTag.toString());
- } else {
- assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG));
- }
- }
-
- @Test
- public void clientBasicStats() {
- String methodName = MethodDescriptor.generateFullMethodName("Service1", "method1");
- ClientCallTracer callTracer =
- census.newClientCallTracer(statsCtxFactory.getDefault(), methodName);
- Metadata headers = new Metadata();
- ClientStreamTracer tracer = callTracer.newClientStreamTracer(headers);
-
- fakeClock.forwardTime(30, MILLISECONDS);
- tracer.outboundHeaders();
-
- fakeClock.forwardTime(100, MILLISECONDS);
- tracer.outboundWireSize(1028);
- tracer.outboundUncompressedSize(1128);
-
- fakeClock.forwardTime(16, MILLISECONDS);
- tracer.inboundWireSize(33);
- tracer.inboundUncompressedSize(67);
- tracer.outboundWireSize(99);
- tracer.outboundUncompressedSize(865);
-
- fakeClock.forwardTime(24, MILLISECONDS);
- tracer.inboundWireSize(154);
- tracer.inboundUncompressedSize(552);
- tracer.streamClosed(Status.OK);
- callTracer.callEnded(Status.OK);
-
- StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
- assertNotNull(record);
- assertNoServerContent(record);
- TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
- assertEquals(methodName, methodTag.toString());
- TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
- assertEquals(Status.Code.OK.toString(), statusTag.toString());
- assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
- assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
- assertEquals(1128 + 865,
- record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
- assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
- assertEquals(67 + 552,
- record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
- assertEquals(30 + 100 + 16 + 24,
- record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
- }
-
- @Test
- public void clientStreamNeverCreated() {
- String methodName = MethodDescriptor.generateFullMethodName("Service1", "method2");
- ClientCallTracer callTracer =
- census.newClientCallTracer(statsCtxFactory.getDefault(), methodName);
-
- fakeClock.forwardTime(3000, MILLISECONDS);
- callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
-
- StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
- assertNotNull(record);
- assertNoServerContent(record);
- TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
- assertEquals(methodName, methodTag.toString());
- TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
- assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString());
- assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ERROR_COUNT));
- assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
- assertEquals(0,
- record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
- assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
- assertEquals(0,
- record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
- assertEquals(3000, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
- assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
- }
-
- @Test
- public void tagPropagation() {
- String methodName = MethodDescriptor.generateFullMethodName("Service1", "method3");
-
- // EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are
- // propagated. The StatsContextFactory decides which tags are to propagated. gRPC facilitates
- // the propagation by putting them in the headers.
- StatsContext clientCtx = statsCtxFactory.getDefault().with(
- StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897"));
- ClientCallTracer callTracer = census.newClientCallTracer(clientCtx, methodName);
- Metadata headers = new Metadata();
- // This propagates clientCtx to headers
- callTracer.newClientStreamTracer(headers);
-
- ServerStreamTracer serverTracer =
- census.getServerTracerFactory().newServerStreamTracer(methodName, headers);
- // Server tracer deserializes clientCtx from the headers, so that it records stats with the
- // propagated tags.
- Context serverContext = serverTracer.filterContext(Context.ROOT);
- // It also put clientCtx in the Context seen by the call handler
- assertEquals(clientCtx, CensusStreamTracerModule.STATS_CONTEXT_KEY.get(serverContext));
-
-
- // Verifies that the server tracer records the status with the propagated tag
- serverTracer.streamClosed(Status.OK);
-
- StatsTestUtils.MetricsRecord serverRecord = statsCtxFactory.pollRecord();
- assertNotNull(serverRecord);
- assertNoClientContent(serverRecord);
- TagValue serverMethodTag = serverRecord.tags.get(RpcConstants.RPC_SERVER_METHOD);
- assertEquals(methodName, serverMethodTag.toString());
- TagValue serverStatusTag = serverRecord.tags.get(RpcConstants.RPC_STATUS);
- assertEquals(Status.Code.OK.toString(), serverStatusTag.toString());
- assertNull(serverRecord.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
- TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
- assertEquals("extra-tag-value-897", serverPropagatedTag.toString());
-
- // Verifies that the client tracer factory uses clientCtx, which includes the custom tags, to
- // record stats.
- callTracer.callEnded(Status.OK);
-
- StatsTestUtils.MetricsRecord clientRecord = statsCtxFactory.pollRecord();
- assertNotNull(clientRecord);
- assertNoServerContent(clientRecord);
- TagValue clientMethodTag = clientRecord.tags.get(RpcConstants.RPC_CLIENT_METHOD);
- assertEquals(methodName, clientMethodTag.toString());
- TagValue clientStatusTag = clientRecord.tags.get(RpcConstants.RPC_STATUS);
- assertEquals(Status.Code.OK.toString(), clientStatusTag.toString());
- assertNull(clientRecord.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
- TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG);
- assertEquals("extra-tag-value-897", clientPropagatedTag.toString());
- }
-
- @Test
- public void serverBasicStats() {
- String methodName = MethodDescriptor.generateFullMethodName("Service1", "method4");
-
- ServerStreamTracer.Factory tracerFactory = census.getServerTracerFactory();
- ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(methodName, new Metadata());
-
- tracer.inboundWireSize(34);
- tracer.inboundUncompressedSize(67);
-
- fakeClock.forwardTime(100, MILLISECONDS);
- tracer.outboundWireSize(1028);
- tracer.outboundUncompressedSize(1128);
-
- fakeClock.forwardTime(16, MILLISECONDS);
- tracer.inboundWireSize(154);
- tracer.inboundUncompressedSize(552);
- tracer.outboundWireSize(99);
- tracer.outboundUncompressedSize(865);
-
- fakeClock.forwardTime(24, MILLISECONDS);
- tracer.streamClosed(Status.CANCELLED);
-
- StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
- assertNotNull(record);
- assertNoClientContent(record);
- TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD);
- assertEquals(methodName, methodTag.toString());
- TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
- assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString());
- assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_ERROR_COUNT));
- assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
- assertEquals(1128 + 865,
- record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
- assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES));
- assertEquals(67 + 552,
- record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
- assertEquals(100 + 16 + 24,
- record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_SERVER_LATENCY));
- }
-
- private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) {
- assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
- assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES));
- assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
- assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME));
- assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY));
- assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
- assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
- }
-
- private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) {
- assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
- assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
- assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
- assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
- assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
- assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
- assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
- }
-}